5ycode
被管理耽误的架构师。工作、学习过程中的知识总结与分享,jvm,多线程,架构设计,经验分享等。
公众号
-
线程池异步处理,消费速度过慢,导致任务积压,响应过慢,或者队列有限,导致提交被拒绝;
-
使用线程池做并行请求的时候,请求量过大,处理积压,导致响应变慢;
-
业务评估不准确,导致线程池资源设置的合理;
/**
* 创建可监控的线程池
* @author yxkong
* @version 1.0
* @date 2021/3/22 13:29
*/
public class ThreadPoolExecutorMonitor extends ThreadPoolExecutor {
public ThreadPoolExecutorMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public ThreadPoolExecutorMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public ThreadPoolExecutorMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public ThreadPoolExecutorMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
public void shutdown() {
//获取执行任务
this.getCompletedTaskCount();
//获取正在运行的线程数
this.getActiveCount();
//获取任务数
this.getTaskCount();
//队列剩余个数
this.getQueue().size();
super.shutdown();
}
public List<Runnable> shutdownNow() {
return super.shutdownNow();
}
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
}
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
//获取线程执行结果
Object result = ((Future<?>) r).get();
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null) {
//处理异常
System.out.println(t);
}
//记录线程执行时间
}
}
- ThreadFactory:是了为了线程的命名,方便统一管理;
- BlockingQueue:是为能动态调整队列的长度(数组扩缩容时,需要考虑锁以及性能,链表不用考虑)
- RejectedExecutionHandler: 队列满了如何处理(可以动态扩容,小心把jvm撑爆,或者无法创建队列)
public class NamedThreadFactory implements ThreadFactory, Serializable {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public NamedThreadFactory(String name) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = name +poolNumber.getAndIncrement() +"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon()){
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY){
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
//自定义 LinkedBlockingQueue,将队列长度对外暴露可修改
public class CustomLinkedBlockingQueue <E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
}
public class MyRejectPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//自定义处理逻辑 ,比如监控报警,队列满了
}
}
/**
* 自定义业务线程池
* @return
*/
"bizThreadPool") (
public ThreadPoolExecutor bizThreadPool(){
return new ThreadPoolExecutor(5,
10,
200,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10),
new NamedThreadFactory("bizThreadPool"));
}
/**
* 自定义log线程池
* @return
*/
"logThreadPool") (
public ThreadPoolExecutor logThreadPool(){
return new ThreadPoolExecutor(5,
10,
200,
TimeUnit.SECONDS,
new CustomLinkedBlockingQueue<>(10),
new NamedThreadFactory("bizThreadPool"));
public class ThreadPoolController {
/**
* 收集所有的线程池,线程池建议自己手动实现,不要用spring默认的
* 这里是偷懒了,用了spring的特性,如果是java项目,实现后自己注册
*/
public Map<String, ThreadPoolExecutor> map;
/**
* 获取所有的线程池
* @return
*/
public ResultBean<Map<String,ThreadPoolExecutor>> list(){
return ResultBeanUtil.success("获取所有线程池成功!",map);
}
public ResultBean<ThreadPoolExecutor> getThreadPool(String threadPool){
ThreadPoolExecutor executor = map.get(threadPool);
if(executor == null){
return ResultBeanUtil.noData("未找到对应的线程池");
}
return ResultBeanUtil.success("获取线程池成功!",executor);
}
public ResultBean<ThreadPoolExecutor> modifyThreadPool(String threadPool,Integer coreSize,Integer maximumPoolSize,Integer capacity){
ThreadPoolExecutor executor = map.get(threadPool);
if(executor == null){
return ResultBeanUtil.noData("未找到对应的线程池");
}
executor.setCorePoolSize(coreSize);
executor.setMaximumPoolSize(maximumPoolSize);
//启动所有的核心线程数,getTask中不会根据核心线程数修改workers,如果再有新线程,会动态调整
executor.prestartAllCoreThreads();
//如果将线程池改小,设置下,默认核心线程数是不会回收的
executor.allowCoreThreadTimeOut(true);
BlockingQueue<Runnable> queue = executor.getQueue();
if(queue instanceof CustomLinkedBlockingQueue){
CustomLinkedBlockingQueue customQueue = (CustomLinkedBlockingQueue) queue;
customQueue.setCapacity(capacity);
}
return ResultBeanUtil.success("获取线程池成功!",executor);
}
public ResultBean<Void> test(String threadPool,Integer size){
if (size == null || size ==0){
return ResultBeanUtil.paramEmpty("size不能为空");
}
ThreadPoolExecutor executor = map.get(threadPool);
if(executor == null){
return ResultBeanUtil.noData("未找到对应的线程池");
}
for (int i = 0; i < size; i++) {
int finalI = i;
executor.submit(new Runnable() {
public void run() {
log.info("任务{}执行",Integer.valueOf(finalI));
}
});
}
return ResultBeanUtil.success();
}
}
/**
* 针对threadPoolExecutor 的增强
* @param instrumentation
*/
private static void threadPoolExecutor(Instrumentation instrumentation){
new AgentBuilder.Default()
.disableClassFormatChanges()
//默认是不对bootstrap类加载器加载的对象instrumentation,忽略某个type后,就可以了
.ignore(ElementMatchers.noneOf(ThreadPoolExecutor.class))
//
.with(AgentBuilder.InitializationStrategy.NoOp.INSTANCE)
//
.with(AgentBuilder.RedefinitionStrategy.REDEFINITION)
.with(AgentBuilder.TypeStrategy.Default.REDEFINE)
.with(AgentBuilder.InjectionStrategy.UsingUnsafe.INSTANCE)
.type(ElementMatchers.is(ThreadPoolExecutor.class))
//.or(ElementMatchers.hasSuperType(ElementMatchers.named("java.util.concurrent.Executor")))
//.or(ElementMatchers.hasSuperType(ElementMatchers.named("java.util.concurrent.ExecutorService")))
.transform((builder, typeDescription, classLoader, javaModule) ->
builder.visit(Advice.to(ThreadPoolExecutorFinalizeAdvice.class).on(ElementMatchers.named("finalize")))
.visit(Advice.to(ThreadPoolExecutorExecuteAdvice.class).on(ElementMatchers.named("execute")))
)
.installOn(instrumentation);
}
public class MonitorTest {
public void test(){
System.out.println(ThreadPoolMonitorData.class.getClassLoader());
System.out.println(ThreadPoolMonitorData.alls());
System.out.println(ThreadPoolMonitor.class.getClassLoader());
ThreadPoolExecutor pool= threadpool();
pool.submit(()->{
System.out.println("线程池pool执行中1:"+Thread.currentThread().getName());
});
pool.submit(()->{
System.out.println("线程池pool执行中2:"+Thread.currentThread().getName());
});
pool.submit(()->{
System.out.println("线程池pool执行中3:"+Thread.currentThread().getName());
});
ExecutorService executorService = threadpool1();
executorService.submit(()->{
System.out.println("线程池executorService执行中1:"+Thread.currentThread().getName());
});
ThreadPoolMonitorData.alls().forEach((key,val) ->{
System.out.println("ThreadPoolMonitorData key="+key+" val:"+val);
});
ThreadPoolMonitor monitor = new ThreadPoolMonitor();
monitor.alls().forEach((key,val)->{
System.out.println("ThreadPoolMonitor key="+key+" val:"+val);
});
try {
Thread.sleep(3000);
}catch (Exception e){
e.printStackTrace();
}
}
private ThreadPoolExecutor threadpool(){
ThreadPoolExecutor pool = new ThreadPoolExecutor(5,
10,
200,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10));
return pool;
}
private ExecutorService threadpool1(){
return Executors.newCachedThreadPool();
}
}
public class ThreadPoolExecutorExecuteAdvice {
/**
* 对所有的线程的execute 进入方法进行监听
* byteBuddy不支持对constructor
* @Advice.OnMethodEnter 必须作用与static方法
* @param obj
* @param abc
*/
.OnMethodEnter
public static void executeBefore( .This Object obj, .Argument(0) Object abc){
try{
ThreadPoolExecutor executor = (ThreadPoolExecutor) obj;
ThreadPoolMonitorData.add(executor.hashCode()+"",(ThreadPoolExecutor) obj);
}catch (Exception e){
e.printStackTrace();
}
}
}
null BootstrapClassLoader 输出是null
{}
sun.misc.Launcher$AppClassLoader@18b4aac2
线程池pool执行中1:pool-3-thread-1
线程池pool执行中2:pool-3-thread-2
线程池pool执行中3:pool-3-thread-3
线程池executorService执行中1:pool-4-thread-1
ThreadPoolMonitorData key=1564698139 val:java.util.concurrent.ThreadPoolExecutor@5d43661b[Running, pool size = 3, active threads = 0, queued tasks = 0, completed tasks = 3]
ThreadPoolMonitorData key=171421438 val:java.util.concurrent.ThreadPoolExecutor@a37aefe[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]
文章评论