.Net中的并行编程-2.ConcurrentStack的实现与分析

在上篇文章《.net中的并行编程-1.基础知识》中列出了在.net进行多核或并行编程中需要的基础知识,今天就来分析在基础知识树中一个比较简单常用的并发数据结构--.net类库中无锁栈的实现。

       首先解释一下什么这里“无锁”的相关概念。

       所谓无锁其实就是在普通栈的实现方式上使用了原子操作,原子操作的原理就是CPU在系统总线上设置一个信号,当其他线程对同一块内存进行访问时CPU监测到该信号存在会,然后当前线程会等待信号释放后才能对内存进行访问。原子操作都是由操作系统API实现底层由硬件支持,常用的操作有:原子递增,原子递减,比较交换,ConcurrentStack中的实现就是使用了原子操作中的比较交换操作。

       使用原子操作的好处:

         第一、由于没有使用锁,可以避免死锁。

         第二、原子操作不会阻塞线程,例如执行某个指令时当前线程挂起了(或执行了一次上下文切换),其他线程还能继续操作,如果使用lock锁,当前线程挂起后由于没有释放锁,其他线程进行操作时会被阻塞。

         第三、由于原子操作直接由硬件指令的支持,所以原子操作性能比普通锁的高。

   使用原子操作的坏处:

         第一,使用原子操作一般失败时会使用回退技术对当前操作进行重试,所以容易产生活锁和线程饥饿问题,但可以通过随机退让等技术进行缓解,但不能消除。

         第二,程序员开发使用难度较大,测试难度较大。

    下面开始进入正题:

            由于.net 中的 ConcurrentStack的代码较多所以本文就不贴出所有代码,本人也只分析笔者认为重要的几个部分,全部源码可以再去以下微软官方网址查看

            http://referencesource.microsoft.com/#mscorlib/system/Collections/Concurrent/ConcurrentStack.cs 

            传统的栈结构都一般都使用单链表实现(.net中的Stack使用的是数组), 入栈操作就是把头节点替换为新节点,出栈操作就是把头结点指向下一个节点。所以当大量线程并发访问时线程的竞争条件都在头结点也就是说如果我们能报保证对于头结点操作时是安全的那么整个栈就是安全的。

入栈操作 public void Push(T item)

        public void Push(T item)
        {
            Node newNode = new Node(item);
            newNode.m_next = m_head;
            if (Interlocked.CompareExchange(ref m_head, newNode, newNode.m_next) == newNode.m_next)
            {
                return;
            }

            // If we failed, go to the slow path and loop around until we succeed.
            PushCore(newNode, newNode);
        }

        private void PushCore(Node head, Node tail)
        {
            SpinWait spin = new SpinWait();

            // Keep trying to CAS the exising head with the new node until we succeed.
            do
            {
                spin.SpinOnce();
                // Reread the head and link our new node.
                tail.m_next = m_head;
            }
            while (Interlocked.CompareExchange(ref m_head, head, tail.m_next) != tail.m_next);

        }

         (在原版的注释中我们就能看到入栈使用了原子操作中的比较交换(CAS)操作)

         入栈分为了三步:

           a.当数据入栈时会分配一个新结点,然后将此刻当前内存中的头结点作为新结点的下一个结点, newNode.m_next = m_head中保持的是当前头结点的快照也就是说另一个线程此时有可能更改了m_head指向的结点,注意头结点(m_head)的字段声明中前面使用了volatile关键字,我们知道volatile关键字有两个作用:第一个是禁止编译器和CPU更改字段的位置,第二个是强制刷新CPU的高速缓存,当读取该声明该关键字的字段时每次都去内存里重新加载数据然后读到CPU的高速缓存而不使用CPU缓存中较老的数据,这个地方m_head使用volatile是因为当运行在其他核心的CPU线程更改了m_head值,而我们当前核心的CPU高速缓存中没有及时更新的问题,还有就是出栈时防止对m_head的操作语句移动到其他语句之后造成逻辑代码没有按照预先的逻辑走,例如newNode.m_next = m_head操作放到了if语句之后造成逻辑错误或者CPU的指令乱序执行时产生的逻辑错误。

           b.比较当前的头结点是否与我们保存的newNode.m_next 的快照头结点相同,如果相同则将新结点替换为头结点否则比较失败进行c步骤。

              Interlocked.CompareExchange(ref m_head, newNode, newNode.m_next)其实等价于以下代码,只不过该代码的执行是以原子的方式执行。

