如何在使用Pool.map()进行多进程时解决内存问题?

42
我已经编写了下面的程序,以执行以下操作:
  • 将大型文本文件作为 pandas dataframe 读取
  • 然后使用特定列值进行 groupby 将数据拆分并存储为数据帧列表。
  • 然后将数据传输到 multiprocess Pool.map() 中,以并行处理每个数据帧。
一切都很好,在我的小型测试数据集上程序运行良好。但是,当我输入大型数据(约14 GB)时,内存消耗呈指数增长,然后冻结计算机或被杀死(在HPC集群中)。
我已经添加了代码,以便在数据/变量不再有用时立即清除内存。我也尽快关闭池。但是,即使使用14 GB的输入,我只期望2*14 GB的内存负担,但似乎发生了很多事情。我还尝试使用 chunkSize 和 maxTaskPerChild 等调整,但在测试和大文件中都没有看到任何优化差异。
我认为需要改进此代码位置,当我启动 multiprocessing 时。 p = Pool(3) # number of pool to run at once; default at 1 result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values())) 但是,我会贴出整个代码。 测试示例:我创建了一个测试文件("genome_matrix_final-chr1234-1mb.txt")最多250 mb并运行该程序。当我检查系统监视器时,可以看到内存消耗增加了约6 GB。我不是很清楚为什么250 mb文件加一些输出需要这么多内存空间。如果有助于查看真正的问题,我已通过Dropbox共享了该文件。https://www.dropbox.com/sh/coihujii38t5prd/AABDXv8ACGIYczeMtzKBo0eea?dl=0 有人能建议我如何解决这个问题吗? 我的Python脚本:
#!/home/bin/python3

import pandas as pd
import collections
from multiprocessing import Pool
import io
import time
import resource

print()
print('Checking required modules')
print()


''' change this input file name and/or path as need be '''
genome_matrix_file = "genome_matrix_final-chr1n2-2mb.txt"   # test file 01
genome_matrix_file = "genome_matrix_final-chr1234-1mb.txt"  # test file 02
#genome_matrix_file = "genome_matrix_final.txt"    # large file 

def main():
    with open("genome_matrix_header.txt") as header:
        header = header.read().rstrip('\n').split('\t')
        print()

    time01 = time.time()
    print('starting time: ', time01)

    '''load the genome matrix file onto pandas as dataframe.
    This makes is more easy for multiprocessing'''
    gen_matrix_df = pd.read_csv(genome_matrix_file, sep='\t', names=header)

    # now, group the dataframe by chromosome/contig - so it can be multiprocessed
    gen_matrix_df = gen_matrix_df.groupby('CHROM')

    # store the splitted dataframes as list of key, values(pandas dataframe) pairs
    # this list of dataframe will be used while multiprocessing
    gen_matrix_df_list = collections.OrderedDict()
    for chr_, data in gen_matrix_df:
        gen_matrix_df_list[chr_] = data

    # clear memory
    del gen_matrix_df

    '''Now, pipe each dataframe from the list using map.Pool() '''
    p = Pool(3)  # number of pool to run at once; default at 1
    result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))

    del gen_matrix_df_list  # clear memory

    p.close()
    p.join()


    # concat the results from pool.map() and write it to a file
    result_merged = pd.concat(result)
    del result  # clear memory

    pd.DataFrame.to_csv(result_merged, "matrix_to_haplotype-chr1n2.txt", sep='\t', header=True, index=False)

    print()
    print('completed all process in "%s" sec. ' % (time.time() - time01))
    print('Global maximum memory usage: %.2f (mb)' % current_mem_usage())
    print()


'''function to convert the dataframe from genome matrix to desired output '''
def matrix_to_vcf(matrix_df):

    print()
    time02 = time.time()

    # index position of the samples in genome matrix file
    sample_idx = [{'10a': 33, '10b': 18}, {'13a': 3, '13b': 19},
                    {'14a': 20, '14b': 4}, {'16a': 5, '16b': 21},
                    {'17a': 6, '17b': 22}, {'23a': 7, '23b': 23},
                    {'24a': 8, '24b': 24}, {'25a': 25, '25b': 9},
                    {'26a': 10, '26b': 26}, {'34a': 11, '34b': 27},
                    {'35a': 12, '35b': 28}, {'37a': 13, '37b': 29},
                    {'38a': 14, '38b': 30}, {'3a': 31, '3b': 15},
                    {'8a': 32, '8b': 17}]

    # sample index stored as ordered dictionary
    sample_idx_ord_list = []
    for ids in sample_idx:
        ids = collections.OrderedDict(sorted(ids.items()))
        sample_idx_ord_list.append(ids)


    # for haplotype file
    header = ['contig', 'pos', 'ref', 'alt']

    # adding some suffixes "PI" to available sample names
    for item in sample_idx_ord_list:
        ks_update = ''
        for ks in item.keys():
            ks_update += ks
        header.append(ks_update+'_PI')
        header.append(ks_update+'_PG_al')


    #final variable store the haplotype data
    # write the header lines first
    haplotype_output = '\t'.join(header) + '\n'


    # to store the value of parsed the line and update the "PI", "PG" value for each sample
    updated_line = ''

    # read the piped in data back to text like file
    matrix_df = pd.DataFrame.to_csv(matrix_df, sep='\t', index=False)

    matrix_df = matrix_df.rstrip('\n').split('\n')
    for line in matrix_df:
        if line.startswith('CHROM'):
            continue

        line_split = line.split('\t')
        chr_ = line_split[0]
        ref = line_split[2]
        alt = list(set(line_split[3:]))

        # remove the alleles "N" missing and "ref" from the alt-alleles
        alt_up = list(filter(lambda x: x!='N' and x!=ref, alt))

        # if no alt alleles are found, just continue
        # - i.e : don't write that line in output file
        if len(alt_up) == 0:
            continue

        #print('\nMining data for chromosome/contig "%s" ' %(chr_ ))
        #so, we have data for CHR, POS, REF, ALT so far
        # now, we mine phased genotype for each sample pair (as "PG_al", and also add "PI" tag)
        sample_data_for_vcf = []
        for ids in sample_idx_ord_list:
            sample_data = []
            for key, val in ids.items():
                sample_value = line_split[val]
                sample_data.append(sample_value)

            # now, update the phased state for each sample
            # also replacing the missing allele i.e "N" and "-" with ref-allele
            sample_data = ('|'.join(sample_data)).replace('N', ref).replace('-', ref)
            sample_data_for_vcf.append(str(chr_))
            sample_data_for_vcf.append(sample_data)

        # add data for all the samples in that line, append it with former columns (chrom, pos ..) ..
        # and .. write it to final haplotype file
        sample_data_for_vcf = '\t'.join(sample_data_for_vcf)
        updated_line = '\t'.join(line_split[0:3]) + '\t' + ','.join(alt_up) + \
            '\t' + sample_data_for_vcf + '\n'
        haplotype_output += updated_line

    del matrix_df  # clear memory
    print('completed haplotype preparation for chromosome/contig "%s" '
          'in "%s" sec. ' %(chr_, time.time()-time02))
    print('\tWorker maximum memory usage: %.2f (mb)' %(current_mem_usage()))

    # return the data back to the pool
    return pd.read_csv(io.StringIO(haplotype_output), sep='\t')


''' to monitor memory '''
def current_mem_usage():
    return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024.


if __name__ == '__main__':
    main()

赏金猎人更新:

我已经使用Pool.map()实现了多进程,但是代码会导致巨大的内存负担(输入测试文件约300 MB,但内存负担约为6 GB)。我只期望最多有3 * 300 MB的内存负担。

  • 有人能解释一下,对于这样一个小文件和如此短的计算长度,是什么导致了如此巨大的内存需求吗?
  • 另外,我正在尝试采取答案并将其用于改进我的大型程序中的多进程。因此,添加任何方法、模块都不应该太大幅度地改变计算部分(CPU绑定进程)的结构。
  • 我已经包含了两个测试文件以进行测试。
  • 附加的代码是完整的,所以当复制粘贴时应该按预期工作。任何更改都应该仅用于改进多进程步骤的优化。

