JUST DO IT

  • 首页
  • 归档

  • 搜索
极客时间

XXL-JOB在少儿英语中的实践-1

发表于 2020-12-13 | 分类于 原创 | 0 | 阅读次数 691

背景

少儿英语项目组任务调度平台初期采用了Spring的@Schedule注解结合Reids分布式锁的方式实现,通过模板方法(Template)模式抽象出一个任务的执行流程,编写任务时只需要实现具体的方法即可。这种实现方式简单直接,解决了当时服务多节点部署时重复执行任务的问题。随着业务的不断壮大,任务调度平台的任务越来越多,原有的任务调度暴露了一些问题。

  • @Schedule默认是单线程模式,存在大任务执行一直占用资源导致其他任务饿死
  • 任务调度平台缺少监控和报警机制,无法查看一个任务的具体执行情况
  • 测试不方便,自测或者是QA测试需要修改代码来验证任务执行的正确性

基于以上问题,决定对原有的任务调度平台进行改造和升级,引入XXL-JOB分布式任务调度平台

任务调度的一些解决方案

单机版

Timer(JDK1.3)
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
    @Override
    public void run() {
        System.out.println("this is a timer task example");
    }
}, 1000, 1000);
ScheduledExecutorService(JDK1.5)
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
executorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        System.out.println("this is scheduledExecutorService example");
    }
}, 1, 30, TimeUnit.SECONDS);

一般在实际应用过程中,推荐用Executor框架,而不是JDK1.3就已经存在的Timer,主要原因有两点:

  • Timer是单线程的框架,只有一个线程在执行任务,而Executor是一个多线程的框架,初始化的时候可以指定线程的个数,并发执行提交的任务。
  • 对于任务中未捕获的异常,Timer中线程直接终止,不能继续执行任务,而Executor只是取消当前的任务,还可以执行其他已提交的任务
@Scheduled(Spring)

在集成了SpringBoot的项目中,实现定时任务很简单,通过@EnableScheduling和@Scheduled两个注解就能够非常方便的编写一个单进程的定时任务。通过查看源码我们不难发现其底层实现还是依赖于JUC调度线程池ScheduledThreadPoolExecutor。需要注意的一点是,默认所有的@Scheduled方法由单线程调度,没有同时执行的任务,如果需要需要多线程调度,需要手动配置下。

public class ScheduleConfig implements SchedulingConfigurer {
    @Override
    public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
    scheduledTaskRegistrar.setScheduler(Executors.newScheduledThreadPool(5));
    }
}

集群部署

以下对比了Quartz、elastic-job以及xxl-job这三个支持集群部署的分布式任务调度框架

对比项QuartzElasticJobXXL-JOB(许雪里)
项目背景OpenSymphony当当网开源项目(Apache ShardingSphere)大众点评开源项目
Githubhttps://github.com/quartz-scheduler/quartz(4k)https://github.com/apache/shardingsphere-elasticjob(6.5k)https://github.com/xuxueli/xxl-job(15.9k)
可视化无支持通过Web页面对任务进行CRUD操作、基本用户权限管理等支持通过Web页面对任务进行CRUD操作,支持报表、日志查看等
触发规则时间触发时间触发时间触发、事件触发(手动触发)
依赖JDBC支持的关系型数据库(Mysql、Oracle等)ZookeeperMysql
调度器集群部署支持支持支持
执行器集群部署支持支持支持
日志追溯不支持通过事件订阅的方式处理调度过程中重要的事件支持
报警不支持可定制开发提供邮件报警支持,可扩展短信、钉钉等方式
弹性扩容支持支持支持
依赖任务不支持不支持支持
任务分片不支持支持支持
多种作业类型内置Java支持Simple、Script等任务通过GLUE支持Java以及Shell、Python的脚本任务
Quartz

Quartz作为开源作业调度中的佼佼者,是作业调度的首选,但是同样存在以下问题:

  • 调用API的的方式操作任务,不人性化
  • 需要持久化业务QuartzJobBean到底层数据表中,系统侵入性相当严重
  • 调度逻辑和QuartzJobBean耦合在同一个项目中,这将导致一个问题,在调度任务数量逐渐增多,同时调度任务逻辑逐渐加重的情况下,此时调度系统的性能将大大受限于业务
  • Quartz底层以“抢占式”获取DB锁并由抢占成功节点负责运行任务,会导致节点负载悬殊非常大

