在AWS Lambda函数中使用Python多进程队列

7
我有一些Python代码,创建多个进程以更快地完成任务。当我创建这些进程时,我传递了一个队列。在进程内部,我使用queue.put(data)方法,以便能够在进程外部检索数据。它在我的本地机器上运行得非常好,但是当我将zip文件上传到AWS Lambda函数(Python 3.8)时,它会显示Queue()函数未被实现。如果我简单地删除队列功能,则该项目在AWS Lambda上运行良好,因此我知道这是我目前唯一的问题。
我确保通过使用"pip install multiprocess -t ./"和"pip install boto3 -t ./"直接向我的Python项目安装了multiprocessing软件包。
我对Python和AWS都是新手,但是最近我所找到的研究可能指向了SQS。
阅读这些SQS文档docs后,我不确定这是否正是我要寻找的东西。
以下是我在Lambda中运行的代码,在本地工作但在AWS上无法正常工作。请查看重要部分的*:
from multiprocessing import Process, Queue
from craigslist import CraigslistForSale
import time
import math

sitesHold = ["sfbay", "seattle", "newyork", "(many more)..." ]

results = []


def f(sites, category, search_keys, queue):
    local_results = []
    for site in sites:
        cl_fs = CraigslistForSale(site=site, category=category, filters={'query': search_keys})
        for result in cl_fs.get_results(sort_by='newest'):
            local_results.append(result)
    if len(local_results) > 0:
        print(local_results)
    queue.put(local_results) # Putting data *********************************


def scan_handler(event, context):
    started_at = time.monotonic()
    queue = Queue()
    print("Running...")
    amount_of_lists = int(event['amountOfLists'])
    list_length = int(len(sitesHold) / amount_of_lists)
    extra_lists = math.ceil((len(sitesHold) - (amount_of_lists * list_length)) / list_length)
    site_list = []
    list_creator_counter = 0
    site_counter = 0
    for i in range(amount_of_lists + extra_lists):
        site_list.append(sitesHold[list_creator_counter:list_creator_counter + list_length])
        list_creator_counter += list_length
    processes = []
    for i in range(len(site_list)):
        site_counter = site_counter + len(site_list[i])
        processes.append(Process(target=f, args=(site_list[i], event['category'], event['searchQuery'], queue,))) # Creating processes and creating queues ***************************

    for process in processes:
        process.start() # Starting processes ***********************

    for process in processes:
        listings = queue.get() # Getting from queue ****************************
        if len(listings) > 0:
            for listing in listings:
                results.append(listing)

    print(f"Results: {results}")

    for process in processes:
        process.join()

    total_time_took = time.monotonic() - started_at
    print(f"Sites processed: {site_counter}")
    print(f'Took {total_time_took} seconds long')

这是Lambda函数给我的错误信息:
{
  "errorMessage": "[Errno 38] Function not implemented",
  "errorType": "OSError",
  "stackTrace": [
    "  File \"/var/task/main.py\", line 90, in scan_handler\n    queue = Queue()\n",
    "  File \"/var/lang/lib/python3.8/multiprocessing/context.py\", line 103, in Queue\n    return Queue(maxsize, ctx=self.get_context())\n",
    "  File \"/var/lang/lib/python3.8/multiprocessing/queues.py\", line 42, in __init__\n    self._rlock = ctx.Lock()\n",
    "  File \"/var/lang/lib/python3.8/multiprocessing/context.py\", line 68, in Lock\n    return Lock(ctx=self.get_context())\n",
    "  File \"/var/lang/lib/python3.8/multiprocessing/synchronize.py\", line 162, in __init__\n    SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)\n",
    "  File \"/var/lang/lib/python3.8/multiprocessing/synchronize.py\", line 57, in __init__\n    sl = self._semlock = _multiprocessing.SemLock(\n"
  ]
}

在AWS Lambda中,Queue()是否可用?实现我的目标的最佳方法是什么?


对于那些正在寻找替代方案的人,这个博客可能会有所帮助。 - adrian
2个回答

6

1
看起来我将要深入研究“管道”(Pipes)! - David La Grange
1
我在使用multiprocessing.pool时遇到了同样的问题。令人难以置信的是,AWS不支持这个功能。 - DineshKumar

4

来自AWS 文档

如果你用Python开发Lambda函数,它并不会默认支持并行处理。Lambda支持Python 2.7和Python 3.6,两种语言都有multiprocessing和threading模块。

Python自带的multiprocessing模块允许你并行运行多个进程。但是由于Lambda执行环境没有/dev/shm(进程之间共享内存)的支持,因此你无法使用multiprocessing.Queue或multiprocessing.Pool。

然而,你可以使用multiprocessing.Pipe代替multiprocessing.Queue来完成你需要的功能,在Lambda函数执行期间不会出现任何错误。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接