我的建议是,如果你需要处理大文件,就使用Pyspark。 - Dinusha Thilakarathna
@DinushaDilanka:我只是简单地浏览了一下pyspark。它看起来不错,但它是否可以替代pandas呢?另外,另一个问题是我将不得不学习一个新的包并重写整个程序。上面的程序只是我的程序和数据的模拟运行,以消除多进程中的内存问题。如果您有任何建议的示例,那就太好了。谢谢。 - everestial007
1
请参考此链接:https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/ - Dinusha Thilakarathna
1
你能否将这个问题简化成一个更简单的例子,不包含任何无关的代码,并且具有相同的问题,解决这个例子的方法可以让你构建真实代码的解决方案吗?这样做会使得解决问题变得更加容易。请参考帮助文档中的 [mcve] 获取指引。(这个问题本身是可以回答的,只是通过简化问题,它会变得更容易回答。) - abarnert
1
想要同时让问题完整且简洁通常并不容易——如果剥离了太多无关紧要的内容,人们可能会问“你为什么要这样做?”但是,如果您提供了我们可以运行和玩耍的代码,而不需要理解您的文件格式以及如何在Pandas中处理它等等,那么找到(和测试)解决方案可能会更容易。 - abarnert
显示剩余10条评论
4个回答

95

前提条件

  1. In Python (in the following I use 64-bit build of Python 3.6.5) everything is an object. This has its overhead and with getsizeof we can see exactly the size of an object in bytes:

    >>> import sys
    >>> sys.getsizeof(42)
    28
    >>> sys.getsizeof('T')
    50
    
  2. When fork system call used (default on *nix, see multiprocessing.get_start_method()) to create a child process, parent's physical memory is not copied and copy-on-write technique is used.
  3. Fork child process will still report full RSS (resident set size) of the parent process. Because of this fact, PSS (proportional set size) is more appropriate metric to estimate memory usage of forking application. Here's an example from the page:
  • 进程A有50 KiB未共享的内存
  • 进程B有300 KiB未共享的内存
  • 进程A和进程B都有100 KiB相同的共享内存区域

由于PSS被定义为进程的未共享内存和与其他进程共享的内存比例之和,因此这两个进程的PSS如下:

  • 进程A的PSS = 50 KiB + (100 KiB / 2) = 100 KiB
  • 进程B的PSS = 300 KiB + (100 KiB / 2) = 350 KiB

数据框

现在让我们单独查看你的DataFramememory_profiler将帮助我们。

justpd.py

#!/usr/bin/env python3

import pandas as pd
from memory_profiler import profile

