无法理解 "message sequence mismatch error" 错误。

3

我使用了在此链接中回答的程序,并进行了一些修改。 下面是我的修改代码:

#include <linux/netlink.h>

#include <netlink/netlink.h>
#include <netlink/route/qdisc.h>
#include <netlink/route/qdisc/plug.h>
#include <netlink/socket.h>

#include <atomic>
#include <csignal>
#include <iostream>
#include <stdexcept>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <thread>
#include <queue>
#include <chrono>

/**
 * Netlink route socket.
 */
struct Socket {
  Socket() : handle{nl_socket_alloc()} {

    if (handle == nullptr) {
      throw std::runtime_error{"Failed to allocate socket!"};
    }

    if (int err = nl_connect(handle, NETLINK_ROUTE); err < 0) {
      throw std::runtime_error{"Unable to connect netlink socket: " +
                               std::string{nl_geterror(err)}};
    }
  }

  Socket(const Socket &) = delete;
  Socket &operator=(const Socket &) = delete;
  Socket(Socket &&) = delete;
  Socket &operator=(Socket &&) = delete;

  ~Socket() { nl_socket_free(handle); }

  struct nl_sock *handle;
};

/**
 * Read all links from netlink socket.
 */
struct LinkCache {
  explicit LinkCache(Socket *socket) : handle{nullptr} {
    if (int err = rtnl_link_alloc_cache(socket->handle, AF_UNSPEC, &handle);
        err < 0) {
      throw std::runtime_error{"Unable to allocate link cache: " +
                               std::string{nl_geterror(err)}};
    }
  }

  LinkCache(const LinkCache &) = delete;
  LinkCache &operator=(const LinkCache &) = delete;
  LinkCache(LinkCache &&) = delete;
  LinkCache &operator=(LinkCache &&) = delete;

  ~LinkCache() { nl_cache_free(handle); }

  struct nl_cache *handle;
};

/**
 * Link (such as "eth0" or "wlan0").
 */
struct Link {
  Link(LinkCache *link_cache, const std::string &iface)
      : handle{rtnl_link_get_by_name(link_cache->handle, iface.c_str())} {

    if (handle == nullptr) {
      throw std::runtime_error{"Link does not exist:" + iface};
    }
  }

  Link(const Link &) = delete;
  Link &operator=(const Link &) = delete;
  Link(Link &&) = delete;
  Link &operator=(Link &&) = delete;

  ~Link() { rtnl_link_put(handle); }

  struct rtnl_link *handle;
};

/**
 * Queuing discipline.
 */
struct QDisc {
  QDisc(const std::string &iface, const std::string &kind)
      : handle{rtnl_qdisc_alloc()} {
    if (handle == nullptr) {
      throw std::runtime_error{"Failed to allocate qdisc!"};
    }

    struct rtnl_tc *tc = TC_CAST(handle);

    // Set link
    LinkCache link_cache{&socket};
    Link link{&link_cache, iface};
    rtnl_tc_set_link(tc, link.handle);

    // Set parent qdisc
    uint32_t parent = 0;

    if (int err = rtnl_tc_str2handle("root", &parent); err < 0) {
      throw std::runtime_error{"Unable to parse handle: " +
                               std::string{nl_geterror(err)}};
    }

    rtnl_tc_set_parent(tc, parent);

    // Set kind (e.g. "plug")
    if (int err = rtnl_tc_set_kind(tc, kind.c_str()); err < 0) {
      throw std::runtime_error{"Unable to set kind: " +
                               std::string{nl_geterror(err)}};
    }
  }

  QDisc(const QDisc &) = delete;
  QDisc &operator=(const QDisc &) = delete;
  QDisc(QDisc &&) = delete;
  QDisc &operator=(QDisc &&) = delete;

  ~QDisc() {
    if (int err = rtnl_qdisc_delete(socket.handle, handle); err < 0) {
      std::cerr << "Unable to delete qdisc: " << nl_geterror(err) << std::endl;
    }

    rtnl_qdisc_put(handle);
  }

  void send_msg() {
    int flags = NLM_F_CREATE;

    if (int err = rtnl_qdisc_add(socket.handle, handle, flags); err < 0) {
      throw std::runtime_error{"Unable to add qdisc: " +
                               std::string{nl_geterror(err)}};
    }
  }

