当前位置:首页 > 艺术

支付宝:多线程事务怎么回滚?说用@Transactional可以回去等通知了!

(微信小程序):5000+道面试题和选择题,包含Java基础、并发、JVM、线程、MQ系列、Redis、Spring系列、Elasticsearch、Docker、K8s、Flink、Spark、架构设计、大厂真题等,在线随时刷题!背景介绍公用的类和方法示例事务不成功操作1 背景介绍1,最近有一个大数据量插入的操作入库的业务场景,需要先做一些其他修改操作,然后在执行插入操作,由于插入数据可能会很多,用到多线程去拆分数据并行处理来提高响应时间,如果有一个线程执行失败,则全部回滚。2,在spring中可以使用@Transactional注解去控制事务,使出现异常时会进行回滚,在多线程中,这个注解则不会生效,如果主线程需要先执行一些修改数据库的操作,当子线程在进行处理出现异常时,主线程修改的数据则不会回滚,导致数据错误。3,下面用一个简单示例演示多线程事务。2 公用的类和方法* 平均拆分list方法.* @param source* @param n* @param* @returnpublic static List> averageAssign(List source,int n){List> result=new ArrayList>();int remaider=source.size()%n;int number=source.size()/n;int offset=0;//偏移量for(int i=0;i List value=null;if(remaider>0){value=source.subList(i*number+offset, (i+1)*number+offset+1);remaider--;offset++;}else{value=source.subList(i*number+offset, (i+1)*number+offset);result.add(value);return result;/** 线程池配置* @version V1.0public class ExecutorConfig {private static int maxPoolSize = Runtime.getRuntime().availableProcessors();private volatile static ExecutorService executorService;public static ExecutorService getThreadPool() {if (executorService == null){synchronized (ExecutorConfig.class){if (executorService == null){executorService = newThreadPool();return executorService;private static ExecutorService newThreadPool(){int queueSize = 500;int corePool = Math.min(5, maxPoolSize);return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(queueSize),new ThreadPoolExecutor.AbortPolicy());}private ExecutorConfig(){}}/** 获取sqlSession* @author 86182* @version V1.0*/@Componentpublic class SqlContext {@Resourceprivate SqlSessionTemplate sqlSessionTemplate;public SqlSession getSqlSession(){SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();return sqlSessionFactory.openSession();}}3 示例事务不成功操作/*** 测试多线程事务.* @param employeeDOList*/@Override@Transactionalpublic void saveThread(List employeeDOList) {try {//先做删除操作,如果子线程出现异常,此操作不会回滚this.getBaseMapper().delete(null);//获取线程池ExecutorService service = ExecutorConfig.getThreadPool();//拆分数据,拆分5份List> lists=averageAssign(employeeDOList, 5);//执行的线程Thread []threadArray = new Thread[lists.size()];//监控子线程执行完毕,再执行主线程,要不然会导致主线程关闭,子线程也会随着关闭CountDownLatch countDownLatch = new CountDownLatch(lists.size());AtomicBoolean atomicBoolean = new AtomicBoolean(true);for (int i =0;i if (i==lists.size()-1){atomicBoolean.set(false);}List list = lists.get(i);threadArray[i] = new Thread(() -> {try {//最后一个线程抛出异常if (!atomicBoolean.get()){throw new ServiceException("001","出现异常");}//批量添加,mybatisPlus中自带的batch方法this.saveBatch(list);}finally {countDownLatch.countDown();}});}for (int i = 0; i service.execute(threadArray[i]);}//当子线程执行完毕时,主线程再往下执行countDownLatch.await();System.out.println("添加完毕");}catch (Exception e){log.info("error",e);throw new ServiceException("002","出现异常");}finally {connection.close();}}数据库中存在一条数据://测试用例@RunWith(SpringRunner.class)@SpringBootTest(classes = { ThreadTest01.class, MainApplication.class})public class ThreadTest01 {@Resourceprivate EmployeeBO employeeBO;/*** 测试多线程事务.* @throws InterruptedException*/@Testpublic void MoreThreadTest2() throws InterruptedException {int size = 10;List employeeDOList = new ArrayList<>(size);for (int i = 0; i EmployeeDO employeeDO = new EmployeeDO();employeeDO.setEmployeeName("lol"+i);employeeDO.setAge(18);employeeDO.setGender(1);employeeDO.setIdNumber(i+"XX");employeeDO.setCreatTime(Calendar.getInstance().getTime());employeeDOList.add(employeeDO);}try {employeeBO.saveThread(employeeDOList);System.out.println("添加成功");}catch (Exception e){e.printStackTrace();}}}测试结果:可以发现子线程组执行时,有一个线程执行失败,其他线程也会抛出异常,但是主线程中执行的删除操作,没有回滚,@Transactional注解没有生效。推荐程序员摸鱼地址: https://www.yoodb.com/slack-off/home.html使用sqlSession控制手动提交事务@ResourceSqlContext sqlContext;* 测试多线程事务.* @param employeeDOList@Overridepublic void saveThread(List employeeDOList) throws SQLException {// 获取数据库连接,获取会话(内部自有事务)SqlSession sqlSession = sqlContext.getSqlSession();Connection connection = sqlSession.getConnection();try {// 设置手动提交connection.setAutoCommit(false);//获取mapperEmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);//先做删除操作employeeMapper.delete(null);//获取执行器ExecutorService service = ExecutorConfig.getThreadPool();List> callableList = new ArrayList<>();//拆分listList> lists=averageAssign(employeeDOList, 5);AtomicBoolean atomicBoolean = new AtomicBoolean(true);for (int i =0;i if (i==lists.size()-1){atomicBoolean.set(false);List list = lists.get(i);//使用返回结果的callable去执行,Callable callable = () -> {//让最后一个线程抛出异常if (!atomicBoolean.get()){throw new ServiceException("001","出现异常");return employeeMapper.saveBatch(list);callableList.add(callable);//执行子线程List> futures = service.invokeAll(callableList);for (Future future:futures) {//如果有一个执行不成功,则全部回滚if (future.get()<=0){connection.rollback();return;connection.commit();System.out.println("添加完毕");}catch (Exception e){connection.rollback();log.info("error",e);throw new ServiceException("002","出现异常");}finally {connection.close();// sql"saveBatch" parameterType="List">INSERT INTOemployee (employee_id,age,employee_name,birth_date,gender,id_number,creat_time,update_time,status)values"list" item="item" index="index" separator=",">#{item.employeeId},#{item.age},#{item.employeeName},#{item.birthDate},#{item.gender},#{item.idNumber},#{item.creatTime},#{item.updateTime},#{item.status}数据库中一条数据:测试结果:抛出异常,删除操作的数据回滚了,数据库中的数据依旧存在,说明事务成功了。成功操作示例:@ResourceSqlContext sqlContext;* 测试多线程事务.* @param employeeDOList@Overridepublic void saveThread(List employeeDOList) throws SQLException {// 获取数据库连接,获取会话(内部自有事务)SqlSession sqlSession = sqlContext.getSqlSession();Connection connection = sqlSession.getConnection();try {// 设置手动提交connection.setAutoCommit(false);EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);//先做删除操作employeeMapper.delete(null);ExecutorService service = ExecutorConfig.getThreadPool();List> callableList = new ArrayList<>();List> lists=averageAssign(employeeDOList, 5);for (int i =0;i List list = lists.get(i);Callable callable = () -> employeeMapper.saveBatch(list);callableList.add(callable);//执行子线程List> futures = service.invokeAll(callableList);for (Future future:futures) {if (future.get()<=0){connection.rollback();return;connection.commit();System.out.println("添加完毕");}catch (Exception e){connection.rollback();log.info("error",e);throw new ServiceException("002","出现异常");// throw new ServiceException(ExceptionCodeEnum.EMPLOYEE_SAVE_OR_UPDATE_ERROR);测试结果:数据库中数据:删除的删除了,添加的添加成功了,测试成功。作者:weixin_43225491 https://blog.csdn.net/weixin_43225491/article/details/117705686公众号“Java精选”所发表内容注明来源的,版权归原出处所有(无法查证版权的或者未注明出处的均来自网络,系转载,转载的目的在于传递更多信息,版权属于原作者。如有侵权,请联系,笔者会第一时间删除处理!最近有很多人问,有没有读者交流群!加入方式很简单,公众号Java精选,回复“加群”,即可入群!(微信小程序):3000+道面试题,包含Java基础、并发、JVM、线程、MQ系列、Redis、Spring系列、Elasticsearch、Docker、K8s、Flink、Spark、架构设计等,在线随时刷题!特别推荐:专注分享最前沿的技术与资讯,为弯道超车做好准备及各种开源项目与高效率软件的公众号,「大咖笔记」,专注挖掘好东西,非常值得大家关注。点击下方公众号卡片关注。文章有帮助的话,点在看,转发吧!

特别声明:以上内容(如有图片或视频亦包括在内)为自媒体平台“号”用户上传并发布,本平台仅提供信息存储服务。

分享到: