kafka-python为Apache Kafka的python客户端。下面将介绍它的基本使用

1.Kafka及ZooKeeper的安装

这里将不累赘说明,参考Apache Kafka


2.kafka-python的安装

pip3 install kafka-python


3.kafka-python的基本使用

最简单使用实例

1.消费端

from kafka import KafkaConsumerconsumer = KafkaConsumer('my_topic', group_id= 'group2', bootstrap_servers= ['localhost:9092'])for msg in consumer:
print(msg)
第1个参数为 topic的名称group_id : 指定此消费者实例属于的组名,可以不指定bootstrap_servers : 指定kafka服务器

2.生产端

from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=['localhost:9092'])future = producer.send('my_topic' , key= b'my_key', value= b'my_value', partition= )result = future.get(timeout= 10)print(result)

producer.send函数为发送消息

第1个参数为 topic名称,必须指定key : 键,必须是字节字符串,可以不指定(但key和value必须指定1个),默认为Nonevalue : 值,必须是字节字符串,可以不指定(但key和value必须指定1个),默认为Nonepartition : 指定发送的partition,由于kafka默认配置1个partition,固为0

future.get函数等待单条消息发送完成或超时,经测试,必须有这个函数,不然发送不出去,或用time.sleep代替


3.发送或接收消息解析

消费者端接收消息如下:

ConsumerRecord(topic='my_topic', partition=0, offset=4, timestamp=1529569531392, timestamp_type=0, key=b'my_value', value=None, checksum=None, serialized_key_size=8, serialized_value_size=-1)
topicpartitionoffset : 这条消息的偏移量timestamp : 时间戳timestamp_type : 时间戳类型key : key值,字节类型value : value值,字节类型checksum : 消息的校验和serialized_key_size : 序列化key的大小serialized_value_size : 序列化value的大小,可以看到value=None时,大小为-1


KafkaConsumer

手动分配partitionfrom kafka import KafkaConsumerfrom kafka import TopicPartitionconsumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'])consumer.assign([TopicPartition(topic= 'my_topic', partition= )])for msg in consumer:
print(msg)


超时处理from kafka import KafkaConsumerconsumer = KafkaConsumer('my_topic', group_id= 'group2', bootstrap_servers= ['localhost:9092'], consumer_timeout_ms=1000)for msg in consumer:
print(msg)

若不指定 consumer_timeout_ms,默认一直循环等待接收,若指定卖二手手机号平台,则超时返回,不再等待

consumer_timeout_ms : 毫秒数


订阅多个topicfrom kafka import KafkaConsumerconsumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'])consumer.subscribe(topics= ['my_topic', 'topic_1'])for msg in consumer:
print(msg)

可同时接收多个topic消息

也可用正则订阅一类topic

from kafka import KafkaConsumerimport jsonconsumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('ascii')))consumer.subscribe(pattern= '^my.*')for msg in consumer:
print(msg)


解码json数据

编码(生产者):value_serializer

解码(消费者):value_deserializer

1.先看producer发送的json数据

from kafka import KafkaProducerimport jsonproducer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda m: json.dumps(m).encode('ascii'))future = producer.send('my_topic' , value= {'value_1' : 'value_2'}, partition= )future.get(timeout= 10)

2.consumer没有解码收到的数据

ConsumerRecord(topic='my_topic', partition=0, offset=22, timestamp=1529575016310, timestamp_type=0, key=None, value=b'{"value_1": "value_2"}', checksum=None, serialized_key_size=-1, serialized_value_size=22)

可以看到value为原始的json字节数据,接下来可以再做一步解码操作

3.consumer自动解码

from kafka import KafkaConsumerimport jsonconsumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('ascii')))consumer.subscribe(topics= ['my_topic', 'topic_1'])for msg in consumer:
print(msg)

接收结果:

ConsumerRecord(topic='my_topic', partition=0, offset=23, timestamp=1529575235994, timestamp_type=0, key=None, value={'value_1': 'value_2'}, checksum=None, serialized_key_size=-1, serialized_value_size=22)
可以看到接收结果中,value已经自动解码,并为字符串类型不仅value可以json,key也可以,只需指定 key_deserializer


KafkaProducer

发送字符串类型的key和valuefrom kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=['localhost:9092'],key_serializer= str.encode, value_serializer= str.encode)future = producer.send('my_topic' , key= 'key_3', value= 'value_3', partition= )future.get(timeout= 10)

指定 key_serializer 和 value_serializer 为 str.encode,但消费者收到的还是字节字符串

若想要消费者收到的为字符串类型,就需要解码操作,key_deserializer= bytes.decode

from kafka import KafkaConsumerconsumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'], key_deserializer= bytes.decode, value_deserializer= bytes.decode)consumer.subscribe(pattern= '^my.*')for msg in consumer:
print(msg)


可压缩消息发送

compression_type='gzip'

若消息过大,还可压缩消息发送,可选值为 ‘gzip’, ‘snappy’, ‘lz4’, or None

from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=['localhost:9092'], compression_type='gzip')future = producer.send('my_topic' , key= b'key_3', value= b'value_3', partition= )future.get(timeout= 10)


发送msgpack消息

msgpack为MessagePack的简称,是高效二进制序列化类库,比json高效

producer = KafkaProducer(value_serializer=msgpack.dumps)producer.send('msgpack-topic', {'key': 'value'})


更多相关文章

  1. Kafka 常见面试题
  2. 阅读代码:Spark 与 Flink 中的 RPC 实现
  3. linux上mongodb的安装与卸载
  4. Spring 响应式编程 随记 -- C1 为什么选择响应式 Spring
  5. ElasticSearch API & 文档 curd 操作
  6. RabbitMQ 高可用之如何确保消息成功消费
  7. php一招搞定替换指定字符(大小写不敏感)
  8. 详细介绍php替换指定字符的方法(大小写敏感)
  9. PHP如何将指定字符串后几位字符转为大写其余不变

随机推荐

  1. EditText属性简介
  2. Android与webview JS 键值编码差异
  3. Android Studio快捷键、配置 Android Stu
  4. Android NKD环境搭建 若干问题.
  5. Android share绘制虚线在手机上显示实线
  6. Linux Kernel and Android休眠与唤醒
  7. Using Ant to Automate Building Android
  8. 前言:Android进阶汇总(持续更新)
  9. 【Android】入门级连接网络示例: 网页浏览
  10. Android 学习记录-ImageView显示格式