Python如何在一个进程返回结果时停止多个进程?

15

我正试图用Python编写一个简单的工作量证明(Proof-of-Work)挖矿程序。

def proof_of_work(b, nBytes):
    nonce = 0
    # while the first nBytes of hash(b + nonce) are not 0
    while sha256(b + uint2bytes(nonce))[:nBytes] != bytes(nBytes):
        nonce = nonce + 1
    return nonce

现在我正在尝试进行多进程处理,以便可以利用所有的CPU核心并更快地找到nonce。我的想法是使用multiprocessing.Pool并多次执行函数proof_of_work,传递两个参数num_of_cpus_runningthis_cpu_id,如下所示:

def proof_of_work(b, nBytes, num_of_cpus_running, this_cpu_id):
    nonce = this_cpu_id
    while sha256(b + uint2bytes(nonce))[:nBytes] != bytes(nBytes):
        nonce = nonce + num_of_cpus_running
    return nonce
所以,如果有4个内核,每个内核将像这样计算随机数:
core 0: 0, 4, 8, 16, 32 ...
core 1: 1, 5, 9, 17, 33 ...
core 2: 2, 6, 10, 18, 34 ...
core 3: 3, 7, 15, 31, 38 ...

因此,我必须重写 proof_of_work ,以便当任何一个进程找到nonce时,其他所有进程都停止寻找nonce,注意所找到的nonce必须是要求字节为0的最低值。如果某个CPU由于某种原因加速,并返回高于最低有效nonce的有效nonce,则工作证明无效。

唯一不知道如何处理的部分是,进程A仅在进程B找到的nonce小于当前由进程A计算的nonce时才停止。如果它更高,则A继续计算(以防万一),直到到达B提供的nonce。

希望我自己解释得清楚。另外,如果有任何我写的东西的更快实现,我很乐意听取建议。非常感谢!


@MaartenBodewes,恐怕我不理解你的观点,也许我还是太新手程序员了xD。cpu_id只是我为每个进程分配的一个数字。首先,我获取CPU核心数x = multiprocessing.cpu_count(),然后启动x个进程,每个进程都有一个不同的ID,仅递增1。希望我理解得正确。 - mesafria
抱歉,那只是我对事情的误解。我从未深入研究过这些工作证明协议。我鄙视像比特币这样的东西-我不喜欢将能源转换为货币的协议,我更喜欢反过来。 - Maarten Bodewes
1
@MaartenBodewes 工作量证明不仅仅用于比特币。我使用它来防止DDoS攻击。 - Nathan Parker
4个回答

8

一种简单的选择是使用微批次并检查是否找到了答案。太小的批次会产生启动并行作业的开销,而太大的大小会导致其他进程在一个进程已经找到答案的情况下执行额外的工作。每个批次应该花费1-10秒钟以保持高效。

示例代码:

from multiprocessing import Pool
from hashlib import sha256
from time import time


def find_solution(args):
    salt, nBytes, nonce_range = args
    target = '0' * nBytes

    for nonce in xrange(nonce_range[0], nonce_range[1]):
        result = sha256(salt + str(nonce)).hexdigest()

        #print('%s %s vs %s' % (result, result[:nBytes], target)); sleep(0.1)

        if result[:nBytes] == target:
            return (nonce, result)

    return None


def proof_of_work(salt, nBytes):
    n_processes = 8
    batch_size = int(2.5e5)
    pool = Pool(n_processes)

    nonce = 0

    while True:
        nonce_ranges = [
            (nonce + i * batch_size, nonce + (i+1) * batch_size)
            for i in range(n_processes)
        ]

        params = [
            (salt, nBytes, nonce_range) for nonce_range in nonce_ranges
        ]

        # Single-process search:
        #solutions = map(find_solution, params)

        # Multi-process search:
        solutions = pool.map(find_solution, params)

        print('Searched %d to %d' % (nonce_ranges[0][0], nonce_ranges[-1][1]-1))

        # Find non-None results
        solutions = filter(None, solutions)

        if solutions:
            return solutions

        nonce += n_processes * batch_size


if __name__ == '__main__':
    start = time()
    solutions = proof_of_work('abc', 6)
    print('\n'.join('%d => %s' % s for s in solutions))
    print('Solution found in %.3f seconds' % (time() - start))

