与多个Python脚本共享字典

13

我想要一个唯一的dict (键/值) 数据库,可以从多个同时运行的Python脚本中访问。

如果script1.py更新了d[2839],那么在查询d[2839]几秒钟后,script2.py应该看到修改后的值

  • 我考虑使用SQLite,但似乎并发从多个进程进行写入/读取不是SQLite的优势(假设script1.py刚刚修改了d[2839],那么script2.py的SQLite连接怎么知道它必须重新加载数据库的这个特定部分?

  • 我也想过在刷新修改时对文件进行锁定(但这样做很棘手),然后使用json.dump进行序列化,然后尝试检测修改,如果有任何修改则使用json.load重新加载等等......哦不,我正在重新发明轮子,而且是一个特别低效的键/值数据库!

  • Redis看起来像一个解决方案,但它不正式支持Windows,同样适用于leveldb

  • 多个脚本可能同时想要写入(即使这是一个非常罕见的事件),是否有一种方法让数据库系统处理这个问题(通过锁定参数?似乎默认情况下SQLite无法做到这一点,因为"SQLite支持无限数量的同时读取器,但它只允许在任何时刻只有一个写入者。"

  • 那么这个问题的Pythonic解决方案是什么呢?

    注意:我使用的是Windows,并且字典应该最多有1M个项目(键和值都是整数)。


    3
    不错的问题。我也期待一个合适的解决方案。同时,你可以看一下multiprocessing Manager(仅适用于能够从单个入口点启动两个脚本的情况)。另外,如果万不得已,你还可以在Docker中运行redis。 - 9dogs
    @9dogs 是的,这个问题的解决方案在许多情况下都非常有用 :) 注意:我不是从同一个入口点开始所有脚本/不使用multiprocessing。我正在寻找比在Docker中运行redis更轻量级的解决方案,但我会记住这个选项作为最后的手段! - Basj
    @Basj。请参考:sqlite3并发访问。这应该能够满足您的需求。 - ekhumoro
    @ekhumoro 有趣的话题!我在那里开始了一个赏金(https://dev59.com/j2855IYBdhLWcg3w-JMr),因为在我看来,答案并不完全准确(它们多多少少地说“没问题”,而没有谈到真正的问题情况:同时进行两个写操作等)。 - Basj
    Redis“不受官方支持”,但MS OpenTech端口运行良好。这可能是您所需的最简单和最佳工具。 - joshua
    显示剩余9条评论
    8个回答

    11
    大多数嵌入式数据库除了SQLite之外,都没有针对并发访问进行优化。我也对SQLite的并发性能很感兴趣,所以我进行了基准测试:
    import time
    import sqlite3
    import os
    import random
    import sys
    import multiprocessing
    
    
    class Store():
    
        def __init__(self, filename='kv.db'):
            self.conn = sqlite3.connect(filename, timeout=60)
            self.conn.execute('pragma journal_mode=wal')
            self.conn.execute('create table if not exists "kv" (key integer primary key, value integer) without rowid')
            self.conn.commit()
    
        def get(self, key):
            item = self.conn.execute('select value from "kv" where key=?', (key,))
            if item:
                return next(item)[0]
    
        def set(self, key, value):
            self.conn.execute('replace into "kv" (key, value) values (?,?)', (key, value))
            self.conn.commit()
    
    
    def worker(n):
        d = [random.randint(0, 1<<31) for _ in range(n)]
        s = Store()
        for i in d:
            s.set(i, i)
        random.shuffle(d)
        for i in d:
            s.get(i)
    
    
    def test(c):
        n = 5000
        start = time.time()
        ps = []
        for _ in range(c):
            p = multiprocessing.Process(target=worker, args=(n,))
            p.start()
            ps.append(p)
        while any(p.is_alive() for p in ps):
            time.sleep(0.01)
        cost = time.time() - start
        print(f'{c:<10d}\t{cost:<7.2f}\t{n/cost:<20.2f}\t{n*c/cost:<14.2f}')
    
    
    def main():
        print(f'concurrency\ttime(s)\tpre process TPS(r/s)\ttotal TPS(r/s)')
        for c in range(1, 9):
            test(c)
    
    
    if __name__ == '__main__':
        main()
    

    以下是在我的四核MacOS电脑上,SSD硬盘分区上的结果:

    concurrency time(s) pre process TPS(r/s)    total TPS(r/s)
    1           0.65    7638.43                 7638.43
    2           1.30    3854.69                 7709.38
    3           1.83    2729.32                 8187.97
    4           2.43    2055.25                 8221.01
    5           3.07    1629.35                 8146.74
    6           3.87    1290.63                 7743.78
    7           4.80    1041.73                 7292.13
    8           5.37    931.27                  7450.15
    

    在一个8核的Windows Server 2012云服务器上,SSD存储卷的性能测试结果如下:

    concurrency     time(s) pre process TPS(r/s)    total TPS(r/s)
    1               4.12    1212.14                 1212.14
    2               7.87    634.93                  1269.87
    3               14.06   355.56                  1066.69
    4               15.84   315.59                  1262.35
    5               20.19   247.68                  1238.41
    6               24.52   203.96                  1223.73
    7               29.94   167.02                  1169.12
    8               34.98   142.92                  1143.39
    

    事实证明,无论并发性如何,总吞吐量都是一致的,而且在Windows上,SQLite比macOS慢,希望这对你有所帮助。


    由于SQLite写锁是针对整个数据库的,为了获得更多TPS,您可以将数据分区到多个数据库文件中:

    class MultiDBStore():
    
        def __init__(self, buckets=5):
            self.buckets = buckets
            self.conns = []
            for n in range(buckets):
                conn = sqlite3.connect(f'kv_{n}.db', timeout=60)
                conn.execute('pragma journal_mode=wal')
                conn.execute('create table if not exists "kv" (key integer primary key, value integer) without rowid')
                conn.commit()
                self.conns.append(conn)
    
        def _get_conn(self, key):
            assert isinstance(key, int)
            return self.conns[key % self.buckets]
    
        def get(self, key):
            item = self._get_conn(key).execute('select value from "kv" where key=?', (key,))
            if item:
                return next(item)[0]
    
        def set(self, key, value):
            conn = self._get_conn(key)
            conn.execute('replace into "kv" (key, value) values (?,?)', (key, value))
            conn.commit()
    

    我的Mac电脑上有20个分区,测试结果如下:

    concurrency time(s) pre process TPS(r/s)    total TPS(r/s)
    1           2.07    4837.17                 4837.17
    2           2.51    3980.58                 7961.17
    3           3.28    3047.68                 9143.03
    4           4.02    2486.76                 9947.04
    5           4.44    2249.94                 11249.71
    6           4.76    2101.26                 12607.58
    7           5.25    1903.69                 13325.82
    8           5.71    1752.46                 14019.70
    

    总TPS比单个数据库文件高。


    我们可以想象一下,如果我们在极少数情况下出现了恰好有两个写操作同时发生的情况,那么第二个操作必须等待第一个操作完成(使用锁系统)。如何在 sqlite3 中实现这一点? - Basj
    让我们在聊天中继续这个讨论 - georgexsh
    1
    @Basj ACID兼容是SQLite的设计目标,这不应该是你需要担心的事情...无论如何,你可以在 wal.c 中查看 sqlite3WalBeginWriteTransaction(),写入器将在事务开始时等待WAL_WRITE_LOCK,我猜这就是你想要了解的“锁定”部分,但完整的机制比这更复杂。 - georgexsh
    2
    @Basj - “一次只允许一个写入者”的原因是_因为sqlite会为您处理并发问题_。如果一个进程正在写入,另一个进程尝试写入不会失败,而是会被阻塞。您无需为并发写入做任何事情。这份文档描述了内部机制; 这份关于Python库的文档应该可以让您放心。 - Nathan Vērzemnieks
    1
    @Basj 是的,SQLite 会在内部为您完成这项工作。 - georgexsh
    显示剩余11条评论

    4

    标题让我对作者的意图有所提示,“与其他Python脚本共享字典”,这不需要数据存储库即可完成。虽然数据库被多次提到,但存储数据的行为可能只是一个旁枝末节,并非必需。 - Back2Basics

    2
    您可以使用Python字典来实现此目的。
    创建一个名为G的通用类或脚本,在其中初始化一个字典。G将运行script1.py和script2.py,并将字典传递给两个脚本文件,Python字典默认按引用传递。这样,单个字典将用于存储数据,两个脚本都可以修改字典值,更改在两个脚本中都可以看到。我希望script1.py和script2.py是基于类的。这不能保证数据的持久性。为了持久性,您可以在x个时间间隔后将数据存储在数据库中。
    示例:
    script1.py
    class SCRIPT1:
    
        def __init__(self, dictionary):
            self.dictionary = dictionary
            self.dictionary.update({"a":"a"})
            print("SCRIPT1 : ", self.dictionary)
    
        def update(self):
            self.dictionary.update({"c":"c"})          
    

    script2.py

    class SCRIPT2:
        def __init__(self, dictionary):
            self.dictionary = dictionary
            self.dictionary.update({"b":"b"})
            print("SCRIPT 2 : " , self.dictionary)
    

    main_script.py

    import script1
    import script2
    
    x = {}
    
    obj1 = script1.SCRIPT1(x) # output: SCRIPT1 :  {'a': 'a'}
    obj2 = script2.SCRIPT2(x) # output: SCRIPT 2 :  {'a': 'a', 'b': 'b'}
    obj1.update()
    print("SCRIPT 1 dict: ", obj1.dictionary) # output: SCRIPT 1 dict:  {'c': 'c', 'a': 'a', 'b': 'b'}
    
    print("SCRIPT 2 dict: ", obj2.dictionary) # output: SCRIPT 2 dict:  {'c': 'c', 'a': 'a', 'b': 'b'}
    

    在您运行脚本的目录中创建一个空的 _ init _.py 文件。
    另一种选择是:

    Redis


    谢谢您的回答,请提供一个代码示例以使其更易理解。 - Basj
    1
    OP所指的“同时运行多个Python脚本”是指“使用多个解释器实例”。 - Darkonaut
    @Darkonaut,我不理解你的问题,请详细说明一下。 - Irtiza
    这不是一个问题,但你的回答并没有回答 OP 所问的。你的代码中所有内容都在一个解释器实例(一个进程)上按顺序运行。只是从其他两个脚本中导入了代码,而不是运行其他进程,更不会在进程之间共享任何东西。 - Darkonaut

    2
    我会考虑两个选项,都是嵌入式数据库。

    SQlite

    这里这里所述,使用它应该没问题。

    BerkeleyDB

    链接

    Berkeley DB (BDB) 是一个旨在为键/值数据提供高性能嵌入式数据库的软件库。

    它正是为您的目的而设计的。

    BDB 可以支持数千个同时控制线程或并发处理大达 256TB 的数据库,3 在包括大多数类 Unix 和 Windows 系统以及实时操作系统在内的各种操作系统上。

    它很强大,并且已经存在了数十年。
    在同一台机器上的两个脚本之间交换数据的任务中,启动需要 sysops 参与的完整基于套接字的服务器(如 redis/memcached/其他任何东西)在我的看法中是一个额外的负担。

    0
    你可以使用基于文档的数据库管理器。也许对于你的系统来说有些过重,但并发访问通常是数据库管理系统和连接到它们的API存在的原因之一。
    我已经使用Python的MongoDB,效果很好。Python API文档非常好,并且每个文档(数据库元素)都可以作为字典加载到Python中。

    0

    我会使用一个发布/订阅的websocket框架,比如Autobahn/Python,用一个脚本作为“服务器”,处理所有文件通信,但这取决于规模,也许这样做有些过度。


    0

    0

    听起来你真正需要的是某种数据库。

    如果 Redis 在 Windows 上行不通,那么我会看看 MongoDB。

    https://docs.mongodb.com/manual/tutorial/install-mongodb-on-windows/

    MongoDB和Python非常搭配,可以类似于Redis一样运行。以下是PyMongo的安装文档链接: http://api.mongodb.com/python/current/installation.html?_ga=2.78008212.1422709185.1517530606-587126476.1517530605

    此外,许多人提到了SQlite。我认为你担心它只允许一个写入者,但这对你来说并不是真正的问题。我认为它所说的是,如果有两个写入者,第二个将被阻塞,直到第一个完成。这对你的情况可能没问题。


    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接