CyclicBarrier源码分析

Dcr 1年前 ⋅ 860 阅读

 

类的结构
并没有显式继承哪个父类或者实现哪个接口,所有的AQS和重入锁都是同事组合实现的.

CyclicBarrier类存在一个内部类Generation,每一次使用CyclicBarrier都可以当成Generation的实例

private static class Generation{
	boolean broken = false; //broken表示当前屏障是否被破坏
}

类属性

public class CyclicBarrier {
    
    /** The lock for guarding barrier entry */
    // 可重入锁
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    // 条件队列
    private final Condition trip = lock.newCondition();
    /** The number of parties */
    // 参与的线程数量
    private final int parties;
    /* The command to run when tripped */
    // 由最后一个进入 barrier 的线程执行的操作
    private final Runnable barrierCommand;
    /** The current generation */
    // 当前代
    private Generation generation = new Generation();
    // 正在等待进入屏障的线程数量
    private int count;
}

类的构造函数

public CyclicBarrier(int parties, Runnable barrierAction) {
    // 参与的线程数量小于等于0,抛出异常
    if (parties <= 0) throw new IllegalArgumentException();
    // 设置parties
    this.parties = parties;
    // 设置count
    this.count = parties;
    // 设置barrierCommand
    this.barrierCommand = barrierAction;
}

构造函数指定关联该CyclicBarrier的线程数量,并且可以指定在所有线程都进入屏障后的执行动作,该动作由最后一个进行屏障的线程执行,还有一个重载的只传数量的构造函数,不设置执行动作.

核心函数dowait
该方法为对外提供await函数的底层调用函数

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
            TimeoutException {
    // 保存当前锁
    final ReentrantLock lock = this.lock;
    // 锁定
    lock.lock();
    try {
        // 保存当前代
        final Generation g = generation;
        
        if (g.broken) // 屏障被破坏,抛出异常
            throw new BrokenBarrierException();

        if (Thread.interrupted()) { // 线程被中断
            // 损坏当前屏障,并且唤醒所有的线程,只有拥有锁的时候才会调用
            breakBarrier();
            // 抛出异常
            throw new InterruptedException();
        }
        
        // 减少正在等待进入屏障的线程数量
        int index = --count;
        if (index == 0) {  // 正在等待进入屏障的线程数量为0,所有线程都已经进入
            // 运行的动作标识
            boolean ranAction = false;
            try {
                // 保存运行动作
                final Runnable command = barrierCommand;
                if (command != null) // 动作不为空
                    // 运行
                    command.run();
                // 设置ranAction状态
                ranAction = true;
                // 进入下一代
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction) // 没有运行的动作
                    // 损坏当前屏障
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        // 无限循环
        for (;;) {
            try {
                if (!timed) // 没有设置等待时间
                    // 等待
                    trip.await(); 
                else if (nanos > 0L) // 设置了等待时间,并且等待时间大于0
                    // 等待指定时长
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) { 
                if (g == generation && ! g.broken) { // 等于当前代并且屏障没有被损坏
                    // 损坏当前屏障
                    breakBarrier();
                    // 抛出异常
                    throw ie;
                } else { // 不等于当前带后者是屏障被损坏
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    // 中断当前线程
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken) // 屏障被损坏,抛出异常
                throw new BrokenBarrierException();

            if (g != generation) // 不等于当前代
                // 返回索引
                return index;

            if (timed && nanos <= 0L) { // 设置了等待时间,并且等待时间小于0
                // 损坏屏障
                breakBarrier();
                // 抛出异常
                throw new TimeoutException();
            }
        }
    } finally {
        // 释放锁
        lock.unlock();
    }
}

doawait方法的大致流程
doawait-->判断屏障是否已经被损坏-->判断当前线程是否被中断-->判断等待进入屏障的线程数量是否为0-->进入屏障等待

核心函数nextGeneration
此函数在所有线程进入屏障后会被调用,既生成下一个版本,所有线程又可以进入到屏障中,源码如下:

private void nextGeneration() {
    // signal completion of last generation
    // 唤醒所有线程
    trip.signalAll();
    // set up next generation
    // 恢复正在等待进入屏障的线程数量
    count = parties;
    // 新生一代
    generation = new Generation();
}

此函数会调用AQS的signalAll函数,唤醒所有等待线程.

public final void signalAll() {
    if (!isHeldExclusively()) // 不被当前线程独占,抛出异常
        throw new IllegalMonitorStateException();
    // 保存condition队列头结点
    Node first = firstWaiter;
    if (first != null) // 头结点不为空
        // 唤醒所有等待线程
        doSignalAll(first);
}

判断头节点是否为空,然后调用doSignalAll函数

private void doSignalAll(Node first) {
    // condition队列的头结点尾结点都设置为空
    lastWaiter = firstWaiter = null;
    // 循环
    do {
        // 获取first结点的nextWaiter域结点
        Node next = first.nextWaiter;
        // 设置first结点的nextWaiter域为空
        first.nextWaiter = null;
        // 将first结点从condition队列转移到sync队列
        transferForSignal(first);
        // 重新设置first
        first = next;
    } while (first != null);
}

此函数会依次将条件队列中的节点通过调用transferForSignal函数转移到同步队列中

final boolean transferForSignal(Node node) {
    /*
        * If cannot change waitStatus, the node has been cancelled.
        */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
        * Splice onto queue and try to set waitStatus of predecessor to
        * indicate that thread is (probably) waiting. If cancelled or
        * attempt to set waitStatus fails, wake up to resync (in which
        * case the waitStatus can be transiently and harmlessly wrong).
        */
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

此函数的作用是将条件队列中的节点转移到同步队列中,并设置节点状态信息,其中会调用到enq函数

private Node enq(final Node node) {
    for (;;) { // 无限循环,确保结点能够成功入队列
        // 保存尾结点
        Node t = tail;
        if (t == null) { // 尾结点为空,即还没被初始化
            if (compareAndSetHead(new Node())) // 头结点为空,并设置头结点为新生成的结点
                tail = head; // 头结点与尾结点都指向同一个新生结点
        } else { // 尾结点不为空,即已经被初始化过
            // 将node结点的prev域连接到尾结点
            node.prev = t; 
            if (compareAndSetTail(t, node)) { // 比较结点t是否为尾结点,若是则将尾结点设置为node
                // 设置尾结点的next域为node
                t.next = node; 
                return t; // 返回尾结点
            }
        }
    }
}

此函数完成了节点插入同步队列的过程.

newGeneration函数的主要调用流程
CyclicBarrier:newGeneration-->AQS:signalAll-->AQS:doSignalAll-->AQS:transferForSignal-->AQS:enq

breakBarrier函数
作用:损坏当前屏障,会唤醒所有在屏障中的线程.

private void breakBarrier() {
    // 设置状态
    generation.broken = true;
    // 恢复正在等待进入屏障的线程数量
    count = parties;
    // 唤醒所有线程
    trip.signalAll();
}

对比CountDownLatch
CountDownLatch减计数,CyclicBarrier加计数
CountDownLatch是一次性的,CyclicBarrier可以重用
CountDownLatch和CyclicBarrier都有让多个线程等待同步然后在开始下一步动作的作用,但是CountDownLatch的下一步动作实施者是主线程,具有不可重复性
而CyclicBarrier的下一步动作实施者还是"其他线程"本身,具有往复多次实施动作的特点.

全部评论: 0

    我有话说: