HA

记一次多线程异常

问题描述

有一个抽数据库数据写入文件的定时任务,发现有些文件数据不全,导入一半就结束了。

实现说明

service是共用的同一个,在Quartz类中定义多个@Scheduled注解的方法,并分别调用service方法

问题代码

// FileService.java
@Service
public class FileService {

    private FileInfo fileInfo = new FileInfo();

    private int batchSize = 500;

    public boolean export2File(Integer type) {
        fileInfo = dao.getList(type); 

        batchSize = fileInfo.getPageNum();
        push(fileInfo.getName());

        return result;
    }

    private boolean push(Integer fileName) {
        // create file with name fileName
        
        for(int pageNo = 1;;i++) {
            List<Row> data = getPageData(pageNo, fileName);
            // write to file with data
            if (data == null && data.size() < batchSize) {
                break;
            }
        }

        return result;
    }

    private List<Row> getPageData(int pageNo, String fileName) {
        IPage<LinkedHashMap<String, Object>> page = new Page<>(pageNo, pageSize);

        list = dataDao.select(page, fileInfo);

        return list;
    }

}

// FileQuartz.java
@Component
public class FileQuartz {

    @Autowired
    private FileService fileService;

    @Scheduled(cron = "0 0 5 * * ?")
    public boolean executeType1() {
        return execute(1, null);
    }

    @Scheduled(cron = "5 0 5 * * ?")
    public boolean executeType2() {
        return execute(2, null);
    }

    @Scheduled(cron = "10 0 5 * * ?")
    public boolean executeType3() {
        return execute(3, null);
    }

    public boolean execute(Integer type) {
        return fileService.export2File(type);
    }
}

问题分析

问题出在FileService是单例的,运行时里面的成员变量fileInfo和batchSize会被三个线程同时操作,最后启动的线程中赋值的就会起作用,导致导出错误数据,push方法里的for循环的break条件也会很快跳出

问题解决

使用ThreadLocal包装fileInfo和batchSize

ThreadLocal的内存泄漏问题

最佳实践是每次使用完后都调用remove()方法清除对象

try {
    threadlocal.set(value);
    ...
} finally {
    threadlocal.remove();
}

使用线程池时的ThreadLocal复用问题

  • 可以线程处理结束时手动清理
  • 重载ThreadPoolExecutor的afterExecute方法执行清理

Bing AI生成的示例代码

public class MyTask implements Runnable {
    private static final ThreadLocal<String> local = new ThreadLocal<>();

    @Override
    public void run() {
        local.set("some value");
        // do something
        // method 1 
        // local.remove();
    }

    public void clean() {
        local.remove();
    }
}

public class MyThreadPoolExecutor extends ThreadPoolExecutor {
    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                 BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (r instanceof MyTask) {
            ((MyTask) r).clean();
        }
    }
}

reference

  1. An Introduction to ThreadLocal in Java
  2. ThreadLocal in Java - Example Program and Tutorial
  3. ThreadLocal Memory Leak in Java web application - Tomcat
  4. Classloader leaks IV – ThreadLocal dangers and why ThreadGlobal may have been a more appropriate name
  5. How to shoot yourself in foot with ThreadLocals