@profile
def main():
    with open('genome_matrix_header.txt') as header:
        header = header.read().rstrip('\n').split('\t')

    gen_matrix_df = pd.read_csv(
        'genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)

    gen_matrix_df.info()
    gen_matrix_df.info(memory_usage='deep')

if __name__ == '__main__':
    main()

现在让我们使用分析工具:

mprof run justpd.py
mprof plot

我们可以看到这个情节:

memory_profile

还有逐行跟踪:

Line #    Mem usage    Increment   Line Contents
================================================
     6     54.3 MiB     54.3 MiB   @profile
     7                             def main():
     8     54.3 MiB      0.0 MiB       with open('genome_matrix_header.txt') as header:
     9     54.3 MiB      0.0 MiB           header = header.read().rstrip('\n').split('\t')
    10                             
    11   2072.0 MiB   2017.7 MiB       gen_matrix_df = pd.read_csv('genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)
    12                                 
    13   2072.0 MiB      0.0 MiB       gen_matrix_df.info()
    14   2072.0 MiB      0.0 MiB       gen_matrix_df.info(memory_usage='deep')

我们可以看到数据框在构建时占用了约2 GiB的空间,峰值达到了约3 GiB。更有趣的是info的输出结果。
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4000000 entries, 0 to 3999999
Data columns (total 34 columns):
...
dtypes: int64(2), object(32)
memory usage: 1.0+ GB

但是info(memory_usage='deep')("deep"表示通过询问object dtype来深入检查数据)会给出:

memory usage: 7.9 GB

哎呀!?从进程外部看,我们可以确保 memory_profiler 的结果是正确的。 sys.getsizeof 也显示了相同的值,这很可能是因为自定义的 __sizeof__。其他使用它来估计分配的 gc.get_objects() 的工具,例如 pympler,也会有相同的结果。
# added after read_csv
from pympler import tracker
tr = tracker.SummaryTracker()
tr.print_diff()   

提供:

                                             types |   # objects |   total size
================================================== | =========== | ============
                 <class 'pandas.core.series.Series |          34 |      7.93 GB
                                      <class 'list |        7839 |    732.38 KB
                                       <class 'str |        7741 |    550.10 KB
                                       <class 'int |        1810 |     49.66 KB
                                      <class 'dict |          38 |      7.43 KB
  <class 'pandas.core.internals.SingleBlockManager |          34 |      3.98 KB
                             <class 'numpy.ndarray |          34 |      3.19 KB

那么这7.93 GiB是从何而来的呢?让我们试着解释一下。我们有4M行和34列,总共有1.34亿个值。它们要么是int64类型,要么是object类型(这是一个64位指针;请参见使用pandas处理大数据以获得详细解释)。因此,仅存储在数据框中的值就有约1022 MiB(即134 * 10 ** 6 * 8 / 2 ** 20)。那剩下的大约6.93 GiB呢?

字符串驻留

为了理解这个行为,需要知道Python有字符串内部化。有两篇好的文章(onetwo)介绍了Python 2中的字符串内部化。除了Python 3中的Unicode更改和PEP 393中Python 3.3的C结构更改外,其思想是相同的。基本上,每个看起来像标识符的短字符串都会被Python缓存到内部字典中,并且引用将指向相同的Python对象。换句话说,我们可以说它的行为类似于单例。上述提到的文章解释了它给出的显着的内存配置文件和性能改进。我们可以使用PyASCIIObjectinterned字段来检查一个字符串是否被内部化:
import ctypes

class PyASCIIObject(ctypes.Structure):
     _fields_ = [
         ('ob_refcnt', ctypes.c_size_t),
         ('ob_type', ctypes.py_object),
         ('length', ctypes.c_ssize_t),
         ('hash', ctypes.c_int64),
         ('state', ctypes.c_int32),
         ('wstr', ctypes.c_wchar_p)
    ]

然后:

>>> a = 'name'
>>> b = '!@#$'
>>> a_struct = PyASCIIObject.from_address(id(a))
>>> a_struct.state & 0b11
1
>>> b_struct = PyASCIIObject.from_address(id(b))
>>> b_struct.state & 0b11
0

我们可以使用两个字符串进行身份比较(在CPython的情况下是内存比较)。

>>> a = 'foo'
>>> b = 'foo'
>>> a is b
True
>> gen_matrix_df.REF[0] is gen_matrix_df.REF[6]
True

因此,在关于object dtype方面,数据框最多分配20个字符串(每个氨基酸一个)。不过值得注意的是,Pandas建议使用分类类型进行枚举。

Pandas内存

因此,我们可以解释7.93 GiB的朴素估计如下:
>>> rows = 4 * 10 ** 6
>>> int_cols = 2
>>> str_cols = 32
>>> int_size = 8
>>> str_size = 58  
>>> ptr_size = 8
>>> (int_cols * int_size + str_cols * (str_size + ptr_size)) * rows / 2 ** 30
7.927417755126953

请注意,str_size为58字节,而不是我们在上面看到的1个字符文字的50字节。这是因为PEP 393定义了紧凑和非紧凑字符串。您可以使用sys.getsizeof(gen_matrix_df.REF[0])进行检查。
实际内存消耗应该为约1 GiB,正如gen_matrix_df.info()所报告的那样,它是两倍多。我们可以假设它与Pandas或NumPy执行的内存(预)分配有关。以下实验表明,这并非没有原因(多次运行显示相同的结果):
Line #    Mem usage    Increment   Line Contents
================================================
     8     53.1 MiB     53.1 MiB   @profile
     9                             def main():
    10     53.1 MiB      0.0 MiB       with open("genome_matrix_header.txt") as header:
    11     53.1 MiB      0.0 MiB           header = header.read().rstrip('\n').split('\t')
    12                             
    13   2070.9 MiB   2017.8 MiB       gen_matrix_df = pd.read_csv('genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)
    14   2071.2 MiB      0.4 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[gen_matrix_df.keys()[0]])
    15   2071.2 MiB      0.0 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[gen_matrix_df.keys()[0]])
    16   2040.7 MiB    -30.5 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    ...
    23   1827.1 MiB    -30.5 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    24   1094.7 MiB   -732.4 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    25   1765.9 MiB    671.3 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    26   1094.7 MiB   -671.3 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    27   1704.8 MiB    610.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    28   1094.7 MiB   -610.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    29   1643.9 MiB    549.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    30   1094.7 MiB   -549.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    31   1582.8 MiB    488.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    32   1094.7 MiB   -488.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])    
    33   1521.9 MiB    427.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])    
    34   1094.7 MiB   -427.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    35   1460.8 MiB    366.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    36   1094.7 MiB   -366.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    37   1094.7 MiB      0.0 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    ...
    47   1094.7 MiB      0.0 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])

