Python多线程队列中的生产者-消费者模式,具备线程安全性

4

我使用线程和队列来获取url并存储到数据库中。
我只想让一个线程执行存储工作。
因此,我编写了以下代码:

import threading
import time

import Queue

site_count = 10

fetch_thread_count = 2

site_queue = Queue.Queue()
proxy_array=[]        


class FetchThread(threading.Thread):
    def __init__(self,site_queue,proxy_array):
        threading.Thread.__init__(self)
        self.site_queue = site_queue
        self.proxy_array = proxy_array
    def run(self):
        while True:
            index = self.site_queue.get()
            self.get_proxy_one_website(index)
            self.site_queue.task_done()
    def get_proxy_one_website(self,index):
        print '{0} fetched site :{1}\n'.format(self.name,index)
        self.proxy_array.append(index)


def save():
    while True:
        if site_queue.qsize() > 0:
            if len(proxy_array) > 10:
                print 'save :{0}  to database\n'.format(proxy_array.pop())

            else:
                time.sleep(1)
        elif len(proxy_array) > 0:
            print 'save :{0} to database\n'.format(proxy_array.pop())

        elif len(proxy_array) == 0:
            print 'break'
            break
        else:
            print 'continue'
            continue

def start_crawl():
    global site_count,fetch_thread_count,site_queue,proxy_array
    print 'init'
    for i in range(fetch_thread_count):
        ft = FetchThread(site_queue,proxy_array)
        ft.setDaemon(True)
        ft.start()

    print 'put site_queue'
    for i in range(site_count):
        site_queue.put(i)

    save()

    print 'start site_queue join'
    site_queue.join()
    print 'finish'

start_crawl()

执行输出:

init
put site_queue
Thread-1 fetched site :0

Thread-2 fetched site :1

Thread-1 fetched site :2

Thread-2 fetched site :3

Thread-1 fetched site :4

Thread-2 fetched site :5

Thread-1 fetched site :6

Thread-2 fetched site :7

Thread-1 fetched site :8

Thread-2 fetched site :9

save :9 to database

save :8 to database

save :7 to database

save :6 to database

save :5 to database

save :4 to database

save :3 to database

save :2 to database

save :1 to database

save :0 to database

break
start site_queue join
finish
[Finished in 1.2s]

为什么save()函数在site_queue.join()之后运行,save()已经被写入。
我还用一个线程函数替换了save(),但也没有起作用。
这是否意味着我必须将proxy_array=[]更改为proxy_queue=Queue.Queue(),然后才能使用线程来存储数据?
我只想让一个线程来完成这个任务,并且没有其他线程会从proxy_array获取数据,为什么要加入它? 使用队列似乎非常奇怪。
是否有更好的解决方案?
更新:
我不想等到所有FetchThreads完成他们的工作。我想在提取数据时保存数据,这样会更快。 我希望结果类似于以下内容(因为我使用了array.pop(),所以save 0可能出现得很晚,这只是为了易于理解的示例):
Thread-2 fetched site :1

Thread-1 fetched site :2

save :0 to database

Thread-2 fetched site :3

Thread-1 fetched site :4

save :2 to database

save :3 to database


Thread-2 fetched site :5

Thread-1 fetched site :6

save :4 to database
.......

以下是针对与下面问题相同的人的更新:

问题:
就像上文所说,没有其他线程会从proxy_array获取数据。
我只是想不通为什么它会破坏线程安全?

回答:
在Misha的回答中提到了生产者-消费者问题,我仔细阅读后理解了。


问题:
还有一个问题,如果程序主线程可以扮演消费者角色,使用FetchThreads(换句话说,不需要创建StoreThread)

这是我无法理解的,我会在找到答案后进行更新。


如果您查看输出,"start site_queue join" 出现在保存行之后,因此保存函数确实在 site_queue.join() 之前运行,而不是之后。 - mpenkov
@misha 但是为什么它在所有“FetchThread”完成后才开始保存数据呢? - Mithril
因为没有任何阻止它这样做的东西。 - mpenkov
@misha 我认为当 len(proxy_array) >0 时应该执行 save。即使我将 site_count 更改为100,结果也是相同的。 - Mithril
你期望得到什么结果?这个结果与你实际得到的有何不同? - mpenkov
2个回答

