Semaphore源码分析

1.概述

semaphore是一个计数信号量,用于控制同时访问某个资源的线程数量。Semaphore可以用于限制同时访问某个资源的线程数量,或者用于保护某个重要资源,以避免多个线程同时修改它,从而导致数据不一致的问题。在Java中Semaphore是用于实现线程同步的一个类,它提供了一种基于许可证的机制来控制并发访问的数量。下面对Semaphore.java进行分析:

Semaphore类继承及实现图
Semaphore类的主要功能

Semaphore类提供了获取和释放许可证的方法,它可以用来控制并发访问数量的上限。Semaphore内部使用了AQS(AbstractQueuedSynchronizer)类来实现同步机制,通过对许可证的获取和释放来实现线程的同步和互斥。Semaphore还提供了公平模式和非公平模式两种同步方式,可以根据实际需求来选择不同的同步方式。

Semaphore类的核心方法

Semaphore类中的核心方法包括acquire()、release()、tryAcquire()等方法。

acquire()方法用于获取一个许可证,如果没有可用的许可证,则阻塞当前线程直到有可用的许可证或线程被中断;

release()方法用于释放一个许可证;

tryAcquire()方法用于尝试获取一个许可证,如果获取成功则返回true,否则返回false。

Semaphore类中还提供了一些其他的方法,如availablePermits()、drainPermits()、reducePermits()等方法,用于获取当前可用的许可证数量、获取并减少许可证数量等。

Semaphore类的内部类

Semaphore类中定义了一个抽象类、两个内部类,它们分别是Sync、FairSync与NonfairSync。

  • Sync类是Semaphore类的抽象内部类,继承了AQS类,并实现了Semaphore类的同步机制。
  • 而FairSync和NonfairSync类则是Sync类的具体实现,分别对应Semaphore类中的公平模式和非公平模式。
Semaphore类的使用场景

下面例举3个常用场景

  • 用于控制并发访问数量的上限,它适用于那些需要限制并发访问数量的场景,如数据库连接池、线程池等。
  • 用于实现生产者-消费者模型中的缓冲区,通过控制缓冲区的大小来限制生产者和消费者的访问数量。
  • 用于解决哲学家就餐问题、停车场问题等经典的同步问题。

总之,Semaphore是Java中实现线程同步和互斥的重要类之一,它提供了一种基于许可证的机制来控制并发访问数量的上限,适用于各种需要限制并发访问数量的场景。

2.源码示例

2.1Semaphore核心部分分析
代码语言:javascript代码运行次数:0运行复制
public class Semaphore implements java.io.Serializable {
  private static final long serialVersionUID = -3222578661600680210L;
    // 内部同步器
    private final Sync sync;
    /**
     * 创建一个许可证数量为permits的Semaphore对象,使用非公平模式
     */
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    /**
     * 创建一个许可证数量为permits的Semaphore对象,并指定是否使用公平模式
     */
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
    /**
     * 获取一个许可证,如果没有可用的许可证则阻塞当前线程,直到有可用的许可证或线程被中断
     */
    public void acquire() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        sync.acquireSharedInterruptibly(1);
    }
    /**
     * 获取指定数量的许可证,如果没有足够的许可证则阻塞当前线程,直到有足够的许可证或线程被中断
     */
    public void acquire(int permits) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (permits <= 0)
            return;
        sync.acquireSharedInterruptibly(permits);
    }
    /**
     * 尝试获取一个许可证,如果获取成功则返回true,否则返回false
     */
    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }
    /**
     * 尝试获取指定数量的许可证,如果获取成功则返回true,否则返回false
     */
    public boolean tryAcquire(int permits) {
        if (permits <= 0)
            return true;
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }
    /**
     * 释放一个许可证
     */
    public void release() {
        sync.releaseShared(1);
    }

    /**
     * 获取当前可用的许可证数量
     */
    public int availablePermits() {
        return sync.getPermits();
    }
    /**
     * 用于获取并返回Semaphore对象当前可用的所有许可证,同时将Semaphore对象的许可证数量设置为0。
     */
    public int drainPermits() {
        return sync.drainPermits();
    }
    /**
     * 减少指定数量的许可证数量
     */
    public void reducePermits(int reduction) {
        if (reduction < 0)
            throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }
    /**
     * 获取正在等待许可证的线程数量
     */
    public int getQueueLength() {
        return sync.getQueueLength();
    }
    /**
     * 获取正在等待许可证的线程集合
     */
    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }
    /**
     * 查询是否有线程正在等待许可证
     */
    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }
    /**
     * 查询当前锁是否是公平锁
     */
    public boolean isFair() {
        return sync instanceof FairSync;
    }

    /**
     * 计算当前许可证数量和指定许可证数量之和是否超过初始化时的许可证数量
     * 如果超过则抛出IllegalArgumentException异常
     */
    public void release(int permits) {
        if (permits <= 0)
            return;
        sync.releaseShared(permits);
    }
    /**
     * 查询是否有线程正在等待许可证,并返回第一个正在等待许可证的线程
     * 如果没有线程正在等待许可证,则返回null
     */
    public Thread getFirstQueuedThread() {
        return sync.getFirstQueuedThread();
    }

