套接字编程 -- recv()无法正确接收数据

4
我看了类似的线程,但似乎找不到能解决我的问题的内容。
我正在编写一个服务器,可以从客户端发送路径中的图像(jpg)文件。我在C语言中使用send/recv函数来实现。
我每次读取一块数据并将其发送给客户端,客户端接收数据并将其写入某个位置来创建文件。
问题是"recv"没有接收到由"send"发送的字节数。
作为调试的一部分,我尝试了不同的缓冲区大小,“128”的缓冲区大小没有任何问题,并且文件成功传输和创建。
但是,对于“32”和“64”位缓冲区,即使服务器发送的数据少于“32”位或“64”位,'recv'仍会在最后一个块中接收“32”位或“64”位数据。而在“256”、“512”、“1024”等情况下,“recv”仅在精确的一个响应中返回“128”位数据,即使服务器发送的完整块是“256”或“512”,这取决于缓冲区大小。
我感谢任何调试建议。以下是相关部分的代码,但如果有人需要,我可以提供更多的内容。
//客户端代码
#define BUFFER_SIZE 4096

#define   HEADER_LEN 512

const char * const scheme = "GETFILE";
const char* const method = "GET";
const char * const end_marker = "\\r\\n\\r\\n";

struct gfcrequest_t
{
    int filelen;
    char cport[12];
    char servIP[50];
    gfstatus_t ret_status;
    char spath[1024];
    int tot_bytes;
    char filecontent[BUFFER_SIZE];

    void (*fl_handler)(void *, size_t, void *);
    void * fDesc;

    void (*head_handler)(void *, size_t, void *);
    void * headarg;

};

static pthread_mutex_t counter_mutex;

gfcrequest_t *gfc;

// get sockaddr, IPv4 or IPv6:
void *get_in_addr(struct sockaddr *sa)
{
    if (sa->sa_family == AF_INET)
    {
        return &(((struct sockaddr_in*)sa)->sin_addr);
    }
    return &(((struct sockaddr_in6*)sa)->sin6_addr);
}

static char *stringFromError(gfstatus_t stat)
{
    static const char *strings[] = {"GF_OK", "GF_FILE_NOT_FOUND", "GF_ERROR", "GF_INVALID"};

    return strings[stat];
}


int getFileRequestHeader(char * req_header)
{
    return snprintf(req_header,HEADER_LEN, "%s%s%s%s", scheme, method,gfc->spath, end_marker);
   // return snprintf(req_header,HEADER_LEN, "%s%s%s%s", scheme, method,"/courses/ud923/filecorpus/yellowstone.jpg", end_marker);
}


gfcrequest_t *gfc_create()
{

    gfc = (gfcrequest_t*)malloc(1* sizeof(gfcrequest_t));

    return gfc;
}

void gfc_set_server(gfcrequest_t *gfr, char* server)
{
    strcpy(gfr->servIP, server);
}

void gfc_set_path(gfcrequest_t *gfr, char* path)
{
    strcpy(gfr->spath, path);
}

void gfc_set_port(gfcrequest_t *gfr, unsigned short port)
{
    snprintf(gfr->cport,12, "%u",port);
}

void gfc_set_headerfunc(gfcrequest_t *gfr, void (*headerfunc)(void*, size_t, void *))
{

    gfr->head_handler = headerfunc;

}

void gfc_set_headerarg(gfcrequest_t *gfr, void *headerarg)
{
/*have to change this...*/
    gfr->headarg = headerarg;

}

int isEndMarker(char *iheader, int start)
{
    char *marker = "\\r\\n\\r\\n";
    int i = 0; int ind=0;


    while (ind <= 7)
    {
        if (iheader[start++] == marker[ind++])
        {
            continue;
        }

        return 0;
    }

    return 1;

}

