一道任务编排服务面试题解析

2022/08/25 780点热度 0人点赞 0条评论

6月份的时候,群里有一个小伙伴分享了一道面试题,如下图:

趁着下班的功夫,拆解下这道面试题

  • 可以按顺序添加任意节点形成一个任务链
    • 链表且有序 或者数组(考虑到数组的扩容,直接pass调,用有序链表LinkedList)
    • 添加任意节点(可以指定索引添加)
  • 任务需要按顺序执行,同一个节点可能有多个任务(可以并行执行)
    • 线程池执行
    • 同一个节点的任务,广度优先,可以利用Queue
  • 节点完成可以指定任意个数
    • Queue需要包装起来
    • 同时需要计数,直接利用CountDownLatch
  • 每个节点都可能失败,失败后可以通过接口进行重试
    • 持久化保存节点任务执行状态
    • 如果强依赖,必须按顺序再走一遍,成功的可以忽略,只执行失败的
    • 只将失败任务重新串联起来
  • 实现监控功能,可以监控当前任务状态
    • 最简单的日志输出
    • 如果有kibana,直接filebeat抽取日志到es,然后在kibana里配置视图,或者定时从es里抽取失败任务,做钉钉或短信报警
    • 也可以filebeat抽取到kafka,然后消费处理,报警
  • 隐含的,还有一层,任务
    • 可以抽象出来,业务自定义去实现

ok,分析完了,开干。

定义任务并编写执行示例

image-20220825094259296

/**
 * 任务接口
 * @Author: yxkong
 * @Date: 2022/6/1 6:24 PM
 * @version: 1.0
 */
public interface ITask {

    /**
     * 任务执行入口
     * @return
     */
    TaskResult execute();
}

/**
 * 任务示例,具体的任务实现
 * @Author: yxkong
 * @Date: 2022/6/1 6:55 PM
 * @version: 1.0
 */
public class TaskExample implements ITask{
    private Random random = new Random();
    //业务数据
    private String name;
    private int status;
    @Override
    public TaskResult execute() {
        System.out.println(String.format("%s task name:%s starting",Thread.currentThread().getName(), this.name));
        //TODO 执行具体的业务逻辑
        try {
            //添加随机等待,确保同一层级不使用一个线程
            Thread.sleep(Long.valueOf(random.nextInt(500)));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (status == 1){
            System.out.println(String.format("%s task name:%s 任务执行成功",Thread.currentThread().getName(), this.name));
            return new TaskResult(1,"任务执行成功");
        } else {
            System.out.println(String.format("%s task name:%s 任务执行失败",Thread.currentThread().getName(), this.name));
            //模拟失败后,再次要执行的任务是成功的(正常,直接添加即可)
            this.status = 1;
            return new TaskResult(0,"任务执行失败!",this);
        }

    }
    public TaskExample(String name, int status) {
        this.name = name;
        this.status = status;
    }

    @Override
    public String toString() {
        return "ITask{" +
                "name='" + name + '\'' +
                ", status=" + status +
                '}';
    }
}

定义任务编排链路

先定义一个任务包装体

/**
 * 任务包装体
 * @Author: yxkong
 * @Date: 2022/6/1 6:37 PM
 * @version: 1.0
 */
public class TaskQueueWrap {
    private Queue<ITask> queue ;
    //至少完成个数
    private int atLeast;

    private TaskQueueWrap(int atLeast) {
        this.queue = new ArrayDeque<>();
        this.atLeast = atLeast;
    }

    public Queue<ITask> getQueue() {
        return queue;
    }

    public int getAtLeast() {
        return atLeast;
    }
    public boolean add(ITask task){
        return this.queue.add(task);
    }
    public static TaskQueueWrap create(List<ITask> tasks,int atLeast){
        TaskQueueWrap queue = new TaskQueueWrap(atLeast);
        for (ITask task:tasks){
            queue.add(task);
        }
        return queue;
    }
    public static TaskQueueWrap createEmpty(int atLeast){
        return new TaskQueueWrap(atLeast);
    }
}

定义任务链,并暴露各种操作方法,包括

  • addNew(...)添加新层级任务(单个或批量)
  • addLast(...)在最后一个层级上添加任务
  • addIdx(...) 指定层级添加任务
  • execute() 链路执行
/**
 * 任务链
 * @Author: yxkong
 * @Date: 2022/6/1 6:34 PM
 * @version: 1.0
 */
public class TaskChain {
    private LinkedList<TaskQueueWrap> chain;
    private LinkedList<TaskCounter> counters;

    public TaskChain() {
        this.chain = new LinkedList<>();
        this.counters = new LinkedList<>();
    }

