记一次多线程异常
问题描述
有一个抽数据库数据写入文件的定时任务,发现有些文件数据不全,导入一半就结束了。
实现说明
service是共用的同一个,在Quartz类中定义多个@Scheduled注解的方法,并分别调用service方法
问题代码
// FileService.java
@Service
public class FileService {
private FileInfo fileInfo = new FileInfo();
private int batchSize = 500;
public boolean export2File(Integer type) {
fileInfo = dao.getList(type);
batchSize = fileInfo.getPageNum();
push(fileInfo.getName());
return result;
}
private boolean push(Integer fileName) {
// create file with name fileName
for(int pageNo = 1;;i++) {
List<Row> data = getPageData(pageNo, fileName);
// write to file with data
if (data == null && data.size() < batchSize) {
break;
}
}
return result;
}
private List<Row> getPageData(int pageNo, String fileName) {
IPage<LinkedHashMap<String, Object>> page = new Page<>(pageNo, pageSize);
list = dataDao.select(page, fileInfo);
return list;
}
}
// FileQuartz.java
@Component
public class FileQuartz {
@Autowired
private FileService fileService;
@Scheduled(cron = "0 0 5 * * ?")
public boolean executeType1() {
return execute(1, null);
}
@Scheduled(cron = "5 0 5 * * ?")
public boolean executeType2() {
return execute(2, null);
}
@Scheduled(cron = "10 0 5 * * ?")
public boolean executeType3() {
return execute(3, null);
}
public boolean execute(Integer type) {
return fileService.export2File(type);
}
}
问题分析
问题出在FileService是单例的,运行时里面的成员变量fileInfo和batchSize会被三个线程同时操作,最后启动的线程中赋值的就会起作用,导致导出错误数据,push方法里的for循环的break条件也会很快跳出
问题解决
使用ThreadLocal包装fileInfo和batchSize
ThreadLocal的内存泄漏问题
最佳实践是每次使用完后都调用remove()方法清除对象
try {
threadlocal.set(value);
...
} finally {
threadlocal.remove();
}
使用线程池时的ThreadLocal复用问题
- 可以线程处理结束时手动清理
- 重载ThreadPoolExecutor的afterExecute方法执行清理
Bing AI生成的示例代码
public class MyTask implements Runnable {
private static final ThreadLocal<String> local = new ThreadLocal<>();
@Override
public void run() {
local.set("some value");
// do something
// method 1
// local.remove();
}
public void clean() {
local.remove();
}
}
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (r instanceof MyTask) {
((MyTask) r).clean();
}
}
}
reference
Updated: 2023-07-05 23:48
Created: 2023-02-18 20:00