我有一个单个文件,里面有1000万个浮点数。使用下面的自定义Mergesort代码(使用TensorFlow 2.0 API),我想对这个文件进行排序。在这里,我每次读取100万个数字并进行排序,然后将它们写入另一个文件中以便稍后合并。
import tensorflow as tf
#@tf.function
def split_list(input_list):
input_list_len = len(input_list)
midpoint = input_list_len // 2
return input_list[:midpoint], input_list[midpoint:]
#@tf.function
def merge_sorted_lists(list_left, list_right):
if tf.math.equal(len(list_left), 0) :
return list_right
elif tf.math.equal(len(list_right), 0):
return list_left
index_left = index_right = 0
list_merged = [] # list to build and return
list_len_target = len(list_left) + len(list_right)
while tf.math.less(len(list_merged), list_len_target):
if tf.math.less_equal(list_left[index_left], list_right[index_right]) :
# Value on the left list is smaller (or equal so it should be selected)
list_merged = [*list_merged,list_left[index_left]]
index_left += 1
else:
# Right value bigger
list_merged = [*list_merged,list_right[index_right]]
index_right += 1
if tf.math.equal(index_right, len(list_right)):
list_merged = [*list_merged,*list_left[index_left:]]
break
elif tf.math.equal(index_left, len(list_left)):
list_merged = [*list_merged,*list_right[index_right:]]
break
return list_merged
#@tf.function
def merge_sort(input_list):
if tf.math.less_equal(len(input_list), 1):
return input_list
else:
left, right = split_list(input_list)
return merge_sorted_lists(merge_sort(left), merge_sort(right))
以下代码在所需的主机/设备上调用上面的merge_sort函数。
def cpu(input_list):
lines = []
with tf.device('/cpu:0'):
lines = merge_sort(input_list)
lines1 = []
lines1 = [str(x)+"\n" for x in lines]
return lines1
def GPU(input_list):
lines = []
with tf.device('/device:GPU:0'):
lines = merge_sort(input_list)
lines1 = []
lines1 = [str(x)+"\n" for x in lines]
return lines1
以下是 CPU 或 GPU 函数的调用方式,使用迭代器每次读取一百万个数据:
dataset = tf.data.TextLineDataset("numbers.txt") #10 million rows in single file
dataset = dataset.batch(1_000_000) # divide into 10 batches of 1million each
dataset = dataset.map(lambda x: tf.strings.to_number(tf.strings.strip(x), tf.float32))
iterator = dataset.__iter__()
start_time = timeit.default_timer()
lines = cpu(np.stack(list(iterator.get_next())))
print(lines)
print("cpu_time ", timeit.default_timer() - start_time)
start_time = timeit.default_timer()
lines = GPU(np.stack(list(iterator.get_next())))
print(lines)
print("gpu_time ", timeit.default_timer() - start_time)
fid = 1
f_out = open('chunk_{}.txt'.format(fid), 'w')
f_out.writelines(lines)
以上代码在 Google Colab GPU 实例 上运行,首先对前 100 万条记录进行 CPU 排序,然后对第二百万条进行 GPU 排序。但我希望能够同时使用 CPU 和 GPU 来对不同的一百万个数字集并行运行归并排序。现在暂时忽略合并步骤。 如果有示例代码(几行即可)或指南(如果涉及大量工作),我将不胜感激。