【Flume】flume中Avro Source到Avro Sink之间通过SSL传输数据的实现分析及使用

首先你需要了解JAVA KEYSTORE

该SSL用于Avro Sink到Avro Source之间的数据传输
该场景主要用于分布式Flume之间的数据传输,从分散的各个flume agent到中心汇集节点的flume agent

下面来看下如何实现的?

Avro Sink SSL

在这个传输过程中,sink其实就相当于socket的client端了
flume源码中有个类NettyAvroRpcClient,该类中还有个内部类SSLCompressionChannelFactory
其中定义了如下属性:

private final boolean enableCompression;
private final int compressionLevel;
private final boolean enableSsl;
private final boolean trustAllCerts;
private final String truststore;
private final String truststorePassword;
private final String truststoreType;
private final List excludeProtocols;
技术分享
1、要使用SSL进行数据传输,首先要将ssl开关打开,true
2、truststore指定生成的keystore文件
3、truststorepassword指定密码(这里注意生成的keypass和storepass一定相同,否则报错)

KeyStore keystore = null;

            if (truststore != null) {
              if (truststorePassword == null) {
                throw new NullPointerException("truststore password is null");
              }
              InputStream truststoreStream = new FileInputStream(truststore);
              keystore = KeyStore.getInstance(truststoreType);
              keystore.load(truststoreStream, truststorePassword.toCharArray());
            }

            TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
            // null keystore is OK, with SunX509 it defaults to system CA Certs
            // see http://docs.oracle.com/javase/6/docs/technotes/guides/security/jsse/JSSERefGuide.html#X509TrustManager
            tmf.init(keystore);
            managers = tmf.getTrustManagers();

该段代码就去加载了keystore文件
TrustManagerFactory是JDK原生的一个信任管理器工厂,每个新人管理器管理特定类型的由安全套接字使用的信任材料。信任材料是基于keystore或提供者特定源。
init方法通过证书授权源和相关的信任材料初始化此工厂
最后为此信任材料返回一个信任管理器

SSLContext sslContext = SSLContext.getInstance("TLS");
          sslContext.init(null, managers, null);
          SSLEngine sslEngine = sslContext.createSSLEngine();
          sslEngine.setUseClientMode(true);
          List<String> enabledProtocols = new ArrayList<String>();
          for (String protocol : sslEngine.getEnabledProtocols()) {
            if (!excludeProtocols.contains(protocol)) {
              enabledProtocols.add(protocol);
            }
          }
          sslEngine.setEnabledProtocols(enabledProtocols.toArray(new String[0]));
          logger.info("SSLEngine protocols enabled: " +
              Arrays.asList(sslEngine.getEnabledProtocols()));
          // addFirst() will make SSL handling the first stage of decoding
          // and the last stage of encoding this must be added after
          // adding compression handling above
          pipeline.addFirst("ssl", new SslHandler(sslEngine));

1、返回指定协议的SSLContext对象
TLS安全传输层协议
2、初始化此上下文,初始化参数只有信任管理器
3、初始化SSLEngine,并指定引擎在握手时使用客户端模式

最终这个安全的Socket就建立起来了

Avro Source SSL

source我们可以认为是socket的server端,打开连接后,等待客户端的连接

private static final String PORT_KEY = “port”;
private static final String BIND_KEY = “bind”;
private static final String COMPRESSION_TYPE = “compression-type”;
private static final String SSL_KEY = “ssl”;
private static final String IP_FILTER_KEY = “ipFilter”;
private static final String IP_FILTER_RULES_KEY = “ipFilterRules”;
private static final String KEYSTORE_KEY = “keystore”;
private static final String KEYSTORE_PASSWORD_KEY = “keystore-password”;
private static final String KEYSTORE_TYPE_KEY = “keystore-type”;
private static final String EXCLUDE_PROTOCOLS = “exclude-protocols”;
技术分享
以上Avro Source的一些配置属性

 try {
        KeyStore ks = KeyStore.getInstance(keystoreType);
        ks.load(new FileInputStream(keystore), keystorePassword.toCharArray());
      } catch (Exception ex) {
        throw new FlumeException(
            "Avro source configured with invalid keystore: " + keystore, ex);
      }

从上面代码可以看出,source端在configure方法执行的时候就会load该keystore

 if (enableSsl) {
        SSLEngine sslEngine = createServerSSLContext().createSSLEngine();
        sslEngine.setUseClientMode(false);
        List<String> enabledProtocols = new ArrayList<String>();
        for (String protocol : sslEngine.getEnabledProtocols()) {
          if (!excludeProtocols.contains(protocol)) {
            enabledProtocols.add(protocol);
          }
        }
        sslEngine.setEnabledProtocols(enabledProtocols.toArray(new String[0]));
        logger.info("SSLEngine protocols enabled: " +
            Arrays.asList(sslEngine.getEnabledProtocols()));
        // addFirst() will make SSL handling the first stage of decoding
        // and the last stage of encoding this must be added after
        // adding compression handling above
        pipeline.addFirst("ssl", new SslHandler(sslEngine));
      }

注意这里的SSLEngine就配置了引擎在握手时使用的服务器模式
最终返回对象ChannelPipeline

以上所有内容可能理解起来比较费劲,大家不妨先来看看这篇文章
Channel与Pipeline这里写链接内容

SSL在flume中的使用

首先准备一个keystore文件
Sink配置

a1.sinks.k1.type=avro
a1.sinks.k1.hostname=192.168.11.177
a1.sinks.k1.port=9520
a1.sinks.k1.channel=c1
a1.sinks.k1.ssl=true
a1.sinks.k1.truststore=/home/flume/keystore/chiwei.keystore
a1.sinks.k1.truststore-type=JKS
a1.sinks.k1.truststore-password=123456

Source端配置

a1.sources.r1.type = avro
a1.sources.r1.channels=c1
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=9520
a1.sources.r1.ssl=true
a1.sources.r1.keystore=/home/flume/keystore/chiwei.keystore
a1.sources.r1.keystore-password=123456
a1.sources.r1.keystore-type=JKS
a1.sources.r1.ipFilter=true
a1.sources.r1.ipFilterRules=allow:ip:192.168.11.176

望各位网友不吝指教!!

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。