chenjiahe
2024-04-15 c2bb89da722d5b5db5f7151546369eb84b775a05
提交 | 用户 | age
d4fa52 1 package com.hx.util.thread;
C 2
3 import lombok.Setter;
4 import org.slf4j.Logger;
5 import org.slf4j.LoggerFactory;
6
7 import java.util.concurrent.LinkedBlockingDeque;
8 import java.util.concurrent.LinkedBlockingQueue;
9 import java.util.concurrent.ThreadPoolExecutor;
10 import java.util.concurrent.TimeUnit;
11
12
13 /**
14  * 创建线程池的,主要避免线程队列超,导致服务器爆了
15  */
16 public class ExecutorServiceTool {
17
18     /**log4j日志*/
19     private static final Logger logger = LoggerFactory.getLogger(ExecutorServiceTool.class.getName());
20
21     /**线程池-获取队列的*/
22     private ThreadPoolExecutor threadPool;
23     /**最大队列*/
24     private int maxQueue;
25     /**最大线程数*/
26     private int maxTread;
27     /**创建队列*/
28     private ThreadTaskLinkedBlockingDeque<Runnable> queue = new ThreadTaskLinkedBlockingDeque<>();
29
30     public ExecutorServiceTool() {
31     }
32
33     public ExecutorServiceTool(int initTread, int maxQueue) {
34         createThread(initTread,maxQueue);
35     }
36
37     public ExecutorServiceTool(int initTread, int maxTread, int maxQueue) {
38         this.maxTread = maxTread;
39         createThread(initTread,maxTread,maxQueue);
40     }
41
42
43     /**创建线程池
44      * @param initTread 初始化线程池
45      * @param maxQueue 最大队列新增队列,注意:这里只做传值,不限制真对
46      *                  线程池限制队列,因为限制了会报错,导致数据丢失
47      * @return 线程池
48      */
49     public ThreadPoolExecutor createThread(int initTread,int maxQueue){
50         this.threadPool = new ThreadPoolExecutor(initTread, initTread,
51                 60L, TimeUnit.SECONDS,
52                 new LinkedBlockingQueue<Runnable>());
53         this.maxQueue = maxQueue;
54         return this.threadPool;
55     }
56
57     /**创建线程池
58      * @param initTread 初始化线程池
59      * @param initTread 最大线程数
60      * @param maxQueue 最大队列新增队列,注意:这里只做传值,不限制真对
61      *                  线程池限制队列,因为限制了会报错,导致数据丢失
62      * @return 线程池
63      */
64     public ThreadPoolExecutor createThread(int initTread,int maxTread,int maxQueue){
65         this.threadPool = new ThreadPoolExecutor(initTread,maxTread,60L,TimeUnit.SECONDS,queue);
66         this.maxQueue = maxQueue;
67         return this.threadPool;
68     }
69
c2bb89 70     /**循环校验队列数量模式,针对最大的队列,返回true就是可以加入队列
C 71      * 避免死循环,设置了循环校验次数
72      * @param sleepMillisecond 睡眠,毫秒秒,如果是空的,那么直接返回
73      * @param frequency 循环校验次数,空值默认100次数
74      * @return 最后都是返回true
75      */
76     public boolean whileCheckQueue(Integer sleepMillisecond,Integer frequency){
77         if(frequency == null){
78             frequency = 100;
79         }
80         while (frequency>0){
81             if(!noRund(sleepMillisecond)){
82                 return true;
83             }
84             frequency--;
85         }
86         return true;
87     }
88
d4fa52 89     /**针对最大的队列,如果没有超过返回是false的,超过就返回是true的
C 90      * 单返回true的时候,就不要传入队列了
91      * @param sleepMillisecond 睡眠,毫秒秒,如果是空的,那么直接返回
92      * @return 如果没有超过返回是false的,超过就返回是true的
93      */
94     public boolean noRund(Integer sleepMillisecond){
95         if(threadPool.getQueue().size() > maxQueue){
96             if(sleepMillisecond != null && sleepMillisecond > 0){
97                 try{
98                     Thread.sleep(sleepMillisecond);
99                 }catch (Exception e){
100                     logger.error("线程池开启睡眠失败!");
101                 }
102             }
103             return true;
104         }else{
105             return false;
106         }
107     }
108
109     /**关闭*/
110     public void shutdown(){
111         this.threadPool.shutdown();
112     }
113
114     /**
115      * 自定义线程任务阻塞队列. 在活跃线程数小于最大支持线程数的情况下,新任务不放到队列从而激发线程池创建新线程及时处理.
116      * 解决使用LinkedBlockingDeque无限队列,线程池只有核心线程在处理。maximumPoolSize未启作用的问题。
117      *
118      * @author chenjiahe
119      */
120     @Setter
121     private class ThreadTaskLinkedBlockingDeque<E> extends LinkedBlockingDeque<E> {
122         @Override
123         public boolean offer(E e) {
124             int activeThreadNum = threadPool.getActiveCount();
125             if (activeThreadNum < maxTread) {
126                 return false;
127             }
128             return offerLast(e);
129         }
130     }
131
132     /****************************************************************************************/
133
134     public int getMaxQueue() {
135         return maxQueue;
136     }
137
138     public void setMaxQueue(int maxQueue) {
139         this.maxQueue = maxQueue;
140     }
141
142     public ThreadPoolExecutor getThreadPool() {
143         return threadPool;
144     }
145
146     public void setThreadPool(ThreadPoolExecutor threadPool) {
147         this.threadPool = threadPool;
148     }
149
150 }