Kafka数据加载
从 Kafka 读取数据
下面是从Kafka读取增量数据的示例:
df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
如果从Kafka读取全量数据可以使用:
df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
将数据写入 Kafka
下面示例将数据写入 Kafka:
df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
流式读取kafka数据并且流式写入表中
val dataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrapServers)
.option("subscribe", topicName)
.load()
val parsedDataFrame = dataFrame
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
parsedDataFrame.writeStream
.outputMode("append")
.format("rain")
.toTable("rain.rain_test")
.awaitTermination()
使用工具导入kafka数据
使用Rain-kafka-consumer工具可以将您kafka中某个topic的数据同步到算场表中
该文档将为您演示如何把本地的kafka数据同步到算场中
支持2中导入模式 jsonMode / rawMode
jsonMode模式: kafka生产的数据为json格式,将json数据解析为表格,实现kafka数据的导入。
rawMode模式: kafka生产的数据不限制格式,最终的表结构为uuid,offest,partition,rawData,recodeMeta,addTime
实现kafka数据的导入。

下面演示jsonMode的导入方式
演示环境
- Windows 10 专业版 22H2
- java version "1.8.0_431"
Java(TM) SE Runtime Environment (build 1.8.0_431-b10)
Java HotSpot(TM) 64-Bit Server VM (build 25.431-b10, mixed mode) - Docker version 27.2.0, build 3ab4256 (用于创建测试kafka环境)
- git version 2.47.0.windows.2 (用于在windows上执行shell)
安装Rain-kafka-consumer
- 点击链接将Rain-kafka-consumer下载到本地

- 该工具为一个jar包所以需要有java环境java8安装包
下载后双击打开该安装包一直点击下一步即可完成java8安装,使用Wim+R
快捷键输出cmd
打开windows命令窗口 输入以下命令出现同样的结果即为安装成功
java -version

- 为了在windows系统上执行脚本,需要使用git工具,安装流程一直点击下一步即可
下载地址:https://git-scm.com/

安装完毕后在第一部下载位置的文件夹中单机鼠标右键 看到open git bash here 即为安装成功

4. 该教程需要用一个本地的kafka环境,因此使用到了docker,安装流程一直点击下一步即可
下载地址:https://www.docker.com/ 选择适合您处理器的版本

安装完成以后点击桌面左下角的图标会在菜单中看到docker desktop,单机即可启动docker

- 准备本地的kafka环境
获取本机ip并保存,在gitBash命令窗口中输入查看本机ip地址
ipconfig
如教程中这里的192.168.123.35
即为本机ip

接下来在gitbash中执行下列命令(将命令中的【本机ip】
替换为上面的ip地址)
#启动zookeeper
docker run -d --name zookeeper -p 2181:2181 -e TZ="Asia/Shanghai" --restart always wurstmeister/zookeeper
#启动kafka
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=【本机ip】:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://【本机ip】:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka

此时在docker中可以看到这两个个容器在运行即为启动成功

这时我们就可以准备测试数据,通过一下命令,进入kafka容器
docker exec -it kafka bash
在打开的命令行中创建topic demo
cd /opt/kafka_2.13-2.8.1/bin
./kafka-topics.sh --create --zookeeper [本机ip]:2181 --replication-factor 1 --partitions 1 --topic demo

接下来使用命令打开生产者程序
./kafka-console-producer.sh --broker-list [本机ip]:9092 --topic demo
在出现的>
后进行数据生产
{"id":1,"name":"John Doe","age":25,"city":"San Francisco","is_student":true}
如图生产了3条json数据

- 使用Rain-kafka-consumer将这三条数据同步到算场表中
接下来将指导您完成这一过程
首先登录您的账号到算场中,获取到独属于您的认证凭证(请妥善保管,泄露造成的任何问题需要您个人承担责任)
6.1. 获取凭证
在浏览器中打开算场数据平台官网 输入账号密码进行登录

点击 跳过指引

创建新的密钥

查看密钥的accessKey和secretKey



这个accessKey和secretKey 可保存在任意地方 后续将会使用

6.2. 添加配置文件
在Rain-kafka-consumer工具目录下创建配置文件RainKafkaConsumer.properties
使用记事本打开后输入以下内容
#Kafka Properties
kafka.bootstrap.servers=【本机ip】:9092
kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.auto.offset.reset=earliest
kafka.max.poll.interval.ms=10000
#kafka.group.id 默认 随机提供一个
#kafka.group.id=RainHub
#rainConsumer properties
rain.loadMode=jsonMode
rain.accessKey=【用户accessKey】
rain.secretKey=【用户secretKey】
rain.metaEndpoint=【metaEndpoint】
rain.topic=test
rain.topic-table=test:rainhub.test
#rain.consumer.nums 默认 每个分区一个消费者
#rain.consumer.nums=5

6.3. 启动Rain-kafka-consumer
经过上面的操作,可以看到在目录下存在这样2个文件,一个是kafka消费程序一个是配置文件,右键空白处,打开gitbash窗口

输入启动命令
java -jar rain-kafka-consumer-1.0.0.jar ./RainKafkaConsumer.properties

此时查看算场数据,即可看到数据已经同步到算场

至此,您已成功将kafka数据导入了算场中
更多信息请查看详细文档
Rain-kafka-consumer详细文档