编程接口
自定义 Kafkareader
继承 DataX 的 Reader
,实现 job、task 对应的接口即可
/**
* @author 青石路
*/
public class KafkaReader extends Reader {
public static class Job extends Reader.Job {
private Configuration originalConfig = null;
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
this.validateParameter();
}
@Override
public void destroy() {
}
@Override
public List<Configuration> split(int adviceNumber) {
List<Configuration> configurations = new ArrayList<>(adviceNumber);
for (int i=0; i<adviceNumber; i++) {
configurations.add(this.originalConfig.clone());
}
return configurations;
}
private void validateParameter() {
this.originalConfig.getNecessaryValue(Key.BOOTSTRAP_SERVERS, KafkaReaderErrorCode.REQUIRED_VALUE);
this.originalConfig.getNecessaryValue(Key.TOPIC, KafkaReaderErrorCode.REQUIRED_VALUE);
}
}
public static class Task extends Reader.Task {
private static final Logger logger = LoggerFactory.getLogger(Task.class);
private Consumer<String, String> consumer;
private String topic;
private Configuration conf;
private int maxPollRecords;
private String fieldDelimiter;
private String readType;
private List<Column.Type> columnTypes;
@Override
public void destroy() {
logger.info("consumer close");
if (Objects.nonNull(consumer)) {
consumer.close();
}
}
@Override
public void init() {
this.conf = super.getPluginJobConf();
this.topic = conf.getString(Key.TOPIC);
this.maxPollRecords = conf.getInt(Key.MAX_POLL_RECORDS, 500);
fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null);
readType = conf.getUnnecessaryValue(Key.READ_TYPE, ReadType.JSON.name(), null);
if (!ReadType.JSON.name().equalsIgnoreCase(readType)
&& !ReadType.TEXT.name().equalsIgnoreCase(readType)) {
throw DataXException.asDataXException(KafkaReaderErrorCode.REQUIRED_VALUE,
String.format("您提供配置文件有误,不支持的readType[%s]", readType));
}
if (ReadType.JSON.name().equalsIgnoreCase(readType)) {
List<String> columnTypeList = conf.getList(Key.COLUMN_TYPE, String.class);
if (CollUtil.isEmpty(columnTypeList)) {
throw DataXException.asDataXException(KafkaReaderErrorCode.REQUIRED_VALUE,
String.format("您提供配置文件有误,readType是JSON时[%s]是必填参数,不允许为空或者留白 .", Key.COLUMN_TYPE));
}
convertColumnType(columnTypeList);
}
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, conf.getString(Key.BOOTSTRAP_SERVERS));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer", null));
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer", null));
props.put(ConsumerConfig.GROUP_ID_CONFIG, conf.getNecessaryValue(Key.GROUP_ID, KafkaReaderErrorCode.REQUIRED_VALUE));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
Configuration saslConf = conf.getConfiguration(Key.SASL);
if (ObjUtil.isNotNull(saslConf)) {
logger.info("配置启用了SASL认证");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslConf.getNecessaryValue(Key.SASL_SECURITY_PROTOCOL, KafkaReaderErrorCode.REQUIRED_VALUE));
props.put(SaslConfigs.SASL_MECHANISM, saslConf.getNecessaryValue(Key.SASL_MECHANISM, KafkaReaderErrorCode.REQUIRED_VALUE));
String userName = saslConf.getNecessaryValue(Key.SASL_USERNAME, KafkaReaderErrorCode.REQUIRED_VALUE);
String password = saslConf.getNecessaryValue(Key.SASL_PASSWORD, KafkaReaderErrorCode.REQUIRED_VALUE);
props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", userName, password));
}
consumer = new KafkaConsumer<>(props);
}
@Override
public void startRead(RecordSender recordSender) {
consumer.subscribe(CollUtil.newArrayList(topic));
int pollTimeoutMs = conf.getInt(Key.POLL_TIMEOUT_MS, 1000);
int retries = conf.getInt(Key.RETRIES, 5);
if (retries < 0) {
logger.info("joinGroupSuccessRetries 配置有误[{}], 重置成默认值[5]", retries);
retries = 5;
}
/**
* consumer 每次都是新创建,第一次poll时会重新加入消费者组,加入过程会进行Rebalance,而 Rebalance 会导致同一 Group 内的所有消费者都不能工作
* 所以 poll 拉取的过程中,即使topic中有数据也不一定能拉到,因为 consumer 正在加入消费者组中
* kafka-clients 没有对应的API、事件机制来知道 consumer 成功加入消费者组的确切时间
* 故增加重试
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
int i = 0;
if (CollUtil.isEmpty(records)) {
for (; i < retries; i++) {
records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
logger.info("第 {} 次重试,获取消息记录数[{}]", i + 1, records.count());
if (!CollUtil.isEmpty(records)) {
break;
}
}
}
if (i >= retries) {
logger.info("重试 {} 次后,仍未获取到消息,请确认是否有数据、配置是否正确", retries);
return;
}
transferRecord(recordSender, records);
do {
records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
transferRecord(recordSender, records);
} while (!CollUtil.isEmpty(records) && records.count() >= maxPollRecords);
}
private void transferRecord(RecordSender recordSender, ConsumerRecords<String, String> records) {
if (CollUtil.isEmpty(records)) {
return;
}
for (ConsumerRecord<String, String> record : records) {
Record sendRecord = recordSender.createRecord();
String msgValue = record.value();
if (ReadType.JSON.name().equalsIgnoreCase(readType)) {
transportJsonToRecord(sendRecord, msgValue);
} else if (ReadType.TEXT.name().equalsIgnoreCase(readType)) {
// readType = text,全当字符串类型处理
String[] columnValues = msgValue.split(fieldDelimiter);
for (String columnValue : columnValues) {
sendRecord.addColumn(new StringColumn(columnValue));
}
}
recordSender.sendToWriter(sendRecord);
}
consumer.commitAsync();
}
private void convertColumnType(List<String> columnTypeList) {
columnTypes = new ArrayList<>();
for (String columnType : columnTypeList) {
switch (columnType.toUpperCase()) {
case "STRING":
columnTypes.add(Column.Type.STRING);
break;
case "LONG":
columnTypes.add(Column.Type.LONG);
break;
case "DOUBLE":
columnTypes.add(Column.Type.DOUBLE);
case "DATE":
columnTypes.add(Column.Type.DATE);
break;
case "BOOLEAN":
columnTypes.add(Column.Type.BOOL);
break;
case "BYTES":
columnTypes.add(Column.Type.BYTES);
break;
default:
throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM,
String.format("您提供的配置文件有误,datax不支持数据类型[%s]", columnType));
}
}
}
private void transportJsonToRecord(Record sendRecord, String msgValue) {
List<KafkaColumn> kafkaColumns = JSONUtil.toList(msgValue, KafkaColumn.class);
if (columnTypes.size() != kafkaColumns.size()) {
throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM,
String.format("您提供的配置文件有误,readType是JSON时[%s列数=%d]与[json列数=%d]的数量不匹配", Key.COLUMN_TYPE, columnTypes.size(), kafkaColumns.size()));
}
for (int i=0; i<columnTypes.size(); i++) {
KafkaColumn kafkaColumn = kafkaColumns.get(i);
switch (columnTypes.get(i)) {
case STRING:
sendRecord.setColumn(i, new StringColumn(kafkaColumn.getColumnValue()));
break;
case LONG:
sendRecord.setColumn(i, new LongColumn(kafkaColumn.getColumnValue()));
break;
case DOUBLE:
sendRecord.setColumn(i, new DoubleColumn(kafkaColumn.getColumnValue()));
break;
case DATE:
// 暂只支持时间戳
sendRecord.setColumn(i, new DateColumn(Long.parseLong(kafkaColumn.getColumnValue())));
break;
case BOOL:
sendRecord.setColumn(i, new BoolColumn(kafkaColumn.getColumnValue()));
break;
case BYTES:
sendRecord.setColumn(i, new BytesColumn(kafkaColumn.getColumnValue().getBytes(StandardCharsets.UTF_8)));
break;
default:
throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM,
String.format("您提供的配置文件有误,datax不支持数据类型[%s]", columnTypes.get(i)));
}
}
}
}
}
重点看 Task 的接口实现
配置文件
在 resources
下新增 plugin_job_template.json
{
"name": "kafkareader",
"parameter": {
"bootstrapServers": "",
"topic": "test-kafka",
"groupId": "test1",
"writeType": "json",
"pollTimeoutMs": 2000,
"columnType": [
"LONG",
"STRING",
"STRING"
],
"sasl": {
"securityProtocol": "SASL_PLAINTEXT",
"mechanism": "PLAIN",
"username": "",
"password": "2"
}
}
}
配置项说明:kafkareader.md