int getFileLen(char *resp_header, gfcrequest_t *gfr)
{
    char scheme[8];
    char status[4];
    int istatus;
    char filelen[12];

    int contentlen = 0;

    int fileindex = 0;
    char end_marker[12];
    int fexit=0;
    int end=0;

    sscanf(resp_header, "%7s%3s", scheme, status);


    istatus = atoi(status);

    if (istatus == 200)
    {
        gfr->ret_status = GF_OK;
    }
    else if (istatus == 400)
    {
        gfr->ret_status = GF_FILE_NOT_FOUND;
    }
    else if (istatus == 500)
    {
        gfr->ret_status = GF_ERROR;

    }

    if (!strcmp(scheme, "GETFILE") && (istatus == 200 || istatus == 400 || istatus == 500))
    {
        int index = 10;
        while(1)
        {

            if (resp_header[index] == '\\')
            {
                end = isEndMarker(resp_header, index);
            }

            if (end)
                break;

            filelen[fileindex++] = resp_header[index++];

        }

        filelen[fileindex] = '\0';
    }

    int head_len = strlen(scheme) + strlen(status) + strlen(filelen) + 8;

    return atoi(filelen);

}

void gfc_set_writefunc(gfcrequest_t *gfr, void (*writefunc)(void*, size_t, void *))
{

    gfr->fl_handler = writefunc;

}

void gfc_set_writearg(gfcrequest_t *gfr, void *writearg)
{
    gfr->fDesc = writearg;
}

int gfc_perform(gfcrequest_t *gfr){

    struct addrinfo hints, *servinfo, *p;

    int sockfd, rv, totalBytesRcvd;
    char req_header[HEADER_LEN];

    int bytesRcvd;

    int header_len;

    char s[INET6_ADDRSTRLEN];

    memset(&hints, 0, sizeof(hints));
    memset(req_header,0,sizeof(req_header));

    hints.ai_family = AF_UNSPEC;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_flags = AI_PASSIVE; //use my IP

    if ((rv = getaddrinfo(NULL, gfr->cport, &hints, &servinfo)) != 0)
    {
        fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
        return 1;
    }

    // loop through all the results and connect to the first we can

    for(p = servinfo; p != NULL; p = p->ai_next)
    {
        if ((sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1)
        {
            perror("client: socket");
            continue;
        }

        if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1)
        {
            close(sockfd);
            perror("client: connect");
            continue;
        }

        break;
    }

    if (p == NULL)
    {
        fprintf(stderr, "client: failed to connect\n");
        return 2;
    }

    //printf("connected...\n");

    inet_ntop(p->ai_family, get_in_addr((struct sockaddr *)p->ai_addr), s, sizeof(s));

//Ahsan
   // printf("Before getFileRequestHeader...\n");
    header_len = getFileRequestHeader(req_header);

    //printf("Header Description:%s, Header Len: %u\n", req_header, header_len);

    if (send(sockfd, req_header, header_len, 0) != header_len)
        perror("send() sent a different number of bytes than expected");

    if ((bytesRcvd = recv(sockfd, gfr->filecontent, BUFFER_SIZE, 0)) <= 0)
        perror("recv() failed or connection closed prematurely");

    //printf("Header Received: %s\n", gfr->filecontent);

    gfr->filelen = getFileLen(gfr->filecontent, gfr);

    //printf("File Length: %d\n", gfr->filelen);

    /* Receive the same string back from the server */
    int req_no=1;
    gfr->tot_bytes = 0;

    while ( 1 )
    {
        printf("Request: %d ", req_no++);
        ssize_t nb = recv( sockfd, gfr->filecontent, BUFFER_SIZE, 0 );
        if ( nb == -1 ) err( "recv failed" );
        if ( nb == 0 ) {printf("zero bytes received...breaking");break;} /* got end-of-stream */

        gfr->fl_handler(gfr->filecontent, nb, gfr->fDesc);
        gfr->tot_bytes += nb;

        printf("Received Bytes: %zd Total Received Bytes: %d\n", nb, gfr->tot_bytes);
    }

    return 0;
}


/*
 * Returns the string associated with the input status
 */

char* gfc_strstatus(gfstatus_t status)
{
    return stringFromError(status);
}


