前 的个人资料miss your eye照片日志列表 工具 帮助

日志


5月11日

用double CAS实现lock-free队列

参考 Optimised Lock-Free FIFO Queue

实现了一个lock-free的fifo。在ia32上,可以用cmpxchg8b来模拟double cas,只是操作数中,2个需要更新的值必须是连续的内存地址。

定义宏如下:
#define CAS2(ret, mem, old1, old2, new1, new2) \

do {\
asm volatile( \
".align 4 \n" \
"movl $0, %0 \n" \
"LOCK cmpxchg8b (%%esi)\n" \
"setz %0 \n" \
: "=m" (ret) \
: "S" (mem), "a" (old1), "d" (old2), "b" (new1), "c" (new2) \
: "memory", "cc" \
); \
} while(0);



#define DEAD_NODE(q) \
&((q)->dead)

#define CAS(ret, mem, old, new) \
do {\
asm volatile( \
".align 4 \n" \
"movl $0, %0 \n" \
"LOCK cmpxchgl %%edx, (%%esi) \n" \
"setz %0 \n" \
: "=m" (ret) \
: "d" (new), "S" (mem), "a" (old) \
: "memory", "cc" \
); \
} while(0);


struct 
{
     node* head;
     unsigned long tag1;
     node* tail;
     unsigned long tag2;
     node dead;
} queue;


struct
{
     node* next;
     void* data;
} node;
---------------------------------------------------------------------------------------------------------

init


void init_queue(queue* q)
{
     memset(q, 0, sizeof(queue));
     q->head = q->tail = &q->dead;
}


---------------------------------------------------------------------------------------------------------

pop


node* pop(queue* q)
{
     node* n = NULL;


     while(1)
     {
          int ret;
          unsigned long tag1 = q->tag1;
          node* head = q->head;
          unsigned long tag2 = q->tag2;
          node* next = head->next;
          if ( tag1 == q->tag1 )
          {
               if(head == q->tail)
               {
                    if(next == NULL)
                         return NULL;
                    CAS2(ret, &q->tail, head, tag2, next, tag2 + 1);

               }
               else if(next != NULL)
               {
                    CAS2(ret, &q->head, head, tag1, next, tag1 + 1);
                    if(ret)
                    {
                         n = head;
                         break;
                    }
               }
          }
     }
     
     if(n == DEAD_NODE(q))
     {
          push(q, n);
          n = pop(q);
     }


     return n;
}
---------------------------------------------------------------------------------------------------------

push


void push(queue* q, node* n)
{
     n->next = NULL;
     while(1)
     {
          int ret = 0;
          unsigned long tag2 = q->tag2;
          node* tail = q->tail;
          CAS(ret, &tail->next, NULL, n);
          if(ret)
               break;
          CAS2(ret, &q->tail, tail, tag2, tail->next, tag2 + 1);
     }
     CAS2(ret, &q->tail, tail, tag2, n, tag2 + 1);
}