我最近阅读了Paul McKenney在2010年撰写的白皮书,"Memory Barriers: a Hardware View for Software Hackers"。
关于下面给出的一小段C代码,实现了M&S队列入队函数,特别是关于内存和编译器屏障,我非常希望得到一些反馈/评论/指导。
该代码使用指针计数对来处理ABA问题,并且应该仅被视为针对x86/x64编写。
行内注释现在都已经写好,并且是这篇文章的一部分,因为它们表达了我目前的想法。
为了简洁起见,我已经删除了断言和结构体中的缓存行填充等内容。
目前,我认为代码有很多问题,但我想确保我认为它有问题是出于正确的原因。
#define PAC_SIZE 2
struct lfds_queue_element
{
struct lfds_queue_element
*volatile next[PAC_SIZE];
void
*user_data;
};
struct lfds_queue_state
{
struct lfds_queue_element
*volatile enqueue[PAC_SIZE];
struct lfds_queue_element
*volatile dequeue[PAC_SIZE];
atom_t volatile
aba_counter;
};
void lfds_queue_internal_dcas_pac_enqueue( struct lfds_queue_state *lqs, struct lfds_queue_element *lqe )
{
ALIGN(ALIGN_DOUBLE_POINTER) struct lfds_queue_element
*local_lqe[PAC_SIZE], *enqueue[PAC_SIZE], *next[PAC_SIZE];
unsigned char cas_result = 0;
unsigned int backoff_iteration = 1;
/* TRD : here we have been passed a new element to place
into the queue; we initialize it and its next
pointer/counter pair
*/
local_lqe[POINTER] = lqe;
local_lqe[COUNTER] = (struct lfds_queue_element *) lfds_abstraction_atomic_increment( &lqs->aba_counter );
local_lqe[POINTER]->next[POINTER] = NULL;
local_lqe[POINTER]->next[COUNTER] = (struct lfds_queue_element *) lfds_abstraction_atomic_increment( &lqs->aba_counter );
/* TRD : now, I think there is a issue here, in that these values
are by no means yet necessarily visible to other cores
however, they only need to be visible once
the element has entered the queue, and for that
to happen, the contigious double-word CAS must
have occurred - and on x86/x64, this carries
with it an mfence
however, that mfence will only act to empty our
local store buffer - it will not cause other cores
to flush their invalidation queues, so surely
it can all still go horribly wrong?
ah, but if all other cores are only accessing
these variables using atomic operations, they
too will be issuing mfences and so at that
point flushing their invalidate queues
*/
do
{
enqueue[COUNTER] = lqs->enqueue[COUNTER];
enqueue[POINTER] = lqs->enqueue[POINTER];
next[COUNTER] = enqueue[POINTER]->next[COUNTER];
next[POINTER] = enqueue[POINTER]->next[POINTER];
/* TRD : now, this is interesting
we load the enqueue pointer and its next pointer
we then (immediately below) check to see they're unchanged
but this check is totally bogus! we could be reading
old values from our cache, where our invalidate queue
has not been processed, so the initial read contains
old data *and* we then go ahead and check from our cache
the same old values a second time
what's worse is that I think we could also read the correct
values for enqueue but an incorrect (old) value for its
next pointer...!
so, in either case, we easily mistakenly pass the if()
and then enter into code which does things to the queue
now, in both cases, the CAS will mfence, which will
cause us to see from the data structure the true
values, but how much will that help us - we need
to look to see what is actually being done
the if() checks next[POINTER] is NULL
if we have read a NULL for next, then we think
the enqueue pointer is correcly placed (it's not
lagging behind) so we don't need to help; we then
try to add our element to the end of the queue
now, it may be we have read enqueue properly but
next improperly and so we now try to add our element
where it will in fact truncate the queue!
the CAS however will mfence and so at this point
we will actually see the correct value for enqueue-next,
and this will prevent that occurring
if we look now at the else clause, here we have seen
that enqueue->next is not NULL, so the enqueue pointer
is out of place and we need to help, which we do by
moving it down the queue
here though we could have read enqueue correctly
but next incorrectly; the CAS will mfence, which will
update the cache, but since we're only comparing
the enqueue pointer with our copy of the enqueue
pointer, the fact our next pointer is wrong won't
change! so here, we move the enqueue pointer to
some old element - which although it might be in the
queue (so this would be an inefficiency, you'd have
to do a bunch more queue walking to get the enqueue
pointer to the final element) it might not be, too!
it could in the meantime have been dequeued and
that of course would be death
*/
if( lqs->enqueue[POINTER] == enqueue[POINTER] and lqs->enqueue[COUNTER] == enqueue[COUNTER] )
{
if( next[POINTER] == NULL )
{
local_lqe[COUNTER] = next[COUNTER] + 1;
cas_result = lfds_abstraction_atomic_dcas_with_backoff( (atom_t volatile *) enqueue[POINTER]->next, (atom_t *) local_lqe, (atom_t *) next, &backoff_iteration );
}
else
{
next[COUNTER] = enqueue[COUNTER] + 1;
lfds_abstraction_atomic_dcas( (atom_t volatile *) lqs->enqueue, (atom_t *) next, (atom_t *) enqueue );
}
}
}
while( cas_result == 0 );
local_lqe[COUNTER] = enqueue[COUNTER] + 1;
lfds_abstraction_atomic_dcas( (atom_t volatile *) lqs->enqueue, (atom_t *) local_lqe, (atom_t *) enqueue );
return;
}