经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Apache Kafka » 查看文章
Kafka 2.3 Producer (0.9以后版本适用)
来源:cnblogs  作者:独孤风  时间:2019/8/21 10:31:57  对本文有异议

kafka0.9版本以后用java重新编写了producer,废除了原来scala编写的版本。

这里直接使用最新2.3版本,0.9以后的版本都适用。

注意引用的包为:org.apache.kafka.clients.producer

  1. import java.util.Properties;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. public class ProducerDemo {
  5. public static void main(String[] args) {
  6. Properties properties = new Properties();
  7. properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
  8. properties.put("acks", "all");
  9. properties.put("retries", 0);
  10. properties.put("batch.size", 16384);
  11. properties.put("linger.ms", 1);
  12. properties.put("buffer.memory", 33554432);
  13. properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  14. properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  15. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
  16. kafkaProducer.send(new ProducerRecord<>("topic", "value"));
  17. kafkaProducer.close();
  18. }
  19. }

0.11.0以后增加了事务,事务producer的示例代码如下,需要适用于0.11.0以后的版本:

  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.Producer;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.apache.kafka.common.KafkaException;
  5. import org.apache.kafka.common.errors.AuthorizationException;
  6. import org.apache.kafka.common.errors.OutOfOrderSequenceException;
  7. import org.apache.kafka.common.errors.ProducerFencedException;
  8. import org.apache.kafka.common.serialization.StringSerializer;
  9. import java.util.Properties;
  10. public class TransactionsProducerDemo {
  11. public static void main(String[] args) {
  12. Properties props = new Properties();
  13. props.put("bootstrap.servers", "localhost:9092");
  14. props.put("transactional.id", "my-transactional-id");
  15. Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
  16. producer.initTransactions();
  17. try {
  18. producer.beginTransaction();
  19. for (int i = 0; i < 100; i++)
  20. producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
  21. producer.commitTransaction();
  22. } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
  23. // We can't recover from these exceptions, so our only option is to close the producer and exit.
  24. producer.close();
  25. } catch (KafkaException e) {
  26. // For all other exceptions, just abort the transaction and try again.
  27. producer.abortTransaction();
  28. }
  29. producer.close();
  30. }
  31. }

更多实时计算,Kafka等相关技术博文,欢迎关注实时流式计算

原文链接:http://www.cnblogs.com/tree1123/p/11386944.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号