Python中如何提高函数的性能

3
我有一个几GB大小的文本文件,格式如下:
0 274 593869.99 6734999.96 121.83 1,
0 273 593869.51 6734999.92 121.57 1,
0 273 593869.15 6734999.89 121.57 1,
0 273 593868.79 6734999.86 121.65 1,
0 273 593868.44 6734999.84 121.65 1,
0 273 593869.00 6734999.94 124.21 1,
0 273 593868.68 6734999.92 124.32 1,
0 273 593868.39 6734999.90 124.44 1,
0 273 593866.94 6734999.71 121.37 1,
0 273 593868.73 6734999.99 127.28 1,

我在Windows上使用Python 2.7编写了一个简单的函数,用于过滤。该函数会读取整个文件,选择具有相同idtile(第一列和第二列)的行,并返回点(x、y、z和标签)列表和idtile

tiles_id = [j for j in np.ndindex(ny, nx)] #ny = number of row, nx= number of columns
idtile = tiles_id[0]

def file_filter(name,idtile):
        lst = []
        for line in file(name, mode="r"):
            element = line.split() # add value
            if (int(element[0]),int(element[1])) == idtile:
                lst.append(element[2:])
                dy, dx = int(element[0]),int(element[1])
        return(lst, dy, dx)

文件大小超过32 GB,瓶颈在于文件的读取速度。我正在寻找一些建议或示例,以加快我的功能(例如:并行计算或其他方法)。

我的解决方案是将文本文件分割成瓦片(使用x和y位置)。这个解决方案并不优雅,我正在寻找一个高效的方法。


2
你的程序中 file_filter() 函数被调用了多少次? - Ber
5
如果您想多次阅读此文件,不如将任务交给一个数据库? - eumiro
@eumiro,我不知道数据库的方法。 - Gianni Spear
有多少唯一的idtile?我建议将文本文件分割成许多小文本文件,文件名为idtile,内容为所有以idtile开头的行。如果idtile的数量很大,则可以使用hash(idtile)%N作为文件名。 - HYRY
亲爱的@akira,这个文件真的很大(超过32GB)。存储这些数据是否可能? - Gianni Spear
显示剩余14条评论
10个回答

1
也许解决您的问题最好且最快的方法是在(大规模)并行系统上使用MapReduce算法。

1
我建议您更改代码,只读取一次大文件,并为每个瓦片 ID 编写(临时)文件。类似于以下内容:
def create_files(name, idtiles=None):
    files = {}
    for line in open(name):
         elements = line.split()
         idtile = (int(elements[0]), int(elements[1]))
         if idtiles is not None and idtile not in idtiles:
             continue
         if idtile not in files:
             files[idtile] = open("tempfile_{}_{}".format(elements[0], elements[1]), "w")
         print >>files[idtile], line
    for f in files.itervalues():
        f.close()
    return files

create_files()将返回一个{(tilex, tiley): fileobject}字典。

