以下是我的代码。
%matplotlib inline
import pymongo
import networkx as nx
import time
import itertools
from multiprocessing import Pool
from pymongo import MongoClient
from sweepy.get_config import get_config
config = get_config()
MONGO_URL = config.get('MONGO_URL')
MONGO_PORT = config.get('MONGO_PORT')
MONGO_USERNAME = config.get('MONGO_USERNAME')
MONGO_PASSWORD = config.get('MONGO_PASSWORD')
client = MongoClient(MONGO_URL, int(MONGO_PORT))
db = client.tweets
db.authenticate(MONGO_USERNAME, MONGO_PASSWORD)
users = db.users
graph = nx.DiGraph()
for user in users.find():
graph.add_node(user['id_str'])
for friend_id in user['friends_ids']:
if not friend_id in graph:
graph.add_node(friend_id)
graph.add_edge(user['id_str'], friend_id)
数据存储在MongoDB中。以下是数据示例。
{
"_id" : ObjectId("55e1e425dd232e5962bdfbdf"),
"id_str" : "246483486",
...
"friends_ids" : [
// a bunch of ids
]
}
我尝试使用并行的介数中心性来加速,但仍然非常缓慢。 https://networkx.github.io/documentation/latest/examples/advanced/parallel_betweenness.html
"""
Example of parallel implementation of betweenness centrality using the
multiprocessing module from Python Standard Library.
The function betweenness centrality accepts a bunch of nodes and computes
the contribution of those nodes to the betweenness centrality of the whole
network. Here we divide the network in chunks of nodes and we compute their
contribution to the betweenness centrality of the whole network.
"""
def chunks(l, n):
"""Divide a list of nodes `l` in `n` chunks"""
l_c = iter(l)
while 1:
x = tuple(itertools.islice(l_c, n))
if not x:
return
yield x
def _betmap(G_normalized_weight_sources_tuple):
"""Pool for multiprocess only accepts functions with one argument.
This function uses a tuple as its only argument. We use a named tuple for
python 3 compatibility, and then unpack it when we send it to
`betweenness_centrality_source`
"""
return nx.betweenness_centrality_source(*G_normalized_weight_sources_tuple)
def betweenness_centrality_parallel(G, processes=None):
"""Parallel betweenness centrality function"""
p = Pool(processes=processes)
node_divisor = len(p._pool)*4
node_chunks = list(chunks(G.nodes(), int(G.order()/node_divisor)))
num_chunks = len(node_chunks)
bt_sc = p.map(_betmap,
zip([G]*num_chunks,
[True]*num_chunks,
[None]*num_chunks,
node_chunks))
# Reduce the partial solutions
bt_c = bt_sc[0]
for bt in bt_sc[1:]:
for n in bt:
bt_c[n] += bt[n]
return bt_c
print("Computing betweenness centrality for:")
print(nx.info(graph))
start = time.time()
bt = betweenness_centrality_parallel(graph, 2)
print("\t\tTime: %.4F" % (time.time()-start))
print("\t\tBetweenness centrality for node 0: %.5f" % (bt[0]))
Mongodb到networkx的导入过程相对较快,少于一分钟。