编辑推荐: |
本文介绍了 什么是虚拟线程、如何使用虚拟线程及最佳实践案例。希望能对遇到类似问题的同学有所帮助。
本文来自于每日运维,由火龙果软件Linda编辑、推荐。 |
|
1.1 什么是虚拟线程
先来看一个例子。
var a = new AtomicInteger(0);
// 创建一个固定200个线程的线程池
try (var vs = Executors.newFixedThreadPool(200)) {
List futures = new ArrayList();
var begin = System.currentTimeMillis();
// 向线程池提交1000个sleep 1s的任务
for (int i=0; i {
Thread.sleep(Duration.ofSeconds(1));
return a.addAndGet(1);
});
futures.add(future);
}
// 获取这1000个任务的结果
for (var future : futures) {
var i = future.get();
if (i % 100 == 0) {
System.out.print(i + " ");
}
}
// 打印总耗时
System.out.println("Exec finished.");
System.out.printf("Exec time: %dms.%n", System.currentTimeMillis() - begin);
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
|
你也许注意到了我们用了var来代替ExecutorService、Future等类型来做对象声明,这个特性很容易理解,我们会在第3篇中介绍。另外ExecutorService继承了AutoCloseable接口,已经可以在try-with-resource语法中创建了。
这段程序的逻辑很简单,向一个固定200个线程的线程池提交1000个sleep 1s的任务并遍历获取结果,平均每个线程会执行5个任务,总耗时约为5s,程序的输出如下:
100 200 300 400 500 600 700 800 900 1000 Exec finished.
Exec time: 5046ms.
|
让我们尝试下使用虚拟线程,将第3行的线程池创建替换为
try (var vs = Executors.newVirtualThreadPerTaskExecutor()) {
|
程序的输出如下:
100 200 300 400 500 600 700 800 900 1000 Exec finished.
Exec time: 1033ms.
|
100 200 300 400 500 600 700 800 900 1000 Exec finished.
Exec time: 1033ms.
可以明显看出,使用虚拟线程的版本执行速度要比线程池快很多,如果我们加大任务量到一百万个任务,打印线程改为每一万个打印一次,程序输出如下:
10000 20000 30000 40000 50000
60000 70000 80000 90000 100000 110000 120000
130000 140000 150000 160000 170000 180000
190000 200000 210000 220000 230000 240000
250000 260000 270000 280000 290000 300000
310000 320000 330000 340000 350000 360000
370000 380000 390000 400000 410000 420000
430000 440000 450000 460000 470000 480000
490000 500000 510000 520000 530000 540000
550000 560000 570000 580000 590000 600000
610000 620000 630000 640000 650000 660000
670000 680000 690000 700000 710000 720000
730000 740000 750000 760000 770000 780000
790000 800000 810000 820000 830000 840000
850000 860000 870000 880000 890000 900000
910000 920000 930000 940000 950000 960000
970000 980000 990000 1000000
Exec finished.Exec time:
16897ms.
|
可以看到,虚拟线程处理100万个并发任务的耗时只有10几秒的时间,这个处理能力如果用线程池来做无论线程数如何配置都几乎不可能实现。
虚拟线程的作用
区别于虚拟线程,传统的线程对象叫做平台线程(platform thread)。平台线程在底层 OS
线程上运行 Java 代码,并在代码的整个生命周期中占用该 OS 线程,因此平台线程的数量受限于 OS
线程的数量。虚拟线程是 java.lang.Thread 的一个实例,它在底层 OS 线程上运行 Java
代码,但不会在代码的整个生命周期中占用该 OS 线程。也就是说,多个虚拟线程可以在同一个 OS 线程上运行其
Java 代码,可以有效地共享该线程。平台线程独占宝贵的 OS 线程,而虚拟线程则不会,因此虚拟线程的数量可以比
OS 线程的数量多得多,执行阻塞任务的整体吞吐量也就大了很多。
但如果上述任务不是简单的sleep 1s,而是计算了1s(例如做矩阵计算或数组排序等),用线程池和虚拟线程的执行时间区别就没有那么大。原因是虚拟线程虽然可以带来更大的吞吐量,但并不能让单个任务计算得更快,当使用平台线程执行任务已经让cpu没有任何空闲时,切换虚拟线程来执行也不会带来任何收益。
虚拟线程可以发挥的最大作用是,可以让采用单请求单线程(thread-per-request)的方式编写的服务器程序最大化地利用CPU计算资源
。 其原因在于服务器程序有两大特点,一是需要处理较大吞吐量的请求,二是请求处理的过程大多是由IO密集型逻辑组成,这就导致采用平台线程实现的单请求单线程编写方式,可能会有大量的IO阻塞占据了平台线程资源,从而不能充分利用CPU资源。我们在使用真实应用压测时观察到,当服务请求IO耗时增大时,使用虚拟线程的吞吐量会明显高于线程池,尤其是当服务下游依赖出现故障导致耗时增大时,虚拟线程带来的服务可用性提升会非常明显。
有些情况下,服务端开发者为了充分利用cpu硬件资源,会考虑放弃单请求单线程的编程风格,而采用基于netty、actor等异步框架来构建服务。这样虽然它消除了由于OS线程稀缺性带来的吞吐量限制,但代价很高:它需要异步编程风格,没有专用线程,开发人员必须将请求处理逻辑分解成小阶段,通常是lambda表达式或独立的回调handler对象,然后使用API将它们组合成一个顺序管道(例如CompletableFuture或响应式框架),在一定程度上放弃了代码的顺序执行逻辑和代码的可读性。
在异步风格中,请求的每个阶段可能在不同的线程上执行,每个线程以交替方式运行属于不同请求的阶段。这对于理解程序行为有很大的影响:堆栈跟踪不能提供可用的上下文,debug无法跟踪请求处理逻辑,而分析工具无法将请求处理与其调用者相关联。总的来说,除了底层服务框架和一些特定的功能性服务,大部分以业务开发为主导的服务器程序都不会采用这种编程风格进行逻辑开发。
虚拟线程的工作原理
了解了虚拟线程的作用,我们再来看看它的工作原理。线程需要被调度才能执行任务,本质上是分配到CPU上执行。对于由操作系统线程实现的平台线程,JDK
依赖于操作系统中的调度程序;而对于虚拟线程,JDK 先将虚拟线程分配给平台线程,然后平台线程按照通常的方式由操作系统进行调度。
JDK 的虚拟线程调度器是一个以 FIFO 模式运行的 ForkJoinPool,调度器的默认并行度是可用于调度虚拟线程的平台线程数量,并行度可以通过设置启动参数调整。调度器函数代码如下:
private static ForkJoinPool createDefaultScheduler() {
ForkJoinWorkerThreadFactory factory = pool -> {
PrivilegedAction pa = () -> new CarrierThread(pool);
return AccessController.doPrivileged(pa);
};
PrivilegedAction pa = () -> {
int parallelism, maxPoolSize, minRunnable;
String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
... //略过一些赋值操作
Thread.UncaughtExceptionHandler handler = (t, e) -> { };
boolean asyncMode = true; // FIFO
return new ForkJoinPool(parallelism, factory, handler, asyncMode,
0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
};
return AccessController.doPrivileged(pa);
}
|
调度器分配给虚拟线程的平台线程称为虚拟线程的载体线程(carrier)。虚拟线程可以在其生命周期内会被安排在不同的载体线程上。换句话说,调度器不维护虚拟线程和平台线程之间的亲和关系。从
Java 代码的角度来看,运行中的虚拟线程在逻辑上独立于其当前载体线程:
载体线程的信息对虚拟线程不可见,Thread.currentThread() 返回的值始终是虚拟线程本身。
载体线程和虚拟线程的堆栈跟踪是分开的。在虚拟线程中抛出的异常将不包括载体线程的堆栈帧。线程dump不会在虚拟线程的堆栈中显示载体线程的堆栈帧,反之亦然。
载体线程的thread-local变量对虚拟线程不可用,反之亦然。
从 Java 代码的角度来看,开发者不能感知到虚拟线程和其载体线程临时共享了一个操作系统线程。但从本地代码(native
code)的角度来看,虚拟线程和其载体在同一个本地线程上运行。因此,在同一虚拟线程上多次调用的本地代码可能会观察到不同的操作系统线程标识符。
下图可以用来表示使用虚拟线程和平台线程(线程池)执行并发任务的区别:
图1-1 线程池处理并发任务
图1-2 虚拟线程处理并发任务
1.2 使用虚拟线程
虚拟线程API
在虚拟线程中可以执行任何java代码,并且支持thread-local和线程中断,因此从使用平台线程迁移到使用虚拟线程的成本非常低。如果要替换一个现有的线程池,则可以使用Executors.newVirtualThreadPerTaskExecutor()
来返回一个 ExecutorService,使用方式如下:
private static ForkJoinPool createDefaultScheduler() {
ForkJoinWorkerThreadFactory factory = pool -> {
PrivilegedAction pa = () -> new CarrierThread(pool);
return AccessController.doPrivileged(pa);
};
PrivilegedAction pa = () -> {
int parallelism, maxPoolSize, minRunnable;
String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
... //略过一些赋值操作
Thread.UncaughtExceptionHandler handler = (t, e) -> { };
boolean asyncMode = true; // FIFO
return new ForkJoinPool(parallelism, factory, handler, asyncMode,
0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
};
return AccessController.doPrivileged(pa);
}
|
除了使用Executors,创建虚拟线程还可以使用java.lang.Thread新的API,Thread.Builder、Thread.ofVirtual()
和 Thread.ofPlatform() 是用于创建虚拟线程和平台线程的新 API。例如这个例子,创建了一个名为“duke”的未启动的新虚拟线程:
Thread thread = Thread.ofVirtual().name("duke").unstarted(runnable);
|
Thread::startVirtualThread可以快捷地创建并启动虚拟线程:
Thread.startVirtualThread(Runnable)
|
其他一些被影响到的方法有:
Thread.Builder 可以创建线程或 ThreadFactory,后者可以创建具有相同属性的多个线程。
Thread.isVirtual() 返回当前线程是否为虚拟线程。
Thread.getAllStackTraces() 会返回所有平台线程的映射,不包括虚拟线程。
避免用ThreadLocal池化资源
虚拟线程支持ThreadLocal和InheritableThreadLocal,因此它们可以运行使用ThreadLocal的现有代码。然而,由于虚拟线程可能非常众多,使用ThreadLocal时必须仔细考虑,特别是**在线程池中共享同一线程的多个任务之间,不要使用ThreadLocal来池化过大的资源。**使用虚拟线程的预期是,只在其生命周期内运行单个任务,因此不应该对虚拟线程进行池化。在过去的一些版本中,java的基础库已经删除了许多ThreadLocal的使用,以减少在大规模使用虚拟线程时的内存占用。
虚拟线程固定(pinned)问题
JDK中绝大多数的阻塞操作(如LockSupport、网络库API、大部分IO操作)都会卸载虚拟线程,释放其载体线程和底层操作系统线程以执行其他虚拟线程。然而,JDK中有一些阻塞操作不会卸载虚拟线程,从而阻塞其载体线程和底层操作系统线程,这是因为操作系统层面(例如许多文件系统操作)或JDK层面(例如Object.wait())的限制。为了解决这些阻塞操作的问题,虚拟线程调度器会通过暂时扩展并行度来弥补平台线程被占用,因此在调度程序的ForkJoinPool中,平台线程的数量可能暂时超过可用处理器的数量,可以通过系统属性jdk.virtualThreadScheduler.maxPoolSize来调整调度器可用的平台线程的最大数量。
但有两种情况下虚拟线程在阻塞操作期间不能卸载,而是被固定(pinned)在其载体线程上:
1. 当虚拟线程在 synchronized 代码块或方法中执行代码时
2. 当它执行本地方法(native method )或外部函数(foreign function)时
用以下代码改造一下本章开头用到的例子中,向虚拟线程提交任务的代码,执行后即可看到这个变化:
var future = vs.submit(() -> {
synchronized (lock) {
Thread.sleep(Duration.ofSeconds(1));
int x = a.addAndGet(1);
return x;
}
});
|
虚拟线程固定不会使应用程序执行得不正确,但可能会影响程序的扩展性,正如上面的例子。如果虚拟线程在被固定的情况下执行阻塞操作(例如I/O或BlockingQueue.take()),则其载体线程将在操作期间被阻塞,而且调度器也不会通过扩展ForkJoinPool的平台线程数来补偿。因此,频繁长时间的虚拟线程固定会严重损害应用程序的可扩展性。我们应该修改包含频繁的有长时间阻塞操作的
synchronized 代码块和方法,代码可以用ReentrantLock来重构。而对于仅在程序启动阶段执行,或完全在内存执行的
synchronized 代码块和方法,没有必要全部修改。
1.3 最佳实践
虚拟线程的thread dump
传统的打印线程堆栈方法(jstack、jcmd)dump文件中不包含虚拟线程,想要查看虚拟线程堆栈,需要指定json格式:
$ jcmd Thread.dump_to_file -format=json
|
这份dump文件包含所有的平台线程和虚拟线程,由于线程数量可以很高,这个文件可能也会很大。在线程数较少时,内容大致如下:
监控虚拟线程固定事件
设置系统属性 jdk.tracePinnedThreads 可以在发生虚拟线程固定时,打印线程堆栈跟踪,具体用法是在程序启动时设置
-Djdk.tracePinnedThreads=full 会打印虚拟线程执行的完整堆栈,如下:
Thread[#23,ForkJoinPool-1-worker-1,5,CarrierThreads]
java.base/java.lang.VirtualThread$VThreadContinuation.onPinned(VirtualThread.java:183)
java.base/jdk.internal.vm.Continuation.onPinned0(Continuation.java:398)
java.base/jdk.internal.vm.Continuation.yield0(Continuation.java:390)
java.base/jdk.internal.vm.Continuation.yield(Continuation.java:357)
java.base/java.lang.VirtualThread.yieldContinuation(VirtualThread.java:428)
java.base/java.lang.VirtualThread.parkNanos(VirtualThread.java:599)
java.base/java.lang.VirtualThread.doSleepNanos(VirtualThread.java:777)
java.base/java.lang.VirtualThread.sleepNanos(VirtualThread.java:750)
java.base/java.lang.Thread.sleep(Thread.java:525)
java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:446)
LoomTest.lambda$main$2(LoomTest.java:159) protocolHandlerVirtualThreadExecutorCustomizer() {
return protocolHandler -> {
protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
};
}
|
OverloadRejectedVirtualExecutor是我通过封装虚拟线程实现的Executor,通过感知当前服务器cpu和内存使用是否超载来动态调整Executor是否执行拒绝策略:
public class OverloadRejectedVirtualExecutor implements ExecutorService {
private static final Logger log = LoggerFactory.getLogger(OverloadRejectedVirtualExecutor.class);
private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
private volatile boolean overload = false;
private volatile boolean terminated = false;
public OverloadRejectedVirtualExecutor() {
Thread.ofVirtual().name("OverloadMonitor").start(() -> {
int hb = 0, cpuOverloadCount = 0, memoryOverloadCount = 0;
while (!terminated) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
log.error("overload rejected virtual executor sleep error", e);
overload = false;
}
if (++hb >= 60) {
hb = 0;
logEvent("DUBBO_VIRTUAL_THREAD_POOL", "heartbeat");
}
// 连续5s CPU Load > 0.99,设置为过载
var operatingSystem = ManagementFactory.getOperatingSystemMXBean();
if (operatingSystem instanceof com.sun.management.OperatingSystemMXBean osBean) {
double cpuLoad = osBean.getCpuLoad();
double processCpuLoad = osBean.getProcessCpuLoad();
if (processCpuLoad > 0.99) {
overload = ++cpuOverloadCount > 4;
logEvent("DUBBO_VIRTUAL_THREAD_POOL", "CPU_Load_over_99%_" + cpuOverloadCount);
log.info("CPU_Load: {}% , Process_CPU_Load: {}%", cpuLoad * 100, processCpuLoad * 100);
}
if (overload) {
logEvent("DUBBO_VIRTUAL_THREAD_POOL", "overload");
continue;
}
}
cpuOverloadCount = 0;
// 连续5s Memory Used > 99%,设置为过载
var usedMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
var maxMemory = Runtime.getRuntime().maxMemory();
if (100d * usedMemory / maxMemory > 99) {
overload = ++memoryOverloadCount > 4;
logEvent("DUBBO_VIRTUAL_THREAD_POOL", "Memory_Used_over_99%_" + memoryOverloadCount);
}
if (overload) {
logEvent("DUBBO_VIRTUAL_THREAD_POOL", "overload");
continue;
}
memoryOverloadCount = 0;
overload = false;
}
});
log.info("dubbo virtual thread executor init.");
}
private void reject() {
RejectedExecutionException e = new RejectedExecutionException("Dubbo server is overload now!");
logError(e);
throw e;
}
@Override
public void execute(Runnable command) {
if (overload) {
reject();
}
executor.execute(command);
}
@Override
public Future submit(Callable task) {
if (overload) {
reject();
}
return executor.submit(task);
}
@Override
public void close() {
terminated = true;
executor.close();
}
... //省略其他方法
}
|
最后将DubboVirtualThreadPool设置为启动线程池,即可使用虚拟线程处理dubbo请求。这段代码中
if (operatingSystem instanceof com.sun.management.OperatingSystemMXBean
osBean) 用到了instanceof 模式匹配的语法,我们会在第4篇里介绍。
实现虚拟线程版的CompletableFuture
ComplableFuture是java并发编程中最重要的异步编程工具类之一,但由于其defaultExecutor使用的是ForkJoinPool.commonPool(),本身并不适合处理高并发且IO密集的请求,因此一般使用ComplableFuture时都需要根据不同场景传入自定义的线程池,这就会导致项目中线程池定义繁多甚至重复,且在程序中也存在一定的风险。
我们可以封装一个虚拟线程版本的CompletableFuture,来解决以上所有的问题,新的类命名为VCompletableFuture:
public final class VCompletableFuture implements Future, CompletionStage {
private static final ExecutorService vcfExecutor = Executors.newVirtualThreadPerTaskExecutor();
private CompletableFuture completableFuture;
private VCompletableFuture() {}
public static VCompletableFuture supplyAsync(Supplier supplier) { VCompletableFuture vcf = new VCompletableFuture(); vcf.completableFuture = CompletableFuture.supplyAsync(supplier, vcfExecutor); return vcf; }
public static VCompletableFuture runAsync(Runnable runnable) { VCompletableFuture vcf = new VCompletableFuture(); vcf.completableFuture = CompletableFuture.runAsync(runnable, vcfExecutor); return vcf; }
public static VCompletableFuture allOf(VCompletableFuture... vcfs) { VCompletableFuture vcf = new VCompletableFuture(); CompletableFuture[] cfs = Arrays.stream(vcfs).map(vf -> vf.completableFuture).toList().toArray(new CompletableFuture[0]); vcf.completableFuture = CompletableFuture.allOf(cfs); return vcf; }
public static VCompletableFuture anyOf(VCompletableFuture... vcfs) { VCompletableFuture vcf = new VCompletableFuture(); CompletableFuture[] cfs = Arrays.stream(vcfs).map(item -> item.completableFuture).toList().toArray(new CompletableFuture[0]); vcf.completableFuture = CompletableFuture.anyOf(cfs); return vcf; }
public VCompletableFuture thenApply(Function
|
|