    /**
     * 在最后一个层级上添加任务
     * @param task
     * @return
     */
    public boolean addLast(ITask task){
        TaskQueueWrap queue = chain.getLast();
        if (Objects.isNull(task) || Objects.isNull(queue)){
            return Boolean.FALSE;
        }
        queue.add(task);
        return Boolean.TRUE;
    }

    /**
     * 添加新层级任务
     * @param task
     * @return
     */
    public boolean addNew(ITask task, int atLeast){
        if (Objects.isNull(task)){
            return Boolean.FALSE;
        }
        TaskQueueWrap queue =TaskQueueWrap.createEmpty(atLeast);
        queue.add(task);
        this.chain.addLast(queue);
        return Boolean.TRUE;
    }
    /**
     * 添加新层级任务
     * @param tasks
     * @return
     */
    public boolean addNew(List<ITask> tasks, int atLeast){
        if (Objects.isNull(tasks)){
            return Boolean.FALSE;
        }
        TaskQueueWrap queue = TaskQueueWrap.create(tasks,atLeast);
        this.chain.add(queue);
        return Boolean.TRUE;
    }
    private boolean add(TaskQueueWrap taskQueue) {
        if (Objects.isNull(taskQueue)){
            return Boolean.FALSE;
        }
        this.chain.add(taskQueue);
        return Boolean.TRUE;
    }

    /**
     * 指定层级添加任务
     * @param task
     * @param idx
     * @return
     */
    public boolean addIdx(ITask task,int idx){
        if (Objects.isNull(task)){
            return Boolean.FALSE;
        }
        if (idx > this.chain.size()){
            return Boolean.FALSE;
        }
        if (idx == this.chain.size()){
           return  this.addNew(task,1);
        }
        this.chain.get(idx).add(task);
        return Boolean.TRUE;
    }

