首先希望大家健康平安。 对于互联网公司的大数据,一般都会需要一个数据开发平台。 incubator-dolphinscheduler再适合不过了。今天我也开始进行学习一下,然后记录一下学习的内容。 通过阅读源码从简单的地方入手学习,来看一下具体的实现逻辑,对后续的安装部署和使用中遇到的问题肯定 会有帮助。
1-代码下载和查看
mvn -U clean package -Prelease -Dmaven.test.skip=true mvn install
2-官网的定义
看源码之前先了解这些细节.会对源码中相关的概念有帮助 DAG: 全称Directed Acyclic Graph,简称DAG。工作流中的Task任务以有向无环图的形式组装起来, 从入度为零的节点进行拓扑遍历,直到无后继节点为止。 流程定义:通过拖拽任务节点并建立任务节点的关联所形成的可视化DAG 流程实例:流程实例是流程定义的实例化,可以通过手动启动或定时调度生成,流程定义每运行一次, 产生一个流程实例 任务实例:任务实例是流程定义中任务节点的实例化,标识着具体的任务执行状态 DolphinScheduler的去中心化是Master/Worker注册到Zookeeper中,实现Master集群和Worker集群无中心, 并使用Zookeeper分布式锁来选举其中的一台Master或Worker为“管理者”来执行任务
dolphinscheduler-api
dolphinscheduler-api模块主要提供了和前端交互的接口。 为了了解整个系统的运行情况,我们只要先关心几个接口就可以。
1-ProcessDefinitionController.createProcessDefinition: 流程定义,保存我们页面定义的流程。 2-ExecutorController.startProcessInstance: 直接运行的任务,不用调度 产生一条Command到数据库中:processDao.createCommand(command); 具体代码就不详细列出。到这里我们知道需要进行调度的流程已经产生了一条command记录到我们的数据库中。 3-SchedulerController.createSchedule: 需要定时调度的任务 创建流程的调度信息到表中t_ds_schedules 4-SchedulerController.online 这里添加任务到调度中 QuartzExecutors.getInstance().addJob(ProcessScheduleJob.class, jobName, jobGroupName, startDate, endDate,schedule.getCrontab(), dataMap);
不管任务是否需要定时调度,都需要先进行流程定义
dolphinscheduler-server
这里我们主要看两个功能: master和work,一个负责调度任务流,一个负责执行具体的任务。 先通过学习master来了解它是如何对任务流进行调度的: MasterServer中的main函数会启动我们的MasterServer Spring实例化该Bean之后马上执行@PostConstruct注解下的run()方法
1-初始化zk相关的目录,注册master的时候会获取master的cpu,内存等信息 zkMasterClient.init(); 保存master的机器信息 /masters/hosts -> heartbeatZKInfo
2-提供了心跳,定时去更新master的机器信息
Runnable heartBeatThread = heartBeatThread(); heartbeatMasterService.scheduleAtFixedRate(heartBeatThread, 5, masterConfig.getMasterHeartbeatInterval(), TimeUnit.SECONDS);
3-QuartzExecutors.getInstance().start(); 启动quartz调度.才能添加调度任务。
4-MasterSchedulerThread:扫描线程,定时扫描数据库中的 command 表,根据不同的命令类型进行不同的 业务操作 既然需要去数据库中获取command,我们就先来看看command是从哪里插入的吧。上面dolphinscheduler-api 已经写了不用定时调度的command是从哪里进行插入的。 现在我们来看看需要定时调度的command是从哪里插入的。 在api的接口SchedulerController.online中我们看到了
QuartzExecutors.getInstance().addJob(ProcessScheduleJob.class, jobName, jobGroupName, startDate, endDate,schedule.getCrontab(), dataMap);
它会将任务加入到调度中,定时执行 Quartz具体用法可以自己去看一下,我们只要知道真正执行任务的代码在ProcessScheduleJob这个类里面:
//1-根据scheduleId获取当前调度任务的详细调度信息 Schedule schedule = processDao.querySchedule(scheduleId); //2-从schedule信息中获取ProcessDefinitionId,获取流程定义的信息 ProcessDefinition processDefinition = processDao.findProcessDefineById(schedule.getProcessDefinitionId()); //3-创建调度的command Command command = new Command(); command.setCommandType(CommandType.SCHEDULER); ...... processDao.createCommand(command);
到这里我们就知道定时调度流程的command在这里被插入了。
5-回到MasterSchedulerThread源码分析: MasterSchedulerThread implements Runnable 我们之间看它的run方法
while (Stopper.isRunning()){
// process instance
ProcessInstance processInstance = null;
InterProcessMutex mutex = null;
try {
//1-检查当前master的cpu和内存情况是否符合要求,可以直接配置cpu和内存的阀值,使用过程中如果遇到类似日志可以检查这两个参数的配置:load or availablePhysicalMemorySize(G) is too high...
boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory());
if(!runCheckFlag) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {
// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/masters
//2-使用ZooKeeper分布式锁来实现同一时刻只有一台Master执行Scheduler
String znodeLock = zkMasterClient.getMasterLockPath();
mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock);
mutex.acquire();
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) masterExecService;
int activeCount = poolExecutor.getActiveCount();
//3-获取一条command,对应之前运行流程保存的command
/**
* select command.* from t_ds_command command
join t_ds_process_definition definition on command.process_definition_id = definition.id
where definition.release_state = 1 AND definition.flag = 1
order by command.update_time asc
limit 1
*
* */
// make sure to scan and delete command table in one transaction
Command command = processDao.findOneCommand();
if (command != null) {
logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString()));
try{
processInstance = processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command);
if (processInstance != null) {
logger.info("start master exec thread , split DAG ...");
masterExecService.execute(new MasterExecThread(processInstance,processDao));
}
}catch (Exception e){
logger.error("scan command error ", e);
processDao.moveToErrorCommand(command, e.toString());
}
} else{
//indicate that no command ,sleep for 1s
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
}catch (Exception e){
logger.error("master scheduler thread exception",e);
}finally{
AbstractZKClient.releaseMutex(mutex);
}
}
@Transactional(rollbackFor = Exception.class)
public ProcessInstance handleCommand(Logger logger, String host, int validThreadNum, Command command) {
//1-根据getProcessDefinitionId构造一条流程实例。
ProcessInstance processInstance = constructProcessInstance(command, host);
//cannot construct process instance, return null;
if(processInstance == null){
logger.error("scan command, command parameter is error: %s", command.toString());
moveToErrorCommand(command, "process instance is null");
return null;
}
if(!checkThreadNum(command, validThreadNum)){
logger.info("there is not enough thread for this command: {}",command.toString() );
return setWaitingThreadProcess(command, processInstance);
}
processInstance.setCommandType(command.getCommandType());
processInstance.addHistoryCmd(command.getCommandType());
//2-保存流程实例信息
saveProcessInstance(processInstance);
this.setSubProcessParam(processInstance);
//3-删除当前的command
delCommandByid(command.getId());
return processInstance;
}具体逻辑都写在了代码的注释里面。
6-MasterExecThread:负责DAG任务切分、任务提交监控、各种不同命令类型的逻辑处理一样看run方法 这里只关注任务流的执行: // execute flow executeProcess(); 主要关注runProcess()方法.里面涉及到了解析前端的dag图的方法,这里也还没仔细研究。但是不影响 我们看整体的流程。 a-首先肯定会获取任务流中起始的任务节点先开始运行
// submit start node,提交任务流的头结点 submitPostNode(null); task会添加到readyToSubmitTaskList中
b-提交task到zk队列,具体实现在后面的MasterTaskExecThread中
if(canSubmitTaskToQueue()){
submitStandByTask();
}从readyToSubmitTaskList中获取待提交的task 提交的任务信息会保存在activeTaskNode中
c-对已经提交的任务进行监控 activeTaskNode
//查看正在运行的task任务。
for(Map.Entry<MasterBaseTaskExecThread,Future<Boolean>> entry: activeTaskNode.entrySet()) {
Future<Boolean> future = entry.getValue();
TaskInstance task = entry.getKey().getTaskInstance();
if(!future.isDone()){
continue;
}
// node monitor thread complete
activeTaskNode.remove(entry.getKey());
if(task == null){
this.taskFailedSubmit = true;
continue;
}
logger.info("task :{}, id:{} complete, state is {} ",
task.getName(), task.getId(), task.getState().toString());
// node success , post node submit
if(task.getState() == ExecutionStatus.SUCCESS){
completeTaskList.put(task.getName(), task);
//2.1继续提交任务流中当前任务的后置任务
submitPostNode(task.getName());
continue;
}
// node fails, retry first, and then execute the failure process
if(task.getState().typeIsFailure()){
if(task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE){
this.recoverToleranceFaultTaskList.add(task);
}
if(task.taskCanRetry()){
addTaskToStandByList(task);
}else{
// node failure, based on failure strategy
errorTaskList.put(task.getName(), task);
completeTaskList.put(task.getName(), task);
if(processInstance.getFailureStrategy() == FailureStrategy.END){
killTheOtherTasks();
}
}
continue;
}
// other status stop/pause
completeTaskList.put(task.getName(), task);
}7-MasterTaskExecThread:负责任务的提交持久化,work拉取的任务从这里进行提交
private TaskInstance submitTaskExec(TaskInstance taskInstance) {
MasterBaseTaskExecThread abstractExecThread = null;
if(taskInstance.isSubProcess()){
abstractExecThread = new SubProcessTaskExecThread(taskInstance, processInstance);
}else {
abstractExecThread = new MasterTaskExecThread(taskInstance, processInstance);
}
//提交task到zk队列
Future<Boolean> future = taskExecService.submit(abstractExecThread);
//提交完成将任务加入到activeTaskNode,方便对已经提交的任务进行监控判断其是否已经完成
//abstractExecThread对应当前提交的task,future,可以获取执行的结果
activeTaskNode.putIfAbsent(abstractExecThread, future);
return abstractExecThread.getTaskInstance();
}
taskExecService.submit(abstractExecThread);提交的线程任务会执行下面call方法,线程池submit提交的 任务通过Future可以获取返回的结果
@Override
public Boolean call() throws Exception {
return submitWaitComplete();
}
//submit task instance and wait complete,提交task并且等待其完成
@Override
public Boolean submitWaitComplete() {
Boolean result = false;
//提交task到db和zk队列,具体实现在MasterBaseTaskExecThread中,可以自己去看:submitQueue = processDao.submitTaskToQueue(task);
this.taskInstance = submit();
if(this.taskInstance == null){
logger.error("submit task instance to mysql and queue failed , please check and fix it");
return result;
}
if(!this.taskInstance.getState().typeIsFinished()) {
result = waitTaskQuit();
}
taskInstance.setEndTime(new Date());
processDao.updateTaskInstance(taskInstance);
logger.info("task :{} id:{}, process id:{}, exec thread completed ",
this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() );
return result;
}
waitTaskQuit()方法中会通过while循环不停的去数据库检查taskInstance的状态,
判断当前task是否已经执行完成。
// task instance finished
if (taskInstance.getState().typeIsFinished()){
break;
}如果执行成功就会返回Boolean true. 这样在MasterExecThread类中的 runProcess()方法里面就可以监控当前task是否已经完成
Future<Boolean> future = entry.getValue();
TaskInstance task = entry.getKey().getTaskInstance();
//如果没有完成就continue
if(!future.isDone()){
continue;
}
//执行完成就可以提交当前task的后置任务了
......
今天我的学习就到这里了,关于Dolphin Scheduler中的work实现后面继续学习,尽量也把zk队列和日志模 块也早点学习完成。争取后面在使用过程中可以一切顺利。
點擊查看更多內容
4人點贊
評論
評論
共同學習,寫下你的評論
評論加載中...
作者其他優質文章
正在加載中
感謝您的支持,我會繼續努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