基于以上的问题,ElasticJob和XXL-JOB从两个不同的方向对Quartz进行了优化,下面具体说明下。

ElasticJob

ElasticJob是当当网的开源项目,2017年开始停止更新了一段时间,2020年成为了Apache ShardingSphere下的子项目,目前正在开发3.X的大版本。

ElasticJob基于Quartz演化而来,由于基于DB的方式缺少分布式协调功能,所以ElasticJob将DB换成成Zookeeper,基于Quarts调度组件和Curator实现了全局的作业控制中心,用于注册、控制和协调分布式作业执行。由于引入了Zookeeper,ElasticJob支持弹性扩容和数据分片的功能。

XXL-JOB

XXL-JOB是大众点评的开源项目,社区比较活跃,目前在持续更新中。

同样XXL-JOB早期也是基于Quartz的调度组建进行开发的,由于基于DB调度的一些问题,XXL-JOB最终选择自研调度组件,一方面是为了精简系统降低冗余依赖,另一方面是为了提供系统的可控度与稳定性。

XXL-JOB中“调度模块”和“任务模块”完全解耦,调度模块进行任务调度时,将会解析不同的任务参数发起远程调用,调用各自的远程执行器服务。这种调用模型类似RPC调用,调度中心提供调用代理的功能,而执行器提供远程服务的功能。

XXL-JOB虽然也是基于DB的方式实现任务调度,但是在XXL-JOB中调度和执行分离,XXL-JOB通过执行器实现“协同分配式”运行任务,充分发挥集群优势,负载各节点均衡,这里建议调度器和执行器的分别是独立的数据源,这样调度和执行完全隔离。

基于以上分析,少儿英语项目决定采用XXL-JOB对原有的任务调度平台进行升级和改造,这里选择XXL-JOB还有一个很重要的原因是,原来的任务调度平台直接可以作为执行器模块,只需要基于xxl-job-admin搭建一套调度中心即可,迁移成本较小,如果基于ElasticJob做改造,成本更高。

XXL-JOB在少儿英语的实践

架构图

image-20201102174604569

一次完成的任务调度流程

  • “调度中心”向“执行器”发送http调度请求:“执行器”中接受请求的服务,实际上是一个内嵌的Server,默认端口号9999
  • “执行器”执行任务逻辑
  • “执行器”http回调“调度中心调度结果”

基于Rancher平台的实际部署方案

image-20201102175021401

使用方式

现在开发一个定时任务主要分为两个步骤

  • 在执行器(kids-job)中编写定时任务具体的业务代码
  • 在调度中心(kids-job-ms)中配置定时任务的调度策略
步骤一:编写定时任务
说明
  • 在执行器(kids-job)编写定时任务具体逻辑
  • 一个定时任务以http接口的方式提供给调度中心(kids-job-ms)管理,简单来说一个定时任务就是controller中的一个方法
  • 支持有参和无参两种调用方式
示例

定时任务1(无参)

@RestController
@RequestMapping("/job/example")
public class JobExampleController {

  @PostMapping("/noArgsJob")
  public ResultData<String> noArgsJob() {
    boolean success = false;
    // TODO 具体定时任务逻辑
    // success = exampleJobService.execute();
    if (success) {
      return new ResultData<>(ResultCodeEnum.SUCCESS);
    }
    return new ResultData<>(ResultCodeEnum.PROCESS_ERROR, "错误描述");
  }
}

定时任务2(有参)

@RestController
@RequestMapping("/job/example")
public class JobExampleController {

  @PostMapping("/argsJob")
  public ResultData<String> noArgsJob(@RequestBody JobArgs jobArgs) {
    boolean success = false;
    // TODO 具体定时任务逻辑
    // success = exampleJobService.execute(jobArgs);
    if (success) {
      return new ResultData<>(ResultCodeEnum.SUCCESS);
    }
    return new ResultData<>(ResultCodeEnum.PROCESS_ERROR, "错误描述");
  }

  @Data
  public static class JobArgs {
    private String arg1;
    private Integer arg2;
  }
}
一些约定
  • requestType:POST
  • path:/job/{模块名}/{任务名}
  • 返回值:请使用ResultData作为返回值,retcode为10000,代表任务执行成功(这里对xxl-job的httpJobHandler做了下改造,只有业务处理成功才代表任务执行成功,而不是简单用http status 200做判断)
  • 有参任务的参数格式为application/json,POST方式
