Springboot multithreading

  • Create a new AsyncTaskConfig and turn on @EnableAsync
  • Create a new iAsyncService interface and its implementation class, create a new method, and open @Async
  • AsyncService, which calls a multithreaded method

AsyncTaskConfig

@Configuration@EnableAsync public class AsyncTaskConfig implements AsyncConfigurer {// ThredPoolTaskExcutor When the pool size is less than corePoolSize, create a new thread and process the request // When the pool size is equal to corePoolSize and put the request into the workQueue, the idle thread in the pool will fetch the task from the workQueue and process it // When the workQueue cannot fit the task, With RejectedExecutionHandler, if the pool size reaches MaximumPoolSize, then with RejectedExecutionHandler, then with RejectedExecutionHandler, then with RejectedExecutionHandler, then with RejectedExecutionHandler, then with RejectedExecutionHandler, then with RejectedExecutionHandler, then with RejectedExecutionHandler. Excess threads will wait for keepAliveTime for a long time, Public Executor getAsyncExecutor() {ThreadPoolTaskExecutor ThreadPool = new ThreadPoolTaskExecutor(); / / set the core number of threads threadPool. SetCorePoolSize (10); / / set the maximum number of threads threadPool. SetMaxPoolSize (15); / / the buffer queue used by the thread pool threadPool. SetQueueCapacity (20); / / wait for the task completed in shutdown that all threads waiting for the execution of the threadPool. SetWaitForTasksToCompleteOnShutdown (true); / / wait time (the default is 0, immediately stop) at this time, did not wait for xx seconds later forced to stop threadPool. SetAwaitTerminationSeconds (60); / / thread name prefix threadPool. SetThreadNamePrefix (MDS - "async task -"); Initialize ThreadPool. Initialize (); return threadPool; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return null; } // @Bean("doSomethingExecutor") // public Executor doSomethingExecutor() { // ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // // Number of core threads: number of threads initialized when the thread pool was created // executor.setCorePoolSize(10); // // Maximum number of threads: Maximum number of threads in the thread pool that will only apply for more than the core number of threads after the buffer queue is full // executor.setMaxPoolSize(20); / / / / the buffer queue: used to buffer the task queue. / / executor setQueueCapacity (500); / / / / 60 seconds to allow threads free time: when more than outside the core thread thread after the arrival of free time will be destroyed / / executor setKeepAliveSeconds (60); / / / / the thread pool name prefix: set up after can facilitate our positioning processing tasks in the thread pool / / executor setThreadNamePrefix (" do something - "); / / / / the buffer queue full of refusal strategies: after processing by the calling thread (typically the main thread) / / executor setRejectedExecutionHandler (new ThreadPoolExecutor. DiscardPolicy ()); // executor.initialize(); // return executor; / /}}

IAsyncService

public interface IAsyncService { Future<Long> saveItemNewInfoAsync(Long shopId) ; } @Slf4j @Service public class AsyncServiceImpl implements IAsyncService { @Value("${url.}") private String Url; @Value("${url.. account}") private String Account; @Value("${url.. appcode}") private String Apppcode; @Autowired private RestTemplate restTemplate; @Autowired private ShopMapper ShopMapper; @Autowired private ShopNewInfoMapper ShopNewInfoMapper; @Autowired private ItemNewInfoMapper ItemNewInfoMapper; @Async @Override public Future<Long> saveItemNewInfoAsync(Long shopId) { String url=Url+"/item-new-info? account="+Account+"&appcode="+Apppcode; int pageNo=1; int pageSize=200; while (pageNo>0){ String reqUrl=url+"&shopId="+shopId+"&pageNo="+pageNo+"&pageSize="+pageSize; ShopId ={}, current page number ={}, number of pages ={}",shopId,pageNo,pageSize); ParameterizedTypeReference<ItemNewInfoDto> reference = new ParameterizedTypeReference<ItemNewInfoDto>() {}; ResponseEntity<ItemNewInfoDto> entity = restTemplate.exchange(reqUrl, HttpMethod.GET,null,reference); ItemNewInfoDto rsp = entity.getBody(); List<ItemNewInfoWithBLOBs> rspList=new ArrayList<>(); if( rsp.getResult()==null|| CollectionUtils.isEmpty(rsp.getResult().getResultList())){ pageNo=0; }else { pageNo++; rspList=rsp.getResult().getResultList(); //if(rsp.getResult() instanceof ArrayList<? >){ List<Object> ids= rspList.stream().map(t->t.getId()).collect(Collectors.toList()); Mapper.markFail("_item_new_info","id",ids); ItemNewInfoMapper.insertBatch(rspList); } the info (" end of 2, goods yesterday's latest data shopId = {}, the current page number = {}, each page number = {}, the result = {} ", shopId, pageNo, pageSize, rspList. The size ()); } return new AsyncResult<>(shopId); }}

ServiceImpl

@Autowired private IAsyncService AsyncService; private final Integer SELECT_LIMIT=1000; Public Integer SaveItemNewInfo (){String url= url +"/item-new-info? account="+Account+"&appcode="+Apppcode; int count=1; while (count>0){ List<Shop> findList = this.findShopPage(SELECT_LIMIT,(count-1)*SELECT_LIMIT); if(findList.size()==0){ count=0; }else { count++; } Integer threadCount=5; Int times = (int) Math.ceil(findList.size() / 5.0); for(int i=0; i<times; i++){ List<Long> shopIds= findList.subList(i*threadCount,Math.min ((i+1)*threadCount,findList.size())) .stream() .map(t->t.getShopId()).collect(Collectors.toList()); this.doSaveItemNewInfo(shopIds); } } return 1; } @SneakyThrows private Integer doSaveItemNewInfo(List<Long> shopIds){ List<Future> futureList=new ArrayList<>(); for(Long shopId :shopIds){ futureList.add(AsyncService.saveItemNewInfoAsync(shopId)) ; } for (Future<Long> Future: FutureList) {while (true) {//CPU high-speed polling: Each future is concurrently rotated, and the completion status is determined and the result is obtained. This line is the essence of this implementation. If (future.isDone() &&!) if (future.isDone() &&!) if (future.isDone() &&! Future.isCancelled ()) {// Get the Future completed status. If you want to limit the timeout for each task, +future.get(1000*1, TimeUnit.milliseconds)+catch timeout Long shopId = future.get(); Log. Info (" 1, shopId=" + future.get()); // futureList.remove(future); break; // Thread.sleep(1); // Thread.sleep(1); // Thread.sleep(1); } } } return 1; }