热门标签 | HotTags
当前位置:  开发笔记 > 人工智能 > 正文

非阻塞同步算法实战(二)-BoundlessCyclicBarrier

感谢网友trytocatch的投稿前言相比上一篇而言,本文不需要太多的准备知识,但技巧性更强一些。因为分析、设计的过程比较复杂繁琐,也限于篇幅,所以,主要展示如何解决这些需求,和讲解代码。另外,所讲的内容也是后一篇实战中需要用到的一个工具类。需

感谢网友trytocatch的投稿 前言 相比上一 篇而言,本文不需要太多的准备知识,但技巧性更强一些。因为分析、设计的过程比较复杂繁琐,也限于篇幅,所以,主要展示如何解决这些需求,和讲解代码。另外,所讲的内容也是后一篇实战中需要用到的一个工具类。 需

感谢网友trytocatch的投稿

前言

相比上一 篇而言,本文不需要太多的准备知识,但技巧性更强一些。因为分析、设计的过程比较复杂繁琐,也限于篇幅,所以,主要展示如何解决这些需求,和讲解代码。另外,所讲的内容也是后一篇实战中需要用到的一个工具类。

需求介绍

我需要编写一个同步工具,它需要提供这样几个方法:await、pass、cancel。某个线程调用await时,会被阻塞;当调用pass方法时,之前因为await而阻塞的线程将全部被解除阻塞,之后调用await的线程继续被阻塞,直到下一次调用pass。

该工具同时还维护一个版本号,await方法可以带一个目标版本号,如果当前的版本号比目标版本号新或相同,则直接通过,否则,阻塞本线程,直到到达或超过目标版本。调用pass的时候,更新版本号。

如果停止了版本更新,可使用cancel方法来解除所有因await而阻塞的线程,包括指定版本号的。此方法用于避免无谓地等待。若await发生在cancel之后,则仍将被阻塞。

因为CountDownLatch不允许重复使用,CyclicBarrier只支持固定个数的线程,并且都没有维护一个版本号,所以没有已有的类能实现上面的需求,需要自己实现。

问题分析

简单分析可知,应该维护一个队列,来保存当前被阻塞的线程,用于在pass时对它们一一解除阻塞,pass时应该使用一个新的队列,否则不方便正确处理pass前和pass后调用await的线程。

至此,问题的关键就明了了:如何将队列的替换和版本号的更新这两个操作做成原子的。

解决方案

以前在《JAVA并发编程实践》曾看到过这样一个小技巧,如果要原子地更新两个变量,那么可以创建一个新的类将它们封装起来,将这两个变量当定义成类成员变量,更新时,用CAS更新这个类的引用即可。

因为较为复杂,下面先给出完整的代码,再讲解其中的关键。

注意:上面所说pass,在代码中的具体实现为nextCycle,有两个版本,一个自动维护版本号,一个由调用者维护版本号。

/**
 * @author trytocatch@163.com
 * @time 2013-1-31
 */
