在同一个字典/列表上进行多进程处理

3

我是一个辅助翻译的机器人,以下是您需要翻译的内容:

我对Python还比较新,如果有不足之处请谅解。作为课程的一部分,我接触了量化金融的Python编程,并正在学习多进程并试图更好地理解它。我尝试修改给定的问题,现在我在思考中陷入困境。

问题:

我有一个函数可以用ohlc格式获取ticks。

{'scrip_name':'ABC','timestamp':1504836192,'open':301.05,'high':303.80,'low':299.00,'close':301.10,'volume':100000}

每分钟我希望同时进行以下计算,并且最好将其附加/插入到同一列表中:

  • 找到最近5个收盘数据的移动平均值
  • 找到最近5个开盘数据的中位数
  • 将tick数据保存到数据库中。

因此,预期数据可能如下:

['scrip_name':'ABC','timestamp':1504836192,'open':301.05,'high':303.80,'low':299.00,'close':301.10,'volume':100000,'MA_5_open':300.25,'Median_5_close':300.50]

假设数据要进入数据库,编写一个简单的dbinsert例程到数据库中是相当容易的,我不认为这是一个巨大的挑战,我可以生成一个任务来执行每一分钟的插入语句。
如何在内存中保持5个时段以计算5个周期的简单平均移动平均线,并将它们推回字典/列表,同时同步3个不同的函数/进程(一个用于插入到db的函数,一个用于计算平均值的函数,一个用于计算中位数的函数)。
以下假设使我在编写多进程程序时感到困难。有人能指导我吗?我不想使用pandas dataframe。
====修订/更新====
我不想使用pandas/numpy的解决方案的原因是,我的目标是了解基础知识,而不是新库的细微差别。请不要误解我对理解的需求是傲慢或不愿接受建议。
拥有的优势
p1=Process(target=Median,arg(sourcelist))
p2=Process(target=Average,arg(sourcelist))
p3=process(target=insertdb,arg(updatedlist))

我希望你能帮助我理解基于功能/算法组件数量进行扩展过程的可能性。但是,我该如何确保p1和p2同步,而p3应在p1和p2之后执行?


你好,关于未来的问题,我发现这个链接很有帮助:https://meta.stackexchange.com/questions/22186/how-do-i-format-my-code-blocks - jmunsch
要共享字典、列表、数组等,您需要使用 multiprocessing.managers.SyncManager(它是 multiprocessing.managers.BaseManager 的子类)和它支持的类型的代理。话虽如此,使用 multiprocessing.Queue(不需要 Manager)通常会更好地提高性能。 - martineau
Martineau,你关于队列的观点是正确的,它可能更快,但有时效率不高。假设我有一个queue_db,那么我有一个函数:dbinsert来读取和更新数据库,这将是可以的。如果将来我想要分配不同算法结果的工作负载到同一列表上,因为那是该分钟的即时数据,会发生什么情况呢?所以我有60秒的时间来解决可能的20个算法,并将其更新回同一列表,然后再进行下一次更新。 - Suresh
2个回答

0

这里是如何使用多进程的示例:

from multiprocessing import Pool, cpu_count
def db_func(ma, med):
    db.save(something)

def backtest_strat(d, db_func):
    a = d.get('avg')
    s = map(sum, a)
    db_func(s/len(a), median(a))

with Pool(cpu_count()) as p:
    from functools import partial
    bs = partial(backtest_strat, db_func=db_func)
    print(p.map(bs, [{'avg': [1,2,3,4,5], 'median': [1,2,3,4,5]}]))

请参见:

请注意,除非有很多切片,否则这不会加快任何速度。
因此,针对加速部分:
def get_slices(data)
    for slice in data:
        yield {'avg': [1,2,3,4,5], 'median': [1,2,3,4,5]}

p.map(bs, get_slices)

据我所知,多进程工作是通过使用pickle进行消息传递的,因此当调用pool.map时,应该可以访问所有三个内容,即两个数组和db_save函数。当然,还有其他方法可以解决这个问题,但希望这种方式能够展示一种解决方案。

谢谢您的快速回复,但这并不能帮助我。哎呀,我是新来的,所以回复有些困难,请耐心等待。 - Suresh
中位数或平均数的示例只是一个示例,它可能是一个更复杂的函数...因此,基本上一个进程池并不理想,因为有些函数的能力会比其他函数更快。 - Suresh
请给予建议,如果中位数和平均数的功能是分开的,我应该如何同步并保持数据的一致性。 - Suresh
嗨,jmunsch,抱歉我离开了一段时间,我会尝试运行你的代码并理解它的工作原理。我刚开始阅读有关部分的内容,完成这个练习后我会及时向你汇报我的理解水平的提高 :) 谢谢。 - Suresh

-1
问题:如何确保p1和p2同步,而p3应在p1和p2之后执行?
如果您同步所有进程,则计算一个任务(p1、p2、p3)的速度不可能比最慢的进程更快。同时,其他进程处于空闲状态。
这被称为"生产者-消费者问题"。 使用队列解决方案,所有数据都进行序列化,无需同步
# Process-1
def Producer()
    task_queue.put(data)

# Process-2
def Consumer(task_queue)
    data = task_queue.get()
    # process data

你想要多个消费者进程和一个消费者进程收集所有结果。
你不想使用Queue,而是使用同步原语
这个例子让所有进程独立运行。
只有进程Result等待通知。

这个例子使用了一个无限制的任务缓冲区tasks = mp.Manager().list()
如果为完成的任务重用列表条目,则可以将大小最小化。
如果你有一些非常快速的算法,则可以将它们组合成一个Process

import multiprocessing as mp

# Base class for all WORKERS
class Worker(mp.Process):
    tasks = mp.Manager().list()
    task_ready = mp.Condition()
    parties = mp.Manager().Value(int, 0)

    @classmethod
    def join(self):
        # Wait until all Data processed

    def get_task(self):
        for i, task in enumerate(Worker.tasks):
            if task is None: continue
            if not self.__class__.__name__ in task['result']:
                return (i, task['range'])
        return (None, None)

    # Main Process Loop
    def run(self):
        while True:
            # Get a Task for this WORKER
            idx, _range = self.get_task()
            if idx is None:
                break
            # Compute with self Method this _range
            result = self.compute(_range)

            # Update Worker.tasks
            with Worker.lock:
                task = Worker.tasks[idx]
                task['result'][name] = result
                parties = len(task['result'])
                Worker.tasks[idx] = task

            # If Last, notify Process Result
            if parties == Worker.parties.value:
                with Worker.task_ready:
                    Worker.task_ready.notify()

class Result(Worker):
    # Main Process Loop
    def run(self):
        while True:
            with Worker.task_ready:
                Worker.task_ready.wait()

            # Get (idx, _range) from tasks List
            idx, _range = self.get_task()
            if idx is None:
                break

            # process Task Results

            # Mark this tasks List Entry as done for reuse
            Worker.tasks[idx] = None

class Average(Worker):
    def compute(self, _range):
        return average of DATA[_range]

class Median(Worker):
    def compute(self, _range):
        return median of DATA[_range]

if __name__ == '__main__':
    DATA = mp.Manager().list()
    WORKERS = [Result(), Average(), Median()]
    Worker.start(WORKERS)

    # Example creates a Task every 5 Records
    for i in range(1, 16):
        DATA.append({'id': i, 'open': 300 + randrange(0, 5), 'close': 300 + randrange(-5, 5)})
        if i % 5 == 0:
            Worker.tasks.append({'range':(i-5, i), 'result': {}})

    Worker.join()

已测试通过的Python版本:3.4.2


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接