Linux中同时进行套接字读写(全双工)(特别是aio)

4
我正在移植一个基于ACE Proactor框架构建的应用程序。该应用程序在VxWorks和Windows上运行完美,但在Linux(CentOS 5.5、WindRiver Linux 1.4和3.0)内核2.6.X.X上使用librt时无法正常工作。
我已经将问题缩小到一个非常基本的问题: 应用程序在套接字上开始异步(通过aio_read)读取操作,随后在同一套接字上开始异步(通过aio_write)写入操作。由于协议是从应用程序端初始化的,因此尚未完成读取操作。 - 当套接字处于阻塞模式时,写入永远不会被执行,协议“挂起”。 - 当使用O_NONBLOCK套接字时,写入成功,但读取返回“EWOULDBLOCK / EAGAIN”错误,并且永远无法恢复(即使重新启动AIO操作)。
我查看了多个论坛,没有找到明确的答案,无法确定这是否可行(以及我是否做错了什么),或者Linux AIO是否不支持。如果我放弃AIO并寻求不同的实现(通过epoll / poll / select等),是否可能?
附上一个示例代码,可以快速重现非阻塞套接字上的问题:
#include <aio.h>
#include <stdio.h>
#include <stdlib.h>
#include <netdb.h>
#include <string.h> 
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <assert.h>
#include <errno.h>

#define BUFSIZE (100)

// Global variables
struct aiocb *cblist[2];
int theSocket;

void InitializeAiocbData(struct aiocb* pAiocb, char* pBuffer)
{
    bzero( (char *)pAiocb, sizeof(struct aiocb) );

    pAiocb->aio_fildes = theSocket;
    pAiocb->aio_nbytes = BUFSIZE;
    pAiocb->aio_offset = 0;
    pAiocb->aio_buf = pBuffer;
}

void IssueReadOperation(struct aiocb* pAiocb, char* pBuffer)
{
    InitializeAiocbData(pAiocb, pBuffer);

    int ret = aio_read( pAiocb );
    assert (ret >= 0);
}

void IssueWriteOperation(struct aiocb* pAiocb, char* pBuffer)
{
    InitializeAiocbData(pAiocb, pBuffer);

    int ret = aio_write( pAiocb );
    assert (ret >= 0);
}