    public TaskChain execute() {
        //广度优先遍历
        ThreadPoolExecutor executor = ThreadPoolUtils.getThreadPooll();
        TaskChain failChain = new TaskChain();
        //埋点1,任务开始执行
        System.out.println("task starting");
        for (int i = 0; i < this.chain.size(); i++) {
            //层级开始埋点
            System.out.println(String.format("task level:%d starting", (i+1)));
            TaskQueueWrap queueWrap = this.chain.get(i);
            Queue<ITask> queue = queueWrap.getQueue();
            int queueSize = queue.size();
            CountDownLatch count = new CountDownLatch(queueSize);
            List<Future<TaskResult>> list = new ArrayList<>();
            while (queue.size() != 0){
                TaskThread taskThread = new TaskThread(queue.poll(),count);
                list.add(executor.submit(taskThread));
            }
            try {
                count.await(5, TimeUnit.SECONDS);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            List<ITask> failTasks = counterAndFail(queueSize, list);
            failChain.addNew(failTasks,queueWrap.getAtLeast());
            //不满足至少,任务链结束
            TaskCounter taskCounter = this.counters.getLast();
            if (queueWrap.getAtLeast() > (taskCounter.getCounter()- taskCounter.getFailCounter())){
                //将剩余的任务添加进去
                for (int j = i+1; j < this.chain.size(); j++) {
                    failChain.add(this.chain.get(j));
                }
                return failChain;
            }

            //层级结束埋点
            System.out.println(String.format("task level:%d end", (i+1)));
        }

        System.out.println("task end");
        return failChain;
    }

    private List<ITask> counterAndFail(int size, List<Future<TaskResult>> list){
        List<ITask> failTasks = new ArrayList<>();
        try {
            //计数
            Integer failCounter = 0;
            for (Future<TaskResult> f:list){
                TaskResult result = f.get();
                if (!result.isSuc()){
                    failCounter++;
                    ITask task =(ITask) result.getData();
                    failTasks.add(task);
                }
            }
            counters.add(new TaskCounter(size,failCounter));

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return failTasks;
    }
}

同时定义了一个返回体

/**
 * 任务返回状态
 * @Author: yxkong
 * @Date: 2022/6/1 6:18 PM
 * @version: 1.0
 */
public class TaskResult {
    //任务状态,1成功,0失败,失败的时候,data为原任务
    private int status;
    private String msg;
    //任务数据
    private Object data;

    public TaskResult(int status, String msg, Object data) {
        this.status = status;
        this.msg = msg;
        this.data = data;
    }

    public TaskResult(int status, String msg) {
        this.status = status;
        this.msg = msg;
    }

    public int getStatus() {
        return status;
    }
    public boolean isSuc(){
        if (status == 1){
            return Boolean.TRUE;
        }
        return Boolean.FALSE;
    }

    public String getMsg() {
        return msg;
    }

    public Object getData() {
        return data;
    }
}

计数器

/**
 * 任务执行计数器
 *   如果放在执行线程里,需要用AtomicInteger
 *   如果在主线程里,用这个即可
 * @Author: yxkong
 * @Date: 2022/6/2 9:12 AM
 * @version: 1.0
 */
public class TaskCounter {

    private int counter;
    private int failCounter;

    public TaskCounter(int counter, int failCounter) {
        this.counter = counter;
        this.failCounter = failCounter;
    }

    public int getCounter() {
        return counter;
    }

    public void setCounter(int counter) {
        this.counter = counter;
    }

    public int getFailCounter() {
        return failCounter;
    }

    public void setFailCounter(int failCounter) {
        this.failCounter = failCounter;
    }
}

定义任务线程

/**
 * 任务线程
 * @Author: yxkong
 * @Date: 2022/6/1 6:32 PM
 * @version: 1.0
 */
public class TaskThread implements Callable {
    private ITask task;
    private CountDownLatch downLatch;

    public TaskThread(ITask task, CountDownLatch downLatch) {
        this.task = task;
        this.downLatch = downLatch;
    }
    @Override
    public Object call() throws Exception {
        TaskResult rst = new TaskResult(0,"执行失败!");
        try {
            rst = task.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            this.downLatch.countDown();
        }
        return rst;
    }
}

测试

/**
 * 测试类
 * @Author: yxkong
 * @Date: 2022/6/1 6:40 PM
 * @version: 1.0
 */
public class TaskTest {
    public static void main(String[] args) {
         /**
         *  任务链路的构建,可以直接从数据库里读取配置,然后构建成链
         *  设计三张表,
         *    一张表存任务链,描述这个任务链
         *    第二张表,存储对应任务链的层级,以及对应层级的至少执行个数
         *    第三张表,存储具体层级的任务,任务执行结果,执行时间,最后更新时间
         *  可以再加辅助表,用于任务执行记录
         */
        TaskChain chain = new TaskChain();

        //第一层级添加任务
        chain.addNew(new TaskExample("task1001",1),1);
        chain.addLast(new TaskExample("task1002",0));

        //第二层级添加任务
        chain.addNew(new TaskExample("task2001",1),1);
        chain.addLast(new TaskExample("task2002",0));
        chain.addLast(new TaskExample("task2003",0));

        //第三层级添加任务
        chain.addNew(new TaskExample("task3001",0),1);

        //第四层级添加任务
        chain.addNew(new TaskExample("task4001",1),1);
        chain.addLast(new TaskExample("task4002",0));

        //给第一层级添加任务
        chain.addIdx(new TaskExample("task1003",0),0);

        //可以将执行失败的任务链持久化,然后接口调用再次触发
        TaskChain failChain = chain.execute();
        System.out.println("   调用失败再试  ");
        failChain.execute();

    }
}

执行结果

task starting
task level:1 starting
pool-1-thread-2 task name:task1002 starting
pool-1-thread-1 task name:task1001 starting
pool-1-thread-3 task name:task1003 starting
pool-1-thread-2 task name:task1002 任务执行失败
pool-1-thread-1 task name:task1001 任务执行成功
pool-1-thread-3 task name:task1003 任务执行失败
task level:1 end
task level:2 starting
pool-1-thread-4 task name:task2001 starting
pool-1-thread-5 task name:task2002 starting
pool-1-thread-6 task name:task2003 starting
pool-1-thread-5 task name:task2002 任务执行失败
pool-1-thread-6 task name:task2003 任务执行失败
pool-1-thread-4 task name:task2001 任务执行成功
task level:2 end
task level:3 starting
pool-1-thread-7 task name:task3001 starting
pool-1-thread-7 task name:task3001 任务执行失败
   调用失败再试  
task starting
task level:1 starting
pool-1-thread-2 task name:task1003 starting
pool-1-thread-8 task name:task1002 starting
pool-1-thread-8 task name:task1002 任务执行成功
pool-1-thread-2 task name:task1003 任务执行成功
task level:1 end
task level:2 starting
pool-1-thread-1 task name:task2002 starting
pool-1-thread-3 task name:task2003 starting
pool-1-thread-3 task name:task2003 任务执行成功
pool-1-thread-1 task name:task2002 任务执行成功
task level:2 end
task level:3 starting
pool-1-thread-5 task name:task3001 starting
pool-1-thread-5 task name:task3001 任务执行成功
task level:3 end
task level:4 starting
pool-1-thread-6 task name:task4001 starting
pool-1-thread-4 task name:task4002 starting
pool-1-thread-4 task name:task4002 任务执行失败
pool-1-thread-6 task name:task4001 任务执行成功
task level:4 end
task end

yxkong

这个人很懒,什么都没留下

文章评论