package com.hx.util.thread; import lombok.Setter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 创建线程池的,主要避免线程队列超,导致服务器爆了 */ public class ExecutorServiceTool { /**log4j日志*/ private static final Logger logger = LoggerFactory.getLogger(ExecutorServiceTool.class.getName()); /**线程池-获取队列的*/ private ThreadPoolExecutor threadPool; /**最大队列*/ private int maxQueue; /**最大线程数*/ private int maxTread; /**创建队列*/ private ThreadTaskLinkedBlockingDeque queue = new ThreadTaskLinkedBlockingDeque<>(); public ExecutorServiceTool() { } public ExecutorServiceTool(int initTread, int maxQueue) { createThread(initTread,maxQueue); } public ExecutorServiceTool(int initTread, int maxTread, int maxQueue) { this.maxTread = maxTread; createThread(initTread,maxTread,maxQueue); } /**创建线程池 * @param initTread 初始化线程池 * @param maxQueue 最大队列新增队列,注意:这里只做传值,不限制真对 * 线程池限制队列,因为限制了会报错,导致数据丢失 * @return 线程池 */ public ThreadPoolExecutor createThread(int initTread,int maxQueue){ this.threadPool = new ThreadPoolExecutor(initTread, initTread, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue()); this.maxQueue = maxQueue; return this.threadPool; } /**创建线程池 * @param initTread 初始化线程池 * @param initTread 最大线程数 * @param maxQueue 最大队列新增队列,注意:这里只做传值,不限制真对 * 线程池限制队列,因为限制了会报错,导致数据丢失 * @return 线程池 */ public ThreadPoolExecutor createThread(int initTread,int maxTread,int maxQueue){ this.threadPool = new ThreadPoolExecutor(initTread,maxTread,60L,TimeUnit.SECONDS,queue); this.maxQueue = maxQueue; return this.threadPool; } /**循环校验队列数量模式,针对最大的队列,返回true就是可以加入队列 * 避免死循环,设置了循环校验次数 * @param sleepMillisecond 睡眠,毫秒秒,如果是空的,那么直接返回 * @param frequency 循环校验次数,空值默认100次数 * @return 最后都是返回true */ public boolean whileCheckQueue(Integer sleepMillisecond,Integer frequency){ if(frequency == null){ frequency = 100; } while (frequency>0){ if(!noRund(sleepMillisecond)){ return true; } frequency--; } return true; } /**针对最大的队列,如果没有超过返回是false的,超过就返回是true的 * 单返回true的时候,就不要传入队列了 * @param sleepMillisecond 睡眠,毫秒秒,如果是空的,那么直接返回 * @return 如果没有超过返回是false的,超过就返回是true的 */ public boolean noRund(Integer sleepMillisecond){ if(threadPool.getQueue().size() > maxQueue){ if(sleepMillisecond != null && sleepMillisecond > 0){ try{ Thread.sleep(sleepMillisecond); }catch (Exception e){ logger.error("线程池开启睡眠失败!"); } } return true; }else{ return false; } } /**关闭*/ public void shutdown(){ this.threadPool.shutdown(); } /** * 自定义线程任务阻塞队列. 在活跃线程数小于最大支持线程数的情况下,新任务不放到队列从而激发线程池创建新线程及时处理. * 解决使用LinkedBlockingDeque无限队列,线程池只有核心线程在处理。maximumPoolSize未启作用的问题。 * * @author chenjiahe */ @Setter private class ThreadTaskLinkedBlockingDeque extends LinkedBlockingDeque { @Override public boolean offer(E e) { int activeThreadNum = threadPool.getActiveCount(); if (activeThreadNum < maxTread) { return false; } return offerLast(e); } } /****************************************************************************************/ public int getMaxQueue() { return maxQueue; } public void setMaxQueue(int maxQueue) { this.maxQueue = maxQueue; } public ThreadPoolExecutor getThreadPool() { return threadPool; } public void setThreadPool(ThreadPoolExecutor threadPool) { this.threadPool = threadPool; } }