Python计数器原子增量

64

我该如何将以下Java代码翻译为Python?

AtomicInteger cont = new AtomicInteger(0);

int value = cont.getAndIncrement();

2
你的Python应用程序中实际上使用了线程吗? - Martijn Pieters
2
是的。我有一个包含100个线程的线程池,我需要在每个线程中递增一个变量。 - user2983041
6个回答

52

在使用该值时,最好使用threading.Lock进行保护。在Python中没有原子修改,除非你使用pypy(如果你使用,可以看一下stm版本中的__pypy__.thread.atomic)。


2
我使用锁的acquire()和release()方法,它可以工作,但我认为这不如Java中的原子类高效。 - user2983041
22
使用 with your_lock: variable += 1 可能更简洁。 - viraptor
4
@viraptor,在锁仍然被持有的时候,不要忘记从variable中取出value - Michael Mol
1
FYI:挂起和恢复线程是昂贵的,因此在Java中实现了原子操作,它们执行重复的比较和交换操作而不是锁定:https://www.baeldung.com/java-atomic-variables - lyjackal

33
itertools.count 返回一个迭代器,每次迭代时相当于执行 getAndIncrement()
示例:
import itertools
cont = itertools.count()
value = next(cont)

11
乍一看这似乎不是线程安全的,对吗? - Collin
33
暂且不谈我对使用依赖GIL进行同步的定时炸弹的犹豫,你为什么说这明确是一个单一操作?虽然它是一个单一的函数调用,但它是否是一个单一的字节码操作呢? - Collin
23
可以,这是一个链接,指向CPython源代码中itertoolsmodule.c文件的第3999行。 - Will Manley
18
@JianggeZhang,我仍然同意Colin的犹豫。在许多情况下,依赖于"contingency"(偶然性)来进行同步是不明智的:也就是说,它目前实现的隐含细节超出了该方法的"合同"和意图。这等价于依赖副作用。对于某些情况,这可能还可以,但在一般情况下,我认为它仍然有风险。 - Turix
26
@WillManley 它依赖于CPython(和PyPy)的一个实现细节,依赖于实现细节并不是一个好的做法。此外,在Jython上使用它也不安全,详见:http://www.jython.org/jythonbook/en/1.0/Concurrency.html#no-global-interpreter-lock 和:https://bitbucket.org/jython/jython/src/91083509a11cdeadc9407b4d9a1ece2b8ffc45ce/src/org/python/modules/itertools/count.java?at=default&fileviewer=file-view-default#count.java-109 请更新帖子。 - Stan Prokop
显示剩余5条评论

21

这将执行相同的功能,尽管其不是像“AtomicInteger”的名称所暗示的那样无锁。

请注意,其他方法也不严格是无锁的--它们依赖于GIL并且在Python解释器之间不可移植。

class AtomicInteger():
    def __init__(self, value=0):
        self._value = int(value)
        self._lock = threading.Lock()
        
    def inc(self, d=1):
        with self._lock:
            self._value += int(d)
            return self._value

    def dec(self, d=1):
        return self.inc(-d)    

    @property
    def value(self):
        with self._lock:
            return self._value

    @value.setter
    def value(self, v):
        with self._lock:
            self._value = int(v)
            return self._value

为什么要锁定value()...难道检索本身不是原子操作吗? - Tomer W
其实,我不确定你能这样假设。self.x是原子的吗?它肯定不总是原子的(因为__getattr____getattribute__可能在子类中被实现)。(另外,我添加了int(x)来保护输入不是不可变的、线程安全对象的情况)。 - user48956
@yahya - 你有对它进行性能分析吗? - user48956
@Yahya 你认为它在功能上是原子的,即使在硬件层面上不是原子的吗? - user48956
1
经过另一次测试,我发现它可以工作了。只是我所使用的平台太慢了。+1! - Yahya
显示剩余2条评论

17

使用atomics库,相同的代码可以用Python编写:

import atomics


a = atomics.atomic(width=4, atype=atomics.INT)

value = a.fetch_inc()

这种方法严格保证无锁。

注意:我是该库的作者。


1
看起来你在推荐自己的库。请注意,在进行自我推广时,"你必须在回答中披露你的关联" - MisterMiyagi
2
没关系,只是想让你知道。顺便说一句,做得很好。 - MisterMiyagi
嗨,我只是一个路过的人,在这里幸运地找到了你的答案。很棒它是几个小时前发布的。所以,让我问你:width=4是什么?通过阅读文档,我并不清楚。此外,为什么我要关心宽度呢?文档还应该包括一个GitHub链接以供贡献,并提供像这个答案中新手可以理解的简单示例,而不是直接跳到像测试多个线程的正确性或内存映射访问之类的复杂内容。 - Victor Stafusa - BozoNaCadeia
1
无论如何,恭喜您提供这样的库。 - Victor Stafusa - BozoNaCadeia
废弃的库。不推荐使用。 - undefined
显示剩余5条评论

2

已经 8 年了,仍然没有不使用任何外部库的 threading.Lock 选项的完整示例代码... 这里来了:

import threading

i = 0
lock = threading.Lock()

# Worker thread for increasing count
class CounterThread(threading.Thread):
    def __init__(self):
        super(CounterThread, self).__init__()
        
    def run(self):
        lock.acquire()
        global i
        i = i + 1
        lock.release()


threads = []
for a in range(0, 10000):
    th = CounterThread()
    th.start()
    threads.append(th)

for thread in threads:
    thread.join()

global i
print(i)

1
你错过了这个答案 https://dev59.com/rGAg5IYBdhLWcg3wfa56#48433648 - Roman-Stop RU aggression in UA
我仍然相信我的选择对于回答有价值,因为它更明确,锁定被获取和释放,并且有一个完整的示例和证明这种方法实际上是有效的。 - undefined
从我个人来看,复制粘贴你的代码对我来说,实际上相当于使用一个“外部库”。我无法判断你的代码是否比原子操作更好、更安全或更健壮。尽管如此,建议使用开源的、已发布并受欢迎的pypi库,而不是来自陌生人的代码。只是说一下而已 ;) - undefined
使用适合你的用例和编程技能的工具吧 :) - undefined

0

Python原子操作用于共享数据类型。

https://sharedatomic.top

该模块可用于多进程和多线程条件下的原子操作。高性能Python!高并发,高性能!

使用多进程和多线程的原子API示例:

您需要以下步骤来利用该模块:

  1. 创建函数,供子进程使用,参考UIntAPIs、IntAPIs、BytearrayAPIs、StringAPIs、SetAPIs、ListAPIs,在每个进程中,您可以创建多个线程。

     def process_run(a):
       def subthread_run(a):
         a.array_sub_and_fetch(b'\0x0F')
    
       threadlist = []
       for t in range(5000):
           threadlist.append(Thread(target=subthread_run, args=(a,)))
    
       for t in range(5000):
           threadlist[t].start()
    
       for t in range(5000):
           threadlist[t].join()
    
  2. 创建共享的bytearray

    a = atomic_bytearray(b'ab', length=7, paddingdirection='r', paddingbytes=b'012', mode='m')
    
  3. 启动进程/线程以利用共享的bytearray

     processlist = []
    
     for p in range(2):
    
       processlist.append(Process(target=process_run, args=(a,)))
    
     for p in range(2):
    
       processlist[p].start()
    
     for p in range(2):
    
       processlist[p].join()
    
     assert a.value == int.to_bytes(27411031864108609, length=8, byteorder='big')
    

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接