我想用 关于设计问题和未来Pandas2的新文章 中作者原话结束这一部分。

pandas的经验法则:内存大小应该是数据集大小的5到10倍

进程树

最后,让我们来看看进程池是否可以使用写时复制。我们将使用smemstat(可从Ubuntu存储库中获取)来估计进程组内存共享情况,glances来记录系统范围内的空闲内存。两者均可编写JSON。

我们将使用Pool(2)运行原始脚本。我们需要3个终端窗口。

  1. smemstat -l -m -p "python3.6 script.py" -o smemstat.json 1
  2. glances -t 1 --export-json glances.json
  3. mprof run -M script.py

然后mprof plot会生成:

3 processes

总结图表(mprof run --nopython --include-children ./script.py)如下:

enter image description here

请注意,上面的两个图表显示了RSS。假设是由于写时复制,它并不反映实际的内存使用情况。现在我们有来自smemstatglances的两个JSON文件。我将使用以下脚本将JSON文件转换为CSV。
#!/usr/bin/env python3

import csv
import sys
import json

def smemstat():
  with open('smemstat.json') as f:
    smem = json.load(f)

  rows = []
  fieldnames = set()    
  for s in smem['smemstat']['periodic-samples']:
    row = {}
    for ps in s['smem-per-process']:
      if 'script.py' in ps['command']:
        for k in ('uss', 'pss', 'rss'):
          row['{}-{}'.format(ps['pid'], k)] = ps[k] // 2 ** 20

    # smemstat produces empty samples, backfill from previous
    if rows:            
      for k, v in rows[-1].items():
        row.setdefault(k, v)

    rows.append(row)
    fieldnames.update(row.keys())

  with open('smemstat.csv', 'w') as out:
    dw = csv.DictWriter(out, fieldnames=sorted(fieldnames))
    dw.writeheader()
    list(map(dw.writerow, rows))

def glances():
  rows = []
  fieldnames = ['available', 'used', 'cached', 'mem_careful', 'percent',
    'free', 'mem_critical', 'inactive', 'shared', 'history_size',
    'mem_warning', 'total', 'active', 'buffers']
  with open('glances.csv', 'w') as out:
    dw = csv.DictWriter(out, fieldnames=fieldnames)
    dw.writeheader()
    with open('glances.json') as f:
      for l in f:
        d = json.loads(l)
        dw.writerow(d['mem'])

if __name__ == '__main__':
  globals()[sys.argv[1]]()

首先让我们来看一下{{free}}内存。

enter image description here

第一和最小值之间的差距约为4.15 GiB。以下是PSS数字的样式:

enter image description here

而且总和为:

enter image description here

因为写时复制技术,实际内存消耗约为4.15 GiB。但我们仍然需要通过Pool.map序列化数据并将其发送给工作进程。我们能在这里利用写时复制技术吗?
共享数据
为了使用写时复制技术,我们需要全局访问list(gen_matrix_df_list.values()),以便工作进程在派生后仍然可以读取它。
  1. Let's modify code after del gen_matrix_df in main like the following:

    ...
    global global_gen_matrix_df_values
    global_gen_matrix_df_values = list(gen_matrix_df_list.values())
    del gen_matrix_df_list
    
    p = Pool(2)
    result = p.map(matrix_to_vcf, range(len(global_gen_matrix_df_values)))
    ...
    
  2. Remove del gen_matrix_df_list that goes later.
  3. And modify first lines of matrix_to_vcf like:

    def matrix_to_vcf(i):
        matrix_df = global_gen_matrix_df_values[i]
    

现在让我们重新运行它。释放内存:

free

进程树:

process tree

以及它的总和:

sum

