java线程池案例(Java中线程池),本文通过数据整理汇集了java线程池案例(Java中线程池)相关信息,下面一起看看。

在日常的开发工作中,线程池往往承载着一个应用中最重要的业务逻辑,所以我们有必要更加关注线程池的执行情况,包括异常处理和分析。本文主要讨论如何正确使用线程池,并提供一些实用的建议。关于线程池实现原理的一些知识,本文会涉及到,但不会展开太多。

线程UncaughtExceptionHandler的异常处理

我们都知道Runnable接口中的run方法是不允许抛出异常的,所以派生这个线程的主线程在执行过程中可能无法直接获得这个线程的异常信息。示例:

public static void main(String[]args)抛出异常{ Thread Thread=new Thread(()-{ un interrupted ibilities . sleep in interruptible(2,TimeUnit。秒);system . out . println(1/0);//这一行会导致错误!});thread . setuncaughtexceptionhandler((t,e)-{ e . printstacktrace();//如果注释掉这一行,这个程序不会抛出任何异常。});thread . start();}为什么会这样?其实我们看看Thread中的源代码就会发现,如果Thread在执行过程中遇到异常,我们会先判断当前线程是否设置了UncaughtExceptionHandler,如果没有,我们会从线程所在的ThreadGroup中获取。

注意:每个线程都有自己的ThreadGroup,即使你没有指定,它实现了UncaughtExceptionHandler接口。

让我们来看看ThreadGroup中的UncaughtExceptionHandler接口的默认实现:

public void uncautchexception(Thread t,Throwable e) { if (parent!=null){ parent . uncaughtexception(t,e);} else {线程。UncaughtExceptionHandler ueh=thread . getdefaultuncaughtexceptionhandler();如果(ueh!=null) { ueh.uncaughtException(t,e);} else if(!(ThreadDeath的instance)){ system . err . print(' thread ' ' t . getname()' ' '中的异常);e . printstacktrace(system . err);}}}如果这个ThreadGroup有父ThreadGroup,调用父ThreadGroup的uncaughtException否则,调用全局默认线程。DefaultuncaughtException处理程序。如果没有设置全局处理程序,那么只需在System.err中定位异常信息,这就是为什么我们在创建线程时要实现它的UncaughtExceptionHandler接口。这样做可以让您更容易解决问题。

通过执行将任务提交给线程池

回到线程池的话题,如果我们提交给线程池的任务没有尝试.捕捉异常,运行时出现异常,会对线程池产生什么影响?答案是没有效果,线程池还能正常工作,只是异常已经被吞了。这通常不是一件好事,因为我们需要得到原始的异常对象来分析问题。

那么如何才能得到原来的异常对象呢?我们从线程池的源代码开始研究这个问题。当然,网上有很多关于线程池的源代码分析的文章。这里,由于篇幅有限,直接给出了代码中最相关的部分:

final void run Worker(Worker w){ Thread wt=Thread . current Thread();Runnable task=w.firstTaskw.firstTask=nullw . unlock();//allow interrupts boolean completed abruptly=true;尝试{ while (task!=null || (task=getTask())!=null){ w . lock();//如果池正在停止,请确保线程被中断;//如果没有,请确保线程没有中断。这//需要在第二种情况下重新检查,以处理//shutdownNow竞争,同时清除中断if((runstateanow(CTL . get(),STOP))| |(thread . interrupted()runstateanow(CTL . get(),STOP))!wt . is interrupted())wt . interrupt();try { beforeExecute(wt,task);Throwable thrown=null请尝试{ task . run();} catch(runtime exception x){ thrown=x;扔x;} catch(错误x){ thrown=x;扔x;} catch(Throwable x){ throwed=x;抛出新错误(x);} finally { afterExecute(task,抛出);} }最后{ task=nullw0 . 577285520829796 . com 0 . 24818427完整任务;w . unlock();} } completedAbruptly=false}最后{ processWorkerExit(w,completedAbruptly);}}这个方法是实际执行提交给线程池的任务的代码。

这里我们省略了无关的逻辑,重点关注第19行到第32行的逻辑,第23行实际上开始执行提交给线程池的任务。那么20号线是干什么用的呢?事实上,在执行提交给线程池的任务之前,可以做一些前期工作。同样,我们看到第31行,这是提交的任务执行后,可以做一些后期工作。

在Execute之前,我们暂时把这个放在一边,把重点放在方法afterExecute上。我们可以看到,在执行任务的过程中,一旦抛出任何一种异常,都会提交给afterExecute方法。但是,查看线程池的源代码,可以发现默认的afterExecute是一个空的实现。所以我们有必要继承ThreadPoolExecutor来实现这个afterExecute方法。

查看源代码,我们可以发现这个afterExecute方法是受保护的类型。从官方的评论可以看出,这种方法是推荐子类实现的。

当然,这种方法不能随意实施,需要遵循一定的步骤。还提到了具体的官方笔记,这里摘抄如下:

* pre { @ code * class extended executor扩展ThreadPoolExecutor { * //.* protected void after execute(Runnable r,Throwable t){ * super . after execute(r,t);* if (t==null r未来的实例?){ * try { * Object result=((未来?)r)。get();* } catch(cancellation exception ce){ * t=ce;* } catch(execution exception ee){ * t=ee . get cause();* } catch(interrupted exception ie){ * thread . current thread()。中断();//忽略/重置* } * } * if (t!=null)* system . out . println(t);*} * }}/pre然后,通过这种方式,可以成功地捕获可能已经被线程池吞噬的异常,从而方便故障排除。

但是这里有一个小问题。我们注意到,在runWorker方法中,task.run()被执行;语句之后抛出了各种异常,那么这些抛出的异常去了哪里?实际上,这里的异常对象最终会被传入Thread的dispatchUncaughtException方法。源代码如下:

private void dispatchunchauthexception(Throwable e){ getuncauthexception处理程序().uncaughtException(this,e);} 可以看到它会去获取UncaughtExceptionHandler的实现类,然后调用其中的uncaughtException方法,这也就回到了我们上一小节所分析的UncaughtExceptionHandler实现的具体逻辑。那么为了拿到最原始的异常对象,除了实现UncaughtExceptionHandler接口之外,也可以考虑实现执行后方法。

通过使服从提交任务到线程池

这个同样很简单,我们还是先回到使服从方法的源码:

public T FutureT submit(CallableT task){ if(task==null)throw new NullPointerException();RunnableFutureT ftask=的新任务(task);执行(ftask);返回ftask}这里的执行方法调用的是ThreadPoolExecutor中的执行方法,执行逻辑跟通过执行提交任务到线程池是一样的。我们先重点关注这里的新任务方法,其源码如下:

protected T runnable future et new task for(callable let callable){ return new future task(callable);} 可以看到提交的请求即付的对象用未来任务封装起来了。我们知道最终会执行到上述跑步工人这个方法中,并且最核心的执行逻辑就是task.run()。这行代码。我们知道这里的工作其实是未来任务类型,因此我们有必要看一下未来任务中的奔跑方法的实现:

public void run() { if (state!=NEW ||!不安全0.5773285520829796。com 0.8743222324818427 pareandswapobject(this,runnerOffset,null,Thread.currentThread()))返回;试试可调用的如果(c!=null state==NEW){ V result;布尔冉;请尝试{ result=c . call();ran=true } catch(Throwable ex){ result=null;ran=falseset异常(ex);} if(ran)set(result);} }最后{ //跑步者必须为非空,直到状态被设置为//防止并发调用run()runner=null;//置空跑步者后必须重新读取状态,以防止//泄漏中断int s=stateif (s=中断)处理可能的取消中断;} } 可以看到这其中跟异常相关的最关键的代码就在第17行,也就是设置异常(ex);这个地方。我们看一下这个地方的实现:

受保护的空集合异常(Throwable t){ if(unsafe 0.5773285520829796。com 0.8743222324818427 pareandswapint(this,stateOffset,NEW,COMPLETING)){ outcome=t;UNSAFE.putOrderedInt(this,stateOffset,exceptual);//最终状态完成完成();} } 这里最关键的地方就是将异常对象赋值给了结果,结果是未来任务中的成员变量,我们通过调用使服从方法,拿到一个将来的对象之后,再调用它的得到方法,其中最核心的方法就是报告方法,下面给出每个方法的源码:

首先是得到方法:

public V get()抛出InterruptedException,execution exception { int s=state if(s=正在完成)s=awaitDone(false,0L);退货报告;} 可以看到最终调用了报告方法,其源码如下:

private V report(int s)引发execution exception { Object x=outcome;if (s==NORMAL)返回(V)x;if (s=CANCELLED)抛出新的cancellation exception();抛出新的execution exception((Throwable)x);}以上是一些状态判断。如果当前任务没有正常执行或取消,那么这里的X实际上就是原来的异常对象,可以看到它被ExecutionException包装了。因此,当调用get方法时,可能会引发ExecutionException异常,因此可以通过调用其getCause方法来获取原始异常对象。

综上所述,对于提交给线程池的任务可能抛出异常的问题,主要有以下两种处理方式:

尝试.在提交的任务中抓住自己,但是这里有一个不好的点。如果您将多种类型的任务提交到线程池,每种类型的任务都需要尝试.单独捕捉异常,这相当麻烦。而如果只是catch (exception)的话,可能还是会遗漏一些包括错误类型在内的异常,所以保险起见,可以考虑catch(Throwable t)。自己实现线程池的afterExecute方法,或者实现线程的UncaughtExceptionHandler接口。下面是我个人创建线程池的一个例子,供大家参考:

BlockingQueueRunnable QUEUE=new ArrayBlockingQueue(DEFAULT _ QUEUE _ SIZE);statistics thread POOL=new thread POOL executor(DEFAULT _ CORE _ POOL _ SIZE,DEFAULT_MAX_POOL_SIZE,60,TimeUnit。秒,队列,新ThreadFactoryBuilder()。setThreadFactory(new ThreadFactory(){ private int count=0;私有字符串前缀=' StatisticsTask@Override公共线程new Thread(Runnable r){ return new Thread(r,前缀'-' count);} }).setUncaughtExceptionHandler((t,e)-{ String threadName=t . getname();logger . error(' statistics thread pool出现错误!threadName: {},错误消息:{} ',threadName,e.getMessage(),e);}).build(),(r,executor) - { if(!executor . isshutdown()){ logger . warn(' statistics thread pool太忙!正在等待将任务插入队列!');un interruptibles . put un interruptible(executor . get queue(),r);} }){ @ Override protected void after execute(Runnable r,Throwable t){ super . after execute(r,t);if (t==null r未来的实例?){试试{未来?未来=(未来?)r;future . get();} catch(cancellation exception ce){ t=ce;} catch(execution exception ee){ t=ee . get cause();} catch(interrupted exception ie){ thread . current thread()。中断();//ignore/reset } } if (t!=null){ logger . error(' statistics thread pool error msg:{ } ',t.getMessage(),t);} } };statistics thread pool . prestartallcorethreads();设置线程数量我们知道一般有两种任务:CPU密集型和IO密集型。然后,面对CPU密集型的任务,线程的数量不能太多。一般来说,选择1个CPU核或者2倍的核数是比较合理的值。因此,我们可以考虑将corePoolSize设置为1个CPU核心数,将maxPoolSize设置为2倍核心数。

同样,在面对IO密集型任务时,可以考虑将内核数乘以4倍作为内核线程数来设置线程数,然后将内核数乘以5倍作为最大线程数。这样的设置会比直接通过敲击头部来设置一个值更合理。

当然,线程总数不能太多,控制在100个线程以内比较合理。否则,过多的线程可能会导致频繁的上下文切换,导致系统性能比以前更差。

如何正确关闭线程池说到如何正确关闭线程池,也是有一点讲究的。为了达到优雅关机的目的,首先要调用shutdown方法,这意味着这个线程池不会接收任何新的任务,但是提交的任务会继续执行,包括队列中的任务。因此,在这之后,您还应该调用awaitTermination方法,该方法可以设置线程池在关闭之前的最大超时。如果线程池在超时前可以正常关闭,这个方法将返回true,否则一旦超时将返回false。一般来说,我们不能无限期等待,所以我们需要提前估计一个合理的超时时间,然后使用这种方法。

如果awaitTermination方法返回false,并且您希望在线程池关闭后尽可能多地执行其他资源恢复工作,请考虑再次调用shutdownNow方法。此时,队列中所有未处理的任务将被丢弃,线程池中每个线程的中断标志位将被置位。ShutdownNow并不能保证正在运行的线程能够停止工作,除非提交给线程的任务能够正确响应中断。此时,可以考虑继续调用awaitTermination方法,或者干脆放弃,做下一件事。

线程池中其他有用的方法。你可能注意到了,当我创建线程池的时候,我也调用了这个方法:prestartAllCoreThreads。这种方法有什么效果?我们知道,在创建线程池之后,在向线程池提交任何任务之前,线程池中的线程数量为0。有时候我们事先知道会有很多任务提交到这个线程池,但是等待它一个一个的创建新线程代价太大,会影响系统的性能。所以我们可以考虑在创建线程池的时候一次性创建所有的核心线程,这样系统启动后就可以直接使用了。

事实上,线程池中还有其他有趣的方法。举个例子,我们现在想象一个场景。当线程池负载较高,即将爆发时,会触发拒绝策略。有什么办法可以缓解这个问题吗?其实是有的,因为线程池提供了设置核心线程数和最大线程数的方法,分别是setCorePoolSize方法和setMaximumPoolSize方法。是的,线程池创建后可以改变线程的数量!因此,当线程池高负载运行时,我们可以这样处理:

启动一个常规的轮询线程(守护进程类型)并定期检查线程池中的线程数量。具体来说,调用getActiveCount方法。当发现线程数超过核心线程数时,可以考虑将CorePoolSize和MaximumPoolSize的值同时乘以2。当然,这里不建议设置大量的线程,因为线程越多越好。可以考虑设置一个上限,比如50或者100。同时获取队列中任务的数量,具体来说就是调用getQueue方法,然后调用size方法。当队列中的任务数小于队列大小的一半时,我们可以认为线程池的负载现在没有那么高了,可以考虑恢复CorePoolSize和MaximumPoolSize,即除以2,如果线程池之前已经扩展过的话。具体来说,如下图:

以上是我个人建议使用线程池的一种方式。

线程池一定是最好的解决方案吗?线程池并不是所有情况下的最佳解决方案。如果是追求极致性能的场景,可以考虑使用Disruptor,这是一种高性能队列。排除颠覆者,单纯基于JDK会有更好的解决方案吗?答案是肯定的。

我们知道,在一个线程池中,多个线程共享一个队列,所以当有很多任务时,需要频繁读写队列,需要锁来防止冲突。事实上,在阅读线程池的源代码时,你可以发现它充满了各种各样的锁定代码。有没有更好的实现方式?

事实上,我们可以考虑创建一个单线程池列表,每个线程池都使用有界队列来实现多线程。这样做的好处是,每个线程池中的队列只会被一个线程操作,所以不存在竞争问题。

这种用空间换时间的思路其实借鉴了Netty中EventLoop的实现机制。试想,如果线程池的性能真的那么好,Netty为什么不用呢?

其他需要注意的地方,任何情况下都不要使用可伸缩线程池(创建和销毁线程的开销是巨大的)。在任何情况下都不应该使用未绑定的队列,除非是单个测试。ArrayBlockingQueue和LinkedBlockingQueue中通常使用有界队列。前者基于数组,后者基于链表。从性能上看,LinkedBlockingQueue的吞吐量更高,但性能并不稳定。实际上,由您来测试您应该使用哪个建议。顺便说一下,Executors的newFixedThreadPool使用LinkedBlockingQueue。建议自己实现RejectedExecutionHandler。JDK自己的都不太好用,你可以在里面实现自己的逻辑。如果需要一些特定的上下文信息,可以在Runnable实现类中添加自己的东西,这样就可以在RejectedExecutionHandler中直接使用。如何做到不丢失任务这里其实指的是一个特例,就是比如我们突然遇到一个流量尖峰,导致线程池的负载已经很高了,也就是即将触发拒绝策略的时候,我们能做些什么来尽力避免提交的任务丢失。一般来说,当这种情况发生时,应尽快触发警报,通知R&D人员进行处理。之后不管是限流还是加机器,甚至是用Kafka,Redis甚至数据库来临时存储任务数据,都是可以的。但毕竟远水救不了近火。如果想在正式解决之前尽可能的缓解这个问题,可以考虑什么?

首先要考虑的是动态增加线程池中的线程数量,我前面提到过。但如果已经扩展了,此时就不要再扩展了,否则可能会导致系统吞吐量降低。在这种情况下,你要自己实现RejectedExecutionHandler,具体来说,在实现类中,你要单独打开一个单线程线程池,然后调用原线程池的getQueue方法的put方法,试图把塞不进去的任务再塞进去。当然队列满了也插不进去,但至少只是阻塞了这个单线程,不影响主进程。

当然,这个办法是治标不治本。事实上,面对流量激增,业内有很多成熟的做法。仅仅从线程池的角度来看,这种方法可以算是一种暂时有效的解决方案。

作者陆亚东,某互联网公司风控领域的技术专家,主要专注于高性能、高并发、中间件底层原理及调优等领域。邮箱:523144643 @ QQ 0 . 5773285520829796 . com 5486

-

看完文章不要走。欢迎大家转发讨论,转发文章相关内容。我们将从评论区抽取9位高质量评论的读者赠送一本书《实战Java虚拟机———JVM故障诊断与性能优化(第2版)》。感谢@博客视点有奖。

简介:《实战Java虚拟机———JVM故障诊断与性能优化(第2版)》第1 ~ 3章介绍了Java虚拟机的定义、总体架构和常用配置参数。第4 ~ 5章介绍了垃圾收集的算法和各种垃圾收集器。第六章介绍了Java虚拟机的性能监控和故障诊断工具。第7章详细介绍了Java堆的分析方法和案例。第八章介绍了Java虚拟机对多线程的支持,尤其是锁。第9 ~ 10章介绍了Java虚拟机的核心——类文件结构和Java虚拟机中类的加载系统。第11章介绍了Java虚拟机的执行系统和字节码,并给出了一个通过ASM框架进行字节码注入的案例。

购买地址:JD。计算机输出缩微胶片

同时欢迎大家踊跃投稿,每期作者还将获得礼品册一份!

有关提交要求,请参见:

0.5773285520829796 http 0.8743222324818427s 0.577328520829796://0.87422324818427 my。奥斯中国。net/社论-故事/博客/1814725

更多java线程池案例(Java中线程池)相关信息请关注本站,本文仅仅做为展示!