经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Apache Kafka » 查看文章
异源数据同步 → DataX 为什么要支持 kafka?
来源:cnblogs  作者:青石路  时间:2024/8/26 9:16:15  对本文有异议

开心一刻

昨天发了一条朋友圈:酒吧有什么好去的,上个月在酒吧当服务员兼职,一位大姐看上了我,说一个月给我 10 万,要我陪她去上海,我没同意

朋友评论道:你没同意,为什么在上海?

我回复到:上个月没同意

嘴真硬

前情回顾

关于 DataX,官网有很详细的介绍,鄙人不才,也写过几篇文章

异构数据源同步之数据同步 → datax 改造,有点意思

异构数据源同步之数据同步 → datax 再改造,开始触及源码

异构数据源同步之数据同步 → DataX 使用细节

异构数据源数据同步 → 从源码分析 DataX 敏感信息的加解密

不了解的小伙伴可以按需去查看,所以了,DataX 就不做过多介绍了;官方提供了非常多的插件,囊括了绝大部分的数据源,基本可以满足我们日常需要,但数据源种类太多,DataX 插件不可能包含全部,比如 kafka,DataX 官方是没有提供读写插件的,大家知道为什么吗?你们如果对数据同步了解的比较多的话,一看到 kafka,第一反应往往想到的是 实时同步,而 DataX 针对的是 离线同步,所以 DataX 官方没提供 kafka 插件是不是也就能理解了?因为不合适嘛!

但如果客户非要离线同步也支持 kafka

人家要嘛

你能怎么办?直接怼过去:实现不了?

实现不了

所以没得选,那就只能给 DataX 开发一套 kafka 插件了;基于 DataX插件开发宝典,插件开发起来还是非常简单的