public class BoundlessCyclicBarrier {
    protected final AtomicReference waitQueueRef;
    public BoundlessCyclicBarrier() {
        this(0);
    }
    public BoundlessCyclicBarrier(int startVersion) {
        waitQueueRef = new AtomicReference(new VersionQueue(startVersion));
    }
    public final void awaitWithAssignedVersion(int myVersion)
            throws InterruptedException {
        awaitImpl(true, myVersion, 0);
    }
    /**
     *
     * @param myVersion
     * @param nanosTimeout
     * @return if timeout, or be canceled and doesn't reach myVersion, returns false
     * @throws InterruptedException
     */
    public final boolean awaitWithAssignedVersion(int myVersion, long nanosTimeout) throws InterruptedException {
        return awaitImpl(true, myVersion, nanosTimeout);
    }
    public final void await() throws InterruptedException {
        awaitImpl(false, 0, 0);
    }
    /**
     *
     * @param nanosTimeout
     * @return if and only if timeout, returns false
     * @throws InterruptedException
     */
    public final boolean await(long nanosTimeout)
            throws InterruptedException {
        return awaitImpl(false, 0, nanosTimeout);
    }
    /**
     * pass and version++(some threads may not be unparked when awaitImpl is in process, but it's OK in this Barrier)
     * @return old queue version
     */
    public int nextCycle() {
        VersionQueue oldQueue = waitQueueRef.get();
        VersionQueue newQueue = new VersionQueue(oldQueue.version + 1);
        for(;;){
            if (waitQueueRef.compareAndSet(oldQueue, newQueue)) {
                for (Thread t : oldQueue.queue)
                    LockSupport.unpark(t);
                break;
            }
            oldQueue = waitQueueRef.get();
            newQueue.version = oldQueue.version + 1;
        }
        return oldQueue.version;
    }
    /**
     * pass and assign the next cycle version(caller should make sure that the newAssignVersion is right)
     * @param newAssignVersion
     */
    public void nextCycle(int newAssignVersion) {
        VersionQueue oldQueue = waitQueueRef.getAndSet(new VersionQueue(newAssignVersion));
        for (Thread t : oldQueue.queue)
            LockSupport.unpark(t);
    }
    /**
     * if version update has stopped, invoke this to awake all threads
     */
    public void cancel(){
        VersionQueue oldQueue = waitQueueRef.get();
        if (waitQueueRef.compareAndSet(oldQueue, new VersionQueue(oldQueue.version, true))) {
            for (Thread t : oldQueue.queue)
                LockSupport.unpark(t);
    }
    public final int getVersion() {
        return waitQueueRef.get().version;
    }
    private static final class VersionQueue {
        final private ConcurrentLinkedQueue queue;
        int version;
        final boolean isCancelQueue;
        VersionQueue(int curVersion){
            this(curVersion, false);
        }
        VersionQueue(int curVersion, boolean isCancelQueue) {
            this.version = curVersion;
            this.isCancelQueue = isCancelQueue;
            queue = new ConcurrentLinkedQueue();
        }
    }
    /**
     *
     * @param assignVersion is myVersion available
     * @param myVersion wait for this version
     * @param nanosTimeout wait time(nanosTimeout <=0 means that nanosTimeout is invalid)      * @return if timeout, or be canceled and doesn't reach myVersion, returns false      * @throws InterruptedException      */     protected boolean awaitImpl(boolean assignVersion, int myVersion,             long nanosTimeout) throws InterruptedException {         boolean timeOutEnable = nanosTimeout > 0;
        long lastTime = System.nanoTime();
        VersionQueue newQueue = waitQueueRef.get();//A
        if (assignVersion && newQueue.version - myVersion >= 0)
            return true;
        while (true) {
            VersionQueue submitQueue = newQueue;//B
            submitQueue.queue.add(Thread.currentThread());//C
            while (true) {
                newQueue = waitQueueRef.get();//D
                if (newQueue != submitQueue){//E: it's a new cycle
                    if(assignVersion == false)
                        return true;
                    else if(newQueue.version - myVersion >= 0)
                        return true;
                    else if (newQueue.isCancelQueue)//F: be canceled
                        return false;
                    else//just like invoking awaitImpl again
                        break;
                }
                if (timeOutEnable) {
                    if (nanosTimeout <= 0)
                        return false;
                    LockSupport.parkNanos(this, nanosTimeout);
                    long now = System.nanoTime();
                    nanosTimeout -= now - lastTime;
                    lastTime = now;
                } else
                    LockSupport.park(this);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        }
    }
}

代码分析

先分析一下awaitImpl方法,A和D是该方法的关键点,决定着它属于哪一个批次,对应哪一个版本。这里有个小细节,在nexeCycle,cancel解除阻塞时,该线程可能并不在队列中,因为插入队列发生在C处,这在A和D之后(虽然看起来C在D之前,但D取到的queue要在下一次循环时才被当作submitQueue),所以,在E处再进行了一次判断,开始解除阻塞时,旧队列肯定被新队列所替换,newQueue != submitQueue一定为真,就会不调用park进行阻塞了,也就不需要解除阻塞,所以即使解除阻塞时,该线程不在队列中也是没问题的。

再看E处,当进入一个新的cycle时(当前队列与提交的队列不同),a)如果没指定版本,或者到达或超过了指定版本,则返回true;b)如果当前调用了cancel,则当前队列的isCancelQueue将为true,则不继续傻等,返回false;c)或者还未到达指定版本,break,插入到当前队列中,继续等待指定版本的到达。

如果没有进入E处的IF内,则当前线程会被阻塞,直到超时,然后返回false;或被中断,然后抛出InterruptedException;或被解除阻塞,重新进行E处的判定。

这里还有个小细节,既然cancel时,把当前的队列设置了isCancelQueue,那么之后指定版本的await会不会也直接返回了呢?其实不会的,因为它若要执行F处的判断,则先必需通过E处的判定,这意味着,当前队列已经不是提交时的那个设置了isCancelQueue的队列了。

代码中对于cancel的处理,其实并不保证cancel后,之前的await都会被解除阻塞并返回,如果cancel后,紧接着又调用了nextCycle,那么可能某线程感知不到cancel的调用,唤醒后又继续等待指定的版本。cancel的目的是在于不让线程傻等,既然恢复版本更新了,那就继续等待吧。