gfstatus_t gfc_get_status(gfcrequest_t *gfr){
    return gfr->ret_status;
}

size_t gfc_get_filelen(gfcrequest_t *gfr)
{
    return gfr->filelen;
}

size_t gfc_get_bytesreceived(gfcrequest_t *gfr)
{

    return gfr->tot_bytes;

}

void gfc_cleanup(gfcrequest_t *gfr)
{
    free(gfr);
    gfr=NULL;
}

void gfc_global_init()
{
;
//  pthread_mutex_lock(&counter_mutex);
//
//  gfc = (gfcrequest_t*)malloc(1* sizeof(gfcrequest_t));
//
//  pthread_mutex_unlock(&counter_mutex);


}

void gfc_global_cleanup()
{
;
//    pthread_mutex_lock(&counter_mutex);
//
//    free(gfc);
//
//    pthread_mutex_unlock(&counter_mutex);

}

//服务器代码

struct gfcontext_t
{
    int sockfd;
    int clntSock;
};

struct gfserver_t
{
    char port[12];
    unsigned short max_npending;
    char fpath[256];
    ssize_t (*fp_handler)(gfcontext_t *ctx, char *, void*);
    int *handler_arg;
};


/*Variable decalation*/


static gfserver_t *gfserv;

static gfcontext_t *gfcontext;

int isEndMarker(char *iheader, int start)
{
    char *marker = "\\r\\n\\r\\n";
    int i = 0; int ind=0;


    while (ind <= 7)
    {
        if (iheader[start++] == marker[ind++])
        {
            //printf("Header Char:%c Marker:%c\n", iheader[start], marker[ind]);
            //start++;
            continue;
        }

        return 0;
    }
    //printf("Its a marker!!!\n");
    return 1;

}

int parseHeader(char *iheader, gfserver_t *gfs, int hlen)
{
//"GETFILEGET/courses/ud923/filecorpus/road.jpg\r\n\r\n"

    char scheme[8];
    char method[4];
//    char path[256];
    int pathindex = 0;
    char end_marker[12];

    int end = 0;
    int fexit=0;

    sscanf(iheader, "%7s%3s", scheme, method);

    int i=10;
        if (iheader[i] == '/')
        {
//            printf("Path has started...\n");
            if (!strcmp(scheme, "GETFILE") && !strcmp(method, "GET"))
            {

                while(1)
                {

                    if (iheader[i] == '\\')
                    {
                        end = isEndMarker(iheader, i);
                    }

                    if (end)
                        break;

                    gfs->fpath[pathindex++] = iheader[i++];

                }

                gfs->fpath[pathindex] = '\0';
            }
        }


    printf("Scheme: %s Method:%s Path:%s\n", scheme, method, gfs->fpath);

    return 0;
}


void *get_in_addr(struct sockaddr *sa)
{

    if (sa->sa_family == AF_INET)
    {
        return &(((struct sockaddr_in*)sa)->sin_addr);
    }

    return &(((struct sockaddr_in6*)sa)->sin6_addr);
}



ssize_t gfs_sendheader(gfcontext_t *ctx, gfstatus_t stat, size_t file_len)
{

    char resp_header[MAX_REQUEST_LEN];
    char end_marker[12] = "\\r\\n\\r\\n";

    sprintf(resp_header, "GETFILE%d%zd%s",stat, file_len, end_marker);

    printf("Response: %s\n", resp_header);

    if (send(ctx->clntSock, resp_header, MAX_REQUEST_LEN, 0) != MAX_REQUEST_LEN)
        perror("send() failed");

    return 0;
}

ssize_t gfs_send(gfcontext_t *ctx, void *data, size_t len)
{
    size_t total = 0;
    size_t bytesLeft = len;
    size_t n;

    int debug_req=1;

    while (total < len)
    {
         n = send(ctx->clntSock, data+total, bytesLeft, 0);
         if (n == -1) { printf("Nothing to send...\n"); break; }

         fprintf(stderr, "Tries: %d Bytes Sent: %zu\n", debug_req++, n);

         total += n;
         bytesLeft -= n;
    }

   // if ( shutdown( ctx->clntSock, SHUT_WR ) == -1 ) err( "socket shutdown failed" );

    return total;
}

