当服务器没有可用的后备时,Golang TCP连接变慢。

5

更新

加入了pthreaded C客户端后,问题重现,表明长连接时间是TCP协议的一部分,而不是特定实现。修改这些协议似乎不容易实现。

初始问题

我认为我的问题主要是:当Golang的net包尝试通过TCP连接到服务器时,会发生什么情况:

  1. 服务器没有可用的连接,即使在backlog中也没有。
  2. 连接未被拒绝/失败。

这种连接似乎有大量的开销,服务器响应时间从5毫秒上升到几秒钟。这在生产环境和下面的最小示例中都看到了。适当的解决方案是使用到服务器的连接池,将被实现。这主要是我的好奇心。

复现:

  1. 使用backlog = 1运行服务器,运行client.go。
    • 所有50个goroutine同时启动,总完成时间近2分钟。
  2. 使用backlog = 100运行服务器,运行client.go。
    • 所有50个goroutine同时启动,排队连接到服务器,并在 ~260毫秒 内完成。
  3. 运行三个C客户端,利用 50微秒 的重试时间,平均能够在 12毫秒 内完成连接,因此没有看到这个问题。

backlog = 1的示例输出(第一次是拨号时间,第二次是完成时间):

user@computer ~/tcp-tests $ go run client.go 127.0.0.1:46999
Long Elapsed Time: 216.579µs, 315.196µs
Long Elapsed Time: 274.169µs, 5.970873ms
Long Elapsed Time: 74.4µs, 10.753871ms
Long Elapsed Time: 590.965µs, 205.851066ms
Long Elapsed Time: 1.029287689s, 1.029574065s
Long Elapsed Time: 1.02945649s, 1.035098229s
...
Long Elapsed Time: 3.045881865s, 6.378597166s
Long Elapsed Time: 3.045314838s, 6.383783688s
Time taken stats: 2.85 +/- 1.59 s // average +/- STDEV
Main Taken: 6.384677948s

backlog = 100 的示例输出:

...
Long Elapsed Time: 330.098µs, 251.004077ms
Long Elapsed Time: 298.146µs, 256.187795ms
Long Elapsed Time: 315.832µs, 261.523685ms
Time taken stats: 0.13 +/- 0.08 s
Main Taken: 263.186955ms

那么在 net.DialTCP 的内部发生了什么(我们使用其他版本的拨号,但没有明显的区别),导致了拨号时间的增长呢?
  • 尝试建立连接时轮询的时间?
  • RFC 5681 全局拥塞控制(可能包括互斥锁?)的变量,在所有初始失败的连接尝试中都会递增?
  • 还有其他原因吗?

我倾向于前两种,因为 1s、3s、5s 这些值似乎是一个神奇的数字。它们出现在我的普通本地机器上以及大规模生产环境中。

这里是用 C 编写的最小服务器。感兴趣的配置值是作为 backlog 参数传递给 listen 的。

/*
    Adapted from
    https://www.geeksforgeeks.org/tcp-server-client-implementation-in-c/
    
    Compile and run with:
        gcc server.c -o server; ./server
*/
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/time.h>

int main(void)
{
    int socket_desc, client_sock, client_size;
    struct sockaddr_in server_addr, client_addr;
    char server_message[2000], client_message[2000];

    // Clean buffers:
    memset(server_message, '\0', sizeof(server_message));
    memset(client_message, '\0', sizeof(client_message));

    // Create socket:
    socket_desc = socket(AF_INET, SOCK_STREAM, 0);

    if(socket_desc < 0){
        printf("Error while creating socket\n");
        return -1;
    }
    printf("Socket created successfully\n");

    // Set port and IP:
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(46999);
    server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");

    // Bind to the set port and IP:
    if(bind(socket_desc, (struct sockaddr*)&server_addr, sizeof(server_addr))<0){
        printf("Couldn't bind to the port\n");
        return -1;
    }
    printf("Done with binding\n");

    // Listen for clients:
    // Increasing the backlog allows the Go client to connect and wait
    // rather than poll/retry.
    if(listen(socket_desc, 100) < 0){
        printf("Error while listening\n");
        return -1;
    }
    printf("\nListening for incoming connections.....\n");

    // Accept an incoming connection:
    client_size = sizeof(client_addr);
    int server_run = 1;
    do
    {
        struct timeval start, end;
        double cpu_time_used;
        gettimeofday(&start, NULL);
        client_sock = accept(socket_desc, (struct sockaddr*)&client_addr, &client_size);

        if (client_sock < 0){
            printf("Can't accept\n");
            return -1;
        }

        // Receive client's message:
        if (recv(client_sock, client_message, sizeof(client_message), 0) < 0){
            printf("Couldn't receive\n");
            return -1;
        }
        if (strcmp(client_message, "stop") == 0)
        {
            server_run = 0;
            printf("Received stop message.\n");
        }

        // Respond to client:
        strcpy(server_message, "This is the server's message.");

        if (send(client_sock, server_message, strlen(server_message), 0) < 0){
            printf("Can't send\n");
            return -1;
        }

        // sleep for 5 ms
        usleep(5000);

        // Closing the socket:
        close(client_sock);
        gettimeofday(&end, NULL);
        cpu_time_used = (end.tv_usec - start.tv_usec) / 1000.0;
        if (cpu_time_used > 0.0) // overflow in tv_usec if negative
            printf("Server Time: %.4f ms\n", cpu_time_used);
    } while(server_run);

    close(socket_desc);

    return 0;
}