if (m_head == newNode.m_next) 
{
    m_head = newNode.m_next;
}
else
{
    return m_head;
}

           c.如果b步骤失败则进入PushCore(Node head, Node tail);PushCore的步骤其实就是重复执行步骤B中的Interlocked.CompareExchange(ref m_head, newNode, newNode.m_next)操作,直到新节点写入为止,在循环期间使用了spin.SpinOnce() ,使用这个API其实就是为了防止活锁,交换失败的线程会进行退让,就好比两个人迎面走来,可能会发生这种情况:你向左走他也向左走,你向右他也向右,所以为了避免俩人碰到一起,那么一个人可以先原地停止,然后继续再走,当继续走时发现方向还是相同时可以改变一下停止的时间,比如说先停止5秒,然后如果还是方向相同则停止10秒,然后还是方向相同可以时可以去喝杯茶慢慢再走,其实这种思想就是随机退让。当然计算机中的SpinOnce实现没有想象的那么简单,在当分析到同步原语SpinWati的实现时我会详细介绍SpinOnce方法的实现方式,这个地方就理解为让线程休息一会(当分析SpinOnce时源码时发现了一个小细节 Thread,Sleep(0)和Thread.Sleep(1)的使用,其实我发现很多人都不清楚这里0和1的区别,这里要说明一下:Thread,Sleep(0)表示交出当前线程的时间片,让具有相同或者更高优先级的线程运行否则继续运行当前线程,也就是说如果没有相同级别或更高级别的线程等待运行那么还是运行当前线程,这个地方会产生线程饥饿问题。Thread,Sleep(1)表示线程睡眠1毫秒和其他线程优先级没有关系,其实这里设置为1时也不是睡眠1毫秒而是13毫秒或者更长,具体的和系统的时钟周期有关)。

出栈操作 public bool TryPop(out T result)

     出栈操作也是使用的CAS操作,不断的将头结点指向下一个结点然后返回头结点,如果交换失败则循环进行,循环期间也是使用随机退让技术来较少活锁的概率只不过退让的时间会随着退让的次数而增大。栈操作的API设计为TryPop()返回值为bool,,是因为在多个线程同时出栈的过程中有可能一个线程出栈以后栈就空了,所以在出栈有可能失败。

批量入栈操作 public void PushRange(T[] items, int startIndex, int count)

    批量入栈是在单个入栈基础上实现的,将多个项目入栈时先将压入的多个项目组成一个栈,然后再将头结点使用CAS操作指向该生成的栈,所以说批量调用一次入栈的效率要比单个调用多次入栈的效率高。 题外话:这里有个编写代码的小细节,在批量入栈的方法中会首先调用ValidatePushPopRangeInput(items, startIndex, count)方法来校验传入的参数正确性,其实该编码是编写代码时比较重要的原则叫“手术室原则”,其解释为医生进入手术室时,对手套,身体等已经进行了消毒,这些准备工作已经完成了,剩下了工作就是医生专心完成手术。这种编码原则不仅使代码的整洁性提高,而且减少CPU分支预判的次数以提升代码运行速度,所以在日常编码中我们可以使用该方法,将大量的参数判断抽象到单独的方法中。

判断是否为空IsEmpty属性

    该操作的实现比较简单,只要判断头结点为空即可。在微软的文档中我们发现当我们判断栈中元素是否为空时应该使用该属性而不是使用Count == 0 这种判断方式,因为Count统计栈内元素的个数时,每使用一次会遍历整个栈,时间复杂度为O(N)而IsEmpty为O(1),所以使用Count==0 效率比较底下尤其是在数据量大的情况下。

IEnumerable<T>接口成员的实现GetEnumerator()

    该方法实现是拿到头结点然后依次遍历整个栈,注意该方法拿到了只是当前时刻整个栈的快照,在遍历过程中栈内元素的增加或减少对于GetEnumerator()返回值的数量不会改变。

其他问题

    1.在.net编写无锁代码时不用考虑ABA问题,因为这是.net的垃圾回收来保证的,除非使用了对象池技术,例如将内部分配结点的操作由对象池来负责。

    2.在.net源码中普通的Stack<T> 内部使用的是数组实现,而ConcurretStack内部使用的是链表,主要原因还是在于Stack 在使用数组扩容时会有拷贝数据的开销,尤其是在数据量大的情况下这种性能损失还是比较大的,还有个原因是内部使用链表可以避免ABA问题(前提是分配内部结点时没有使用对象池),不过链表的实现也不是没有缺点,例如入栈时我们会分配一个新结点,而该结点出栈完以后会由GC回收掉,这种结点这时候就成为了垃圾结点,不过在实现ConcurretQueue的时候因为队列的先进后出的特性使用了另一种解决方案--链表+数组的方式,这种方式既解决了垃圾结点的问题又解决了数组扩容复制数据产生的性能开销问题。

   最后,在我们阅读.net源码的过程中其实可以发现很多非常经典的编码技巧和编码风格,让我们看代码时可以由上到下如行云流水般一气呵成可,这也是我比较推崇的代码风格--要像写诗一样写自己代码,让别人像读诗一样读你的代码。

    时间不早了就到这了,下片文章中我会继续分析.net中另外一个比较经典的并发数据结构ConcurrentQueue的实现。

  

    由于笔者能力有限,有分析错误的地方难免发生,欢迎大家指正。

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。