/**
     * 设置是否允许中断正在等待许可证的线程
     */
    public void setDaemon(boolean on) {
        sync.setDaemon(on);
    }
    /**
     * 获取是否允许中断正在等待许可证的线程
     */
    public boolean isDaemon() {
        return sync.isDaemon();
    }
    /**
     * 获取正在等待许可证的线程集合,包括正在等待获取许可证的线程和已经获取许可证但还没有释放的线程
     */
    public Collection<Thread> getThreadsWaitingForPermits() {
        return sync.getThreadsWaitingForPermits();
    }

    // ...
    /**
     * 获取正在等待许可证的线程集合,不包括已经获取许可证但还没有释放的线程
     */
    public Collection<Thread> getWaitingThreads() {
        return sync.getWaitingThreads();
    }
    /**
     * 获取当前许可证数量和指定许可证数量之和,但不会超过初始化时的许可证数量
     */
    public int tryAcquireShared(int permits) {
        if (permits <= 0)
            return 0;
        return sync.tryAcquireShared(permits);
    }


    /**
     * 获取并减少指定数量的许可证数量,如果没有足够的许可证则等待,直到获取到许可证或者等待超时
     * 返回值表示是否获取到了许可证
     */
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (permits <= 0)
            return true;
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }
    /**
     * 获取正在等待许可证的线程数量,不包括已经获取许可证但还没有释放的线程
     */
    public int getWaitingThreadCount() {
        return sync.getWaitingThreadCount();
    }
    /**
     * 获取当前许可证数量,但不会超过初始化时的许可证数量
     */
    public int availablePermitsShared() {
        return sync.getPermits();
    }
    /**
     * 查询是否有线程正在等待许可证,并返回第一个正在等待许可证的线程节点
     * 如果没有线程正在等待许可证,则返回null
     */
    public AbstractQueuedSynchronizer.Node getFirstQueuedNode() {
        return sync.getFirstQueuedNode();
    }
    /**
     * 查询是否有线程正在等待许可证,并返回最后一个正在等待许可证的线程节点
     * 如果没有线程正在等待许可证,则返回null
     */
    public AbstractQueuedSynchronizer.Node getLastQueuedNode() {
        return sync.getLastQueuedNode();
    }
    // ...
}
2.2Sync部分源码实现分析

Sync是Semaphore类的同步器抽象类,它定义了Semaphore的公共接口和Semaphore对象的状态信息。Semaphore类的Sync抽象类有两个子类,即NonfairSync和FairSync,它们分别代表Semaphore的非公平模式和公平模式的同步器实现。以下列举是Sync抽象类的核心部分源码实现

