线程池监控方案

2021/04/20 885点热度 0人点赞 0条评论

5ycode
5ycode

被管理耽误的架构师。工作、学习过程中的知识总结与分享,jvm,多线程,架构设计,经验分享等。
28篇原创内容

公众号

读了Java线程池实现原理及其在美团业务中的实践 后,我就想一个问题,如果让我去做这个线程池的监控,我该怎么做?
要对线程池进行监控,首先得明白,我们监控线程池的目的是什么?
监控是为了防患于未然,防止生产事故的发生。或者能在未发生时就进行入状态。
出问题线程池的现象:
  • 线程池异步处理,消费速度过慢,导致任务积压,响应过慢,或者队列有限,导致提交被拒绝;

  • 使用线程池做并行请求的时候,请求量过大,处理积压,导致响应变慢;

  • 业务评估不准确,导致线程池资源设置的合理;

对线程池监控的指标有以下几种:
1,队列饱和度;
2,单位时间内提交任务的速度远大于消费速度;
监控方案:
方案一:继承ThreadPoolExecutor对部分方法进行重写
/** * 创建可监控的线程池 * @author yxkong * @version 1.0 * @date 2021/3/22 13:29 */public class ThreadPoolExecutorMonitor  extends ThreadPoolExecutor {
public ThreadPoolExecutorMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); }
public ThreadPoolExecutorMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); }
public ThreadPoolExecutorMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); }
public ThreadPoolExecutorMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); }
@Override public void shutdown() { //获取执行任务 this.getCompletedTaskCount(); //获取正在运行的线程数 this.getActiveCount(); //获取任务数 this.getTaskCount(); //队列剩余个数 this.getQueue().size(); super.shutdown(); }
@Override public List<Runnable> shutdownNow() { return super.shutdownNow(); }
@Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); }
@Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t == null && r instanceof Future<?>) { try { //获取线程执行结果 Object result = ((Future<?>) r).get(); } catch (CancellationException ce) { t = ce; } catch (ExecutionException ee) { t = ee.getCause(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); // ignore/reset } } if (t != null) { //处理异常 System.out.println(t); } //记录线程执行时间 }}
方案二:自定义ThreadFactory、BlockingQueue、RejectedExecutionHandler
  • ThreadFactory:是了为了线程的命名,方便统一管理;
  • BlockingQueue:是为能动态调整队列的长度(数组扩缩容时,需要考虑锁以及性能,链表不用考虑)
  • RejectedExecutionHandler: 队列满了如何处理(可以动态扩容,小心把jvm撑爆,或者无法创建队列)
public class NamedThreadFactory implements ThreadFactory, Serializable {    private static final AtomicInteger poolNumber = new AtomicInteger(1);    private final ThreadGroup group;    private final AtomicInteger threadNumber = new AtomicInteger(1);    private final String namePrefix;
public NamedThreadFactory(String name) { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = name +poolNumber.getAndIncrement() +"-thread-"; }
@Override public Thread newThread(Runnable r) { Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()){ t.setDaemon(false); } if (t.getPriority() != Thread.NORM_PRIORITY){ t.setPriority(Thread.NORM_PRIORITY); } return t; }}//自定义 LinkedBlockingQueue,将队列长度对外暴露可修改public class CustomLinkedBlockingQueue <E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable }public class MyRejectPolicy implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //自定义处理逻辑 ,比如监控报警,队列满了 }}
自定义线程池
  /**     * 自定义业务线程池     * @return     */    @Bean("bizThreadPool")    public ThreadPoolExecutor bizThreadPool(){        return new ThreadPoolExecutor(5,                10,                200,                TimeUnit.SECONDS,                new LinkedBlockingQueue<>(10),                new NamedThreadFactory("bizThreadPool"));    }
/** * 自定义log线程池 * @return */ @Bean("logThreadPool") public ThreadPoolExecutor logThreadPool(){ return new ThreadPoolExecutor(5, 10, 200, TimeUnit.SECONDS, new CustomLinkedBlockingQueue<>(10), new NamedThreadFactory("bizThreadPool"));
针对线程池的监控以及动态调整
@RestController@RequestMapping("/threadpool")@Slf4jpublic class ThreadPoolController {
/** * 收集所有的线程池,线程池建议自己手动实现,不要用spring默认的 * 这里是偷懒了,用了spring的特性,如果是java项目,实现后自己注册 */
@Autowired public Map<String, ThreadPoolExecutor> map;
/** * 获取所有的线程池 * @return */ @GetMapping("/list") public ResultBean<Map<String,ThreadPoolExecutor>> list(){ return ResultBeanUtil.success("获取所有线程池成功!",map); } @GetMapping("/get") public ResultBean<ThreadPoolExecutor> getThreadPool(String threadPool){ ThreadPoolExecutor executor = map.get(threadPool); if(executor == null){ return ResultBeanUtil.noData("未找到对应的线程池"); } return ResultBeanUtil.success("获取线程池成功!",executor); } @PostMapping("/modify") public ResultBean<ThreadPoolExecutor> modifyThreadPool(String threadPool,Integer coreSize,Integer maximumPoolSize,Integer capacity){ ThreadPoolExecutor executor = map.get(threadPool); if(executor == null){ return ResultBeanUtil.noData("未找到对应的线程池"); } executor.setCorePoolSize(coreSize); executor.setMaximumPoolSize(maximumPoolSize); //启动所有的核心线程数,getTask中不会根据核心线程数修改workers,如果再有新线程,会动态调整 executor.prestartAllCoreThreads(); //如果将线程池改小,设置下,默认核心线程数是不会回收的 executor.allowCoreThreadTimeOut(true); BlockingQueue<Runnable> queue = executor.getQueue(); if(queue instanceof CustomLinkedBlockingQueue){ CustomLinkedBlockingQueue customQueue = (CustomLinkedBlockingQueue) queue; customQueue.setCapacity(capacity); } return ResultBeanUtil.success("获取线程池成功!",executor); } @PostMapping("test") public ResultBean<Void> test(String threadPool,Integer size){ if (size == null || size ==0){ return ResultBeanUtil.paramEmpty("size不能为空"); } ThreadPoolExecutor executor = map.get(threadPool); if(executor == null){ return ResultBeanUtil.noData("未找到对应的线程池"); } for (int i = 0; i < size; i++) { int finalI = i; executor.submit(new Runnable() { @Override public void run() { log.info("任务{}执行",Integer.valueOf(finalI)); } }); } return ResultBeanUtil.success(); }}
方案三:通过agent进行监控,并对外暴露http服务
这里需要注意几点:
1,ThreadPoolExecutor 是由Bootstrap ClassLoader加载,承载的线程池的类必须也是Bootstrap ClassLoader 加载,否则会出现找不到类定义的问题;
2,如果是实现ThreadPoolExecutor自定义的的Executor类,不需要考虑类加载的问题;
问题一的解决方案:
1,使用-Xbootclasspath/a: ../a.jar 让承载容器由Bootstrap ClassLoader加载;
2,使用byte-buddy 增强某个类,强制让Bootstrap ClassLoader加载
 /**     * 针对threadPoolExecutor 的增强     * @param instrumentation     */    private static void threadPoolExecutor(Instrumentation instrumentation){        new AgentBuilder.Default()                .disableClassFormatChanges()                //默认是不对bootstrap类加载器加载的对象instrumentation,忽略某个type后,就可以了                .ignore(ElementMatchers.noneOf(ThreadPoolExecutor.class))                //                .with(AgentBuilder.InitializationStrategy.NoOp.INSTANCE)                //                .with(AgentBuilder.RedefinitionStrategy.REDEFINITION)                .with(AgentBuilder.TypeStrategy.Default.REDEFINE)                .with(AgentBuilder.InjectionStrategy.UsingUnsafe.INSTANCE)                .type(ElementMatchers.is(ThreadPoolExecutor.class))                //.or(ElementMatchers.hasSuperType(ElementMatchers.named("java.util.concurrent.Executor")))                //.or(ElementMatchers.hasSuperType(ElementMatchers.named("java.util.concurrent.ExecutorService")))                .transform((builder, typeDescription, classLoader, javaModule) ->                        builder.visit(Advice.to(ThreadPoolExecutorFinalizeAdvice.class).on(ElementMatchers.named("finalize")))                                .visit(Advice.to(ThreadPoolExecutorExecuteAdvice.class).on(ElementMatchers.named("execute")))                )                .installOn(instrumentation);    }
暴露一个统一的接口,不需要各项目去实现。
public class MonitorTest {
@Test public void test(){ System.out.println(ThreadPoolMonitorData.class.getClassLoader()); System.out.println(ThreadPoolMonitorData.alls()); System.out.println(ThreadPoolMonitor.class.getClassLoader()); ThreadPoolExecutor pool= threadpool(); pool.submit(()->{ System.out.println("线程池pool执行中1:"+Thread.currentThread().getName()); }); pool.submit(()->{ System.out.println("线程池pool执行中2:"+Thread.currentThread().getName()); }); pool.submit(()->{ System.out.println("线程池pool执行中3:"+Thread.currentThread().getName()); });
ExecutorService executorService = threadpool1(); executorService.submit(()->{ System.out.println("线程池executorService执行中1:"+Thread.currentThread().getName()); }); ThreadPoolMonitorData.alls().forEach((key,val) ->{ System.out.println("ThreadPoolMonitorData key="+key+" val:"+val); });
ThreadPoolMonitor monitor = new ThreadPoolMonitor(); monitor.alls().forEach((key,val)->{ System.out.println("ThreadPoolMonitor key="+key+" val:"+val); });
try { Thread.sleep(3000); }catch (Exception e){ e.printStackTrace(); }
}

private ThreadPoolExecutor threadpool(){ ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 10, 200, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10)); return pool; } private ExecutorService threadpool1(){ return Executors.newCachedThreadPool(); }}public class ThreadPoolExecutorExecuteAdvice { /** * 对所有的线程的execute 进入方法进行监听 * byteBuddy不支持对constructor * @Advice.OnMethodEnter 必须作用与static方法 * @param obj * @param abc */ @Advice.OnMethodEnter public static void executeBefore(@Advice.This Object obj,@Advice.Argument(0) Object abc){ try{ ThreadPoolExecutor executor = (ThreadPoolExecutor) obj; ThreadPoolMonitorData.add(executor.hashCode()+"",(ThreadPoolExecutor) obj); }catch (Exception e){ e.printStackTrace(); } }}
null   BootstrapClassLoader 输出是null{}sun.misc.Launcher$AppClassLoader@18b4aac2线程池pool执行中1:pool-3-thread-1线程池pool执行中2:pool-3-thread-2线程池pool执行中3:pool-3-thread-3线程池executorService执行中1:pool-4-thread-1ThreadPoolMonitorData key=1564698139 val:java.util.concurrent.ThreadPoolExecutor@5d43661b[Running, pool size = 3, active threads = 0, queued tasks = 0, completed tasks = 3]ThreadPoolMonitorData key=171421438 val:java.util.concurrent.ThreadPoolExecutor@a37aefe[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]
监控获取到的数据,需要在某个地方进行统一采集。
建议的方案是:统一标准  以及 agent采集,根据实际情况采集需要的数据进行监控以及动态调整。

yxkong

这个人很懒,什么都没留下

文章评论