大数据基础系列之kafka011生产者缓存超时,幂等性和事务实现

 浪尖 浪尖聊大数据

一,demo及相关类

1,基本介绍

KafkaProducer是线程安全的,多线程间共享一个实例比共享多个实例更加高效。首先搞一个demo

Properties props = new Properties();
props.put("bootstrap.servers""localhost:9092");
props.put("acks""all");
props.put("retries"0);
props.put("batch.size"16384);
props.put("linger.ms"1);
props.put("buffer.memory"33554432);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

Producer<StringString> producer = new KafkaProducer<>(props);
for (int i = 0i < 100i++)
producer.send(new ProducerRecord<StringString>("my-topic",Integer.toString(i)Integer.toString(i)));

producer.close();

2ProducerRecord

发往kafkakey/value对。由topic,分区id(可选)key(可选)timestamp(可选)value组成。

如果一个有效的分区ID被指定,Record就会被发到指定的分区。如果,没指定分区id,只指定了key,就会按照keyhash后对分区数取余得到的数值作为分区的id。如果分区id,和key都没有指定,就会以轮训的形式发送Records

Record还有一个timestamp属性。如果用户没有提供timestamp,生产者将会使用当前时间作为RecordtimestampKafka最终使用的时间戳取决于topic配置的时间类型。

1),如果topic配置使用了CreateTimeBroker就会使用生产者生产Record时带的时间戳。

2),如果topic配置使用了LogAppendTimeRecord追加到log的时候,Broker会有本地时间代替Producer生产时带的时间戳。

无论是采用的上文中的哪种形式,timestamp都会被包含在RecordMetadata中返回。

ProducerRecord(String topicInteger partitionK keyV value)
Creates a record to be sent to a specified topic and partition
  ProducerRecord(String topicInteger partitionK keyV value,Iterable<Header> headers)
Creates a record to be sent to a specified topic and partition
  ProducerRecord(String topicInteger partition, Long timestampK keyV value)
Creates a record with a specified timestamp to be sent to a specified topic and partition
  ProducerRecord(String topicInteger partition, Long timestampK keyV valueIterable<Header> headers)
Creates a record with a specified timestamp to be sent to a specified topic and partition
  ProducerRecord(String topicK keyV value)
Create a record to be sent to Kafka
  ProducerRecord(String topicV value)
Create a record with no key

二,缓存和超时

生产者内部有一个buffer,用来缓存Record,同时内部有一个后台线程负责将Record转化为请求,然后将请求发给kafka集群。使用生产者后未关闭,会导致这些资源泄漏。

send方法是异步的。调用他实际上是将Record添加到Buffer中,然后立即返回。这使得生产者可以批量提交消息来提升性能。

acks配置控制发送请求完成的标准。如果设置成all,将会导致生产阻塞,等待所有副本提交日志成功后才算发送完成,超级低效但是可以最大限度的容错。

如果请求失败,生产者会自动尝试,前提是不要设置retries为零。当然,开启失败尝试也就意味着带来了数据重复发送的风险。

生产者为每个分区维护一个buffer,这个buffer的大小由batch.size指定,该值越大表示批量发送的消息数越多,也意味着需要更大的内存。内存数可以估计的。

默认情况下,即使buffer还有剩余的空间没有填充,消息也会被立即发送。如果你想减少请求的次数,可以设置linger.ms参数为大于0的某一值。使生产者发送消息前等待linger.ms指定的时间,这样就可以有更多的消息加入到该batch来。这很像TCP中的Nagle原理。例如,在上面的代码片段中,由于我们设置linger.ms1ms100条消息可能在一次请求中全部发送到了Server端。然而,这也意味着加入消息一直不能填充满buffer,我们要延迟一毫秒。

buffer.memory决定者生产者所能用于buffer的总内存大小。如果,消息发送的速度比传输到Server端的速度快,这个buffer空间就会耗尽。当buffer空间耗尽,send调用就会阻塞,超过max.block.ms设置的超时时间后会抛出TimeoutException

三,序列化

Key.serializervalue.serialize决定者如何将keyvalue对象转化为字节数组。你可以使用包括bytearrayserializerstringserializer简单的字符串或字节类型。也可以实现自定义的序列化方式。