kafkawriter

  1. 编程接口

    自定义 Kafkawriter 继承 DataX 的 Writer,实现 job、task 对应的接口即可

    1. /**
    2. * @author 青石路
    3. */
    4. public class KafkaWriter extends Writer {
    5. public static class Job extends Writer.Job {
    6. private Configuration conf = null;
    7. @Override
    8. public List<Configuration> split(int mandatoryNumber) {
    9. List<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);
    10. for (int i = 0; i < mandatoryNumber; i++) {
    11. configurations.add(this.conf.clone());
    12. }
    13. return configurations;
    14. }
    15. private void validateParameter() {
    16. this.conf.getNecessaryValue(Key.BOOTSTRAP_SERVERS, KafkaWriterErrorCode.REQUIRED_VALUE);
    17. this.conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);
    18. }
    19. @Override
    20. public void init() {
    21. this.conf = super.getPluginJobConf();
    22. this.validateParameter();
    23. }
    24. @Override
    25. public void destroy() {
    26. }
    27. }
    28. public static class Task extends Writer.Task {
    29. private static final Logger logger = LoggerFactory.getLogger(Task.class);
    30. private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n");
    31. private Producer<String, String> producer;
    32. private Configuration conf;
    33. private Properties props;
    34. private String fieldDelimiter;
    35. private List<String> columns;
    36. private String writeType;
    37. @Override
    38. public void init() {
    39. this.conf = super.getPluginJobConf();
    40. fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null);
    41. columns = conf.getList(Key.COLUMN, String.class);
    42. writeType = conf.getUnnecessaryValue(Key.WRITE_TYPE, WriteType.TEXT.name(), null);
    43. if (CollUtil.isEmpty(columns)) {
    44. throw DataXException.asDataXException(KafkaWriterErrorCode.REQUIRED_VALUE,
    45. String.format("您提供配置文件有误,[%s]是必填参数,不允许为空或者留白 .", Key.COLUMN));
    46. }
    47. props = new Properties();
    48. props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, conf.getString(Key.BOOTSTRAP_SERVERS));
    49. //这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。
    50. props.put(ProducerConfig.ACKS_CONFIG, conf.getUnnecessaryValue(Key.ACK, "0", null));
    51. props.put(CommonClientConfigs.RETRIES_CONFIG, conf.getUnnecessaryValue(Key.RETRIES, "0", null));
    52. props.put(ProducerConfig.BATCH_SIZE_CONFIG, conf.getUnnecessaryValue(Key.BATCH_SIZE, "16384", null));
    53. props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    54. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));
    55. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));
    56. Configuration saslConf = conf.getConfiguration(Key.SASL);
    57. if (ObjUtil.isNotNull(saslConf)) {
    58. logger.info("配置启用了SASL认证");
    59. props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslConf.getNecessaryValue(Key.SASL_SECURITY_PROTOCOL, KafkaWriterErrorCode.REQUIRED_VALUE));
    60. props.put(SaslConfigs.SASL_MECHANISM, saslConf.getNecessaryValue(Key.SASL_MECHANISM, KafkaWriterErrorCode.REQUIRED_VALUE));
    61. String userName = saslConf.getNecessaryValue(Key.SASL_USERNAME, KafkaWriterErrorCode.REQUIRED_VALUE);
    62. String password = saslConf.getNecessaryValue(Key.SASL_PASSWORD, KafkaWriterErrorCode.REQUIRED_VALUE);
    63. props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", userName, password));
    64. }
    65. producer = new KafkaProducer<String, String>(props);
    66. }
    67. @Override
    68. public void prepare() {
    69. if (Boolean.parseBoolean(conf.getUnnecessaryValue(Key.NO_TOPIC_CREATE, "false", null))) {
    70. ListTopicsResult topicsResult = AdminClient.create(props).listTopics();
    71. String topic = conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);
    72. try {
    73. if (!topicsResult.names().get().contains(topic)) {
    74. new NewTopic(
    75. topic,
    76. Integer.parseInt(conf.getUnnecessaryValue(Key.TOPIC_NUM_PARTITION, "1", null)),
    77. Short.parseShort(conf.getUnnecessaryValue(Key.TOPIC_REPLICATION_FACTOR, "1", null))
    78. );
    79. List<NewTopic> newTopics = new ArrayList<NewTopic>();
    80. AdminClient.create(props).createTopics(newTopics);
    81. }
    82. } catch (Exception e) {
    83. throw new DataXException(KafkaWriterErrorCode.CREATE_TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE.getDescription());
    84. }
    85. }
    86. }
    87. @Override
    88. public void startWrite(RecordReceiver lineReceiver) {
    89. logger.info("start to writer kafka");
    90. Record record = null;
    91. while ((record = lineReceiver.getFromReader()) != null) {//说明还在读取数据,或者读取的数据没处理完
    92. //获取一行数据,按照指定分隔符 拼成字符串 发送出去
    93. if (writeType.equalsIgnoreCase(WriteType.TEXT.name())) {
    94. producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),
    95. recordToString(record),
    96. recordToString(record))
    97. );
    98. } else if (writeType.equalsIgnoreCase(WriteType.JSON.name())) {
    99. producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),
    100. recordToString(record),
    101. recordToKafkaJson(record))
    102. );
    103. }
    104. producer.flush();
    105. }
    106. }
    107. @Override
    108. public void destroy() {
    109. logger.info("producer close");
    110. if (producer != null) {
    111. producer.close();
    112. }
    113. }
    114. /**
    115. * 数据格式化
    116. *
    117. * @param record
    118. * @return
    119. */
    120. private String recordToString(Record record) {
    121. int recordLength = record.getColumnNumber();
    122. if (0 == recordLength) {
    123. return NEWLINE_FLAG;
    124. }
    125. Column column;
    126. StringBuilder sb = new StringBuilder();
    127. for (int i = 0; i < recordLength; i++) {
    128. column = record.getColumn(i);
    129. sb.append(column.asString()).append(fieldDelimiter);
    130. }
    131. sb.setLength(sb.length() - 1);
    132. sb.append(NEWLINE_FLAG);
    133. return sb.toString();
    134. }
    135. private String recordToKafkaJson(Record record) {
    136. int recordLength = record.getColumnNumber();
    137. if (recordLength != columns.size()) {
    138. throw DataXException.asDataXException(KafkaWriterErrorCode.ILLEGAL_PARAM,
    139. String.format("您提供配置文件有误,列数不匹配[record columns=%d, writer columns=%d]", recordLength, columns.size()));
    140. }
    141. List<KafkaColumn> kafkaColumns = new ArrayList<>();
    142. for (int i = 0; i < recordLength; i++) {
    143. KafkaColumn column = new KafkaColumn(record.getColumn(i), columns.get(i));
    144. kafkaColumns.add(column);
    145. }
    146. return JSONUtil.toJsonStr(kafkaColumns);
    147. }
    148. }
    149. }

    DataX 框架按照如下的顺序执行 Job 和 Task 的接口

    job_task 接口执行顺序

    重点看 Task 的接口实现

    • init:读取配置项,然后创建 Producer 实例

    • prepare:判断 Topic 是否存在,不存在则创建

    • startWrite:通过 RecordReceiver 从 Channel 获取 Record,然后写入 Topic

      支持两种写入格式:textjson,细节请看下文中的 kafkawriter.md

    • destroy:关闭 Producer 实例

    实现不难,相信大家都能看懂

  2. 插件定义

    resources 下新增 plugin.json

    1. {
    2. "name": "kafkawriter",
    3. "class": "com.qsl.datax.plugin.writer.kafkawriter.KafkaWriter",
    4. "description": "write data to kafka",
    5. "developer": "qsl"
    6. }

    强调下 class,是 KafkaWriter 的全限定类名,如果你们没有完全拷贝我的,那么要改成你们自己的

  3. 配置文件

    resources 下新增 plugin_job_template.json

    1. {
    2. "name": "kafkawriter",
    3. "parameter": {
    4. "bootstrapServers": "",
    5. "topic": "",
    6. "ack": "all",
    7. "batchSize": 1000,
    8. "retries": 0,
    9. "fieldDelimiter": ",",
    10. "writeType": "json",
    11. "column": [
    12. "const_id",
    13. "const_field",
    14. "const_field_value"
    15. ],
    16. "sasl": {
    17. "securityProtocol": "SASL_PLAINTEXT",
    18. "mechanism": "PLAIN",
    19. "username": "",
    20. "password": ""
    21. }
    22. }
    23. }

    配置项说明:kafkawriter.md

  4. 打包发布

    可以参考官方的 assembly 配置,利用 assembly 来打包

