Spark Streaming、Kafka结合Spark JDBC External DataSouces处理案例

场景:使用Spark Streaming接收Kafka发送过来的数据与关系型数据库中的表进行相关的查询操作;

 

Kafka发送过来的数据格式为:id、name、cityId,分隔符为tab 

1       zhangsan        1
2       lisi    1
3       wangwu  2
4       zhaoliu 3

 

MySQL的表city结构为:id int, name varchar

1    bj
2    sz
3    sh

 

本案例的结果为:select s.id, s.name, s.cityId, c.name from student s join city c on s.cityId=c.id;

 

Kafka安装参见:Kafka单机版环境搭建

 

启动Kafka:

zkServer.sh start
kafka-server-start.sh  $KAFKA_HOME/config/server.properties  &
kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1  --topic luogankun_topic
kafka-console-producer.sh --broker-list hadoop000:9092 --topic luogankun_topic

 

实例代码:

package com.asiainfo.ocdc

case class Student(id: Int, name: String, cityId: Int)
package com.asiainfo.ocdc


import org.apache.spark.streaming._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka._

/**
 * Spark Streaming处理Kafka的数据并结合Spark JDBC外部数据源处理
 *
 * @author luogankun
 */
object KafkaStreaming {
  def main(args: Array[String]) {

    if (args.length < 4) {
      System.err.println("Usage: KafkaStreaming <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }

    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf()
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(5))

    val sqlContext = new HiveContext(sc)
    import sqlContext._

    import com.luogankun.spark.jdbc._
    //使用External Data Sources处理MySQL中的数据
    val cities = sqlContext.jdbcTable("jdbc:mysql://hadoop000:3306/test", "root", "root", "select id, name from city")
    //将cities RDD注册成city临时表
    cities.registerTempTable("city")

    val topicpMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val inputs = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2)

    inputs.foreachRDD(rdd => {
      if (rdd.partitions.length > 0) {
        //将Streaming中接收到的数据注册成student临时表
        rdd.map(_.split("\t")).map(x => Student(x(0).toInt, x(1), x(2).toInt)).registerTempTable("student")
        //关联Streaming和MySQL表进行查询操作
        sqlContext.sql("select s.id, s.name, s.cityId, c.name from student s join city c on s.cityId=c.id").collect().foreach(println)
      }
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

 

提交到集群执行脚本:sparkstreaming_kafka_jdbc.sh

#!/bin/sh
. /etc/profile
set -x

cd $SPARK_HOME/bin

spark-submit --name KafkaStreaming --class com.asiainfo.ocdc.KafkaStreaming --master spark://hadoop000:7077 \
--executor-memory 1G --total-executor-cores 1 /home/spark/software/source/streaming-app/target/streaming-app-V00B01C00-SNAPSHOT-jar-with-dependencies.jar hadoop000:2181 test-consumer-group luogankun_topic 1

 

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。