Datax 很方便扩展插件,我们通过实现一个简单的kafka writer 来看一下如何新增插件
1-扩展插件之前,推荐大家先阅读Datax的官方文档
https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md
我们会看到Datax几个核心的概念:
Job: Job是DataX用以描述从一个源头到一个目的端的同步作业,是DataX数据同步的最小业务单元。比如:从一张mysql的表同步到odps的一个表的特定分区。 Task: Task是为最大化而把Job拆分得到的最小执行单元。比如:读一张有1024个分表的mysql分库分表的Job,拆分成1024个读Task,用若干个并发执行。 TaskGroup: 描述的是一组Task集合。在同一个TaskGroupContainer执行下的Task集合称之为TaskGroup JobContainer: Job执行器,负责Job全局拆分、调度、前置语句和后置语句等工作的工作单元。类似Yarn中的JobTracker TaskGroupContainer: TaskGroup执行器,负责执行一组Task的工作单元,类似Yarn中的TaskTracker。
简而言之, Job拆分成Task,在分别在框架提供的容器中执行,插件只需要实现Job和Task两部分逻辑。
物理执行模型
框架为插件提供物理上的执行能力(线程)。DataX框架有三种运行模式: Standalone: 单进程运行,没有外部依赖。 Local: 单进程运行,统计信息、错误信息汇报到集中存储。 Distrubuted: 分布式多进程运行,依赖DataX Service服务。 当然,上述三种模式对插件的编写而言没有什么区别,你只需要避开一些小错误, 插件就能够在单机/分布式之间无缝切换了。 当JobContainer和TaskGroupContainer运行在同一个进程内时,就是单机模式(Standalone和Local); 当它们分布在不同的进程中执行时,就是分布式(Distributed)模式。 备注:开源的DATAX是没有分布式功能的,大家可以按照上述的思想,自己研究开发。
2 -创建kafkawriter模块
<module>kafkawriter</module>
引入kafkawriter所需要的依赖
因为kafka版本比较多,这里选用0.9.0.1这个版本。
<kafka.version>0.9.0.1</kafka.version>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>当然插件还要引入框架核心的一些依赖
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>16.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
接下来要配置Datax打包的一些配置
首先我们要了解Datax打包的方式:Maven-assembly-plugin 1、作用:要想将写的程序和它本身所依赖的jar包一起build到一个包里,是maven中针对打包任务而提供的标准插件。 2、其他作用: 1)提供一个把工程依赖元素、模块、网站文档等其他文件存放到单个归档文件里。 2)打包成指定格式分发包,支持各种主流的格式如zip、tar.gz、jar和war等,具体打包哪些文件是高度可控的。 3)能够自定义包含/排除指定的目录或文件。 总体来说,实现插件maven-assembly-plugin需要两个步骤: 第1步骤:pom.xml文件里配置maven-assembly-plugin,指定描述文件 第2步骤:描述文件配置具体参数
在kafkawriter的pom文件中需要新增
<plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptors> <!--描述文件路径--> <descriptor>src/main/assembly/package.xml</descriptor> </descriptors> <finalName>datax</finalName> </configuration> <executions> <execution> <id>dwzip</id> <phase>package</phase> <!-- 绑定到package生命周期阶段上 --> <goals> <goal>single</goal> <!-- 只运行一次 --> </goals> </execution> </executions> </plugin>
kafkawriter中的package.xml我们的配置:
<id></id> <formats> <format>dir</format> </formats> <includeBaseDirectory>false</includeBaseDirectory> <fileSets> <fileSet> <directory>src/main/resources</directory> <includes> <include>plugin.json</include> </includes> <outputDirectory>plugin/writer/kafkawriter</outputDirectory> </fileSet> <fileSet> <directory>target/</directory> <includes> <include>kafkawriter-0.0.1-SNAPSHOT.jar</include> </includes> <outputDirectory>plugin/writer/kafkawriter</outputDirectory> </fileSet> </fileSets> <dependencySets> <dependencySet> <useProjectArtifact>false</useProjectArtifact> <outputDirectory>plugin/writer/kafkawriter/libs</outputDirectory> <scope>runtime</scope> </dependencySet> </dependencySets>
1-formats标签 maven-assembly-plugin 支持的打包格式有zip、tar、tar.gz (or tgz)、tar.bz2 (or tbz2)、jar、dir、war,可以同时指定多个打包格式 <formats> <format>dir</format> ...可配置多个 </formats> 2-dependencySets标签: 用来定制工程依赖 jar 包的打包方式 3-fileSets 管理一组文件的存放位置
3-模块创建和配置完成后,接下来进入代码的开发
开发之前我们要考虑,如果做一个kafkawriter需要哪些参数?
1-topic 主题,我们要往哪个主题写入消息 2-kafka broker 地址 3-分隔符,每行消息的分隔符
代码
package com.alibaba.datax.plugin.writer.kafkawriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
* @author dalizu on 2018/11/7.
* @version v1.0
* @desc
*/
public class KafkaWriter extends Writer {
public static class Job extends Writer.Job {
private static final Logger log = LoggerFactory.getLogger(Job.class);
private Configuration conf = null;
@Override
public void init() {
this.conf = super.getPluginJobConf();//获取配置文件信息{parameter 里面的参数}
log.info("kafka writer params:{}", conf.toJSON());
//校验 参数配置
this.validateParameter();
}
private void validateParameter() {
//toipc 必须填
this.conf
.getNecessaryValue(
Key.TOPIC,
KafkaWriterErrorCode.REQUIRED_VALUE);
this.conf
.getNecessaryValue(
Key.BOOTSTRAP_SERVERS,
KafkaWriterErrorCode.REQUIRED_VALUE);
}
@Override
public void prepare() {
}
@Override
public List<Configuration> split(int mandatoryNumber) {
//按照reader 配置文件的格式 来 组织相同个数的writer配置文件
List<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);
for (int i = 0; i < mandatoryNumber; i++) {
configurations.add(conf);
}
return configurations;
}
@Override
public void post() {
}
@Override
public void destroy() {
}
}
public static class Task extends Writer.Task {
private static final Logger log = LoggerFactory.getLogger(Task.class);
private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n");
private Producer<String, String> producer;
private String fieldDelimiter;
private Configuration conf;
@Override
public void init() {
this.conf = super.getPluginJobConf();
fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null);
//初始化kafka
Properties props = new Properties();
props.put("bootstrap.servers", conf.getString(Key.BOOTSTRAP_SERVERS));
props.put("acks", "all");//这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。
props.put("retries", 0);
// Controls how much bytes sender would wait to batch up before publishing to Kafka.
//控制发送者在发布到kafka之前等待批处理的字节数。
//控制发送者在发布到kafka之前等待批处理的字节数。 满足batch.size和ling.ms之一,producer便开始发送消息
//默认16384 16kb
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer(props);
}
@Override
public void startWrite(RecordReceiver lineReceiver) {
log.info("start to writer kafka");
Record record = null;
while ((record = lineReceiver.getFromReader()) != null) {//说明还在读取数据,或者读取的数据没处理完
//获取一行数据,按照指定分隔符 拼成字符串 发送出去
producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),
recordToString(record), recordToString(record)));
}
}
@Override
public void destroy() {
if (producer != null) {
producer.close();
}
}
private String recordToString(Record record) {
int recordLength = record.getColumnNumber();
if (0 == recordLength) {
return NEWLINE_FLAG;
}
Column column;
StringBuilder sb = new StringBuilder();
for (int i = 0; i < recordLength; i++) {
column = record.getColumn(i);
sb.append(column.asString()).append(fieldDelimiter);
}
sb.setLength(sb.length() - 1);
sb.append(NEWLINE_FLAG);
return sb.toString();
}
}
}4-代码开发完成后需要配置plugin.json
{
"name": "kafkawriter",
"class": "com.alibaba.datax.plugin.writer.kafkawriter.KafkaWriter",
"description": "简单插件,有待测试验证. 原理: TODO",
"developer": "lizu"
}让框架可以加载此插件
还需要在Datax目录下的package.xml下面新增我们新开发的插件
<fileSet> <directory>kafkawriter/target/datax/</directory> <includes> <include>**/*.*</include> </includes> <outputDirectory>datax</outputDirectory> </fileSet> 打包准备测试:mvn -U clean package assembly:assembly -Dmaven.test.skip=true
5-测试Mysql-->kafka
配置文件:
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root123",
"connection": [
{
"querySql": [
"select * from user;"
],
"jdbcUrl": [
"jdbc:mysql://localhost:3306/datax"
]
}
]
}
},
"writer": {
"name": "kafkawriter",
"parameter": {
"topic": "test-topic",
"bootstrapServers": "192.168.88.129:9092",
"fieldDelimiter":","
}
}
}
]
}
}可以去kafka 命令行 启动一个消费者 查看是否有消息。
DATAX日志: 任务启动时刻 : 2018-11-08 11:38:37 任务结束时刻 : 2018-11-08 11:38:57 任务总计耗时 : 20s 任务平均流量 : 470.20KB/s 记录写入速度 : 5867rec/s 读出记录总数 : 117358 读写失败总数 : 0
我是用代码写的消费者去消费,并且把offset保存到了本地文件
117358
和日志传输数量完全一样
消费端控制台:
... offset = 58538, key = 224928,99991285,99991285,99991285,991285,null,991285,哈哈哈哈哈哈哈哈哈哈哈哈哈哈,2018-09-20 15:30:44,10.10.10.1 , value = 224928,99991285,99991285,99991285,991285,null,991285,哈哈哈哈哈哈哈哈哈哈哈哈哈哈,2018-09-20 15:30:44,10.10.10.1 offset = 58539, key = 224929,99991311,99991311,99991311,991311,null,991311,哈哈哈哈哈哈哈哈哈哈哈哈哈哈,2018-09-20 15:30:44,10.10.10.1 , value = 224929,99991311,99991311,99991311,991311,null,991311,哈哈哈哈哈哈哈哈哈哈哈哈哈哈,2018-09-20 15:30:44,10.10.10.1 ......
到此一个简单的kafkawriter开发完成
如果需要完整的插件代码或者消费者代码,可以留言。
點擊查看更多內容
5人點贊
評論
評論
共同學習,寫下你的評論
評論加載中...
作者其他優質文章
正在加載中
感謝您的支持,我會繼續努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