编辑推荐: |
本文将从线程池和线程变量的原理和使用出发,结合实例给出最佳使用实践,帮助各开发人员构建出稳定、高效的java应用服务。希望能对遇到类似问题的同学有所帮助。
本文来自于微信公众号阿里技术,由火龙果软件Linda编辑、推荐。 |
|
01 背景
随着计算技术的不断发展,3纳米制程芯片已进入试产阶段,摩尔定律在现有工艺下逐渐面临巨大的物理瓶颈,通过多核处理器技术来提升服务器的性能成为提升算力的主要方向。
在服务器领域,基于java构建的后端服务器占据着领先地位,因此,掌握java并发编程技术,充分利用CPU的并发处理能力是一个开发人员必修的基本功,本文结合线程池源码和实践,简要介绍了线程池和线程变量的使用。
02 线程池概述
2.1 什么是线程池
线程池是一种“池化”的线程使用模式,通过创建一定数量的线程,让这些线程处于就绪状态来提高系统响应速度,在线程使用完成后归还到线程池来达到重复利用的目标,从而降低系统资源的消耗。
2.2 为什么要使用线程池
总体来说,线程池有如下的优势:
1)降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
2)提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。
3)提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
03 线程池的使用
3.1 线程池创建&核心参数设置
在java中,线程池的实现类是ThreadPoolExecutor,构造函数如下:
public ThreadPoolExecutor(int
corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit timeUnit,
BlockingQueue<Runnable>
workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler
handler)
|
可以通过 new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue, threadFactory,handler)来创建一个线程池。
corePoolSize参数
在构造函数中,corePoolSize为线程池核心线程数。默认情况下,核心线程会一直存活,但是当将allowCoreThreadTimeout设置为true时,核心线程超时也会回收。
maximumPoolSize参数
在构造函数中,maximumPoolSize为线程池所能容纳的最大线程数。
keepAliveTime参数
在构造函数中,keepAliveTime表示线程闲置超时时长。如果线程闲置时间超过该时长,非核心线程就会被回收。如果将allowCoreThreadTimeout设置为true时,核心线程也会超时回收。
timeUnit参数
在构造函数中,timeUnit表示线程闲置超时时长的时间单位。常用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)。
blockingQueue参数
在构造函数中,blockingQueue表示任务队列,线程池任务队列的常用实现类有:
ArrayBlockingQueue :一个数组实现的有界阻塞队列,此队列按照FIFO的原则对元素进行排序,支持公平访问队列。
LinkedBlockingQueue :一个由链表结构组成的可选有界阻塞队列,如果不指定大小,则使用Integer.MAX_VALUE作为队列大小,按照FIFO的原则对元素进行排序。
PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列,默认情况下采用自然顺序排列,也可以指定Comparator。
DelayQueue:一个支持延时获取元素的无界阻塞队列,创建元素时可以指定多久以后才能从队列中获取当前元素,常用于缓存系统设计与定时任务调度等。
SynchronousQueue:一个不存储元素的阻塞队列。存入操作必须等待获取操作,反之亦然。
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列,与LinkedBlockingQueue相比多了transfer和tryTranfer方法,该方法在有消费者等待接收元素时会立即将元素传递给消费者。
LinkedBlockingDeque:一个由链表结构组成的双端阻塞队列,可以从队列的两端插入和删除元素。
threadFactory参数
在构造函数中,threadFactory表示线程工厂。用于指定为线程池创建新线程的方式,threadFactory可以设置线程名称、线程组、优先级等参数。如通过Google工具包可以设置线程池里的线程名:
new
ThreadFactoryBuilder().setNameFormat ("general-detail-batch-%d").build()
|
RejectedExecutionHandler参数
在构造函数中,rejectedExecutionHandler表示拒绝策略。当达到最大线程数且队列任务已满时需要执行的拒绝策略,常见的拒绝策略如下:
ThreadPoolExecutor.AbortPolicy:默认策略,当任务队列满时抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:丢弃掉不能执行的新任务,不抛任何异常。
ThreadPoolExecutor.CallerRunsPolicy:当任务队列满时使用调用者的线程直接执行该任务。
ThreadPoolExecutor.DiscardOldestPolicy:当任务队列满时丢弃阻塞队列头部的任务(即最老的任务),然后添加当前任务。
3.2 线程池状态转移图
ThreadPoolExecutor线程池有如下几种状态:
RUNNING:运行状态,接受新任务,持续处理任务队列里的任务;
SHUTDOWN:不再接受新任务,但要处理任务队列里的任务;
STOP:不再接受新任务,不再处理任务队列里的任务,中断正在进行中的任务;
TIDYING:表示线程池正在停止运作,中止所有任务,销毁所有工作线程,当线程池执行terminated()方法时进入TIDYING状态;
TERMINATED:表示线程池已停止运作,所有工作线程已被销毁,所有任务已被清空或执行完毕,terminated()方法执行完成。
3.3 线程池任务调度机制
线程池提交一个任务时任务调度的主要步骤如下:
1)当线程池里存活的核心线程数小于corePoolSize核心线程数参数的值时,线程池会创建一个核心线程去处理提交的任务;
2)如果线程池核心线程数已满,即线程数已经等于corePoolSize,新提交的任务会被尝试放进任务队列workQueue中等待执行;
3)当线程池里面存活的线程数已经等于corePoolSize了,且任务队列workQueue已满,再判断当前线程数是否已达到maximumPoolSize,即最大线程数是否已满,如果没到达,创建一个非核心线程执行提交的任务;
4)如果当前的线程数已达到了maximumPoolSize,还有新的任务提交过来时,执行拒绝策略进行处理。
核心代码如下:
public void execute(Runnable
command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize
threads are running, try to
* start a new thread with
the given command as its first
* task. The call to addWorker
atomically checks runState and
* workerCount, and so prevents
false alarms that would add
* threads when it shouldn't,
by returning false.
*
* 2. If a task can be successfully
queued, then we still need
* to double-check whether
we should have added a thread
* (because existing ones
died since last checking) or that
* the pool shut down since
entry into this method. So we
* recheck state and if necessary
roll back the enqueuing if
* stopped, or start a new
thread if there are none.
*
* 3. If we cannot queue task,
then we try to add a new
* thread. If it fails, we
know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) <
corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) &&
workQueue.offer (command)) {
int recheck = ctl.get();
if (! isRunning(recheck)
&& remove (command))
reject(command);
else if (workerCountOf(recheck)
== 0)
addWorker(null, false);
}
else if (!addWorker(command,
false))
reject(command);
}
|
3.4 Tomcat线程池分析
✪ Tomcat请求处理过程
Tomcat的整体架构包含连接器和容器两大部分,其中连接器负责与外部通信,容器负责内部逻辑处理。在连接器中:
1)使用ProtocolHandler接口来封装I/O模型和应用层协议的差异,其中I/O模型可以选择非阻塞I/O、异步I/O或APR,应用层协议可以选择HTTP、HTTPS或AJP。ProtocolHandler将I/O模型和应用层协议进行组合,让EndPoint只负责字节流的收发,Processor负责将字节流解析为Tomcat
Request/Response对象,实现功能模块的高内聚和低耦合,ProtocolHandler接口继承关系如下图示。
2)通过适配器Adapter将Tomcat Request对象转换为标准的ServletRequest对象。
Tomcat为了实现请求的快速响应,使用线程池来提高请求的处理能力。下面我们以HTTP非阻塞I/O为例对Tomcat线程池进行简要的分析。
✪ Tomcat线程池创建
在Tomcat中,通过AbstractEndpoint类提供底层的网络I/O的处理,若用户没有配置自定义公共线程池,则AbstractEndpoint通过createExecutor方法来创建Tomcat默认线程池。
核心部分代码如下:
public void createExecutor()
{
internalExecutor = true;
TaskQueue taskqueue = new
TaskQueue();
TaskThreadFactory tf = new
TaskThreadFactory (getName() + "-exec-",
daemon, getThreadPriority());
executor = new ThreadPoolExecutor (getMinSpareThreads(),
getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue,
tf);
taskqueue.setParent( (ThreadPoolExecutor)
executor);
}
|
其中,TaskQueue、ThreadPoolExecutor分别为Tomcat自定义任务队列、线程池实现。
✪ Tomcat自定义ThreadPoolExecutor
Tomcat自定义线程池继承于java.util.concurrent.ThreadPoolExecutor,并新增了一些成员变量来更高效地统计已经提交但尚未完成的任务数量(submittedCount),包括已经在队列中的任务和已经交给工作线程但还未开始执行的任务。
/**
* Same as a java.util.concurrent. ThreadPoolExecutor
but implements a much more efficient
* {@link #getSubmittedCount()}
method, to be used to properly handle the
work queue.
* If a RejectedExecutionHandler
is not specified a default one will be configured
* and that one will always
throw a RejectedExecutionException
*
*/
public class ThreadPoolExecutor
extends java.util.concurrent.ThreadPoolExecutor
{
/**
* The number of tasks submitted
but not yet finished. This includes tasks
* in the queue and tasks
that have been handed to a worker thread but
the
* latter did not start executing
the task yet.
* This number is always greater
or equal to {@link #getActiveCount()}.
*/
// 新增的submittedCount成员变量,用于 统计已提交但还未完成的任务数
private final AtomicInteger
submittedCount = new AtomicInteger(0);
private final AtomicLong
lastContextStopped Time = new AtomicLong(0L);
// 构造函数
public ThreadPoolExecutor(int
corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable>
workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler
handler) {
super(corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue, threadFactory,
handler);
// 预启动所有核心线程
prestartAllCoreThreads();
}
}
|
Tomcat在自定义线程池ThreadPoolExecutor中重写了execute()方法,并实现对提交执行的任务进行submittedCount加一。Tomcat在自定义ThreadPoolExecutor中,当线程池抛出RejectedExecutionException异常后,会调用force()方法再次向TaskQueue中进行添加任务的尝试。如果添加失败,则submittedCount减一后,再抛出RejectedExecutionException。
@Override
public void execute(Runnable
command) {
execute(command,0,TimeUnit.MILLISECONDS);
}
public void execute(Runnable
command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException
rx) {
if (super.getQueue() instanceof
TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
if (!queue.force(command,
timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecution Exception("Queue
capacity is full.");
}
} catch (InterruptedException
x) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}
|
✪ Tomcat自定义任务队列
在Tomcat中重新定义了一个阻塞队列TaskQueue,它继承于LinkedBlockingQueue。在Tomcat中,核心线程数默认值为10,最大线程数默认为200,为了避免线程到达核心线程数后后续任务放入队列等待,Tomcat通过自定义任务队列TaskQueue重写offer方法实现了核心线程池数达到配置数后线程的创建。
具体地,从线程池任务调度机制实现可知,当offer方法返回false时,线程池将尝试创建新新线程,从而实现任务的快速响应。TaskQueue核心实现代码如下:
/**
* As task queue specifically
designed to run with a thread pool executor.
The
* task queue is optimised
to properly utilize threads within a thread
pool
* executor. If you use a
normal queue, the executor will spawn threads
when
* there are idle threads
and you wont be able to force items onto the
queue
* itself.
*/
public class TaskQueue extends
Linked BlockingQueue<Runnable> {
public boolean force(Runnable
o, long timeout, TimeUnit unit) throws InterruptedException
{
if ( parent==null || parent.isShutdown()
) throw new RejectedExecutionException("Executor
not running, can't force a command into the
queue");
return super.offer(o,timeout,unit);
//forces the item onto the queue, to be used
if the task is rejected
}
@Override
public boolean offer(Runnable
o) {
// 1. parent为线程池,Tomcat中为自定义线程池实例
//we can't do any checks
if (parent==null) return
super.offer(o);
// 2. 当线程数达到最大线程数时,新提交任务入队
//we are maxed out on threads,
simply queue the object
if (parent.getPoolSize()
== parent.getMaximumPoolSize ()) return super.offer(o);
// 3. 当提交的任务数小于线程池中已有的 线程数时,即有空闲线程,任务入队即可
//we have idle threads, just
add it to the queue
if (parent.getSubmittedCount()<=(parent. getPoolSize()))
return super.offer(o);
// 4. 【关键点】如果当前线程数量未达到最 大线程数,直接返回false,让线程池创建新线程
//if we have less threads
than maximum force creation of a new thread
if (parent.getPoolSize()<parent. getMaximumPoolSize())
return false;
// 5. 最后的兜底,放入队列
//if we reached here, we
need to add it to the queue
return super.offer(o);
}
}
|
✪ Tomcat自定义任务线程
Tomcat中通过自定义任务线程TaskThread实现对每个线程创建时间的记录;使用静态内部类WrappingRunnable对Runnable进行包装,用于对StopPooledThreadException异常类型的处理。
/**
* A Thread implementation
that records the time at which it was created.
*
*/
public class TaskThread extends
Thread {
private final long creationTime;
public TaskThread(ThreadGroup
group, Runnable target, String name) {
super(group, new WrappingRunnable(target),
name);
this.creationTime = System.currentTimeMillis();
}
/**
* Wraps a {@link Runnable}
to swallow any {@link StopPooledThreadException}
* instead of letting it go
and potentially trigger a break in a debugger.
*/
private static class WrappingRunnable
implements Runnable {
private Runnable wrappedRunnable;
WrappingRunnable(Runnable
wrappedRunnable) {
this.wrappedRunnable = wrappedRunnable;
}
@Override
public void run() {
try {
wrappedRunnable.run();
} catch(StopPooledThreadException
exc) {
//expected : we just swallow
the exception to avoid disturbing
//debuggers like eclipse's
log.debug("Thread exiting
on purpose", exc);
}
}
}
}
|
3.5 思考&小结
1)Tomcat为什么要自定义线程池和任务队列实现?
JUC原生线程池在提交任务时,当工作线程数达到核心线程数后,继续提交任务会尝试将任务放入阻塞队列中,只有当前运行线程数未达到最大设定值且在任务队列任务满后,才会继续创建新的工作线程来处理任务,因此JUC原生线程池无法满足Tomcat快速响应的诉求。
2)Tomcat为什么使用无界队列?
Tomcat在EndPoint中通过acceptCount和maxConnections两个参数来避免过多请求积压。其中maxConnections为Tomcat在任意时刻接收和处理的最大连接数,当Tomcat接收的连接数达到maxConnections时,Acceptor不会读取accept队列中的连接;这时accept队列中的线程会一直阻塞着,直到Tomcat接收的连接数小于maxConnections(maxConnections默认为10000,如果设置为-1,则连接数不受限制)。acceptCount为accept队列的长度,当accept队列中连接的个数达到acceptCount时,即队列满,此时进来的请求一律被拒绝,默认值是100(基于Tomcat
8.5.43版本)。因此,通过acceptCount和maxConnections两个参数作用后,Tomcat默认的无界任务队列通常不会造成OOM。
/**
* Allows the server developer
to specify the acceptCount (backlog) that
* should be used for server
sockets. By default, this value
* is 100.
*/
private int acceptCount =
100;
private int maxConnections
= 10000;
|
3.6 最佳实践
✪ 避免用Executors 的创建线程池
Executors常用方法有以下几个:
1)newCachedThreadPool():创建一个可缓存的线程池,调用 execute 将重用以前构造的线程(如果线程可用)。如果没有可用的线程,则创建一个新线程并添加到线程池中。终止并从缓存中移除那些已有
60 秒钟未被使用的线程。CachedThreadPool适用于并发执行大量短期耗时短的任务,或者负载较轻的服务器;
2)newFiexedThreadPool(int nThreads):创建固定数目线程的线程池,线程数小于nThreads时,提交新的任务会创建新的线程,当线程数等于nThreads时,提交新的任务后任务会被加入到阻塞队列,正在执行的线程执行完毕后从队列中取任务执行,FiexedThreadPool适用于负载略重但任务不是特别多的场景,为了合理利用资源,需要限制线程数量;
3)newSingleThreadExecutor() 创建一个单线程化的 Executor,SingleThreadExecutor适用于串行执行任务的场景,每个任务按顺序执行,不需要并发执行;
4)newScheduledThreadPool(int corePoolSize) 创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代
Timer 类。ScheduledThreadPool中,返回了一个ScheduledThreadPoolExecutor实例,而ScheduledThreadPoolExecutor实际上继承了ThreadPoolExecutor。从代码中可以看出,ScheduledThreadPool基于ThreadPoolExecutor,corePoolSize大小为传入的corePoolSize,maximumPoolSize大小为Integer.MAX_VALUE,超时时间为0,workQueue为DelayedWorkQueue。实际上ScheduledThreadPool是一个调度池,其实现了schedule、scheduleAtFixedRate、scheduleWithFixedDelay三个方法,可以实现延迟执行、周期执行等操作;
5)newSingleThreadScheduledExecutor() 创建一个corePoolSize为1的ScheduledThreadPoolExecutor;
6)newWorkStealingPool(int parallelism)返回一个ForkJoinPool实例,ForkJoinPool
主要用于实现“分而治之”的算法,适合于计算密集型的任务。
Executors类看起来功能比较强大、用起来还比较方便,但存在如下弊端:
1)FiexedThreadPool和SingleThreadPool任务队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM;
2)CachedThreadPool和ScheduledThreadPool允许创建的线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM;
使用线程时,可以直接调用 ThreadPoolExecutor 的构造函数来创建线程池,并根据业务实际场景来设置corePoolSize、blockingQueue、RejectedExecuteHandler等参数。
✪ 避免使用局部线程池
使用局部线程池时,若任务执行完后没有执行shutdown()方法或有其他不当引用,极易造成系统资源耗尽。
✪ 合理设置线程池参数
在工程实践中,通常使用下述公式来计算核心线程数:
nThreads=(w+c)/c*n*u=(w/c+1)*n*u
其中,w为等待时间,c为计算时间,n为CPU核心数(通常可通过 Runtime.getRuntime().availableProcessors()方法获取),u为CPU目标利用率(取值区间为[0,
1]);在最大化CPU利用率的情况下,当处理的任务为计算密集型任务时,即等待时间w为0,此时核心线程数等于CPU核心数。
上述计算公式是理想情况下的建议核心线程数,而不同系统/应用在运行不同的任务时可能会有一定的差异,因此最佳线程数参数还需要根据任务的实际运行情况和压测表现进行微调。
✪ 增加异常处理
为了更好地发现、分析和解决问题,建议在使用多线程时增加对异常的处理,异常处理通常有下述方案:
在任务代码处增加try...catch异常处理;
如果使用的Future方式,则可通过Future对象的get方法接收抛出的异常;
为工作线程设置setUncaughtExceptionHandler,在uncaughtException方法中处理异常。
✪ 优雅关闭线程池
public void destroy() {
try {
poolExecutor.shutdown();
if (!poolExecutor.awaitTermination(AWAIT_TIMEOUT,
TimeUnit.SECONDS)) {
poolExecutor.shutdownNow();
}
} catch (InterruptedException
e) {
// 如果当前线程被中断,重新取消所有任务
pool.shutdownNow();
// 保持中断状态
Thread.currentThread().interrupt();
}
}
|
为了实现优雅停机的目标,我们应当先调用shutdown方法,调用这个方法也就意味着,这个线程池不会再接收任何新的任务,但是已经提交的任务还会继续执行。之后我们还应当调用awaitTermination方法,这个方法可以设定线程池在关闭之前的最大超时时间,如果在超时时间结束之前线程池能够正常关闭则会返回true,否则,超时会返回false。通常我们需要根据业务场景预估一个合理的超时时间,然后调用该方法。
如果awaitTermination方法返回false,但又希望尽可能在线程池关闭之后再做其他资源回收工作,可以考虑再调用一下shutdownNow方法,此时队列中所有尚未被处理的任务都会被丢弃,同时会设置线程池中每个线程的中断标志位。shutdownNow并不保证一定可以让正在运行的线程停止工作,除非提交给线程的任务能够正确响应中断。
✪ 鹰眼上下文参数传递
/**
* 在主线程中,开启鹰眼异步模式,并将ctx传递给多线程任务
**/
// 防止鹰眼链路丢失,需要传递
RpcContext_inner ctx = EagleEye.getRpcContext();
// 开启异步模式
ctx.setAsyncMode(true);
/**
* 在线程池任务线程中,设置鹰眼rpc环境
**/
private void runTask() {
try {
EagleEye.setRpcContext(ctx);
// do something...
} catch (Exception e) {
log.error("requestError,
params: {}", this.params, e);
} finally {
// 判断当前任务是否是主线程在运行,当Rejected策略为CallerRunsPolicy的时候,核对当前线程
if (mainThread != Thread.currentThread())
{
EagleEye.clearRpcContext();
}
}
}
|
04 ThreadLocal线程变量概述
4.1 什么是ThreadLocal
ThreadLocal类提供了线程本地变量(thread-local variables),这些变量不同于普通的变量,访问线程本地变量的每个线程(通过其get或set方法)都有其自己的独立初始化的变量副本,因此ThreadLocal没有多线程竞争的问题,不需要单独进行加锁。
4.2 ThreadLocal使用场景
每个线程都需要有属于自己的实例数据(线程隔离);
框架跨层数据的传递;
需要参数全局传递的复杂调用链路的场景;
数据库连接的管理,在AOP的各种嵌套调用中保证事务的一致性。
05 ThreadLocal的原理与实践
对于ThreadLocal而言,常用的方法有get/set/initialValue三个方法。
众所周知,在java中SimpleDateFormat有线程安全问题,为了安全地使用SimpleDateFormat,除了1)创建SimpleDateFormat局部变量;和2)加同步锁
两种方案外,我们还可以使用3)ThreadLocal的方案。
/**
* 使用 ThreadLocal 定义一个全局的
SimpleDateFormat
*/
private static ThreadLocal<SimpleDateFormat>
simpleDateFormatThreadLocal = new
ThreadLocal<SimpleDateFormat>()
{
@Override
protected SimpleDateFormat
initialValue() {
return new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss");
}
};
// 用法
String dateString = simpleDateFormatThreadLocal.get().format(calendar.getTime());
|
5.1 ThreadLocal原理
Thread内部维护了一个ThreadLocal.ThreadLocalMap实例(threadLocals),ThreadLocal的操作都是围绕着threadLocals来操作的。
✪ threadLocal.get()方法
/**
* Returns the value in the
current thread's copy of this
* thread-local variable.
If the variable has no value for the
* current thread, it is first
initialized to the value returned
* by an invocation of the
{@link #initialValue} method.
*
* @return the current thread's
value of this thread-local
*/
public T get() {
// 1. 获取当前线程
Thread t = Thread.currentThread();
// 2. 获取当前线程内部的ThreadLocalMap变量t.threadLocals;
ThreadLocalMap map = getMap(t);
// 3. 判断map是否为null
if (map != null) {
// 4. 使用当前threadLocal变量获取entry
ThreadLocalMap.Entry e =
map.getEntry(this);
// 5. 判断entry是否为null
if (e != null) {
// 6.返回Entry.value
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
// 7. 如果map/entry为null设置初始值
return setInitialValue();
}
/**
* Variant of set() to establish
initialValue. Used instead
* of set() in case user has
overridden the set() method.
*
* @return the initial value
*/
private T setInitialValue()
{
// 1. 初始化value,如果重写就用重写后的value,默认null
T value = initialValue();
// 2. 获取当前线程
Thread t = Thread.currentThread();
// 3. 获取当前线程内部的ThreadLocalMap变量
ThreadLocalMap map = getMap(t);
if (map != null)
// 4. 不为null就set, key: threadLocal,
value: value
map.set(this, value);
else
// 5. map若为null则创建ThreadLocalMap对象
createMap(t, value);
return value;
}
/**
* Create the map associated
with a ThreadLocal. Overridden in
* InheritableThreadLocal.
*
* @param t the current thread
* @param firstValue value
for the initial entry of the map
*/
void createMap(Thread t,
T firstValue) {
t.threadLocals = new ThreadLocalMap(this,
firstValue);
}
/**
* Construct a new map initially
containing (firstKey, firstValue).
* ThreadLocalMaps are constructed
lazily, so we only create
* one when we have at least
one entry to put in it.
*/
ThreadLocalMap(ThreadLocal<?>
firstKey, Object firstValue) {
// 1. 初始化entry数组,size: 16
table = new Entry[INITIAL_CAPACITY];
// 2. 计算value的index
int i = firstKey.threadLocalHashCode
& (INITIAL_CAPACITY - 1);
// 3. 在对应index位置赋值
table[i] = new Entry(firstKey,
firstValue);
// 4. entry size
size = 1;
// 5. 设置threshold: threshold
= len * 2 / 3;
setThreshold(INITIAL_CAPACITY);
}
/**
* Set the resize threshold
to maintain at worst a 2/3 load factor.
*/
private void setThreshold(int
len) {
threshold = len * 2 / 3;
}
|
✪ threadLocal.set()方法
/**
* Sets the current thread's
copy of this thread-local variable
* to the specified value.
Most subclasses will have no need to
* override this method, relying
solely on the {@link #initialValue}
* method to set the values
of thread-locals.
*
* @param value the value
to be stored in the current thread's copy
of
* this thread-local.
*/
public void set(T value)
{
// 1. 获取当前线程
Thread t = Thread.currentThread();
// 2. 获取当前线程内部的ThreadLocalMap变量
ThreadLocalMap map = getMap(t);
if (map != null)
// 3. 设置value
map.set(this, value);
else
// 4. 若map为null则创建ThreadLocalMap
createMap(t, value);
}
|
✪ ThreadLocalMap
从JDK源码可见,ThreadLocalMap中的Entry是弱引用类型的,这就意味着如果这个ThreadLocal只被这个Entry引用,而没有被其他对象强引用时,就会在下一次GC的时候回收掉。
static class ThreadLocalMap
{
/**
* The entries in this hash
map extend WeakReference, using
* its main ref field as the
key (which is always a
* ThreadLocal object). Note
that null keys (i.e. entry.get()
* == null) mean that the
key is no longer referenced, so the
* entry can be expunged from
table. Such entries are referred to
* as "stale entries"
in the code that follows.
*/
static class Entry extends
WeakReference<ThreadLocal<?>>
{
/** The value associated
with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?>
k, Object v) {
super(k);
value = v;
}
}
// ...
}
|
5.2 ThreadLocal示例
✪ 鹰眼链路ThreadLocal的使用
EagleEye(鹰眼)作为全链路监控系统在集团内部被广泛使用,traceId、rpcId、压测标等信息存储在EagleEye的ThreadLocal变量中,并在HSF/Dubbo服务调用间进行传递。EagleEye通过Filter将数据初始化到ThreadLocal中,部分相关代码如下:
EagleEyeHttpRequest eagleEyeHttpRequest
= this.convertHttpRequest(httpRequest);
// 1. 初始化,将traceId、rpcId等数据存储到鹰眼的ThreadLocal变量中
EagleEyeRequestTracer.startTrace(eagleEyeHttpRequest,
false);
try {
chain.doFilter(httpRequest,
httpResponse);
} finally {
// 2. 清理ThreadLocal变量值
EagleEyeRequestTracer.endTrace(this.convertHttpResponse(httpResponse));
}
|
在EagleEyeFilter中,通过EagleEyeRequestTracer.startTrace方法进行初始化,在前置入参转换后,通过startTrace重载方法将鹰眼上下文参数存入ThreadLocal中,相关代码如下:
EagleEyeFilter在finally代码块中,通过EagleEyeRequestTracer.endTrace方法结束调用链,通过clear方法将ThreadLocal中的数据进行清理,相关代码实现如下:
✪ Bad case:XX项目权益领取失败问题
在某权益领取原有链路中,通过app打开一级页面后才能发起权益领取请求,请求经过淘系无线网关(Mtop)后到达服务端,服务端通过mtop
sdk获取当前会话信息。
在XX项目中,对权益领取链路进行了升级改造,在一级页面请求时,通过服务端同时发起权益领取请求。具体地,服务端在处理一级页面请求时,同时通过调用hsf/dubbo接口来进行权益领取,因此在发起rpc调用时需要携带用户当前会话信息,在服务提供端将会话信息进行提取并注入到mtop上下文,从而才能通过mtop
sdk获取到会话id等信息。某开发同学在实现时,因ThreadLocal使用不当造成下述问题:
问题1:因ThreadLocal初始化时机不当,造成获取不到会话信息,进而导致权益领取失败。
问题2:请求完成时,因未清理ThreadLocal中的变量值,导致脏数据。
⍟ 问题1:权益领取失败分析
在权益领取服务中,该应用构建了一套高效和线程安全的依赖注入框架,基于该框架的业务逻辑模块通常抽象为xxxModule形式,Module间为网状依赖关系,框架会按依赖关系自动调用init方法(其中,被依赖的module
的init方法先执行)。
在应用中,权益领取接口的主入口为CommonXXApplyModule类,CommonXXApplyModule依赖XXSessionModule。当请求来临时,会按依赖关系依次调用init方法,因此XXSessionModule的init方法会优先执行;而开发同学在CommonXXApplyModule类中的init方法中通过调用recoverMtopContext()方法来期望恢复mtop上下文,因recoverMtopContext()方法的调用时机过晚,从而导致XXSessionModule模块获取不到正确的会话id等信息而导致权益领取失败。
⍟ 问题2:脏数据分析
权益领取服务在处理请求时,若当前线程曾经处理过权益领取请求,因ThreadLocal变量值未被清理,此时XXSessionModule通过mtop
SDK获取会话信息时得到的是前一次请求的会话信息,从而造成脏数据。
⍟ 解决方案
在依赖注入框架入口处AbstractGate#visit(或在XXSessionModule中)通过recoverMtopContext方法注入mtop上下文信息,并在入口方法的finally代码块清理当前请求的threadlocal变量值。
5.3 思考&小结
1)ThreadLocalMap中的Entry为什么要设计为弱引用类型?
若使用强引用类型,则threadlocal的引用链为:Thread -> ThreadLocal.ThreadLocalMap
-> Entry[] -> Entry -> key(threadLocal对象)和value;在这种场景下,只要这个线程还在运行(如线程池场景),若不调用remove方法,则该对象及关联的所有强引用对象都不会被垃圾回收器回收。
2)使用static和不使用static修饰threadlocal变量有和区别?
若使用static关键字进行修饰,则一个线程仅对应一个线程变量;否则,threadlocal语义变为perThread-perInstance,容易引发内存泄漏,如下述示例:
public class ThreadLocalTest
{
public static class ThreadLocalDemo
{
private ThreadLocal<String>
threadLocalHolder = new ThreadLocal();
public void setValue(String
value) {
threadLocalHolder.set(value);
}
public String getValue()
{
return threadLocalHolder.get();
}
}
public static void main(String[]
args) {
int count = 3;
List<ThreadLocalDemo>
list = new LinkedList<>();
for (int i = 0; i < count;
i++) {
ThreadLocalDemo demo = new
ThreadLocalDemo();
demo.setValue("demo-"
+ i);
list.add(demo);
}
System.out.println();
}
}
|
在上述main方法第22行debug,可见线程的threadLocals变量中有3个threadlocal实例。在工程实践中,使用threadlocal时通常期望一个线程只有一个threadlocal实例,因此,若不使用static修饰,期望的语义发生了变化,同时易引起内存泄漏。
5.4 最佳实践
✪ ThreadLocal变量值初始化和清理建议成对出现
如果不执行清理操作,则可能会出现:
1)内存泄漏:由于ThreadLocalMap的中key是弱引用,而Value是强引用。这就导致了一个问题,ThreadLocal在没有外部对象强引用时,发生GC时弱引用Key会被回收,而Value不会回收,从而Entry里面的元素出现<null,value>的情况。如果创建ThreadLocal的线程一直持续运行,那么这个Entry对象中的value就有可能一直得不到回收,这样可能会导致内存泄露。
2)脏数据:由于线程复用,在用户1请求时,可能保存了业务数据在ThreadLocal中,若不清理,则用户2的请求进来时,可能会读到用户1的数据。
建议使用try...finally 进行清理。
✪ ThreadLocal变量建议使用static进行修饰
我们在使用ThreadLocal时,通常期望的语义是perThread,若不使用static进行修饰,则语义变为perThread-perInstance;在线程池场景下,若不用static进行修饰,创建的线程相关实例可能会达到
M * N个(其中M为线程数,N为对应类的实例数),易造成内存泄漏(https://errorprone.info/bugpattern/ThreadLocalUsage)。
✪ 谨慎使用ThreadLocal.withInitial
在应用中,谨慎使用ThreadLocal.withInitial(Supplier<? extends
S> supplier)这个工厂方法创建ThreadLocal对象,一旦不同线程的ThreadLocal使用了同一个Supplier对象,那么隔离也就无从谈起了,如:
// 反例,实际上使用了共享对象obj而并未隔离,
private static ThreadLocal<Obj>
threadLocal = ThreadLocal.withIntitial(()
-> obj);
|
06 总结
在java工程实践中,线程池和线程变量被广泛使用,因线程池和线程变量的不当使用经常造成安全生产事故,因此,正确使用线程池和线程变量是每一位开发人员必须修炼的基本功。本文从线程池和线程变量的使用出发,简要介绍了线程池和线程变量的原理和使用实践,各开发人员可结合最佳实践和实际应用场景,正确地使用线程和线程变量,构建出稳定、高效的java应用服务。
|