一种变体是在每行写入后关闭文件,以解决“太多打开的文件”错误。这个变体返回一个{(tilex, tiley: filename}字典。可能会慢一些。

def create_files(name, idtiles=None):
    files = {}
    for line in open(name):
         elements = line.split()
         idtile = (int(elements[0]), int(elements[1]))
         if idtiles is not None and idtile not in idtiles:
             continue
         filename = "tempfile_{}_{}".format(elements[0], elements[1])
         files[idtile] = filename
         with open(filename, "a") as f:
             print >>f, line
    return files

所以,你将大文件分割成小文件...如果我没有被误导,这是OP已经做过的事情。 - akira
@codeape。返回此错误消息 create_files(name) Traceback (most recent call last): File "<interactive input>", line 1, in <module> File "<editor selection>", line 9, in create_files IOError: [Errno 24] 打开的文件过多:'tempfile_3_340' - Gianni Spear
1
更新了一个变量,每写一行后关闭文件。 - codeape
@codeape 文件名需要排序吗?你的代码需要进行一些修正。 - Gianni Spear
我不确定我是否理解了最后一个问题。我还没有测试过这段代码,所以是的,它可能需要一些修复。我认为没有任何需要排序的东西。 - codeape

1
你的'idtile'似乎是按照一定顺序排列的。也就是说,样本数据表明,一旦你遍历了某个'idtile'并到达下一个,那么不可能再出现具有该'idtile'的行。如果是这种情况,你可以在处理完所需的'idtile'并遇到其他'idtile'时中断for循环。我的想法是:
loopkiller = false
for line in file(name, mode="r"):
    element = line.split()
    if (int(element[0]),int(element[1])) == idtile:
        lst.append(element[2:])
        dy, dx = int(element[0]),int(element[1])
        loopkiller = true
    elif loopkiller:
        break;

这样,一旦你完成了某个“idtile”,你就停止;而在你的例子中,你会一直读到文件末尾。
如果你的“idtile”以随机顺序出现,也许你可以先编写一个有序版本的文件。
此外,单独评估您的“idtile”的数字可能会帮助您更快地遍历文件。假设您的“idtile”是由一个一位数和一个三位数整数组成的二元组,也许可以考虑以下内容:
for line in file(name, mode="r"):
    element = line.split()
    if int(element[0][0]) == idtile[0]:
        if element[1][0] == str(idtile[1])[0]:
            if element[1][1] == str(idtile[1])[1]:
                if element[1][2] == str(idtile[1])[2]:
                    dy, dx = int(element[0]),int(element[1])
                else go_forward(walk)
            else go_forward(run)
         else go_forward(sprint)
     else go_forward(warp)

1

速度的主要规则:做得少。

  • 你可以创建一个已排序的huge.txt版本,并跟踪idtitle的位置。因此,如果你搜索(223872、23239),你已经知道所需信息在文件中的位置,并且可以跳过之前的所有内容(file.seek)。有人可能会认为这些信息与'INDEX'相当。
  • 你可以使用mmap将文件映射到内存中。
  • 你可以开始编写“工作程序”以处理不同的文件/位置。
  • 你可以将给定文件转换为类似SQL服务器的东西,并使用标准SQL检索数据。是的,将32GB的数据传输到数据库需要时间,但请将其视为一种预处理。之后,数据库将使用任何手段更快地访问数据,而不是你的方法。

小想法:

  • 你可以使用切片而不是line.split()来避免大量微小的内存分配。

如何使用数据库的概述:

假设您有类似于PostgreSQL的东西:
CREATE TABLE tiles
(
  tile_x integer,
  tile_y integer,
  x double precision,
  y double precision,
  z double precision,
  flag integer
);

然后您可以将输入文件中的所有空格替换为|,将所有的,替换为(以创建一个漂亮而闪亮的.csv),并直接将其馈送到数据库:

 COPY "tiles" from "\full\path\to\huge.txt" WITH DELIMITER "|";

你可以做一些花哨的东西,比如这样:

SELECT * FROM "tiles";

tile_x | tile_y |     x     |     y      |   z    | flag
-------+--------+-----------+------------+--------+-----
     0 |    274 | 593869.99 | 6734999.96 | 121.83 |    1
     0 |    273 | 593869.51 | 6734999.92 | 121.57 |    1
     0 |    273 | 593869.15 | 6734999.89 | 121.57 |    1
     0 |    273 | 593868.79 | 6734999.86 | 121.65 |    1
     0 |    273 | 593868.44 | 6734999.84 | 121.65 |    1
     0 |    273 |    593869 | 6734999.94 | 124.21 |    1
     0 |    273 | 593868.68 | 6734999.92 | 124.32 |    1
     0 |    273 | 593868.39 |  6734999.9 | 124.44 |    1
     0 |    273 | 593866.94 | 6734999.71 | 121.37 |    1
     0 |    273 | 593868.73 | 6734999.99 | 127.28 |    1 

或者是这样的:

或类似的内容:

SELECT * FROM "tiles" WHERE tile_y > 273;

tile_x | tile_y |     x     |     y      |   z    | flag
-------+--------+-----------+------------+--------+-----
     0 |    274 | 593869.99 | 6734999.96 | 121.83 |    1

1
你可以将过滤器转换为生成器函数:
def file_filter(name):
        lst = []
        idtile = None
        for line in file(name, mode="r"):
            element = line.split() # add value
            if idtile is None:
               idtile = (int(element[0]), int(element[1]))
            if (int(element[0]), int(element[1])) == idtile:
                lst.append(element[2:])
                dy, dx = int(element[0]),int(element[1])
            else:
                yield lst, dx, dy
                lst = []
                idtile = None

对于每个idtile,此函数应返回一个元组list_of_data,dx,dy,前提是文件按idtile排序。
现在你可以像这样使用它:
for lst, dx, dy in file_filter('you_name_it'):
    process_tile_data(lst, dx, dy)

谢谢Ber。我忘记了在file_filter(name)之后跳过第二步骤,抱歉。 - Gianni Spear
谢谢@Ber。我会研究如何对文件进行排序(从0,0到...)。 - Gianni Spear

1
我建议比较您完成全文阅读流程和仅阅读行并对其不做任何处理所需的时间。如果这些时间接近,您唯一能做的就是改变方法(拆分文件等),因为您可以优化的可能是数据处理时间而不是文件读取时间。
我还看到您代码中有两个值得修复的地方:
with open(name) as f:
    for line in f:
        pass #Here goes the loop body
  1. 使用 with 来显式地关闭文件。你的解决方案应该在CPython中工作,但这取决于实现并且不总是那么有效。

  2. 您将字符串转换为 int 两次。这是一个相对较慢的操作。通过重用结果来移除第二个操作。

P.S. 它看起来像一组点在地球表面上的深度或高度值的数组,并且表面被分成了瓦片。 :-)