代码语言:javascript代码运行次数:0运行复制
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;
    /**
     * 构造方法,接受一个int类型的参数permits,并调用父类构造方法将其设置为同步器的状态信息
     */
    Sync(int permits) {
        setState(permits);
    }
    /**
     * 获取当前可用的许可证数量
     */
    final int getPermits() {
        return getState();
    }
    /**
     * 将许可证数量设置为给定的值
     */
    final void setPermits(int permits) {
        setState(permits);
    }
    /**
     * 尝试获取许可证,如果获取成功,则返回true,否则返回false
     */
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases; // 计算释放许可证后的许可证数量
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next)) // 释放许可证
                return true;
        }
    }
    /**
     * 获取许可证,如果没有足够的许可证则返回负数值
     */
    abstract int nonfairTryAcquireShared(int acquires);
    /**
     * 尝试获取许可证,如果获取成功,则返回true,否则返回false
     */
    protected final boolean tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires) >= 0;
    }
    /**
     * 尝试释放许可证,如果释放成功,则返回true,否则返回false
     */
    protected final boolean tryReleaseShared() {
        return tryReleaseShared(1);
    }
}

在Sync抽象类中,定义了Semaphore对象的状态信息,包括许可证数量的获取和设置方法。同时,Sync抽象类也定义了获取和释放许可证的方法,其中获取许可证的方法在子类中实现,包括非公平模式和公平模式两种实现。释放许可证的方法在Sync抽象类中实现,它使用CAS操作更新许可证数量,如果更新成功,则返回true,否则继续循环直到更新成功为止。由于Semaphore是一个同步工具,涉及到线程的并发访问,因此Sync抽象类继承了AbstractQueuedSynchronizer类,使用了队列等待机制来保证线程的安全访问。

2.3FairSync的源码实现分析

FairSync是Semaphore类的内部类,是Semaphore的公平模式的同步器实现。它继承自AbstractQueuedSynchronizer类,实现了公平模式下的获取和释放许可证的逻辑。以下是FairSync的源码实现:

代码语言:javascript代码运行次数:0运行复制
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;
        /**
         *创建一个许可证数量为permits的FairSync对象
        **/

        FairSync(int permits) {
            super(permits);
        }
        /**
        * 计算新的state值,如果计算后state值小于0,则抛出异常
        */
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
 2.4 NonfairSync的源码实现分析

NonfairSync是Semaphore类的Sync内部类的一个实现,它继承自Sync类。NonfairSync是Semaphore的非公平模式的同步器实现。

以下是NonfairSync的源码实现分析

代码语言:javascript代码运行次数:0运行复制
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;
        /**
         创建一个许可证数量为permits的FairSync对象
       **/

        NonfairSync(int permits) {
            super(permits);
        }
       /**
        * 计算新的state值,如果计算后state值小于0,则抛出异常
        */
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

 在非公平模式下,线程可以直接尝试获取许可证,不需要排队等待。当线程尝试获取许可证时,线程首先尝试直接获取许可证,如果当前的许可证数量大于等于需要的许可证数量,则直接获取许可证,否则使用可中断的方式获取许可证。在释放许可证时,同步器使用CAS操作更新许可证数量,如果更新成功,则返回true,否则继续循环直到更新成功为止。由于非公平模式下,线程可以直接尝试获取许可证,因此多个线程可能同时竞争同一个许可证,存在一定的竞争和争用。

3.使用示例

Semaphore是一个计数信号量,用于控制同时访问某个资源的线程数量。Semaphore可以用于限制同时访问某个资源的线程数量,或者用于保护某个重要资源,以避免多个线程同时修改它,从而导致数据不一致的问题。 Semaphore有两个主要的操作:acquire()和release()。当一个线程需要访问受Semaphore保护的资源时,它必须调用acquire()方法获取Semaphore的许可证。如果此时Semaphore的计数器为0,则acquire()方法将阻塞该线程,直到有一个许可证可用。当线程访问完受Semaphore保护的资源后,它必须调用release()方法释放Semaphore的许可证。这会将Semaphore的计数器加1,以便其他线程可以继续访问受Semaphore保护的资源。 

3.1简单示例

下面是一个使用Semaphore的简单示例,该示例使用Semaphore来限制同时执行的线程数量