因此,我们的实际内存使用量最大为约2.9 GiB(构建数据框时主进程的峰值),写时复制已经起到了帮助作用!
顺便提一下,Python的引用循环垃圾收集器有所谓的读时复制行为,Instagram Engineering中描述了这一行为(导致issue31558中出现了gc.freeze)。但是,在这种特殊情况下,gc.disable()没有影响。
更新
无需复制即可共享数据的替代方法是从一开始就将其委托给内核,使用numpy.memmap即可实现。以下是来自Python高性能数据处理演讲的示例实现棘手的部分是使Pandas使用mmaped Numpy数组。

14
如此全面、详细和美丽的答案。我希望我能给你50分。但是,已经有人评分了。但是,这就是被接受的答案。在我的编程生涯中,我会多次回顾这个问答过程。最有帮助的是您提供的查找导致内存问题的魔鬼方法。有句话说得好,“魔鬼藏在细节中”。 - everestial007
但是在这种特定情况下,gc.disable()没有影响。- 为什么这对抗复制读取行为没有帮助? - fjsj
虽然已经过去了将近3年...我面临着类似的问题...只是我的pandas处理是在线程内完成的,仍然遇到了内存不足的问题...你能帮忙吗? - Fr_nkenstien

14
当您使用时,将使用系统调用创建多个子进程。每个进程都从父进程在那时刻的内存的精确副本开始。因为在创建大小为3的之前加载了CSV,所以池中的这3个进程中的每一个都会不必要地拥有数据帧的副本(和也将存在于当前进程以及3个子进程中,因此每个这些结构的副本在内存中将有4个)。
尝试在加载文件之前(实际上是最开始)创建,这应该可以减少内存使用量。
如果仍然太高,您可以:
1.将gen_matrix_df_list转储到文件中,每行1个项目,例如:
import os
import cPickle

