Redis作为消息代理

17

问题

我想以发布-订阅方式在应用程序之间传递数据。数据可能会以比消费速率高得多的速度产生,并且消息会丢失,这不是问题。想象一下快速传感器和慢速传感器数据处理器。为此,我使用Redis pub/sub并编写了一个充当订阅者的类,接收每条消息并将其放入缓冲区中。当新消息进来时,缓冲区被覆盖或在“真实”函数请求消息时被清空。因此,在向该类提出问题时,我立即获得响应(表明我的函数比数据速度慢)或必须等待(表明我的函数比数据快)。

这对于数据快速到达的情况效果非常好。但是对于相对很少的数据,比如每五秒钟一次,这就行不通:想象一下我的消费者稍微晚于生产者启动,第一条消息丢失并且我的消费者需要等待近五秒钟才能开始工作。

我认为我必须使用Redis工具来解决这个问题。与其使用pub/sub,我可以直接使用get/set方法,从而将缓存功能直接放入Redis中。但是,我的消费者现在必须轮询数据库而不是我现在拥有的事件魔术。键可以看起来像“key:timestamp”,我的消费者现在必须get key:*并持续比较时间戳,我认为这会导致很大的负载。没有自然的休眠可能性,因为虽然我不关心丢失的消息(我无能为力),但我确实关心延迟。

是否有人使用Redis进行类似的事情,并能够给我有关聪明使用Redis工具和数据结构的提示?

编辑

理想情况下,我的程序流程如下:

  • 启动程序
  • 从Redis检索key
  • 告诉Redis,“嘿,在key的更改上通知我”。
  • 启动带有新消息回调的异步功能。
  • 写这篇文章时,我想到了一个主意:发布者不仅可以在主题key上发布message,还可以set key message。这样,应用程序可以先get,然后再subscribe

    这个主意好不好呢?

    在我得到下面那个答案(被接受的答案)之后我做了什么

    这里确实需要使用键空间通知。Redis作为信息的主要来源,我的客户端订阅键空间通知,通知订阅者有关影响特定键的事件。现在,在客户机的异步部分中,我订阅有关我的感兴趣的键的通知。这些通知设置一个key_has_updates标志。当我需要该值时,我从Redis中获取它并取消设置标志。没有设置标志,我知道服务器上没有该键的新值。如果没有键空间通知,这将是我需要轮询服务器的部分。优点是我可以使用各种数据结构,而不仅仅是pub/sub机制,而且缓慢加入者总是能够get到初始值,这在pub/sub中将会丢失。

    当我需要该值时,我从Redis中获取该值并将标志设置为false。

    3个回答

    9
    一种方法是将数据推送到列表(LPUSH),然后修剪它(LTRIM),因此如果没有消费者,它不会无限增长。在另一端,消费者将从该列表中获取项目并处理它们。您还可以使用键空间通知,每次向该队列添加一个项目时都会收到警报。

    我理解列表的优点,但是假设我拥有一个非常快速的处理器,它仍然必须轮询......对于一个慢处理器,你的建议具有优势,因为他总是获得最新的项目(与发布相反,其中需要自己实现所需的行为),但是除了在列表中保留最近的历史记录之外,我认为这与“获取”相比没有太大的优势... - wal-o-mat
    好的,我已经更新了你的答案,我需要检查一下链接的文档...请稍等 :) - wal-o-mat
    键空间通知似乎是我需要的工具:使用“正常”的get/set操作来帮助“慢加入者”,但也允许通过sub访问操作。 - wal-o-mat
    好的,如果您需要更多信息,我会继续在这里检查,但键空间通知和其他一些东西(列表、字符串)的混合似乎是您所需要的。 - soveran

    3

    从Redis 5开始,新增了一种名为"Streams"的数据类型,它是一种追加式数据结构。Redis Streams可用作可靠的消息队列,并使用消费者组概念进行点对点和多播通信Redis_Streams_MQ


    3

    我使用两个本地redis命令rpushblpop在应用程序之间传递数据。

    blpop会在给定列表中没有元素可以弹出时,阻塞连接。

    • 数据以json格式传递,在应用程序之间使用列表作为队列。
    • 要发送数据的应用程序(充当发布者)在列表上进行rpush操作。
    • 要接收数据的应用程序(充当订阅者)在同一列表上进行blpop操作。

    代码(perl语言)如下:


    发送方(我们假设传递的是哈希值)

    #Encode hash in json format
    my $json_text = encode_json \%$hash_ref;
    
    #Connect to redis and send to list
    my $r = Redis->new(server => "127.0.0.1:6379");
    $r->rpush("shared_queue","$json_text");
    $r->quit;
    

    接收器(进入一个无限循环)

    while (1) {
        my $r = Redis->new(server => "127.0.0.1:6379");
        my @elem =$r->blpop("shared_queue",0);
    
        #Decode hash element
        my $hash_ref=decode_json($elem\[1]);
    
        #make some stuff
    }
    

    我发现这种方法有很多用处:
    • 元素被存储到列表中,因此暂时禁用接收器不会丢失信息。当接收器重新启动时,可以处理列表中的所有项目。
    • 高速率的发送者可以通过多个接收者实例处理。
    • 多个发送者可以在唯一列表上发送数据。在这种情况下,应该很容易实现数据收集器。
    • 作为守护进程的接收器进程可以使用特定工具(例如 pm2)进行监控。

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接