int main()
{
    int ret;
    int nPort = 11111;
    char* szServer = "10.10.9.123";

    // Connect to the remote server
    theSocket = socket(AF_INET, SOCK_STREAM, 0);
    assert (theSocket >= 0);

    struct hostent *pServer;
    struct sockaddr_in serv_addr;
    pServer = gethostbyname(szServer);

    bzero((char *) &serv_addr, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(nPort);
    bcopy((char *)pServer->h_addr, (char *)&serv_addr.sin_addr.s_addr, pServer->h_length);

    assert (connect(theSocket, (const sockaddr*)(&serv_addr), sizeof(serv_addr)) >= 0);

    // Set the socket to be non-blocking
    int oldFlags = fcntl(theSocket, F_GETFL) ;
    int newFlags = oldFlags | O_NONBLOCK;

    fcntl(theSocket, F_SETFL, newFlags);
    printf("Socket flags: before=%o, after=%o\n", oldFlags, newFlags);

    // Construct the AIO callbacks array
    struct aiocb my_aiocb1, my_aiocb2;
    char* pBuffer = new char[BUFSIZE+1];

    bzero( (char *)cblist, sizeof(cblist) );
    cblist[0] = &my_aiocb1;
    cblist[1] = &my_aiocb2;

    // Start the read and write operations on the same socket
    IssueReadOperation(&my_aiocb1, pBuffer);
    IssueWriteOperation(&my_aiocb2, pBuffer);

    // Wait for I/O completion on both operations
    int nRound = 1;
    printf("\naio_suspend round #%d:\n", nRound++);
    ret = aio_suspend( cblist, 2, NULL );
    assert (ret == 0);

    // Check the error status for the read and write operations
    ret = aio_error(&my_aiocb1);
    assert (ret == EWOULDBLOCK);

    // Get the return code for the read
    {
        ssize_t retcode = aio_return(&my_aiocb1);
        printf("First read operation results: aio_error=%d, aio_return=%d  -  That's the first EWOULDBLOCK\n", ret, retcode);
    }

    ret = aio_error(&my_aiocb2);
    assert (ret == EINPROGRESS);
    printf("Write operation is still \"in progress\"\n");

    // Re-issue the read operation
    IssueReadOperation(&my_aiocb1, pBuffer);

    // Wait for I/O completion on both operations
    printf("\naio_suspend round #%d:\n", nRound++);
    ret = aio_suspend( cblist, 2, NULL );
    assert (ret == 0);

    // Check the error status for the read and write operations for the second time
    ret = aio_error(&my_aiocb1);
    assert (ret == EINPROGRESS);
    printf("Second read operation request is suddenly marked as \"in progress\"\n");

    ret = aio_error(&my_aiocb2);
    assert (ret == 0);

    // Get the return code for the write
    {
        ssize_t retcode = aio_return(&my_aiocb2);
        printf("Write operation has completed with results: aio_error=%d, aio_return=%d\n", ret, retcode);
    }

    // Now try waiting for the read operation to complete - it'll just busy-wait, receiving "EWOULDBLOCK" indefinitely
    do
    {
        printf("\naio_suspend round #%d:\n", nRound++);
        ret = aio_suspend( cblist, 1, NULL );
        assert (ret == 0);

        // Check the error of the read operation and re-issue if needed
        ret = aio_error(&my_aiocb1);
        if (ret == EWOULDBLOCK)
        {
            IssueReadOperation(&my_aiocb1, pBuffer);
            printf("EWOULDBLOCK again on the read operation!\n");
        }
    }
    while (ret == EWOULDBLOCK);
}

感谢您的提前帮助,Yotam。

请尝试使用ace-users@list.isis.vanderbilt.edu邮件列表:http://www.cs.wustl.edu/~schmidt/ACE-mail.html - Raphael Bossek
1个回答

3
首先,O_NONBLOCK和AIO不兼容。当相应的readwrite不会被阻塞时,AIO将报告异步操作完成 - 并且使用O_NONBLOCK,它们永远不会被阻塞,因此aio请求将始终立即完成(aio_return()返回EWOULDBLOCK)。
其次,请勿为两个同时存在的aio请求使用相同的缓冲区。在发出aio请求和
第三,对于相同的文件描述符,AIO请求是按顺序排队的,以便给出合理的结果。这意味着在读取完成之前,您的写入不会发生 - 如果需要先写入数据,则需要按相反的顺序发出AIO请求。以下内容将正常工作,而无需设置O_NONBLOCK
struct aiocb my_aiocb1, my_aiocb2;
char pBuffer1[BUFSIZE+1], pBuffer2[BUFSIZE+1] = "Some test message";

const struct aiocb *cblist[2] = { &my_aiocb1, &my_aiocb2 };

// Start the read and write operations on the same socket
IssueWriteOperation(&my_aiocb2, pBuffer2);
IssueReadOperation(&my_aiocb1, pBuffer1);

// Wait for I/O completion on both operations
int nRound = 1;
int aio_status1, aio_status2;
do {
    printf("\naio_suspend round #%d:\n", nRound++);
    ret = aio_suspend( cblist, 2, NULL );
    assert (ret == 0);

    // Check the error status for the read and write operations
    aio_status1 = aio_error(&my_aiocb1);
    if (aio_status1 == EINPROGRESS)
        puts("aio1 still in progress.");
    else
        puts("aio1 completed.");

    aio_status2 = aio_error(&my_aiocb2);

    if (aio_status2 == EINPROGRESS)
        puts("aio2 still in progress.");
    else
        puts("aio2 completed.");
} while (aio_status1 == EINPROGRESS || aio_status2 == EINPROGRESS);

 // Get the return code for the read
ssize_t retcode;
retcode = aio_return(&my_aiocb1);
printf("First operation results: aio_error=%d, aio_return=%d\n", aio_status1, retcode);

retcode = aio_return(&my_aiocb1);
printf("Second operation results: aio_error=%d, aio_return=%d\n", aio_status1, retcode);

或者,如果您不关心读写是否相互有序,可以使用 dup() 创建两个文件描述符用于套接字,并分别用于读取和写入 - 每个文件描述符的 AIO 操作将被单独排队。


1
你有“同一fd的AIO请求按顺序排队”的参考资料吗?如果是这样,那么更好的解决方案肯定是复制该句柄。 - Ben Voigt
1
@Ben Voigt:这是另一种选择-使用dup()为套接字创建两个文件描述符,一个用于读取,另一个用于写入。 - caf
首先,我刚刚看到了评论并开始尝试它们 - 但似乎dup()可能会解决问题。它似乎修复了问题,但我仍然需要运行一些压力测试。很可能我欠你们一杯啤酒!其次,我知道在下面的示例中我使用了非阻塞套接字和相同的缓冲区 - 这只是一个虚拟测试程序。不过,在发出写入之前后台读取是我们特定应用程序所要求的 - 所以无法更改... - Yotam
Ben Voigt - 请将您的评论重新发布为答案,我会标记此问题为已完成或其他什么... - Yotam

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接