java.util.concurrent 概述
1. 概述
java.util.concurrent包提供了用于创建并发应用程序的工具。
在本文中,我们将对整个软件包进行概述。
2. 主要部件
java.util.concurrent包含太多的功能,无法在一篇文章中讨论。在本文中,我们将主要关注此软件包中一些最有用的实用程序,例如:
- Executor
- ExecutorService
- ScheduledExecutorService
- Future
- CountDownLatch
- CyclicBarrier
- Semaphore
- ThreadFactory
- BlockingQueue
- DelayQueue
- Locks
- Phaser
2.1.Executor
执行器是一个接口,表示执行所提供任务的对象。
这取决于特定的实现(从何处启动调用)是否应在新线程或当前线程上运行。因此,使用此接口,我们可以将任务执行流程与实际任务执行机制解耦。
这里需要注意的一点是,执行器并不严格要求任务执行是异步的。在最简单的情况下,执行程序可以在调用线程中立即调用提交的任务。
我们需要创建一个调用程序来创建执行器实例:
代码语言:javascript代码运行次数:0运行复制public class Invoker implements Executor {
@Override
public void execute(Runnable r) {
r.run();
}
}Copy
现在,我们可以使用此调用程序来执行任务。
代码语言:javascript代码运行次数:0运行复制public void execute() {
Executor executor = new Invoker();
executor.execute( () -> {
// task to be performed
});
}Copy
这里需要注意的是,如果执行程序不能接受执行任务,它将抛出RejectedExecutionException。
2.2.ExecutorService
ExecutorService是异步处理的完整解决方案。它管理内存中队列,并根据线程可用性计划提交的任务。
要使用ExecutorService,我们需要创建一个Runnable类。
代码语言:javascript代码运行次数:0运行复制public class Task implements Runnable {
@Override
public void run() {
// task details
}
}Copy
现在我们可以创建ExecutorService实例并分配此任务。在创建时,我们需要指定线程池大小。
代码语言:javascript代码运行次数:0运行复制ExecutorService executor = Executors.newFixedThreadPool(10);Copy
如果我们想创建一个单线程的ExecutorService实例,我们可以使用newSingleThreadExecutor(ThreadFactory threadFactory)来创建实例。
创建执行器后,我们可以使用它来提交任务。
代码语言:javascript代码运行次数:0运行复制public void execute() {
executor.submit(new Task());
}Copy
我们还可以在提交任务时创建Runnable实例。
代码语言:javascript代码运行次数:0运行复制executor.submit(() -> {
new Task();
});Copy
它还带有两种开箱即用的执行终止方法。第一个是shutdown();它会等待所有提交的任务完成执行。另一种方法是shutdownNow(),它尝试终止所有正在执行的任务并停止等待任务的处理。
还有另一种方法awaitTermination(long timeout, TimeUnit unit) ,它强制阻塞,直到触发关闭事件或发生执行超时后所有任务完成执行,或者执行线程本身被中断,
代码语言:javascript代码运行次数:0运行复制try {
executor.awaitTermination( 20l, TimeUnit.NANOSECONDS );
} catch (InterruptedException e) {
e.printStackTrace();
}Copy
2.3.ScheduledExecutorService
ScheduledExecutorService是与ExecutorService类似的接口,但它可以定期执行任务。
Executor和ExecutorService的方法是即时调度的,不会引入任何人为延迟。零或任何负值表示请求需要立即执行。
我们可以同时使用Runnable和Callable接口来定义任务。
代码语言:javascript代码运行次数:0运行复制public void execute() {
ScheduledExecutorService executorService
= Executors.newSingleThreadScheduledExecutor();
Future<String> future = executorService.schedule(() -> {
// ...
return "Hello world";
}, 1, TimeUnit.SECONDS);
ScheduledFuture<?> scheduledFuture = executorService.schedule(() -> {
// ...
}, 1, TimeUnit.SECONDS);
executorService.shutdown();
}Copy
ScheduledExecutorService还可以在给定的固定延迟后调度任务:
代码语言:javascript代码运行次数:0运行复制executorService.scheduleAtFixedRate(() -> {
// ...
}, 1, 10, TimeUnit.SECONDS);
executorService.scheduleWithFixedDelay(() -> {
// ...
}, 1, 10, TimeUnit.SECONDS);Copy
在这里,scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit ) 方法创建并执行一个定期操作,该操作在提供的初始延迟之后首先调用,随后在给定的时间段内调用,直到服务实例关闭。
scheduleWithFixedDelay( runnable command, long initialDelay, long delay, TimeUnit unit ) 方法创建并执行一个周期性操作,该操作在提供的初始延迟之后首先调用,并在执行一个终止和下一个调用之间以给定的延迟重复调用。
2.4.Future
Future用于表示异步操作的结果。它带有用于检查异步操作是否完成、获取计算结果等的方法。
更重要的是,cancel(boolean mayInterruptIfRunning)API 会取消操作并释放正在执行的线程。如果mayInterruptIfRunning的值为 true,则执行任务的线程将立即终止。否则,将允许完成正在进行的任务。我们可以使用以下代码片段来创建将来的实例:
代码语言:javascript代码运行次数:0运行复制public void invoke() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(() -> {
// ...
Thread.sleep(10000l);
return "Hello world";
});
}Copy
我们可以使用以下代码片段来检查未来结果是否准备就绪,并在计算完成后获取数据:
代码语言:javascript代码运行次数:0运行复制if (future.isDone() && !future.isCancelled()) {
try {
str = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}Copy
我们还可以为给定操作指定超时。如果任务花费的时间超过此时间,则会抛出超时异常:
代码语言:javascript代码运行次数:0运行复制try {
future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}Copy
2.5.CountDownLatch
CountDownLatch(在JDK 5中引入)是一个实用程序类,它阻止一组线程,直到某些操作完成。
CountDownLatch使用计数器(整数类型)初始化;此计数器随着依赖线程完成执行而递减。但是一旦计数器达到零,其他线程就会被释放。
2.6.CyclicBarrier
CyclicBarrier的工作方式与CountDownLatch几乎相同,只是我们可以重用它。与CountDownLatch 不同,它允许多个线程在调用最终任务之前使用await() 方法(称为屏障条件)相互等待。
我们需要创建一个可运行的任务实例来启动屏障条件:
代码语言:javascript代码运行次数:0运行复制public class Task implements Runnable {
private CyclicBarrier barrier;
public Task(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
LOG.info(Thread.currentThread().getName() +
" is waiting");
barrier.await();
LOG.info(Thread.currentThread().getName() +
" is released");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}Copy
现在我们可以调用一些线程来竞争屏障条件:
代码语言:javascript代码运行次数:0运行复制public void start() {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
// ...
LOG.info("All previous tasks are completed");
});
Thread t1 = new Thread(new Task(cyclicBarrier), "T1");
Thread t2 = new Thread(new Task(cyclicBarrier), "T2");
Thread t3 = new Thread(new Task(cyclicBarrier), "T3");
if (!cyclicBarrier.isBroken()) {
t1.start();
t2.start();
t3.start();
}
}Copy
在这里,isBroken()方法检查是否有任何线程在执行期间被中断。在执行实际过程之前,我们应该始终执行此检查。
2.7.Semaphore
信号量用于阻止对物理或逻辑资源的某些部分的线程级访问。信号量包含一组许可证;每当线程尝试进入关键部分时,它都需要检查信号量是否允许。
如果许可证不可用(通过tryAcquire()),则不允许线程跳转到关键部分;但是,如果许可证可用,则授予访问权限,并且许可证计数器减少。
一旦执行线程释放关键部分,允许计数器再次增加(通过release()方法完成)。
我们可以通过使用 tryAcquire(long timeout, TimeUnit unit)方法指定获取访问权限的超时。
我们还可以检查可用许可证的数量或等待获取信号量的线程数量。
以下代码片段可用于实现信号量:
代码语言:javascript代码运行次数:0运行复制static Semaphore semaphore = new Semaphore(10);
public void execute() throws InterruptedException {
LOG.info("Available permit : " + semaphore.availablePermits());
LOG.info("Number of threads waiting to acquire: " +
semaphore.getQueueLength());
if (semaphore.tryAcquire()) {
try {
// ...
}
finally {
semaphore.release();
}
}
}Copy
我们可以使用信号量实现类似互斥体的数据结构。
2.8.ThreadFactory
顾名思义,ThreadFactory充当线程(不存在)池,可按需创建新线程。它消除了实现高效线程创建机制所需的大量样板编码。
我们可以定义一个线程工厂:
代码语言:javascript代码运行次数:0运行复制public class BaeldungThreadFactory implements ThreadFactory {
private int threadId;
private String name;
public BaeldungThreadFactory(String name) {
threadId = 1;
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, name + "-Thread_" + threadId);
LOG.info("created new thread with id : " + threadId +
" and name : " + t.getName());
threadId++;
return t;
}
}Copy
我们可以使用这个newThread(Runnable r)方法在运行时创建一个新线程:
代码语言:javascript代码运行次数:0运行复制BaeldungThreadFactory factory = new BaeldungThreadFactory(
"BaeldungThreadFactory");
for (int i = 0; i < 10; i++) {
Thread t = factory.newThread(new Task());
t.start();
}Copy
2.9.BlockingQueue
在异步编程中,最常见的集成模式之一是生产者-消费者模式。java.util.concurrent包附带了一个称为BlockingQueue 的数据结构,这在这些异步场景中非常有用。
2.10.DelayQueue
DelayQueue是一个无限大小的元素阻塞队列,其中元素只有在过期时间(称为用户定义的延迟)完成时才能被拉取。因此,最顶层的元素(头部)将具有最大的延迟量,并且将最后轮询。
2.11.Locks
毫不奇怪,Lock是一个实用程序,用于阻止其他线程访问某个代码段,除了当前正在执行它的线程外。
锁块和同步块之间的主要区别在于同步块完全包含在方法中;但是,我们可以在单独的方法中使用 Lock API 的 lock() 和 unlock() 操作。
2.12.Phaser
Phaser是比CyclicBarrier和CountDownLatch 更灵活的解决方案,用于充当可重用的屏障,动态数量的线程需要等待才能继续执行。我们可以协调执行的多个阶段,为每个程序阶段重用一个Phaser实例。
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:2023-02-14,如有侵权请联系 cloudcommunity@tencent 删除线程异步java程序接口
发布评论