void gfs_abort(gfcontext_t *ctx){

    close(ctx->clntSock);
    close(ctx->sockfd);
    free(ctx);
    free(gfserv);

    perror("aborting...");

    exit(1);
}

gfserver_t* gfserver_create()
{

    gfserv = (gfserver_t*) malloc(1 * sizeof(gfserver_t));
    gfcontext = (gfcontext_t*) malloc(1*sizeof(gfcontext_t));

    return gfserv;
}

void gfserver_set_port(gfserver_t *gfs, unsigned short port)
{
    //set port number in gfs structure
    snprintf(gfs->port,12, "%u",port);
}

void gfserver_set_maxpending(gfserver_t *gfs, int max_npending)
{
    //set maxpending connections
    gfs->max_npending = max_npending;
}

void gfserver_set_handler(gfserver_t *gfs, ssize_t (*handler)(gfcontext_t *, char *, void*))
{
    gfs->fp_handler = handler;
}

void gfserver_set_handlerarg(gfserver_t *gfs, void* arg)
{
    gfs->handler_arg = (int *)arg;
}

void gfserver_serve(gfserver_t *gfs)
{

    struct addrinfo hints, *servinfo, *p;
    struct sockaddr_storage clntAddr; //connectors address information
    socklen_t clntSize;
    int yes = 1;
    char s[INET6_ADDRSTRLEN];
    int rv;

    int rcvMsg = 0;
    char recvBuff[MAX_REQUEST_LEN];
    char *req_path;

    memset(&hints, 0, sizeof(hints));

    hints.ai_family = AF_UNSPEC;
    hints.ai_socktype = SOCK_STREAM; //using stream socket instead of datagrams
    hints.ai_flags = AI_PASSIVE; //use my IP

    if ((rv = getaddrinfo(NULL, gfs->port, &hints, &servinfo)) != 0)
    {
        fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
        return 1;
    }

    // loop through all the results and bind to the first we can
    for(p = servinfo; p != NULL; p = p->ai_next)
    {

        if ((gfcontext->sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1)
        {
            perror("server: socket");
            continue;
        }

        //get rid of 'address already in use' error.
        if (setsockopt(gfcontext->sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1)
        {
            perror("setsockopt");
            exit(1);
        }

        if (bind(gfcontext->sockfd, p->ai_addr, p->ai_addrlen) == -1)
        {
            close(gfcontext->sockfd);
            perror("server: bind");
            continue;
        }

        break;
    }

    if (p == NULL)
    {
        fprintf(stderr, "server: failed to bind.\n");
        return 2;
    }

    freeaddrinfo(servinfo); // no need of servinfo structure anymore

    if (listen(gfcontext->sockfd, gfs->max_npending) == -1)
    {
        perror("listen");
        exit(1);
    }


    //printf("server: waiting for connetions...\n");

    while(1)
    {
        clntSize = sizeof(clntAddr);
        gfcontext->clntSock = accept(gfcontext->sockfd, (struct sockaddr *)&clntAddr, &clntSize);

        if (gfcontext->clntSock == -1)
        {
            perror("accept");
            continue;
        }

        inet_ntop(clntAddr.ss_family, get_in_addr((struct sockaddr *)&clntAddr), s, sizeof(s));

        //printf("server: got connection from %s\n", s);

        if (!fork())
        { // this is the child process
            if ((rcvMsg = recv(gfcontext->clntSock, recvBuff, MAX_REQUEST_LEN, 0)) < 0)
            {
                perror("recv() failed");
                exit(1);
            }

            /*Still to parse received request...*/
            //printf("Recd Header: %s, Recd %d bytes\n",recvBuff, rcvMsg);

            /*Parse the received header...*/

            int len = parseHeader(recvBuff, gfs, rcvMsg);

            //printf("Requested Path: %s\n", gfs->fpath);

            if (gfs->fp_handler(gfcontext, gfs->fpath, NULL) < 0)
            {
                printf("some problem...\n");

            }

            if ( shutdown( gfcontext->clntSock, SHUT_WR ) == -1 ) err( "socket shutdown failed" );
            //close(gfcontext->clntSock);
        }

    }

}

//Server gf_send is being called from following function handler function:

Handler:
ssize_t handler_get(gfcontext_t *ctx, char *path, void* arg){
    int fildes;
    size_t file_len, bytes_transferred;
    ssize_t read_len, write_len;
    char buffer[BUFFER_SIZE];

    printf("Path: %s\n", path);

    if( 0 > (fildes = content_get(path)))
        return gfs_sendheader(ctx, GF_FILE_NOT_FOUND, 0);

    /* Calculating the file size */
    file_len = lseek(fildes, 0, SEEK_END);

    gfs_sendheader(ctx, GF_OK, file_len);

    /* Sending the file contents chunk by chunk. */
    int req=1;
    bytes_transferred = 0;
    while(bytes_transferred < file_len){
        read_len = pread(fildes, buffer, BUFFER_SIZE, bytes_transferred);
        if (read_len <= 0){
            fprintf(stderr, "handle_with_file read error, %zd, %zu, %zu", read_len, bytes_transferred, file_len );
            gfs_abort(ctx);
            return -1;
        }

        printf("Request No: %d ", req++);
        write_len = gfs_send(ctx, buffer, read_len);
        if (write_len != read_len){
            fprintf(stderr, "handle_with_file write error");
            gfs_abort(ctx);
            return -1;
        }

        bytes_transferred += write_len;
    }

    printf("Total Bytes sent to client: %zu\n", bytes_transferred);

    return bytes_transferred;
}

(1/2)具体是哪种类型的套接字?(也就是说,展示一下你是如何调用“socket”、“bind”和“connect”的。) - zwol
2
你的“last-chunk-too-large”问题可能只是因为你在调用“fl_handler”时错误地使用了“BUFFER_SIZE”而不是“bytesRecvd”。 - zwol
(3/2)挑剔与您实际问题无关:memset的第二个参数应该是0,而不是NULLNULL是一个指针。 memset的第二个参数是字符。(您可能会因为ASCII字符0的名称是带有一个L的“NUL”而感到困惑。) - zwol
fl_handler 只是将数据写入文件,但我正在打印返回的字节,这会打印缓冲区的大小而不是从服务器发送的实际数据。是的,我理解了 NULL -- 我需要修改它。我正在使用 TCP 套接字。 - Ahsan
这里有一篇文章,他的指南非常实用。http://beej.us/guide/bgnet/ - hookenz
3个回答

9
你没有说明,所以我假设你在这里使用TCP(与UDP不同的发送/接收语义),
你正在遭受一个非常普遍的误解,即TCP套接字上的一次发送对应于另一端接收到的已发送字节数。这是错误的。
事实上,TCP套接字是一个无消息概念的双向字节流。一个写操作可以对应于另一端的多个读取操作,反之亦然。将其视为流。
您需要保持从发送和接收系统调用返回的发送和接收字节数。
还重要的是让另一方知道您发送了多少数据,这样它就会知道何时完全传输图像。这是一个应用层协议的工作,您必须提出或使用现有的协议。

编辑0:

这是在客户端和服务器之间设置任何有意义的协议之前看起来需要的内容。 首先是发送代码:
size_t total = 0;

while ( total != len ) {
    ssize_t nb = send( s, data + total, len - total, 0 );
    if ( nb == -1 ) err( "send failed" );
    total += nb;
}
if ( shutdown( s, SHUT_WR ) == -1 ) err( "socket shutdown failed" );
/* also need to close client socket, see below */

然后是接收代码:
char buffer[BUFFER_SIZE]; /* somewhere, might be static */
size_t total = 0; /* everything received */
while ( 1 ) {
    ssize_t nb = recv( s, buffer, BUFFER_SIZE, 0 );
    if ( nb == -1 ) err( "recv failed" );
    if ( nb == 0 ) break; /* got end-of-stream */
    if ( write( file_fd, buffer, nb ) == -1 ) err( "file write failed" );
    total += nb;
}
/* send an ack here */
if ( close( s ) == -1 ) err( "socket close failed" );
if ( close( file_fd )) err( "file close failed" );
printf( "received and saved total of %zu bytes\n", total );

然后,你的应用层协议可能就像服务器在接受新客户端连接后立即发送 64 位文件长度一样简单(你需要决定使用什么 字节序),然后向客户端发送相同数量的字节并关闭套接字的写入,在等待客户端确认成功接收数据。这可能是同样的数字返回,或者只是一个字节 - 由您决定,最后关闭套接字。这样,客户端事先知道要期望多少字节,而服务器则知道传输成功。
在使这个简单版本工作之后,您可以将其扩展为允许每个连接进行多个文件传输,并且/或者深入研究 select(2)/poll(2) 中的 IO 多路复用。
希望这有所帮助。

2
1up - 很棒的回答。 - NTDLS
是的,我正在使用TCP。我已经阅读了您提到的问题,并尝试在“发送方”解决它。您能否提供一些关于如何在接收端解决该问题的见解?我原以为while循环会一直工作直到所有数据到达,所以如果我接收到较少的字节,recv将在下一次迭代中再次调用。 - Ahsan
1
看起来你需要增加缓冲区偏移量。还可以查看 @EJP 的答案。 - Nikolai Fetissov
尝试在发送完成后关闭服务器端的套接字。 - Nikolai Fetissov
当我关闭套接字时,它会丢失一个完整的数据块,但如果我关闭套接字,我会得到相同的行为。我曾经使用过fork,但我已经禁用了它。这并没有改变任何事情。 - Ahsan
显示剩余11条评论

2
首先:recv() 并不总是按照 send() 发送的块接收数据,事实上 - 它很少这样做 - 因为缓冲(例如,您发送 256 字节,接收两个 128 字节的缓冲区)。
现在来看你的错误:我认为问题在于你没有使用 FD_SET 调用 select() 将你的套接字重置为“准备好接收”的状态,然后才调用 recv() 进行第二次接收。
如果你想挖掘一下,我的网站 my site 上有大量的 winsock/c-sockets 代码。
如果需要,我可以进一步提供帮助!

谢谢!我很想知道在再次调用recv之前如何重置套接字。我认为这会解决我的错误。 - Ahsan
1
这个答案的第一段是正确的,但第二段完全错误。在第二次调用recv之前不需要在套接字上调用select。如果您只有一个要接收的套接字,则在recv中阻塞而不是select中没有害处。如果您有多个套接字,则当然必须将它们全部设置为非阻塞,并且您可以 - 实际上应该 - 在循环中调用recv,直到它以EAGAIN失败。 - zwol
我同意,但在回答时 - 问题并没有表述得很清楚,需要消化的源代码也少得多。 - NTDLS

0
gfr->fl_handler(gfr->filecontent, BUFFER_SIZE, gfr->fDesc);

通常的问题。您假设读取已填充缓冲区。应该是:

gfr->fl_handler(gfr->filecontent, bytesRcvd, gfr->fDesc);

是的,应该是这样的。我会纠正它。这样就少了一个 bug。 :) - Ahsan
同时,当 bytesRcvd <= 0 时,那个循环迭代需要不更新 tot_bytes(如果 bytesRcvd 是 -1 的情况下)或调用 fl_handler(),因为从该迭代中不会有任何可用的数据。 当 bytesRcvd 为0时,套接字已断开连接,并且不会再有更多数据,所以只需 break 循环即可。 但是,如果 bytesRcvd 为-1,则必须查看 errno 以知道您是否可以安全地 continue 循环重试 recv() 或需要 break 它。 - Remy Lebeau

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接