步骤二:调度中心设置调度策略
新建无参定时任务

新建有参定时任务

如图所示,新建任务时只需要填写标红框的部分,其中

  • 任务描述:必填,请填写有意义的任务描述,方便搜索
  • cron:必填
  • 负责人:必填
  • 报警邮件:选填,群组邮件,若失败需要报警邮件,请设置
  • 任务参数:必填
    • json格式,{"path":"/job/example/testJob","param":{}}
    • 其中path为job的http路径,param为job的参数,param选填
    • 目前job对应的http接口仅支持POST请求,json入参方式

新建任务完成后,在操作里点击"启动"按钮,任务即可按照cron规则来执行了

关于调度中心系统的其他操作,可以参考https://www.xuxueli.com/xxl-job/

后续的工作

  • 支持分片,目前的部署方式是手动注册的执行器,一个执行器地址对应的是多pod实例的clusterIP地址,通过k8s的内部域名服务以及kube-proxy来进行pod的选择,所以对于调度中心来说,执行器只有一个,无法支持分片。后续调整部署方式,每个pod自动向调度中心注册成为一个执行器来支持分片。
  • 接入有道LDAP服务,原生支持rd登录
  • 接入泡泡报警,目前只支持邮箱报警

XXL-JOB核心源码分析

注:以下源码分析基于v2.2.0版本

调度中心

调度中心负责调度的核心类为JobScheduleHelper,里边的scheduleThread线程,通过while循环不断地从数据库中查找符合执行条件的任务(单线程),然后通过JobTriggerPoolHelper中的线程池并发地将任务发送给执行器去执行(多线程)。

分布式调度的一致性

调度中心通过DB锁保证集群分布式调度的一致性,一次任务调度只会触发一次执行

while (!scheduleThreadToStop) {
    Connection conn = null;
    Boolean connAutoCommit = null;
    PreparedStatement preparedStatement = null;

    boolean preReadSuc = true;
    try {

        conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
        connAutoCommit = conn.getAutoCommit();
        // 自动提交设置为false
        conn.setAutoCommit(false);
        // 利用for update加锁
        preparedStatement = conn.prepareStatement("select * from xxl_job_lock where lock_name = 'schedule_lock' for update");
        preparedStatement.execute();
        
        // 查询可执行的任务,交给执行器去执行
        .....

    } catch (Exception e) {
        if (!scheduleThreadToStop) {
            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
        }
    } finally {

        // commit
        if (conn != null) {
            try {
                // finally提交事物
                conn.commit();
            } catch (SQLException e) {
                if (!scheduleThreadToStop) {
                    logger.error(e.getMessage(), e);
                }
            }
            try {
                // 撤销之前的操作
                conn.setAutoCommit(connAutoCommit);
            } catch (SQLException e) {
                if (!scheduleThreadToStop) {
                    logger.error(e.getMessage(), e);
                }
            }
            try {
                conn.close();
            } catch (SQLException e) {
                if (!scheduleThreadToStop) {
                    logger.error(e.getMessage(), e);
                }
            }
        }

        // close PreparedStatement
        ......
    }
线程池并发调度

调度采用线程池方式实现,避免单线程因阻塞而引起任务调度延迟。同时调度线程池进行隔离拆分,慢任务自动降级进入”Slow”线程池,避免耗尽调度线程,提高系统稳定性。

public class JobTriggerPoolHelper {

    // ---------------------- trigger pool ----------------------

    // fast/slow thread pool,做了基本的线程池隔离
    private ThreadPoolExecutor fastTriggerPool = null;
    private ThreadPoolExecutor slowTriggerPool = null;

    public void start(){
        // fast线程池,maxSize可配置
        fastTriggerPool = new ThreadPoolExecutor(
                10,
                XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(1000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
                    }
                });

				// fast线程池,maxSize可配置
        slowTriggerPool = new ThreadPoolExecutor(
                10,
                XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(2000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
                    }
                });
    }

		......

    // job timeout count
    private volatile long minTim = System.currentTimeMillis()/60000;     // ms > min
    private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();

    /**
     * add trigger,添加一个调度的触发器
     */
    public void addTrigger(final int jobId,
                           final TriggerTypeEnum triggerType,
                           final int failRetryCount,
                           final String executorShardingParam,
                           final String executorParam,
                           final String addressList) {

        // choose thread pool
        // 默认选择fast pool
        ThreadPoolExecutor triggerPool_ = fastTriggerPool;
        // 从全局的jobTimeoutCountMap拿到1分钟超时的数量
        AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
        // 如果大于10,切换为slow pool
        if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
            triggerPool_ = slowTriggerPool;
        }

        // 具体的触发调度逻辑,通过xxl-rpc将任务信息传递个执行器集群
        triggerPool_.execute(new Runnable() {
            @Override
            public void run() {

                long start = System.currentTimeMillis();

                try {
                    // do trigger
                    XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                } finally {

                    // check timeout-count-map
                    long minTim_now = System.currentTimeMillis()/60000;
                    if (minTim != minTim_now) {
                        minTim = minTim_now;
                        jobTimeoutCountMap.clear();
                    }

                    // incr timeout-count-map
                    long cost = System.currentTimeMillis()-start;
                    if (cost > 500) {       // ob-timeout threshold 500ms
                        AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                        if (timeoutCount != null) {
                            timeoutCount.incrementAndGet();
                        }
                    }

                }

            }
        });
    }

    ......

}