感谢Ellioh。欢迎提出改进建议。为什么要使用with open(name) as f:而不是for line in open(name):? - Gianni Spear
1
修正了一个笔误。:-) 当然是 "for line in f"。"with"语句总是会关闭文件。在您的代码中,当垃圾回收器移除文件对象时,文件会被关闭。在CPython中,由于其引用计数几乎相同,但其他版本的Python可能会以其他方式处理,文件将保持打开状态,直到对象被垃圾回收,可能要晚得多。 - Ellioh

1

我的解决方案是将大型文本文件分割成每个idtile的许多小二进制文件。为了更快地读取文本文件,您可以使用pandas:

import pandas as pd
import numpy as np
n = 400000 # read n rows as one block
for df in pd.read_table(large_text_file, sep=" ", comment=",", header=None, chunksize=n):
    for key, g in df.groupby([0, 1]):
        fn = "%d_%d.tmp" % key
            with open(fn, "ab") as f:
                data = g.ix[:, 2:5].values
                data.tofile(f)

然后你可以通过以下方式获取一个二进制文件的内容:
np.fromfile("0_273.tmp").reshape(-1, 4)

1

您可以通过进行字符串比较来避免在每一行上执行 split()int() 操作:

def file_filter(name,idtile):
    lst = []
    id_str = "%d %d " % idtile
    with open(name) as f:
        for line in f:
            if line.startswith(id_str):
                element = line.split() # add value
                lst.append(element[2:])
                dy, dx = int(element[0]),int(element[1])
    return(lst, dy, dx)

1
这里有一些统计数据。随着更多的解决方案出现,我将会更新它。以下程序适用于由问题行重复组成的文件。
import sys

def f0(name, idtile):
    lst = []
    dx, dy = None, None
    with open(name) as f:
        for line in f:
            pass


"""def f0(name, idtile):
    lst = []
    dx, dy = None, None
    with open(name) as f:
        for line in f:
            line.split()"""