四,幂等性

kafka0.11版本开始,Kafka支持两种额外的模式:幂等性生产者和事务生产者。幂等性强化消息的传递语义,从至少一次到仅仅一次。特别是生产者重试将不再导致消息重复发送。事务生产者允许应用程序将消息原子的发送到多个分区(和主题!)。

设置enable.idempotencetrue来开启幂等性,如果设置了这个参数retries配置将会被设置为默认值,也即Integer.MAX_VALUEmax.inflight.requests.per.connection会被设置为1acks会被设置为all。幂等性生产者不需要修改API,所以现有的应用程序不需要修改就可以使用该特性。

为了利用幂等生产者,必须避免应用程序级重新发送,因为这些不能被去重。例如,如果应用程序运行幂等性,建议不要设置retries,因为他会被设置为默认值(Integer.MAX_VALUE).此外,如果sendproducerrecord)返回一个错误甚至无限重试(例如,如果消息送前缓冲区满了),建议关闭生产和检查最后产生消息的内容以确保不重复。

五,事务

为了使用事务生产者和相关的APIs,必须要设置transactional.id属性.如果设置了transactional.id幂等性会自动被启用。支持事务的topic必须要进行容错配置。特别的replication.factor应该设置为3topicmin.insync.replicas配置必须设置为2.最后,为了从端到端实现事务性保证,必须配置消费者只读取committed 的消息。

transactional.id目的是单生产者实例能从多会话中恢复。该特性就是分区的,状态的应用程序程序中的一个碎片标识符。transactional.id值在一个分区的应用中每个消费者实例必须是唯一的。

所有新的事务性API都会被阻塞,将在失败时抛出异常。举一个简单的例子,一次事务中提交100条消息。

Properties props = new Properties();
props.put("bootstrap.servers""localhost:9092");
props.put("transactional.id""my-transactional-id");
Producer<StringString> producer = new KafkaProducer<>(propsnewStringSerializer()new StringSerializer());

producer.initTransactions();

try {
  producer.beginTransaction();
  for (int i = 0i < 100i++)
  producer.send(new ProducerRecord<>("my-topic"Integer.toString(i),Integer.toString(i)));
  producer.commitTransaction();
catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
  // We can't recover from these exceptions, so our only option is to close the producer and exit.
  producer.close();
catch (KafkaException e) {
  // For all other exceptions, just abort the transaction and try again.
  producer.abortTransaction();
}
producer.close();

就如例子一样,每个消费者只能有一个事务开启。在beginTransaction() commitTransaction()中间发送的所有消息,都是一次事务的一部分。

事务生产者使用execeptions进行错误状态交流。特别之处,我们不需要为producer.send指定回调函数。任何在事务中不可恢复的错误发生都会抛出一个KafkaException异常(http://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send(org.apache.kafka.clients.producer.ProducerRecord))

在接受到一个kafkaexection异常之后,通过调用producer.abortTransaction(),可以保证所有的已经写入成功的消息会被标记为aborted,因此保证事务传输。

六,总结

本文主要是阐述缓存和超时机制,序列化及反序列化,幂等性生产者,事务生产者。大家可以根据需要进行选择.


©著作权归作者所有:来自51CTO博客作者mob604756ed02fe的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. Kafka源码系列之kafka如何实现高性能读写的
  2. Kafka原理详解
  3. FlinkSQL演进过程,解析原理及一些优化策略
  4. 一文读懂数据湖及企业中的架构特点
  5. kafka|使用Interceptors实现消息端到端跟踪
  6. 干货--部署RocketMQ
  7. 消息处理
  8. Kafka评传——从kafka的消息生命周期引出的沉思
  9. kafka架构

随机推荐

  1. Android图像开源视图:SmartImageView
  2. android ImageView android:adjustViewBo
  3. 系出名门Android(7) - 控件(View)之ZoomC
  4. Android中Parcelable接口的使用
  5. Android gdb调试
  6. ndk下使用sqlite
  7. Android Handler 消息机制原理解析
  8. android 2.3 r1 中文 api (58) —— TabH
  9. Android Handler机制1--ThreadLocal
  10. [入门八]Android的应用程序框架