  Socket socket;
  struct rtnl_qdisc *handle;
};

/**
 * Queuing discipline for plugging traffic.
 */
class Plug {
public:
  Plug(const std::string &iface, uint32_t limit, std::string msg)
      : qdisc_{iface, "plug"} {

    rtnl_qdisc_plug_set_limit(qdisc_.handle, limit);
    qdisc_.send_msg();

    // set_enabled(enabled_);
    set_msg(msg);
  }

  // void set_enabled(bool enabled) {
  //   if (enabled) {
  //     rtnl_qdisc_plug_buffer(qdisc_.handle);
  //   } else {
  //     rtnl_qdisc_plug_release_one(qdisc_.handle);
  //   }

  //   qdisc_.send_msg();
  //   enabled_ = enabled;
  // }

  void set_msg(std::string msg) {
    if (msg == "buffer") {
      int ret = rtnl_qdisc_plug_buffer(qdisc_.handle);
      //std::cout<<strerror(ret);
    } else if(msg == "commit") {
      int ret = rtnl_qdisc_plug_release_one(qdisc_.handle);
      //std::cout<<strerror(ret);
    } else {
      int ret = rtnl_qdisc_plug_release_indefinite(qdisc_.handle);
      //std::cout<<strerror(ret);   
    }

    qdisc_.send_msg();
  }  

  // bool is_enabled() const { return enabled_; }

private:
  QDisc qdisc_;

  // bool enabled_;
};

std::atomic<bool> quit{false};

void exit_handler(int /*signal*/) { quit = true; }

// this function busy wait on job queue until there's something 
//and calls release operation i.e. unplug qdisc to release output packets 
//generated for a particular epoch
void transmit_ckpnt(std::queue<int> &job_queue, Plug &plug){

  while(true){

      while(!job_queue.empty()){

        int id = job_queue.front();
        job_queue.pop();
        std::string s = std::to_string(id);

        std::cout<<"called from parallel thread "<<s<<"\n"; 

        //release buffer
        plug.set_msg("commit");  
      }
  }

}

int main() {
  std::string iface{"veth-host"};
  constexpr uint32_t buffer_size = 10485760;
  // bool enabled = true;

  Plug plug{iface, buffer_size, "buffer"};

  /**
   * Set custom exit handler to ensure destructor runs to delete qdisc.
   */
  struct sigaction sa {};
  sa.sa_handler = exit_handler;
  sigfillset(&sa.sa_mask);
  sigaction(SIGINT, &sa, nullptr);

  pid_t wpid;
  int status = 0;
  std::queue<int> job_queue;
  int ckpnt_no  = 1;

  std::thread td(transmit_ckpnt, std::ref(job_queue), std::ref(plug));
  plug.set_msg("indefinite");

  while(true){
    //plug the buffer at start of the epoch
    plug.set_msg("buffer");

    //wait for completion of epoch
    sleep(4);   
    
    job_queue.push(ckpnt_no);
    ckpnt_no += 1;  
  }

  plug.set_msg("indefinite");
  td.join();
  
  // while (!quit) {
  //   std::cout << "Plug set to " << plug.is_enabled() << std::endl;
  //   std::cout << "Press <Enter> to continue.";
  //   std::cin.get();

  //   plug.set_enabled(!plug.is_enabled());
  // }

  return EXIT_SUCCESS;
}
代码演示:该程序创建了一种插拔式的队列规则(qdiscs),在插入操作期间,网络数据包被缓冲,在拔出操作期间,网络数据包从队列规则qdisc中的第一个插头(即队列规则qdisc前面)释放到qdisc中的第二个插头。如果插入和拔出操作是交替进行的,则上述程序可以正常工作。但我想按照原来设计的方式使用它,即如此链接所述。
     TCQ_PLUG_BUFFER (epoch i)
         TCQ_PLUG_BUFFER (epoch i+1) 
             TCQ_PLUG_RELEASE_ONE (for epoch i)
                 TCQ_PLUG_BUFFER (epoch i+2)
                     ..............................so on