这里是测试用的Go客户端

/*
    Adapted from
    https://www.linode.com/docs/guides/developing-udp-and-tcp-clients-and-servers-in-go/

    Run once the server.c is compiled and running with:
        go run client.go 127.0.0.1:46999
*/
package main

import (
    "fmt"
    "net"
    "os"
    "time"
    "github.com/montanaflynn/stats"
    "sync"
)
func do_message(wg *sync.WaitGroup, connect string, time_taken *float64) {
    defer wg.Done()
    message := make([]byte, 128)
    start_time := time.Now()
    pAddr, err := net.ResolveTCPAddr("tcp", connect)
    if err != nil {
        return
    }

    c, err := net.DialTCP("tcp", nil, pAddr)
    if err != nil {
        fmt.Println(err)
        return
    }
    c.SetLinger(0)
    dialed_time := time.Since(start_time)

    defer func() {
        c.Close()
        elapsed_time := time.Since(start_time)
        if elapsed_time.Microseconds() > 60 { // microseconds
            fmt.Println("Long Elapsed Time: " + dialed_time.String() + ", " + elapsed_time.String())
        }
        *time_taken = float64(elapsed_time.Microseconds())
    }()

    text := "{\"service\": \"magic_service_str\"}"
    c.Write([]byte(text))
    code, _ := c.Read(message) // Does not actually wait for response.
    code = code
}
func main() {
    main_start := time.Now()
    arguments := os.Args
    if len(arguments) == 1 {
            fmt.Println("Please provide host:port.")
            return
    }
    n_messages := 50
    wg := new(sync.WaitGroup)
    wg.Add(n_messages)
    times := make([]float64, n_messages)
    for i := 0; i < n_messages; i++ {
        // Used to turn the goroutines into serial implementation
        // time.Sleep(5500 * time.Microsecond)
        go do_message(wg, arguments[1], &times[i])
    }
    wg.Wait()
    avg, _ := stats.Mean(times)
    std, _ := stats.StandardDeviation(times)
    fmt.Println("Time taken stats: " + fmt.Sprintf("%.2f", avg / 1000000.0) + " +/- " + fmt.Sprintf("%.2f", std / 1000000.0) + " s")
    main_taken := time.Since(main_start)
    fmt.Println("Main Taken: " + main_taken.String())
}

更新了C语言中的pthread客户端,并确认问题不是Golang实现造成的:

// gcc client_p.c -o pclient -lpthread
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdlib.h>
#include<sys/time.h>


#include <pthread.h>
#include <errno.h>

#ifndef THREAD_LOOP_COUNT
#define THREAD_LOOP_COUNT 1
#endif

/* Subtract the ‘struct timeval’ values X and Y,
   storing the result in RESULT.
   Return 1 if the difference is negative, otherwise 0.
   https://www.gnu.org/software/libc/manual/html_node/Calculating-Elapsed-Time.html
*/

int
timeval_subtract (struct timeval *result, struct timeval *x, struct timeval *y)
{
  /* Perform the carry for the later subtraction by updating y. */
  if (x->tv_usec < y->tv_usec) {
    int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1;
    y->tv_usec -= 1000000 * nsec;
    y->tv_sec += nsec;
  }
  if (x->tv_usec - y->tv_usec > 1000000) {
    int nsec = (x->tv_usec - y->tv_usec) / 1000000;
    y->tv_usec += 1000000 * nsec;
    y->tv_sec -= nsec;
  }

  /* Compute the time remaining to wait.
     tv_usec is certainly positive. */
  result->tv_sec = x->tv_sec - y->tv_sec;
  result->tv_usec = x->tv_usec - y->tv_usec;

  /* Return 1 if result is negative. */
  return x->tv_sec < y->tv_sec;
}


