1-介绍:
上一篇文章已经介绍了Datax插件开发的完整流程。 针对Hive Reader插件的开发,我们之间从代码开始
2-实现原理
如果想要从hive表中把数据按照一列一列把数据取出来,可以使用官方的hdfsreader. 在某些时候,我们想要使用更灵活的方式,比如使用hive sql查询语句导出sql查询的数据. 实现原理: 根据配置的sql,通过将查询结果保存到一张新的临时hive表中这种方式. 然后获取新表的hdfs文件地址,然后读取文件到缓冲区,最后删除临时的表. 当然还有其他方式,比如使用hive-hcatalog,后面也会分享。
3-参数定义
* hiveSql * 描述:需要执行导出的sql,可以是多个 * 必选:是 * 默认值:无 * defaultFS * 描述:Hadoop hdfs文件系统namenode节点地址。 * 必选:是 * 默认值:无
4-环境准备
* hadoop fs -mkdir /user/datax_tmp 先创建临时目录,否则会报错(临时hive 表使用) TODO 后期在代码中处理 * 执行datax任务的机器要按照hive,并且配置好环境变量
5-代码实现
/**
* @author dalizu on 2018/11/10.
* @version v1.0
* @desc hive reder
*/
public class HiveReader {
/**
* Job 中的方法仅执行一次,Task 中方法会由框架启动多个 Task 线程并行执行。
* <p/>
* 整个 Reader 执行流程是:
* <pre>
* Job类init-->prepare-->split
*
* Task类init-->prepare-->startRead-->post-->destroy
* Task类init-->prepare-->startRead-->post-->destroy
*
* Job类post-->destroy
* </pre>
*/
public static class Job extends Reader.Job {
private static final Logger LOG = LoggerFactory
.getLogger(Job.class);
private Configuration readerOriginConfig = null;
@Override
public void init() {
LOG.info("init() begin...");
this.readerOriginConfig = super.getPluginJobConf();//获取配置文件信息{parameter 里面的参数}
this.validate();
LOG.info("init() ok and end...");
}
private void validate() {
this.readerOriginConfig.getNecessaryValue(Key.DEFAULT_FS,
HiveReaderErrorCode.DEFAULT_FS_NOT_FIND_ERROR);
List<String> sqls = this.readerOriginConfig.getList(Key.HIVE_SQL, String.class);
if (null == sqls || sqls.size() == 0) {
throw DataXException.asDataXException(
HiveReaderErrorCode.SQL_NOT_FIND_ERROR,
"您未配置hive sql");
}
}
@Override
public List<Configuration> split(int adviceNumber) {
//按照Hive sql的个数 获取配置文件的个数
LOG.info("split() begin...");
List<String> sqls = this.readerOriginConfig.getList(Key.HIVE_SQL, String.class);
List<Configuration> readerSplitConfigs = new ArrayList<Configuration>();
Configuration splitedConfig = null;
for (String querySql : sqls) {
splitedConfig = this.readerOriginConfig.clone();
splitedConfig.set(Key.HIVE_SQL, querySql);
readerSplitConfigs.add(splitedConfig);
}
return readerSplitConfigs;
}
//全局post
@Override
public void post() {
LOG.info("任务执行完毕,hive reader post");
}
@Override
public void destroy() {
}
}
public static class Task extends Reader.Task {
private static final Logger LOG = LoggerFactory
.getLogger(Task.class);
private Configuration taskConfig;
private String hiveSql;
private String tmpPath;
private String tableName;
private DFSUtil dfsUtil = null;
private HashSet<String> sourceFiles;
@Override
public void init() {
//获取配置
this.taskConfig = super.getPluginJobConf();//获取job 分割后的每一个任务单独的配置文件
this.hiveSql = taskConfig.getString(Key.HIVE_SQL);//获取hive sql
taskConfig.set(Key.FIELDDELIMITER, '\001');//设置hive 存储文件 hdfs默认的分隔符
tmpPath = Constant.TMP_PREFIX + KeyUtil.genUniqueKey();//创建临时Hive表 存储地址
LOG.info("配置分隔符后:" + this.taskConfig.toJSON());
this.dfsUtil = new DFSUtil(this.taskConfig);//初始化工具类
}
@Override
public void prepare() {
//创建临时Hive表,指定存储地址
tableName = hiveTableName();
String hiveCmd = "create table " + tableName + " LOCATION '" + tmpPath + "' as " + hiveSql;
LOG.info("hiveCmd ----> :" + hiveCmd);
//执行脚本,创建临时表
if (!ShellUtil.exec(new String[]{"hive", "-e", "\"" + hiveCmd + "\""})) {
throw DataXException.asDataXException(
HiveReaderErrorCode.SHELL_ERROR,
"创建hive临时表脚本执行失败");
}
LOG.info("创建hive 临时表结束 end!!!");
LOG.info("prepare(), start to getAllFiles...");
List<String> path = new ArrayList<String>();
path.add(tmpPath);
this.sourceFiles = dfsUtil.getAllFiles(path, Constant.TEXT);
LOG.info(String.format("您即将读取的文件数为: [%s], 列表为: [%s]",
this.sourceFiles.size(),
StringUtils.join(this.sourceFiles, ",")));
}
@Override
public void startRead(RecordSender recordSender) {
//读取临时hive表的hdfs文件
LOG.info("read start");
for (String sourceFile : this.sourceFiles) {
LOG.info(String.format("reading file : [%s]", sourceFile));
//默认读取的是TEXT文件格式
InputStream inputStream = dfsUtil.getInputStream(sourceFile);
UnstructuredStorageReaderUtil.readFromStream(inputStream, sourceFile, this.taskConfig,
recordSender, this.getTaskPluginCollector());
if (recordSender != null) {
recordSender.flush();
}
}
LOG.info("end read source files...");
}
//只是局部post 属于每个task
@Override
public void post() {
LOG.info("one task hive read post...");
deleteTmpTable();
}
private void deleteTmpTable() {
String hiveCmd = "drop table " + tableName;
LOG.info("hiveCmd ----> :" + hiveCmd);
//执行脚本,创建临时表
if (!ShellUtil.exec(new String[]{"hive", "-e", "\"" + hiveCmd + "\""})) {
throw DataXException.asDataXException(
HiveReaderErrorCode.SHELL_ERROR,
"删除hive临时表脚本执行失败");
}
}
@Override
public void destroy() {
LOG.info("hive read destroy...");
}
//创建hive临时表名称
private String hiveTableName() {
StringBuilder str = new StringBuilder();
FastDateFormat fdf = FastDateFormat.getInstance("yyyyMMdd");
str.append(Constant.TABLE_NAME_PREFIX).append(fdf.format(new Date()))
.append("_").append(KeyUtil.genUniqueKey());
return str.toString();
}
}
}注意: 代码中注释比较详细 代码中使用的DFSUtil 工具类 是copy 官方hdfsreader里面的。 希望大家先理解整个插件的运行流程,代码最上方有。
shell工具类:
/**
* @author dalizu on 2018/11/10.
* @version v1.0
* @desc 执行shell脚本
*/
public class ShellUtil {
private static final int SUCCESS = 0;
private static final Logger LOG = LoggerFactory.getLogger(RetryUtil.class);
public static boolean exec(String [] command){
try {
Process process=Runtime.getRuntime().exec(command);
read(process.getInputStream());
StringBuilder errMsg= read(process.getErrorStream());
// 等待程序执行结束并输出状态
int exitCode = process.waitFor();
if (exitCode == SUCCESS) {
LOG.info("脚本执行成功");
return true;
} else {
LOG.info("脚本执行失败[ERROR]:"+errMsg.toString());
return false;
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
private static StringBuilder read(InputStream inputStream) {
StringBuilder resultMsg=new StringBuilder();
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
String line;
while ((line = reader.readLine()) != null) {
resultMsg.append(line);
resultMsg.append("\r\n");
}
return resultMsg;
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return null;
}
}6-测试准备
创建必须的临时目录
hadoop fs -mkdir /user/datax_tmp
创建目标mysql表
CREATE TABLE `user_hive_mysql` ( `id` int(11) NOT NULL AUTO_INCREMENT, `username` varchar(50), `telephone` varchar(30), `mail` varchar(50), PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;
创建hive表
CREATE TABLE `mysql_to_hive`( `id` int, `username` string, `telephone` string, `mail` string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
接下来load数据到hive表中。
配置datax任务的json文件
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "hivereader",
"parameter": {
"hiveSql": [
"select username,telephone,mail from mysql_to_hive;"
],
"defaultFS": "hdfs://hadoop001:8020"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "root123",
"column": [
"username",
"telephone",
"mail"
],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
""
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://localhost:3306/datax",
"table": [
"user_hive_mysql"
]
}
]
}
}
}
]
}
}执行脚本:
python bin/datax.py ../../data/dataxJob/hive_to_mysql.json
测试结果
任务启动时刻 : 2018-11-09 23:37:08 任务结束时刻 : 2018-11-09 23:37:39 任务总计耗时 : 31s 任务平均流量 : 8B/s 记录写入速度 : 0rec/s 读出记录总数 : 21 读写失败总数 : 0
hiveSql 也可以写多个sql:
"hiveSql": [ "select username,telephone,mail from mysql_to_hive;", "select name,tel,age from t_orc;" ],
测试结果ok:
任务启动时刻 : 2018-11-09 23:46:13 任务结束时刻 : 2018-11-09 23:47:34 任务总计耗时 : 81s 任务平均流量 : 3B/s 记录写入速度 : 0rec/s 读出记录总数 : 23 读写失败总数 : 0
可以看出不管hive 源表不管是什么格式,都支持,因为我们的中间表是textfile
7-思考:
如果想要一个源数据导入到多个目标源在怎么做? 如果需要完整代码可以留言。
點擊查看更多內容
9人點贊
評論
評論
共同學習,寫下你的評論
評論加載中...
作者其他優質文章
正在加載中
感謝您的支持,我會繼續努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