执行器

执行器负责接收调度请求并执行任务逻辑。

自动注册
public class ExecutorRegistryThread {

    // 单例
    private static ExecutorRegistryThread instance = new ExecutorRegistryThread();
    public static ExecutorRegistryThread getInstance(){
        return instance;
    }

    private Thread registryThread;
    private volatile boolean toStop = false;
    public void start(final String appname, final String address){

        // 一些参数有效性校验
        ......

        registryThread = new Thread(new Runnable() {
            @Override
            public void run() {

                // registry
                while (!toStop) {
                    try {
                        RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                        // 向配置的每一个调度中心地址发送注册请求
                        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                            try {
                                ReturnT<String> registryResult = adminBiz.registry(registryParam);
                                // 注册结果的处理
                                ......
                            } catch (Exception e) {
                                logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
                            }

                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }

                    }

                    try {
                        if (!toStop) {
                            // 周期性注册的心跳间隔,默认30s
                            TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                        }
                    } catch (InterruptedException e) {
                        if (!toStop) {
                            logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
                        }
                    }
                }

                // registry remove
                ......
            }
        });
        registryThread.setDaemon(true);
        registryThread.setName("xxl-job, executor ExecutorRegistryThread");
        registryThread.start();
    }

    ......

}
GLUE模式任务

XXL-JOB支持GLUE模式的任务,可以在调度中心在线编写任务逻辑代码,动态发布,实时编译生效。

主要分为两类:Script语言任务和JAVA语言任务

Script
// 通过Runtime执行脚本语言命令
final Process process = Runtime.getRuntime().exec(cmdarrayFinal);
Java
private Class<?> getCodeSourceClass(String codeSource){
   try {
      // md5
      byte[] md5 = MessageDigest.getInstance("MD5").digest(codeSource.getBytes());
      String md5Str = new BigInteger(1, md5).toString(16);

      Class<?> clazz = CLASS_CACHE.get(md5Str);
      if(clazz == null){
         // 通过GroovyClassLoader动态加载Java类
         clazz = groovyClassLoader.parseClass(codeSource);
         CLASS_CACHE.putIfAbsent(md5Str, clazz);
      }
      return clazz;
   } catch (Exception e) {
      return groovyClassLoader.parseClass(codeSource);
   }
}

参考

  • https://www.xuxueli.com/xxl-job/
  • 本文作者: jkl_yuiop
  • 本文链接: https://leeshengis.com/archives/x-x-l---j-o-b-zai-shao-er-ying-yu-zhong-de-shi-jian
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# 极客时间
Java SPI与Dubbo SPI
动态数据源自定义注解的使用说明
  • 文章目录
  • 站点概览
jkl_yuiop

jkl_yuiop

1,383 日志
37 分类
1 标签
RSS
E-mail
Creative Commons
Links
  • 学习笔记
© 2023 jkl_yuiop

如需同步其他《极客时间》专栏,请邮件leeshengis@163.com(免费)