5

我需要设计类似生产者-消费者的系统,生产者生成一个“id”,消费者使用该“id”进行一些url获取和处理操作。以下是我的代码框架:


    import Queue
    import random
    import threading
    import time
    import sys

    data_queue = Queue.Queue()
    lock = threading.Lock()

    def gcd(a, b):
        while b != 0:
            a,b = b, a%b

        return b

    def consumer(idnum):
        while True:
            try:
                data = data_queue.get(block=False)
            except Exception, e:
               print 'Exception ' + str(e)
            else:
                with lock:
                    print('\t consumer %d: computed gcd(%d, %d) = %d' %(idnum, data[0], data[1], gcd(data[0], data[1])))

            time.sleep(1)
            data_queue.task_done()

    def producer(idnum, count):
        for i in range(count):
            a,b = random.randint(1, sys.maxint), random.randint(1, sys.maxint)
            with lock:
                print('\t producer %d: generated (%d, %d)'% (idnum, a, b))
            data_queue.put((a,b))
            time.sleep(0.5)

    if __name__ == '__main__':
        num_producers = 1
        num_consumers = 2
        num_integer_pairs = 10

        for i in range(num_consumers):
            t = threading.Thread(target=consumer, args=(i,))
            t.daemon = True
            t.start()

        threads = []
        for ii in range(num_producers):
            thread = threading.Thread(target=producer, args=(ii, num_integer_pairs))
            threads.append(thread)
            thread.start()

        # wait for the producers threads to finish
        for thread in threads:
            thread.join()
        print 'done with producer threads'

        # wait till all the jobs are done in the queue
        data_queue.join()

        with lock:
            print 'all consumer threads finished'

        with lock:
            print 'main thread exited'

data_queue.task_done() should be under the else in try-except-else - Ricky Han

3
我建议你阅读有关生产者-消费者问题的内容。你的生产者是抓取线程。你的消费者是save函数。如果我理解正确,你希望消费者尽快保存已经获取的结果。为了实现这一点,生产者和消费者必须以某种线程安全的方式进行通信(例如队列)。
基本上,你需要另一个队列来替代proxy_array。你的save函数应类似于以下内容:
while True:
 try:
   data = fetch_data_from_output_queue()
   save_to_database(data)
 except EmptyQueue:
   if not stop_flag.is_set():
     # All done
     break
   time.sleep(1)
   continue

这个save函数需要在它自己的线程中运行。 stop_flag是一个Event,在你连接抓取线程之后设置。

从高层次来看,您的应用程序将如下所示:

input_queue = initialize_input_queue()
ouput_queue = initialize_output_queue()

stop_flag = Event()
create_and_start_save_thread(output_queue) # read from output queue, save to DB
create_and_start_fetch_threads(input_queue, output_queue) # get sites to crawl from input queue, push crawled results to output_queue
join_fetch_threads() # this will block until the fetch threads have gone through everything in the input_queue
stop_flag.set() # this will inform the save thread that we are done
join_save_thread() # wait for all the saving to complete

我不想等到所有的FetchThreads完成它们的工作。我想在获取数据时保存数据,这样会更快。 - Mithril
好的,在看到你更新的问题后,我现在明白了。我已经更新了我的答案。 - mpenkov
谢谢您仔细的回答!所以我必须创建一个StoreThread(threading.Thread)。但是我无法想象为什么它会破坏“线程安全”,因为没有其他线程会从proxy_array获取数据。还有一个问题,如果程序主线程可以作为使用FetchThreads的消费者(换句话说,不需要创建StoreThread)吗? - Mithril
你需要一个额外的线程,因为你正在独立地完成两件事情:1)等待工作线程join以便可以stop_flag.set();2)保存数据。由于join是一个阻塞调用,你需要在不同的线程上完成1)和2)。你可以通过使用非阻塞的join来解决这个问题,但这会带来更多的麻烦而收益很小。 - mpenkov

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接