with open('tempfile.txt', 'w') as f:
    for item in gen_matrix_df_list.items():
        cPickle.dump(item, f)
        f.write(os.linesep)
  • 使用 Pool.imap() 处理此文件中导出的行的迭代器,例如:

    with open('tempfile.txt', 'r') as f:
        p.imap(matrix_to_vcf, (cPickle.loads(line) for line in f))
    

    (请注意,在上面的示例中,matrix_to_vcf 接受一个 (key, value) 元组,而不仅仅是一个值)

  • 希望这有所帮助。

    注:我没有测试上面的代码。它只是用来演示这个想法。


    谢谢你的回答。我会在大约一天后尝试这个答案,并让你知道结果。我希望这会起作用。 - everestial007
    如果您可以将数据存储在内存中两次,那么您可能不需要遭受磁盘IO的困扰。我曾经遇到过一个大DataFrame(存储在self.big_df中)的问题,但我能够找到一个更简单的解决方案:只需对DataFrame进行分块处理。我使用一个快速循环构建了一个参数列表,其中包含df的块(现在内存为2倍的self.big_df-一个用于原始数据,一个用于块),然后我明确地将self.big_df={}。随后,我创建了线程池,不再有内存问题,每个线程的内存需求仅相当于原始df的一小部分。 - Jeff Ellen
    @tomas 我没有尝试过,但我不明白提前实例化池如何减少内存。我认为池中并不只有一个数据结构的主副本,而是每个数据结构都有自己的副本,这是基于类似这样的语句:“对于大型参数,这可能会导致工作进程重新分配n_jobs次”(来自https://pythonhosted.org/joblib/parallel.html)的陈述。 - Jeff Ellen
    每个工作进程都有自己的参数副本,这是真的(在这种情况下是 list(gen_matrix_df_list.values()))。除此之外,每个工作进程都有其自己创建的变量的副本,这些变量在工作进程生成之前就已经存在了(在这种情况下,最大的变量是 gen_matrix_df_list)。关于 gen_matrix_df,我不确定会发生什么,因为它的引用被删除了,但它可能仍然被分配。在 del gen_matrix_df 之后调用 gc.collect() 也是一个好主意。 - tomas
    1
    @tomas 唯一改善我的内存使用情况的方法是将 p=Pool(3) 移到主函数的开头。谢谢。其他所有事情都没有真正改善任何东西。即使重新分配变量而不是删除也没有任何区别。我想我要采取这种方法:https://dev59.com/dFsX5IYBdhLWcg3wLM7K,通过按 chr_ 分割我的文件。我没有得到完整的答案,但我仍然想提供悬赏。@jeff ellen 也建议将 Pool() 提前。 - everestial007
    显示剩余3条评论

    10
    我遇到了同样的问题。我需要处理一个巨大的文本语料库,同时保持几个数据框(数百万行)的知识库在内存中加载。我认为这个问题很常见,所以我会将我的答案定向于通用目的。
    一组设置的组合解决了我的问题(只有1、3、5可能对您有效):
    1. 使用Pool.imap(或imap_unordered)而不是Pool.map。这将迭代数据,而不是在开始处理之前将所有数据加载到内存中。 2. 设置chunksize参数的值。这也会使imap更快。 3. 为maxtasksperchild参数设置一个值。 4. 将输出附加到磁盘而不是内存中。立即或每当它达到一定大小时。 5. 将代码分批运行。如果您有一个迭代器,可以使用itertools.islice。思路是将list(gen_matrix_df_list.values())分成三个或更多列表,然后仅将第一个列表传递给mapimap,然后在另一个运行中传递第二个列表,等等。由于您有一个列表,因此可以在同一行代码中对其进行切片。

    谢谢你的回答。你能给我展示一下你的代码风格(使用你自己的数据或我的数据),这样我就可以将这个问题的思路和我的大型程序中实现。 - everestial007
    我认为对我来说使用#5没有任何好处,因为数据将始终在队列中(作为输入和输出)。只有4似乎在内存优化方面有合理的收益,但它不会导致I/O瓶颈和无序输出吗?此外,我刚刚尝试了imap,但我没有看到任何收益(无论是速度还是内存消耗)。 - everestial007
    这将取决于您的处理细节。您必须尝试,但会出现瓶颈。 (4) 也会减慢处理速度。这是我的一个模块 https://files.fm/u/uqrq4zje - Abdulrahman Bres
    settingsread_data 两个模块。这些是你本地的模块吗? - everestial007
    有一些是的,设置有文件路径,而read-data有迭代器逐项从大型JSON文件中读取。而注释模块则接受一个项目并返回处理后的文本。我不介意展示整个项目,但它还没有完成,也不是所有部分都需要或有效。 - Abdulrahman Bres
    显示剩余5条评论

    4

    关于使用多进程内存的普遍回答

    你的问题是:“是什么导致了这么多内存被分配”。答案取决于两个部分。

    首先,正如你已经注意到的那样,每个 multiprocessing 工作进程都会获得自己的数据副本(摘自这里),因此你应该将大型参数分块。或者对于大型文件,尽可能地一次读入一小部分。

    默认情况下,池的工作进程是使用 Python 标准库中的 multiprocessing 模块生成的真实 Python 进程,当 n_jobs != 1 时会进行 fork。 Parallel 调用传递的参数作为输入被序列化并重新分配到每个 worker 进程的内存中。

    对于大型参数,这可能会有问题,因为它们将被 worker 进程重新分配 n_jobs 次。

    其次,如果你想要回收内存,你需要了解 python 与其他语言的工作方式不同,并且你依赖于 del 释放内存的方式,但它并不能总是起作用。我不知道这是否是最好的方法,但在我的代码中,我通过重新将变量赋值为 None 或空对象来解决了这个问题。

    针对你的具体示例 - 最小化代码编辑

    只要你可以将大型数据装进内存两次,我认为你可以通过更改单行代码来做到你想做的事情。我编写过非常类似的代码,并且当我重新分配变量(而不是调用 del 或任何垃圾回收)时,它对我起作用。如果这种方式不奏效,你可能需要遵循上述建议并使用磁盘 I/O:

        #### earlier code all the same
        # clear memory by reassignment (not del or gc)
        gen_matrix_df = {}
    
        '''Now, pipe each dataframe from the list using map.Pool() '''
        p = Pool(3)  # number of pool to run at once; default at 1
        result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))
    
        #del gen_matrix_df_list  # I suspect you don't even need this, memory will free when the pool is closed
    
        p.close()
        p.join()
        #### later code all the same
    

    对于你的具体示例-最佳内存使用

    只要您可以将大型数据在内存中一次容纳,而且您对文件的大小有一定的了解,您就可以使用Pandas read_csv 部分文件读取,每次读入仅nrows行数据,如果您真的想微观管理读入的数据量,或者使用[固定数量的内存块大小],它会返回一个迭代器5。 我的意思是,nrows参数只是一个单独的读取:您可能会用它来查看文件,或者如果由于某种原因您希望每个部分具有完全相同的行数(例如,如果任何数据是可变长度的字符串,则每行将不占用相同的内存量)。但我认为,为了准备多处理文件,使用块会更容易,因为它直接涉及内存,这是您关注的问题。根据特定大小的块使用试错方法进行内存适应比行数更容易,行数将根据行中有多少数据而改变内存使用量。唯一的其他困难部分是,由于某些特定应用程序原因,您正在分组某些行,因此这使得它略微复杂化。以您的代码为例:

       '''load the genome matrix file onto pandas as dataframe.
        This makes is more easy for multiprocessing'''
    
        # store the splitted dataframes as list of key, values(pandas dataframe) pairs
        # this list of dataframe will be used while multiprocessing
        #not sure why you need the ordered dict here, might add memory overhead
        #gen_matrix_df_list = collections.OrderedDict()  
        #a defaultdict won't throw an exception when we try to append to it the first time. if you don't want a default dict for some reason, you have to initialize each entry you care about.
        gen_matrix_df_list = collections.defaultdict(list)   
        chunksize = 10 ** 6
    
        for chunk in pd.read_csv(genome_matrix_file, sep='\t', names=header, chunksize=chunksize)
            # now, group the dataframe by chromosome/contig - so it can be multiprocessed
            gen_matrix_df = chunk.groupby('CHROM')
            for chr_, data in gen_matrix_df:
                gen_matrix_df_list[chr_].append(data)
    
        '''Having sorted chunks on read to a list of df, now create single data frames for each chr_'''
        #The dict contains a list of small df objects, so now concatenate them
        #by reassigning to the same dict, the memory footprint is not increasing 
        for chr_ in gen_matrix_df_list.keys():
            gen_matrix_df_list[chr_]=pd.concat(gen_matrix_df_list[chr_])
    
        '''Now, pipe each dataframe from the list using map.Pool() '''
        p = Pool(3)  # number of pool to run at once; default at 1
        result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))
        p.close()
        p.join()
    

    你和Tomas的回答看起来很有前途。但是,我还没有时间测试它。我明天会去测试。我喜欢重新分配的想法。现在关于“只要你可以将...两次放入内存中” - 为什么不是3次、4次呢?我也在思考是否有一种方法可以将列表创建为迭代器、生成器或yield,并将其传递给Pool.map()进程。有什么建议吗? - everestial007
    @everestial007 因为你只需要将它放入两次:完整的原始副本和每个块在制作块时,所以两次就足够了。3或4次就过度了。当你创建一个生成器时,只有在你没有先将整个项目保存在内存中(或者如果你正在做一些新的事情,比如生成器是两个现有列表的zip的结果)时,你才能节省内存。实际上,在查看后,我之前不知道,但是pandas有一个部分文件读取方法,在你的情况下会更好,我敢打赌。我会编辑我的答案。 - Jeff Ellen
    唯一改善我的内存使用情况的方法是将p=Pool(3)移到主函数的开头。分配chunksize对我没有帮助,因为我必须一次性从一个染色体中读取整个数据 - 这是一个有点复杂的原因。我也在考虑是否将数据作为迭代器、生成器来读取会有所帮助。相反,这种方法https://dev59.com/dFsX5IYBdhLWcg3wLM7K比任何其他方法都要好。但是,由于I/O重写,会有一些拖延。 - everestial007
    此外,重新分配并没有真正减少内存使用量。我不确定原因是什么。 - everestial007
    @everestial007,你对我的回复毫无意义,你试过我的代码吗?你说我的解决方案行不通,因为“你必须一次性读取一个染色体的所有数据”。但是你的原始代码并没有这样做。它将整个CSV文件从头到尾读入,没有什么特别之处。然后你的代码使用“group by”为池中的每个成员准备一些染色体组。我的代码几乎完全相同:它读入文件的一部分,然后使用“group by”来准备染色体组。唯一的问题是我是否为你的系统选择了一个好的块大小,你可能需要进行调整。 - Jeff Ellen

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接