Redis发布/订阅添加额外的频道在订阅过程中。

4

是否有可能在Redis连接中添加附加订阅? 我有一个监听线程,但似乎不受新的SUBSCRIBE命令的影响。

如果这是预期的行为,那么如果用户添加股票行情源或加入聊天室,应该使用什么模式?

我想要实现一个类似于Python类的东西:

import threading
import redis

class RedisPubSub(object):
    def __init__(self):
        self._redis_pub = redis.Redis(host='localhost', port=6379, db=0)        
        self._redis_sub = redis.Redis(host='localhost', port=6379, db=0)        
        self._sub_thread = threading.Thread(target=self._listen)
        self._sub_thread.setDaemon(True)
        self._sub_thread.start()

    def publish(self, channel, message):
        self._redis_pub.publish(channel, message)

    def subscribe(self, channel):
        self._redis_sub.subscribe(channel)

    def _listen(self):
        for message in self._redis_sub.listen():
            print message
2个回答

6
python-redisConnectionPool 类继承自 threading.local,这就产生了你看到的“神奇”效果。 总结:主线程和工作线程的 self._redis_sub 客户端最终使用了两个不同的连接到服务器,但只有主线程的连接发出了 SUBSCRIBE 命令。 详细信息:由于主线程创建了 self._redis_sub,因此该客户端被放置在主线程的本地存储中。接下来,我假设主线程执行了 client.subscribe(channel) 调用。现在,主线程的客户端已经订阅了连接 1。接下来,您启动了 self._sub_thread 工作线程,它最终拥有自己的 self._redis_sub 属性,该属性设置为 redis.Client 的新实例,该实例构建了一个新的连接池并建立了一个新的与 redis 服务器的连接。
这个新连接还没有订阅您的频道,因此 listen() 立即返回。因此,对于 python-redis,您不能在线程之间传递已建立订阅(或任何其他状态命令)的连接。
根据您计划实现应用程序的方式,您可能需要切换到使用不同的客户端,或想出其他方法向工作线程通信订阅状态,例如通过队列发送订阅命令。
另一个问题是 python-redis 使用阻塞套接字,这会阻止您的监听线程在等待消息时执行其他工作,并且除非立即在收到消息后进行退订,否则它无法发出退订信号。

我认为最后一个问题是致命的。我需要让监听线程能够随意添加和删除订阅。 - Tristan

1

异步方式:

Twisted框架 和插件 txredisapi

示例代码(订阅:

import txredisapi as redis

from twisted.application import internet
from twisted.application import service


class myProtocol(redis.SubscriberProtocol):
    def connectionMade(self):
        print "waiting for messages..."
        print "use the redis client to send messages:"
        print "$ redis-cli publish chat test"
        print "$ redis-cli publish foo.bar hello world"
        self.subscribe("chat")
        self.psubscribe("foo.*")


        reactor.callLater(10, self.unsubscribe, "chat")
        reactor.callLater(15, self.punsubscribe, "foo.*")

        # self.continueTrying = False
        # self.transport.loseConnection()

    def messageReceived(self, pattern, channel, message):
        print "pattern=%s, channel=%s message=%s" % (pattern, channel, message)

    def connectionLost(self, reason):
        print "lost connection:", reason


class myFactory(redis.SubscriberFactory):
    # SubscriberFactory is a wapper for the ReconnectingClientFactory
    maxDelay = 120
    continueTrying = True
    protocol = myProtocol


application = service.Application("subscriber")
srv = internet.TCPClient("127.0.0.1", 6379, myFactory())
srv.setServiceParent(application)

只有一个线程,没有头疼的问题 :)

当然,这取决于您编写的应用程序类型。在网络编程方面,请使用twisted。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接