static void* workerThreadFunc(void* arg)
{
    int socket_desc;
    struct sockaddr_in server_addr;
    char server_message[2000], client_message[2000];
    // Clean buffers:
    memset(server_message,'\0',sizeof(server_message));
    memset(client_message,'\0',sizeof(client_message));
    // Set port and IP the same as server-side:
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(46999);
    server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
    int retries = 0;
    struct timeval start, end, difference;
    double cpu_time_used;
    for(int i = 0; i < THREAD_LOOP_COUNT; i++)
    {
        gettimeofday(&start, NULL);
        // Create socket:
        socket_desc = socket(AF_INET, SOCK_STREAM, 0);
        if(socket_desc < 0){
            printf("Unable to create socket\n");
            return;
        }
        // Send connection request to server:
        while(connect(socket_desc, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0){
            retries++;
            if (retries > 10)
            {
                printf("Unable to connect\n");
                retries = 0;
            }
            usleep(5);
        }
        int retries = 0;

        // Send the message to server:
        if(send(socket_desc, client_message, strlen("client message."), 0) < 0){
            printf("Unable to send message\n");
            close(socket_desc);
            return;
        }

        // Receive the server's response:
        if(recv(socket_desc, server_message, sizeof(server_message), 0) < 0){
            printf("Error while receiving server's msg\n");
            close(socket_desc);
            return;
        }

        // Close the socket:
        close(socket_desc);
        gettimeofday(&end, NULL);
        timeval_subtract (&difference, &end, &start);
        double cpu_time_used = (double)difference.tv_sec + (double)difference.tv_usec / 1000000.0;
        printf("Client Time: %.4e s\n", cpu_time_used);
    }
}

int main(int argc, char **argv)
{
    int n_threads = 50;  // default value
    if (argc > 1)
        n_threads = atoi(argv[1]);

    pthread_t *threads = (pthread_t*)malloc(n_threads * sizeof(pthread_t));

    struct timeval start, end, difference;
    gettimeofday(&start, NULL);
    for(int i = 0; i < n_threads; i++)
    {
        int createRet = pthread_create(&threads[i], NULL, workerThreadFunc, NULL);
        if (createRet != 0)
        {
            printf("failed to create thread\n");
        }
    }
    for(int i = 0; i < n_threads; i++)
        pthread_join(threads[i], NULL);
    gettimeofday(&end, NULL);
    timeval_subtract (&difference, &end, &start);
    double cpu_time_used = (double)difference.tv_sec + (double)difference.tv_usec / 1000000.0;
    printf("Total Client Time: %.4e s\n", cpu_time_used);
    free(threads);
    return 0;
}

1
这与Golang无关。该行为在服务器端内核中实现,其目的是根据连接是否为Windows来重置或忽略连接。这将导致客户端的connect()调用超时并尝试加倍超时时间,最多重试3次。总共可能需要一分钟左右。真正的解决方案是加速服务器的accept()循环,并使其独立于所有已接受客户端的I/O。 - user207421
@user207421 那么服务器端的内核是否正在重试与服务器进程的连接,并且这个过程中会出现超时并加倍? - bbg
是客户端进行重试,我已经说过了。 - user207421
@user207421 没错,但是你也说它与Golang无关,而Golang是客户端。有办法通过Golang/C在客户端上修改指数退避吗? - bbg
1
客户端还包括C库、操作系统、计算机和网络接口。我提到的重试是在C库的connect()调用中发生的。我不知道有任何方法可以控制它们。 - user207421
1个回答

0

正如@user207421所指出的,问题在于TCP实现中包含了重试的指数退避。无论是Golang还是C都似乎没有一种简单的方法来改变这种行为。

答案是:如果您需要高吞吐量,请不要打开和关闭TCP连接--使用连接池。

有些工作正在研究去除指数退避,下面链接提供了相关信息,但对于特定情况可能存在更好的解决方案。对我而言确实如此。

ACM SIGCOMM计算机通信评论,“从TCP中删除指数退避”,第38卷,第5期,2008年10月。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接