如果自己维护版本号,则应该保证递增。另外,版本号的设计,考虑到了int溢出的情况,版本的前后判断,我不是使用newVersion>=oldVersion,而是newVersion-oldVersion>=0,这样,版本号就相当于循环使用了,只要两个比较的版本号的差不超过int的最大值,那么都是正确的,int的最大值可是20多亿,几乎不可能出现跨度这么大的两个版本号的比较,所以,认为它是正确的。

小结

本文讲到了一个非阻塞同步算法设计时的小技巧,如果多个变量之间要维护某种特定关系,那么可以将它们封装到一个类中,再用CAS更新这个类的引用,这样就达到了:要么都被更新,要么都没被更新,保持了多个变量之间的一致性。同时需要注意的是,每次更新都必需创建新的包装对象,假如有其它更好的办法,应该避免使用该方法。

推荐阅读
  • 使用Numpy实现无外部库依赖的双线性插值图像缩放
    本文介绍如何仅使用Numpy库,通过双线性插值方法实现图像的高效缩放,避免了对OpenCV等图像处理库的依赖。文中详细解释了算法原理,并提供了完整的代码示例。 ... [详细]
  • 非公版RTX 3080显卡的革新与亮点
    本文深入探讨了图形显卡的进化历程,重点介绍了非公版RTX 3080显卡的技术特点和创新设计。 ... [详细]
  • 线性Kalman滤波器在多自由度车辆悬架主动控制中的应用研究
    本文探讨了线性Kalman滤波器(LKF)在不同自由度(2、4、7)的车辆悬架系统中进行主动控制的应用。通过详细的仿真分析,展示了LKF在提升悬架性能方面的潜力,并总结了调参过程中的关键要点。 ... [详细]
  • 本文探讨了Hive中内部表和外部表的区别及其在HDFS上的路径映射,详细解释了两者的创建、加载及删除操作,并提供了查看表详细信息的方法。通过对比这两种表类型,帮助读者理解如何更好地管理和保护数据。 ... [详细]
  • 1:有如下一段程序:packagea.b.c;publicclassTest{privatestaticinti0;publicintgetNext(){return ... [详细]
  • C++实现经典排序算法
    本文详细介绍了七种经典的排序算法及其性能分析。每种算法的平均、最坏和最好情况的时间复杂度、辅助空间需求以及稳定性都被列出,帮助读者全面了解这些排序方法的特点。 ... [详细]
  • 本文介绍如何利用动态规划算法解决经典的0-1背包问题。通过具体实例和代码实现,详细解释了在给定容量的背包中选择若干物品以最大化总价值的过程。 ... [详细]
  • 本文详细探讨了Java中的24种设计模式及其应用,并介绍了七大面向对象设计原则。通过创建型、结构型和行为型模式的分类,帮助开发者更好地理解和应用这些模式,提升代码质量和可维护性。 ... [详细]
  • 本文介绍了Java并发库中的阻塞队列(BlockingQueue)及其典型应用场景。通过具体实例,展示了如何利用LinkedBlockingQueue实现线程间高效、安全的数据传递,并结合线程池和原子类优化性能。 ... [详细]
  • 题目描述:给定n个半开区间[a, b),要求使用两个互不重叠的记录器,求最多可以记录多少个区间。解决方案采用贪心算法,通过排序和遍历实现最优解。 ... [详细]
  • 深入理解C++中的KMP算法:高效字符串匹配的利器
    本文详细介绍C++中实现KMP算法的方法,探讨其在字符串匹配问题上的优势。通过对比暴力匹配(BF)算法,展示KMP算法如何利用前缀表优化匹配过程,显著提升效率。 ... [详细]
  • 探讨一个显示数字的故障计算器,它支持两种操作:将当前数字乘以2或减去1。本文将详细介绍如何用最少的操作次数将初始值X转换为目标值Y。 ... [详细]
  • 本文详细介绍了Java编程语言中的核心概念和常见面试问题,包括集合类、数据结构、线程处理、Java虚拟机(JVM)、HTTP协议以及Git操作等方面的内容。通过深入分析每个主题,帮助读者更好地理解Java的关键特性和最佳实践。 ... [详细]
  • 本文探讨如何设计一个安全的加密和验证算法,确保生成的密码具有高随机性和低重复率,并提供相应的验证机制。 ... [详细]
  • 深入解析:手把手教你构建决策树算法
    本文详细介绍了机器学习中广泛应用的决策树算法,通过天气数据集的实例演示了ID3和CART算法的手动推导过程。文章长度约2000字,建议阅读时间5分钟。 ... [详细]
author-avatar
过期物品请勿购_613
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有