Kafka Connector
1. Kafka
Kafka是由Apache软件基金会开发的一个开源流处理平台。kafka通过发布和订阅记录流,按照记录的生成顺序高效的存储记录流,来实现数据流的实时流数据管道和应用程序。它结合了消息收发、存储和流处理功能,能够存储历史和实时数据。
2. Rain-kafka-consumer
Rain-kafka-consumer是算场科技专门提供的一种将数据从kafka中加载到算场平台。可以无缝的实现kafka数据传输。
3. 安装Rain-kafka-consumer
3.1 通过源码安装
安装前确保已经安装了maven环境和jre8
3.1.1 获取源码
git clone git@code.suan-chang.com:rain-connect/rain-kafka-consumer.git
3.1.2 打包
cd rain-kafka-consumer
mvn clean package -DskipTests
3.1.3 填写配置并运行
配置参见附录1
cd target
vim RainKafkaConsumer.properties
java -jar rain-kafka-consumer-1.0.0.jar ./RainKafkaConsumer.properties
2. 使用docker安装
安装前确保已经安装了 Docker,docker Compose
3.2.1 创建配置
vim RainKafkaConsumer.properties
3.2.2 创建docker compose
vim docker-compose.yml
docker-compose.yml内容为
version: '2.1'
services:
rain-kafka-consumer:
image: registry.cn-beijing.aliyuncs.com/suanchang/rain-kafka-consumer
volumes:
- $PWD/RainKafkaConsumer.properties:/rain-kafka-consumer/RainKafkaConsumer.properties
3.2.3 启动docker容器
docker-compose up -d
4. 使用示例
4.1 安装kafka
#启动zookeeper
docker run -d --name zookeeper --publish 2181:2181 --volume /etc/localtime:/etc/localtime zookeeper:latest
#启动kafka
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=【本机ip】:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://【本机ip】:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
4.2 准备kafka topic
#进入kafka容器
docker exec -it kafka /bin/bash
#创建topic test
cd /opt/kafka_2.13-2.8.1/
bin/kafka-topics.sh --create --zookeeper 【本机ip】:2181 --replication-factor 1 --partitions 1 --topic test
#查看topic 是否创建成功
bin/kafka-topics.sh --list --zookeeper 【本机ip】:2181
#向test中发送消息
bin/kafka-console-producer.sh --broker-list 【本机ip】:9092 --topic test
4.3 生产测试数据
在打开的test生产者的窗口中发送消息
{"id":1,"name":"John Doe","age":25,"city":"San Francisco","is_student":true}
4.4 使用rain-kafka-consumer消费数据
#ctrl+d 退出kafka容器 创建配置文件
vim RainKafkaConsumer.properties
RainKafkaConsumer.properties填入内容
#Kafka Properties
kafka.bootstrap.servers=【本机ip】:9092
kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.auto.offset.reset=earliest
kafka.max.poll.interval.ms=10000
#kafka.group.id 默认 随机提供一个
#kafka.group.id=RainHub
#rainConsumer properties
rain.loadMode=jsonMode
rain.accessKey=【用户accessKey】
rain.secretKey=【用户secretKey】
rain.metaEndpoint=【metaEndpoint】
rain.topic=test
rain.topic-table=test:rainhub.test
#rain.consumer.nums 默认 每个分区一个消费者
#rain.consumer.nums=5
保存配置并退出,创建dokcer compose
vim docker-compose.yml
#内容为
version: '2.1'
services:
rain-kafka-consumer:
image: registry.cn-beijing.aliyuncs.com/suanchang/rain-kafka-consumer
volumes:
- $PWD/RainKafkaConsumer.properties:/rain-kafka-consumer/RainKafkaConsumer.properties
#保存退出
启动消费者
docker-compose up -it
查看日志可以看到消费情况,这时在算场中可以查看数据,并且该服务会持续消费kafaka中的数据。
附录1 配置示例
#Kafka Properties
kafka.bootstrap.servers=127.0.0.1:9092
kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.auto.offset.reset=earliest
kafka.max.poll.interval.ms=10000
#kafka.group.id 默认 随机提供一个
#kafka.group.id=RainHub
#rainConsumer properties
rain.metaEndpoint=127.0.0.1:4002
rain.accessKey=yourAccessKey
rain.secretKey=yourSecretKey
rain.topic=user1
rain.topic-table=user1:test.user1
#rain.consumer.nums 默认 每个分区一个消费者
#rain.consumer.nums=5
附录2 配置说明
Config | Default | Description |
---|---|---|
kafka.bootstrap.servers | None | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster |
kafka.key.deserializer | None | The class name of the deserializer is used to convert the message key from bytes to the desired type. |
kafka.value.deserializer | None | The class name of the deserializer is used to convert the message value from bytes to the desired type. |
kafka.auto.offset.reset | latest | What to do when there is no initial offset in Kafka or if the current offset does not exist anymore. |
kafka.max.poll.interval.ms | 300000ms | The maximum delay between invocations of the poll() method. |
kafka.group.id | RandomId | The unique identifier of the consumer group this consumer belongs to. |
rain.accessKey | None | user accessKey |
rain.secretKey | None | user secretKey |
rain.metaEndpoint | None | Rainhub metaEndpoint |
rain.topic | None | Topic that needs to be subscribed and consumed |
rain.topic-table | None | Which table of Rainhub will consume the corresponding topic data |
rain.consumer.nums | topicNum*partitionNums | The total number of consumer threads |