我使用线程和队列来获取url并存储到数据库中。
我只想让一个线程执行存储工作。
因此,我编写了以下代码:
import threading
import time
import Queue
site_count = 10
fetch_thread_count = 2
site_queue = Queue.Queue()
proxy_array=[]
class FetchThread(threading.Thread):
def __init__(self,site_queue,proxy_array):
threading.Thread.__init__(self)
self.site_queue = site_queue
self.proxy_array = proxy_array
def run(self):
while True:
index = self.site_queue.get()
self.get_proxy_one_website(index)
self.site_queue.task_done()
def get_proxy_one_website(self,index):
print '{0} fetched site :{1}\n'.format(self.name,index)
self.proxy_array.append(index)
def save():
while True:
if site_queue.qsize() > 0:
if len(proxy_array) > 10:
print 'save :{0} to database\n'.format(proxy_array.pop())
else:
time.sleep(1)
elif len(proxy_array) > 0:
print 'save :{0} to database\n'.format(proxy_array.pop())
elif len(proxy_array) == 0:
print 'break'
break
else:
print 'continue'
continue
def start_crawl():
global site_count,fetch_thread_count,site_queue,proxy_array
print 'init'
for i in range(fetch_thread_count):
ft = FetchThread(site_queue,proxy_array)
ft.setDaemon(True)
ft.start()
print 'put site_queue'
for i in range(site_count):
site_queue.put(i)
save()
print 'start site_queue join'
site_queue.join()
print 'finish'
start_crawl()
执行输出:
init
put site_queue
Thread-1 fetched site :0
Thread-2 fetched site :1
Thread-1 fetched site :2
Thread-2 fetched site :3
Thread-1 fetched site :4
Thread-2 fetched site :5
Thread-1 fetched site :6
Thread-2 fetched site :7
Thread-1 fetched site :8
Thread-2 fetched site :9
save :9 to database
save :8 to database
save :7 to database
save :6 to database
save :5 to database
save :4 to database
save :3 to database
save :2 to database
save :1 to database
save :0 to database
break
start site_queue join
finish
[Finished in 1.2s]
为什么
save()
函数在site_queue.join()
之后运行,save()
已经被写入。我还用一个线程函数替换了
save()
,但也没有起作用。这是否意味着我必须将
proxy_array=[]
更改为proxy_queue=Queue.Queue()
,然后才能使用线程来存储数据?我只想让一个线程来完成这个任务,并且没有其他线程会从
proxy_array
获取数据,为什么要加入它? 使用队列似乎非常奇怪。是否有更好的解决方案?
更新:
我不想等到所有FetchThreads完成他们的工作。我想在提取数据时保存数据,这样会更快。 我希望结果类似于以下内容(因为我使用了array.pop(),所以save 0可能出现得很晚,这只是为了易于理解的示例):
Thread-2 fetched site :1
Thread-1 fetched site :2
save :0 to database
Thread-2 fetched site :3
Thread-1 fetched site :4
save :2 to database
save :3 to database
Thread-2 fetched site :5
Thread-1 fetched site :6
save :4 to database
.......
以下是针对与下面问题相同的人的更新:
问题:
就像上文所说,没有其他线程会从proxy_array获取数据。
我只是想不通为什么它会破坏线程安全?
回答:
在Misha的回答中提到了生产者-消费者问题,我仔细阅读后理解了。
问题:
还有一个问题,如果程序主线程可以扮演消费者角色,使用FetchThreads(换句话说,不需要创建StoreThread)
这是我无法理解的,我会在找到答案后进行更新。
site_queue.join()
之前运行,而不是之后。 - mpenkovlen(proxy_array) >0
时应该执行save
。即使我将site_count
更改为100,结果也是相同的。 - Mithril