至此,kafkawriter 就算完成了

kafkareader

  1. 编程接口

    自定义 Kafkareader 继承 DataX 的 Reader,实现 job、task 对应的接口即可

    1. /**
    2. * @author 青石路
    3. */
    4. public class KafkaReader extends Reader {
    5. public static class Job extends Reader.Job {
    6. private Configuration originalConfig = null;
    7. @Override
    8. public void init() {
    9. this.originalConfig = super.getPluginJobConf();
    10. this.validateParameter();
    11. }
    12. @Override
    13. public void destroy() {
    14. }
    15. @Override
    16. public List<Configuration> split(int adviceNumber) {
    17. List<Configuration> configurations = new ArrayList<>(adviceNumber);
    18. for (int i=0; i<adviceNumber; i++) {
    19. configurations.add(this.originalConfig.clone());
    20. }
    21. return configurations;
    22. }
    23. private void validateParameter() {
    24. this.originalConfig.getNecessaryValue(Key.BOOTSTRAP_SERVERS, KafkaReaderErrorCode.REQUIRED_VALUE);
    25. this.originalConfig.getNecessaryValue(Key.TOPIC, KafkaReaderErrorCode.REQUIRED_VALUE);
    26. }
    27. }
    28. public static class Task extends Reader.Task {
    29. private static final Logger logger = LoggerFactory.getLogger(Task.class);
    30. private Consumer<String, String> consumer;
    31. private String topic;
    32. private Configuration conf;
    33. private int maxPollRecords;
    34. private String fieldDelimiter;
    35. private String readType;
    36. private List<Column.Type> columnTypes;
    37. @Override
    38. public void destroy() {
    39. logger.info("consumer close");
    40. if (Objects.nonNull(consumer)) {
    41. consumer.close();
    42. }
    43. }
    44. @Override
    45. public void init() {
    46. this.conf = super.getPluginJobConf();
    47. this.topic = conf.getString(Key.TOPIC);
    48. this.maxPollRecords = conf.getInt(Key.MAX_POLL_RECORDS, 500);
    49. fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null);
    50. readType = conf.getUnnecessaryValue(Key.READ_TYPE, ReadType.JSON.name(), null);
    51. if (!ReadType.JSON.name().equalsIgnoreCase(readType)
    52. && !ReadType.TEXT.name().equalsIgnoreCase(readType)) {
    53. throw DataXException.asDataXException(KafkaReaderErrorCode.REQUIRED_VALUE,
    54. String.format("您提供配置文件有误,不支持的readType[%s]", readType));
    55. }
    56. if (ReadType.JSON.name().equalsIgnoreCase(readType)) {
    57. List<String> columnTypeList = conf.getList(Key.COLUMN_TYPE, String.class);
    58. if (CollUtil.isEmpty(columnTypeList)) {
    59. throw DataXException.asDataXException(KafkaReaderErrorCode.REQUIRED_VALUE,
    60. String.format("您提供配置文件有误,readType是JSON时[%s]是必填参数,不允许为空或者留白 .", Key.COLUMN_TYPE));
    61. }
    62. convertColumnType(columnTypeList);
    63. }
    64. Properties props = new Properties();
    65. props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, conf.getString(Key.BOOTSTRAP_SERVERS));
    66. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer", null));
    67. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer", null));
    68. props.put(ConsumerConfig.GROUP_ID_CONFIG, conf.getNecessaryValue(Key.GROUP_ID, KafkaReaderErrorCode.REQUIRED_VALUE));
    69. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    70. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    71. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
    72. Configuration saslConf = conf.getConfiguration(Key.SASL);
    73. if (ObjUtil.isNotNull(saslConf)) {
    74. logger.info("配置启用了SASL认证");
    75. props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslConf.getNecessaryValue(Key.SASL_SECURITY_PROTOCOL, KafkaReaderErrorCode.REQUIRED_VALUE));
    76. props.put(SaslConfigs.SASL_MECHANISM, saslConf.getNecessaryValue(Key.SASL_MECHANISM, KafkaReaderErrorCode.REQUIRED_VALUE));
    77. String userName = saslConf.getNecessaryValue(Key.SASL_USERNAME, KafkaReaderErrorCode.REQUIRED_VALUE);
    78. String password = saslConf.getNecessaryValue(Key.SASL_PASSWORD, KafkaReaderErrorCode.REQUIRED_VALUE);
    79. props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", userName, password));
    80. }
    81. consumer = new KafkaConsumer<>(props);
    82. }
    83. @Override
    84. public void startRead(RecordSender recordSender) {
    85. consumer.subscribe(CollUtil.newArrayList(topic));
    86. int pollTimeoutMs = conf.getInt(Key.POLL_TIMEOUT_MS, 1000);
    87. int retries = conf.getInt(Key.RETRIES, 5);
    88. if (retries < 0) {
    89. logger.info("joinGroupSuccessRetries 配置有误[{}], 重置成默认值[5]", retries);
    90. retries = 5;
    91. }
    92. /**
    93. * consumer 每次都是新创建,第一次poll时会重新加入消费者组,加入过程会进行Rebalance,而 Rebalance 会导致同一 Group 内的所有消费者都不能工作
    94. * 所以 poll 拉取的过程中,即使topic中有数据也不一定能拉到,因为 consumer 正在加入消费者组中
    95. * kafka-clients 没有对应的API、事件机制来知道 consumer 成功加入消费者组的确切时间
    96. * 故增加重试
    97. */
    98. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
    99. int i = 0;
    100. if (CollUtil.isEmpty(records)) {
    101. for (; i < retries; i++) {
    102. records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
    103. logger.info("第 {} 次重试,获取消息记录数[{}]", i + 1, records.count());
    104. if (!CollUtil.isEmpty(records)) {
    105. break;
    106. }
    107. }
    108. }
    109. if (i >= retries) {
    110. logger.info("重试 {} 次后,仍未获取到消息,请确认是否有数据、配置是否正确", retries);
    111. return;
    112. }
    113. transferRecord(recordSender, records);
    114. do {
    115. records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
    116. transferRecord(recordSender, records);
    117. } while (!CollUtil.isEmpty(records) && records.count() >= maxPollRecords);
    118. }
    119. private void transferRecord(RecordSender recordSender, ConsumerRecords<String, String> records) {
    120. if (CollUtil.isEmpty(records)) {
    121. return;
    122. }
    123. for (ConsumerRecord<String, String> record : records) {
    124. Record sendRecord = recordSender.createRecord();
    125. String msgValue = record.value();
    126. if (ReadType.JSON.name().equalsIgnoreCase(readType)) {
    127. transportJsonToRecord(sendRecord, msgValue);
    128. } else if (ReadType.TEXT.name().equalsIgnoreCase(readType)) {
    129. // readType = text,全当字符串类型处理
    130. String[] columnValues = msgValue.split(fieldDelimiter);
    131. for (String columnValue : columnValues) {
    132. sendRecord.addColumn(new StringColumn(columnValue));
    133. }
    134. }
    135. recordSender.sendToWriter(sendRecord);
    136. }
    137. consumer.commitAsync();
    138. }
    139. private void convertColumnType(List<String> columnTypeList) {
    140. columnTypes = new ArrayList<>();
    141. for (String columnType : columnTypeList) {
    142. switch (columnType.toUpperCase()) {
    143. case "STRING":
    144. columnTypes.add(Column.Type.STRING);
    145. break;
    146. case "LONG":
    147. columnTypes.add(Column.Type.LONG);
    148. break;
    149. case "DOUBLE":
    150. columnTypes.add(Column.Type.DOUBLE);
    151. case "DATE":
    152. columnTypes.add(Column.Type.DATE);
    153. break;
    154. case "BOOLEAN":
    155. columnTypes.add(Column.Type.BOOL);
    156. break;
    157. case "BYTES":
    158. columnTypes.add(Column.Type.BYTES);
    159. break;
    160. default:
    161. throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM,
    162. String.format("您提供的配置文件有误,datax不支持数据类型[%s]", columnType));
    163. }
    164. }
    165. }
    166. private void transportJsonToRecord(Record sendRecord, String msgValue) {
    167. List<KafkaColumn> kafkaColumns = JSONUtil.toList(msgValue, KafkaColumn.class);
    168. if (columnTypes.size() != kafkaColumns.size()) {
    169. throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM,
    170. String.format("您提供的配置文件有误,readType是JSON时[%s列数=%d]与[json列数=%d]的数量不匹配", Key.COLUMN_TYPE, columnTypes.size(), kafkaColumns.size()));
    171. }
    172. for (int i=0; i<columnTypes.size(); i++) {
    173. KafkaColumn kafkaColumn = kafkaColumns.get(i);
    174. switch (columnTypes.get(i)) {
    175. case STRING:
    176. sendRecord.setColumn(i, new StringColumn(kafkaColumn.getColumnValue()));
    177. break;
    178. case LONG:
    179. sendRecord.setColumn(i, new LongColumn(kafkaColumn.getColumnValue()));
    180. break;
    181. case DOUBLE:
    182. sendRecord.setColumn(i, new DoubleColumn(kafkaColumn.getColumnValue()));
    183. break;
    184. case DATE:
    185. // 暂只支持时间戳
    186. sendRecord.setColumn(i, new DateColumn(Long.parseLong(kafkaColumn.getColumnValue())));
    187. break;
    188. case BOOL:
    189. sendRecord.setColumn(i, new BoolColumn(kafkaColumn.getColumnValue()));
    190. break;
    191. case BYTES:
    192. sendRecord.setColumn(i, new BytesColumn(kafkaColumn.getColumnValue().getBytes(StandardCharsets.UTF_8)));
    193. break;
    194. default:
    195. throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM,
    196. String.format("您提供的配置文件有误,datax不支持数据类型[%s]", columnTypes.get(i)));
    197. }
    198. }
    199. }
    200. }
    201. }

    重点看 Task 的接口实现

    • init:读取配置项,然后创建 Consumer 实例

    • startWrite:从 Topic 拉取数据,通过 RecordSender 写入到 Channel 中

      这里有几个细节需要注意下

      1. Consumer 每次都是新创建的,拉取数据的时候,如果消费者还未加入到指定的消费者组中,那么它会先加入到消费者组中,加入过程会进行 Rebalance,而 Rebalance 会导致同一消费者组内的所有消费者都不能工作,此时即使 Topic 中有可拉取的消息,也拉取不到消息,所以引入了重试机制来尽量保证那一次同步任务拉取的时候,消费者能正常拉取消息
      2. 一旦 Consumer 拉取到消息,则会循环拉取消息,如果某一次的拉取数据量小于最大拉取量(maxPollRecords),说明 Topic 中的消息已经被拉取完了,那么循环终止;这与常规使用(Consumer 会一直主动拉取或被动接收)是有差别的
      3. 支持两种读取格式:textjson,细节请看下文的配置文件说明
      4. 为了保证写入 Channel 数据的完整,需要配置列的数据类型(DataX 的数据类型)
    • destroy:

      关闭 Consumer 实例

  2. 插件定义

    resources 下新增 plugin.json

    1. {
    2. "name": "kafkareader",
    3. "class": "com.qsl.datax.plugin.reader.kafkareader.KafkaReader",
    4. "description": "read data from kafka",
    5. "developer": "qsl"
    6. }

    classKafkaReader 的全限定类名

  3. 配置文件

    resources 下新增 plugin_job_template.json

    1. {
    2. "name": "kafkareader",
    3. "parameter": {
    4. "bootstrapServers": "",
    5. "topic": "test-kafka",
    6. "groupId": "test1",
    7. "writeType": "json",
    8. "pollTimeoutMs": 2000,
    9. "columnType": [
    10. "LONG",
    11. "STRING",
    12. "STRING"
    13. ],
    14. "sasl": {
    15. "securityProtocol": "SASL_PLAINTEXT",
    16. "mechanism": "PLAIN",
    17. "username": "",
    18. "password": "2"
    19. }
    20. }
    21. }

    配置项说明:kafkareader.md

  4. 打包发布

    可以参考官方的 assembly 配置,利用 assembly 来打包

至此,kafkareader 也完成了

总结

  1. 完整代码:qsl-datax
  2. kafkareader 重试机制只能降低拉取不到数据的概率,并不能杜绝;另外,如果上游一直往 Topic 中发消息,kafkareader 每次拉取的数据量都等于最大拉取量,那么同步任务会一直进行而不会停止,这还是离线同步吗?
  3. 离线同步,不推荐走 kafka,因为用 kafka 走实时同步更香

原文链接:https://www.cnblogs.com/youzhibing/p/18378097

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号