def f1(name, idtile):
    lst = []
    dx, dy = None, None
    with open(name) as f:
        for line in f:
            element = line.split() # add value
            if (int(element[0]),int(element[1])) == idtile:
                lst.append(element[2:])
                dy, dx = int(element[0]),int(element[1])
    return(lst, dy, dx)


def f2(name, idtile):
    lst = []
    dx, dy = None, None
    with open(name) as f:
        for line in f:
            element = line.split() # add value
            pdx, pdy = int(element[0]),int(element[1])
            if (pdx, pdy) == idtile:
                lst.append(element[2:])
                dy, dx = pdx, pdy
    return(lst, dy, dx)

def f2(name, idtile):
    lst = []
    dx, dy = None, None
    with open(name) as f:
        for line in f:
            element = line.split() # add value
            pdx, pdy = int(element[0]),int(element[1])
            if (pdx, pdy) == idtile:
                lst.append(element[2:])
                dy, dx = pdx, pdy
    return(lst, dy, dx)


def f3(name,idtile):
    lst = []
    id_str = "%d %d " % idtile
    with open(name) as f:
        for line in f:
            if line.startswith(id_str):
                element = line.split() # add value
                lst.append(element[2:])
                dy, dx = int(element[0]),int(element[1])
    return(lst, dy, dx)

functions = [f0, f1, f2, f3]

functions[int(sys.argv[1])]("./srcdata.txt",(0, 273))

计时的shell脚本很简单:

#!/bin/sh
time python ./timing.py 0
time python ./timing.py 1
time python ./timing.py 2

我更喜欢以这种方式运行它,以避免之前运行的函数对其他函数的时间产生影响。
结果是:
0.02user 0.01system 0:00.03elapsed
0.42user 0.01system 0:00.44elapsed
0.32user 0.02system 0:00.34elapsed
0.33user 0.01system 0:00.35elapsed

好消息是:读取文件并不是瓶颈将多余的转换为整数操作删除是有效的
坏消息是:我仍然不知道如何显著地进行优化。

可能需要在切割idtiles之前对文件进行排序。 - Gianni Spear
是的,那么它应该被切片。实际上,我认为这是唯一的方法。 - Ellioh
我会在操作系统方面开一个新的问题。如果您想参与,我会放置链接。 - Gianni Spear
是的,至少我想看一下。 - Ellioh
在你所寻找的时间范围内 (0, 273),这占据了文件的90%。我相信@Gianni正在尝试提取文件的一小部分。你能否使用不在文件中的ID重新运行计时器?在我看来,这会更具代表性。 - Janne Karila
显示剩余3条评论

0

好的。如果你需要多次执行这个操作,显然你需要创建某种索引。但如果这不是一个频繁的活动,最好的选择是像这样进行多线程处理。

NUMWORKERS = 8
workerlist = []
workQ = Queue.Queue()

def file_filter(name,idtile, numworkers):
    for i in range(NUMWORKERS):
        worker = threading.Thread(target=workerThread, args=(lst,))
    lst = []
    for line in file(name, mode="r"):
        workQ.put(line)                
    for i in range(NUMWORKERS):
        workQ.put(None)
    workQ.join()
    return lst , idtile[0], idtile[1]

def workerThread(lst):
    line = workQ.get()
    if not line:
        return
    element = line.split() # add value
    if (int(element[0]),int(element[1])) == idtile:
        lst.append(element[2:])

如果这是一个经常发生的活动,适用于每个idtile,那么解决方案将截然不同。一次处理多个idtiles将为您提供最佳的平均性能。因为任何已知的数量的idtiles都可以在单个文件循环中处理。


这是有利的,因为您的IO线程不会浪费时间进行计算。这意味着您的IO以最大可能的速度运行。 - saketrp
我不相信这会有所帮助。更甚的是,我敢打赌这比普通方式慢得多,至少在CPython中是如此。 - Ellioh

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接