在我的程序中,主线程在每个时期的开头开始缓冲,并继续执行。作业线程从作业队列中取出作业 ID,并释放队列头部的缓冲数据到下一个插口。但是这会导致以下错误:
./a.out: /lib/x86_64-linux-gnu/libnl-3.so.200: no version information available (required by ./a.out)
./a.out /usr/lib/x86_64-linux-gnu/libnl-route-3.so.200: no version information available (required by ./a.out)
called from parallel thread 1
called from parallel thread 2
called from parallel thread 3
called from parallel thread 4
called from parallel thread 5
called from parallel thread 6
called from parallel thread 7
terminate called after throwing an instance of 'std::runtime_error'
 what(): Unable to add qdisc: Message sequence number mismatch
Aborted

不理解这是什么以及为什么会出现错误,在主线程中按顺序执行发布操作时可以正常工作,但是现在有另一个线程执行发布操作,该线程只检查 job_queue 是否为空并执行释放操作,直到 job_queue 中有内容为止,并在 job_queue 为空时忙等待。

1个回答

4
预期的序列计数器由libnl存储为nl_sock结构的一部分(reference)。当多个线程调用libnl函数时,可能会导致不一致,例如数据竞争(两个线程同时写入序列计数器)或竞态条件(检查-使用时间问题,在此期间一个线程检查计数器是否满足某些条件,然后执行某些操作,但在此期间另一个线程修改了计数器)。有关数据竞争和竞态条件的更多详细信息,请参见here
注:无论是 g++ 还是 clang ++ 都支持 -fsanitize = thread 标志,该标志会自动将附加的调试代码插入到二进制文件中,以帮助检测此类数据竞争(reference)。虽然在这种情况下,它可能没有那么有用,因为您还必须使用此标志编译libnl,这可能不容易。

来自libnl文档(reference):

The next step is to check the sequence number of the message against
the currently expected sequence number. The application may provide
its own sequence number checking algorithm by setting the callback
function NL_CB_SEQ_CHECK to its own implementation. In fact, calling
nl_socket_disable_seq_check() to disable sequence number checking will
do nothing more than set the NL_CB_SEQ_CHECK hook to a function which
always returns NL_OK.

这给我们留下了以下选择:
  1. 使用互斥锁来保护所有访问可能修改序列计数器的libnl函数。

  2. 使用 nl_socket_disable_seq_check 禁用序列计数器检查。

从我的角度来看,1) 是更健壮的解决方案。如果您更关注性能而不是健壮性,则可以选择 2)。

选项 1: 使用互斥锁来保护对 libnl 函数的访问

从标准库中包含互斥头文件:

#include <mutex>

Plug类中,添加一个std::mutex作为成员:
class Plug {
...
private:
  std::mutex seq_counter_mutex_;
...
};

set_msg函数的开头,使用std::lock_guard获取互斥锁,以确保在函数执行期间只有一个线程能够进入该函数:
  void set_msg(std::string msg) {
    std::lock_guard guard{seq_counter_mutex_};
    ...
  }

选项2:禁用序列号检查

Socket类中,您可以在构造函数的末尾使用以下代码来禁用序列号计数器检查:

  nl_socket_disable_seq_check(handle);

1
现在发生的情况是主线程频繁地推送作业 ID,但 transmit_ckpnt 函数无法获取作业队列的访问权限,并停止了很长时间。你有什么想法,如何实现对两个线程公平地访问作业队列? - y_159
嗨,我很忙,很久没有看到这个答案了。我尝试了第二个选项,现在它可以工作了。 - ram
1
很高兴那个运行成功了。关于 std::mutex 不公平的问题(即一个线程能够比另一个线程更频繁地获取互斥锁),我没有遇到过这个问题。我找到了这个回答,它解释了如何实现公平的互斥锁,但我还没有尝试过:https://dev59.com/x3TYa4cB1Zd3GeqPsDQB#17528648 - f9c69e9781fa194211448473495534
请在空闲时间查看此问题:https://unix.stackexchange.com/questions/626528/connection-reset-by-peer-errror-when-delaying-network-response-packets,我已经尝试了很长时间但无法解决。只有网络输出数据包可以通过“qdiscs”路由,对吗? - y_159

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接