代码语言:javascript代码运行次数:0运行复制
import java.util.concurrent.Semaphore;
public class Example {
    private static final Semaphore semaphore = new Semaphore(5); // 最多允许5个线程同时执行
    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) { // 启动10个线程
            new Thread(new Worker()).start();
        }
    }
    private static class Worker implements Runnable {
        @Override
        public void run() {
            try {
                semaphore.acquire(); // 获取Semaphore的许可证
                // 执行一些需要受Semaphore保护的操作
                Thread.sleep(1000);
                semaphore.release(); // 释放Semaphore的许可证
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

在上面的示例中,Semaphore的初始计数器为5,因此最多允许5个线程同时执行。当一个线程启动时,它将创建一个Worker对象,并在其中调用semaphore.acquire()方法以获取Semaphore的许可证。如果此时Semaphore的计数器为0,则该线程将被阻塞,直到有一个许可证可用。在Worker对象中,我们可以执行一些需要受Semaphore保护的操作,例如访问共享资源。当线程完成这些操作后,它将调用semaphore.release()方法以释放Semaphore的许可证,以便其他线程可以继续执行。

3.2复杂示例

示例目的

使用了 Semaphore 来控制三个线程的执行顺序,保证了输出的字符串按照指定的顺序。

示例说明

该示例使用 Semaphore 来控制三个线程的执行顺序,其中 semaphoreA 表示线程 A 的信号量,semaphoreB 表示线程B的信号量,semaphoreC 表示线程C的信号量。线程 A 先执行,所以它的信号量初始值为 1,而线程 B和线程 C的信号量初始值为 0。在每个线程的 run() 方法中,线程会先尝试获得当前信号量,如果当前信号量不可用则线程会被阻塞。当当前信号量可用时,线程输出指定字符串并释放下一个线程的信号量,从而实现按照指定顺序输出字符串的效果。同时,主线程通过检测输入来设置 stopFlag,从而控制三个线程的退出。

示例代码

代码语言:javascript代码运行次数:0运行复制
package company;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Scanner;
import java.util.concurrent.Semaphore;
public class PrintABC {
    private static Semaphore semaphoreA = new Semaphore(1);
    private static Semaphore semaphoreB = new Semaphore(0);
    private static Semaphore semaphoreC = new Semaphore(0);
    private static volatile boolean stopFlag = false;
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new PrinterThread("A", semaphoreA, semaphoreB);
        Thread t2 = new PrinterThread("B", semaphoreB, semaphoreC);
        Thread t3 = new PrinterThread("C", semaphoreC, semaphoreA);
        t1.start();
        t2.start();
        t3.start();
        Scanner scanner = new Scanner(System.in);
        while (!stopFlag) {
            System.out.println("输入任意字符结束");
            scanner.nextLine();
            stopFlag = true;
        }
        t1.join();
        t2.join();
        t3.join();
        LinkedList list=new LinkedList();
        ArrayList list1;
    }
    private static class PrinterThread extends Thread {
        private final String str;
        private final Semaphore currentSemaphore;
        private final Semaphore nextSemaphore;
        public PrinterThread(String str, Semaphore currentSemaphore, Semaphore nextSemaphore) {
            this.str = str;
            this.currentSemaphore = currentSemaphore;
            this.nextSemaphore = nextSemaphore;
        }
        @Override
        public void run() {
            while (!stopFlag) {
                try {
                    currentSemaphore.acquire();
                    System.out.println(str);

                    nextSemaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

Thread.join() 方法说明

该方法是一个阻塞方法,它的作用是等待该线程执行完毕。当在一个线程中调用另一个线程的 join() 方法时,当前线程会被阻塞,直到另一个线程执行完毕后才会继续执行。 具体来说,当一个线程调用另一个线程的 join() 方法时,该线程会等待另一个线程执行完毕后再继续执行。如果在调用 join() 方法时传入了参数,那么当前线程会等待一定的时间,如果另一个线程在该时间内还没有执行完毕,当前线程也会继续执行。 Thread.join() 方法通常用于在一个线程中等待另一个线程执行完毕后再进行后续操作,例如等待子线程执行完毕后再进行结果的处理,或者等待多个线程执行完毕后再进行结果的合并等等。 注意,调用 Thread.join() 方法会对当前线程进行阻塞,如果不加以控制,可能会导致死锁的问题。因此,在使用 join() 方法时需要注意线程之间的依赖关系,避免出现死锁的情况。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:2023-03-18,如有侵权请联系 cloudcommunity@tencent 删除对象线程源码分析javasemaphore