Java 任务处理

最近梳理其他同事以前写的 job 后有点想法,记录下。

一、业务场景

在大多数的系统都有类似这样的逻辑,比如下单了给用户赠送积分,用户在论坛上发表了帖子,给用户增加积分等等。

下单赠送积分,那么一个订单肯定不能重复赠送积分,所以需要一些状态来比较来哪些是已赠送的,哪些是没有赠送的。或许可以在订单表里加个字段来标记是否赠送了积分。

有时候,业务人员出于营销的需要,可能要搞个某某时间段内下单返券的活动。难道又在订单表里加个字段?肯定不能,谁知道还要搞多少活动呢。

二、实现

为了使核心的业务流程尽可能简单高效,赠送积分、返券(后面简称为task)之类的逻辑应该通过异步的job来处理。

因为 task 的处理状态不能放在核心的业务表里,所以,可以另外建一个表示异步任务的 async_task 表,结构如下:

-- 业务job处理 任务
create table async_task (
  id number(11) primary key,
  key_work  varchar2(32),  --  不同业务逻辑的task用不同的keyword
  biz_id char(32),         --  业务数据 ID,比如订单号
  biz_data varchar2(256),  --  核心的业务数据,用于避免关联业务表;具体结构取决于keyword
  status number,           --  任务的处理状态; -2:未处理, -1:处理中, 0:已处理, 大于 0 的数字表示失败次数
  create_tm date,          --  任务的创建时间
  modify_tm date           --  任务的修改时间
);

处于性能考虑,可以在 key_work 字段上建立分区,在 biz_id 上建立索引。

当业务表有需要处理的数据时,就往 async_task 插入一条相应的记录(可以异步插入),异步 job 再从 async_task 表里取数据来处理。

注意:处理 task 时,要保证数据的一致性。所在的项目组曾出现过,下单返券的活动里,送券与更新状态的操作没有放在同一个事务里,出现券送了,状态没更新,重复送券的问题。一定要注意事务的正确处理。

三、单线程、多线程处理 task

不管是用单线程还是多线程,都要考虑有大量 task 的情况,所以不能一次把所有符合条件的 task 都读取到内存里,一定要分页。

单机单线程

不用考虑数据被其他线程重复处理的情况,顺序处理即可:取一批数据处理,处理完了再取下一批,直到所有的都处理完了。

单机多线程

数据量大了,就不能用单线程慢慢地处理了。可以采用一个线程去读取未处理的 task,然后提交到线程池去处理,等这批 task 处理完后再去读取下一批,主流程如下:


// 直接使用 ThreadPoolExecutor 是为了使线程池的线程有特定的名字,任务队列有边界。 ExecutorService executorService = new ThreadPoolExecutor(0, 10, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000), // 有界队列 new ThreadFactory() { // 使用定制的 private AtomicInteger counter = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("task-handler-" + counter.incrementAndGet()); return null; } }, new CallerRunsPolicy()); do { List<AsyncTask> tasks = getUnhandleTask(); if (tasks.isEmpty()) { break; } List<Callable<Object>> callables = convert2callables(tasks); executorService.invokeAll(callables); } while (true); executorService.shutdown();

线程池采用 CallerRunsPolicy 策略是为了在线程池处理不完任务,线程池的任务队列满的时候,读取 task 的线程可以直接处理 task,这样既减缓了 task 的读取速度,又可以加快 task 的处理速度。

多机处理

数据量实在太多,一台机器处理不完,可以用多台。

在多机处理的时候,上面的代码就有问题了,task 可能在不同的机器上被重复处理。

任务被 getUnhandleTask() 方法读取处理后、处理完成前,另一台机器上的线程也读取到了这个任务,发现是未处理的,它也会进行处理,这样就出现重复处理了。正确的主流程如下:

  public class AsyncTask {
    private long id;
    private int status;

    public static enum STATUS {
        UNHANDLE, HANDLING
    }

    public long getId() {
        return id;
    }

    public void setId(long id) {
        this.id = id;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }
}

public class TestTaskHandle {

    private Callable<Object> convert(final AsyncTask task) {
        return new Callable<Object>() {

            @Override
            public Object call() throws Exception {
                return doWithTask(task);
            }
        };
    }

    private Object doWithTask(AsyncTask task) {
        return null;
    }

    private List<AsyncTask> getUnhandleTask() {
        return null;
    }

    public void multiMachine() {

        ExecutorService executorService = new ThreadPoolExecutor(0, 10, 5,
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000),
                new ThreadFactory() {
                    private AtomicInteger counter = new AtomicInteger(0);

                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        thread.setName("task-handler-"
                                + counter.incrementAndGet());
                        return null;
                    }

                }, new CallerRunsPolicy());

        do {
            List<AsyncTask> tasks = getUnhandleTask();
            if (tasks.isEmpty()) {
                break;
            }

            for (AsyncTask asyncTask : tasks) {
                // 把 RDBMS 的 update 操作当作一个 CAS 命令
                boolean isSuccess = updateStatus(asyncTask.getId(),
                        AsyncTask.STATUS.UNHANDLE, AsyncTask.STATUS.HANDLING);
                if (isSuccess) {
                    // 把 task 更新为处理中,成功表示抢占到了这个任务,可以继续处理

                    executorService.submit(convert(asyncTask));

                } // else 被其他线程处理了
            }

        } while (true);

        executorService.shutdown();
    }

    public boolean updateStatus(long id, AsyncTask.STATUS oldStatus,
            AsyncTask.STATUS newStatus) {
        return true;
    }
} 

在上面的实现中,每一条记录都需要通过一个数据库的 update 操作来判断是否可以继续处理,开销不小。一个改进的做法是:在 async_task 表增加一个 owner 字段,每个线程使用一个唯一的标识 tid(比如 UUID)。当 task 读取线程要读取任务时,先对 async_task 表里的未处理 task 执行 update,把状态更新为处理中, owner 更新为自己的 tid。如果这个 update 的影响行数大于 0,表示抢占到了任务,然后根据 tid 去读取任务,再分发给线程池去处理。

在存在并发竞争的情景下,很重要的就是借助数据库事务的ACID来达到一种 CAS 的效果;正确处理并发问题总是需要基础的 CAS 操作或锁。


欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据