输出结果(一台搭载Core i7的笔记本电脑):
Searched 0 to 1999999
Searched 2000000 to 3999999
Searched 4000000 to 5999999
Searched 6000000 to 7999999
Searched 8000000 to 9999999
Searched 10000000 to 11999999
Searched 12000000 to 13999999
Searched 14000000 to 15999999
Searched 16000000 to 17999999
Searched 18000000 to 19999999
Searched 20000000 to 21999999
Searched 22000000 to 23999999
Searched 24000000 to 25999999
Searched 26000000 to 27999999
Searched 28000000 to 29999999
Searched 30000000 to 31999999
Searched 32000000 to 33999999
Searched 34000000 to 35999999
Searched 36000000 to 37999999
37196346 => 000000f4c9aee9d427dc94316fd49192a07f1aeca52f6b7c3bb76be10c5adf4d
Solution found in 20.536 seconds

使用单核,花费了76.468秒。但这并不是最有效的解决方法,但它确实可行。例如,如果“salt”很长,则可以在吸收盐后预先计算“SHA-256”状态,然后从那里继续暴力搜索。此外,字节数组可能比“hexdigest()”更有效。


2
出色的并行展示。然而,它是否回答了OP的问题,即如何在一个线程找到答案后停止其他线程? - RobertB
确实,这并不能完全回答原问题,但是通过知道最坏情况下其他线程会多做几秒钟的工作来解决了问题。 - NikoNyrh

6
一般的做法是:
  1. 将工作分成小块,例如针对特定“范围”进行计算,一个范围不应该太长,大约需要0.1秒到1秒
  2. 由某个管理者将工作包分配给工人
  3. 在完成工作包后,向管理者报告结果并请求新的工作包
  4. 如果工作已经完成并且找到了结果,则接受工人提供的结果并发出信号表示不需要执行更多工作 - 工人现在可以安全地终止
这样,您就不必在每次迭代时与管理者进行检查(这会减慢一切进程),也不必像中途停止线程等恶心的事情。不用说,管理者需要具备线程安全性。
这种方法完全符合您的模型,因为即使找到了结果,仍然需要其他工人的结果。
请注意,在您的模型中,可能会出现一个线程与其他线程失去同步的情况,落后于其他线程。一旦找到结果,您不想再进行另外一百万次计算。我从问题中重申这一点,因为我认为该模型是错误的。您应该修复模型而不是修复实现。

2
初学者的错误:将管理器设为低优先级。由于大部分时间管理器并没有做什么,它应该拥有至少与工人相同的优先级,但最好更高。否则,工人可能需要永远等待一个新数据包。是的,我曾经也是那个初学者 :) - Maarten Bodewes
1
注意:我不是一名有经验的Python开发人员。如果有人发布了一个可靠的Python实现,请随意接受它。 - Maarten Bodewes
2
更加灵活。可以添加/删除工作人员,更改工作包的大小,添加用户指示何时停止等选项。 - Maarten Bodewes
非常感谢,这真的很有帮助。如果有人提供代码,我会等待,但同时也会尝试实现它。 - mesafria

3
你可以使用multiprocessing.Queue()。每个CPU/进程都有一个队列。当一个进程找到一个nonce时,它会将其放在其他进程的队列中。其他进程在while循环的每次迭代中检查它们的队列(非阻塞),如果队列上有任何东西,它们就根据队列中的值决定是继续还是终止:
def proof_of_work(b, nBytes, num_of_cpus_running, this_cpu_id, qSelf, qOthers):
    nonce = this_cpu_id
    while sha256(b + uint2bytes(nonce))[:nBytes] != bytes(nBytes):
        nonce = nonce + num_of_cpus_running
        try:
            otherNonce = qSelf.get(block=False)
            if otherNonce < nonce:
                return
        except:
            pass
    for q in qOthers:
        q.put(nonce)
    return nonce

qOthers是属于其他进程的队列列表(每个队列=multiprocessing.Queue())。

如果您决定使用我建议的队列,您应该能够编写一个更好/更好看的上述方法实现。


0

我想通过将pool.map更改为pool.imap_unordered来改进NikoNyrh的答案。使用imap_unordered将立即从任何工作程序返回结果,而无需等待它们全部完成。因此,一旦任何一个结果返回元组,我们就可以退出while循环。

def proof_of_work(salt, nBytes):
    n_processes = 8
    batch_size = int(2.5e5)
    with Pool(n_processes) as pool:

        nonce = 0

        while True:
            nonce_ranges = [
                (nonce + i * batch_size, nonce + (i+1) * batch_size)
                for i in range(n_processes)
            ]

            params = [
                (salt, nBytes, nonce_range) for nonce_range in nonce_ranges

           ]
            print('Searched %d to %d' % (nonce_ranges[0][0], nonce_ranges[-1][1]-1))

            for result in pool.imap_unordered(find_solution, params):
                if isinstance(result,tuple): return result
            
            nonce += n_processes * batch_size



网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接