Python垃圾回收器存在问题?

4
我有一个简单的程序,它读取一个包含数百万行的大型文件,解析每一行(numpy数组)并转换成双精度数组(python数组),然后将其写入hdf5文件。我会在多天内重复此循环。读取每个文件后,我删除所有对象并调用垃圾收集器。当我运行程序时,第一天解析没有任何错误,但是第二天出现了MemoryError。我监视程序的内存使用情况,在第一天解析期间,内存使用量约为1.5 GB。当第一天解析完成后,内存使用量下降到50 MB。现在当第二天开始并尝试从文件中读取行时,我会得到MemoryError。以下是程序输出。
source file extracted at C:\rfadump\au\2012.08.07.txt
parsing started
current time: 2012-09-16 22:40:16.829000
500000 lines parsed
1000000 lines parsed
1500000 lines parsed
2000000 lines parsed
2500000 lines parsed
3000000 lines parsed
3500000 lines parsed
4000000 lines parsed
4500000 lines parsed
5000000 lines parsed
parsing done.
end time is 2012-09-16 23:34:19.931000
total time elapsed 0:54:03.102000
repacking file
done
> s:\users\aaj\projects\pythonhf\rfadumptohdf.py(132)generateFiles()
-> while single_date <= self.end_date:
(Pdb) c
*** 2012-08-08 ***
source file extracted at C:\rfadump\au\2012.08.08.txt
cought an exception while generating file for day 2012-08-08.
Traceback (most recent call last):
  File "rfaDumpToHDF.py", line 175, in generateFile
    lines = self.rawfile.read().split('|\n')
MemoryError

我非常确定Windows系统任务管理器显示这个进程的内存使用量为50 MB。看起来Python的垃圾回收器或内存管理器没有正确计算可用内存。应该有很多可用内存,但它认为不够用。
有什么想法吗? 编辑 在此处添加我的代码
我将放置我的代码的部分。我是Python的新手,请原谅我的Python编码风格。 模块1
def generateFile(self, current_date):
    try:
        print "*** %s ***" % current_date.strftime("%Y-%m-%d")
        weekday=current_date.weekday()
        if weekday >= 5:
            print "skipping weekend"
            return
        self.taqdb = taqDB(self.index, self.offset)
        cache_filename = os.path.join(self.cache_dir,current_date.strftime("%Y.%m.%d.h5"))
        outputFile = config.hdf5.filePath(self.index, date=current_date)
        print "cache file: ", cache_filename
        print "output file: ", outputFile

        tempdir = "C:\\rfadump\\"+self.region+"\\"  
        input_filename = tempdir + filename
        print "source file extracted at %s " % input_filename

        ## universe
        reader = rfaTextToTAQ.rfaTextToTAQ(self.tickobj)  ## PARSER
        count = 0
        self.rawfile = open(input_filename, 'r')
        lines = self.rawfile.read().split('|\n')
        total_lines = len(lines)
        self.rawfile.close()
        del self.rawfile
        print "parsing started"
        start_time = dt.datetime.now()
        print "current time: %s" % start_time
        #while(len(lines) > 0):
        while(count < total_lines):
            #line = lines.pop(0) ## This slows down processing
            result = reader.parseline(lines[count]+"|")
            count += 1
            if(count % 500000 == 0):
                print "%d lines parsed" %(count)
            if(result == None): 
                continue
            ric, timestamp, quotes, trades, levelsUpdated, tradeupdate = result
            if(len(levelsUpdated) == 0 and tradeupdate == False):
                continue
            self.taqdb.insert(result)

        ## write to hdf5 TODO
        writer = h5Writer.h5Writer(cache_filename, self.tickobj)
        writer.write(self.taqdb.groups)
        writer.close()

        del lines
        del self.taqdb, self.tickobj
        ##########################################################
        print "parsing done." 
        end_time = dt.datetime.now()
        print "end time is %s" % end_time
        print "total time elapsed %s" % (end_time - start_time)

        defragger = hdf.HDF5Defragmenter()
        defragger.Defrag(cache_filename,outputFile)
        del defragger
        print "done"
        gc.collect(2)
    except:
        print "cought an exception while generating file for day %s." % current_date.strftime("%Y-%m-%d")
        tb = traceback.format_exc()
        print tb

模块2-TAQDB-将解析数据存储在数组中

class taqDB:
  def __init__(self, index, offset):
    self.index = index
    self.tickcfg = config.hdf5.getTickConfig(index)
    self.offset = offset
    self.groups = {}

  def getGroup(self,ric):
    if (self.groups.has_key(ric) == False):
        self.groups[ric] = {}
    return self.groups[ric]

  def getOrderbookArray(self, ric, group):
    datasetname = orderBookName
    prodtype = self.tickcfg.getProdType(ric)
    if(prodtype == ProdType.INDEX):
        return
    orderbookArrayShape = self.tickcfg.getOrderBookArrayShape(prodtype)
    if(group.has_key(datasetname) == False):
        group[datasetname] = array.array("d")
        orderbookArray = self.tickcfg.getOrderBookArray(prodtype)
        return orderbookArray
    else:
        orderbookArray = group[datasetname]
        if(len(orderbookArray) == 0):
            return self.tickcfg.getOrderBookArray(prodtype)
        lastOrderbook = orderbookArray[-orderbookArrayShape[1]:]
        return np.array([lastOrderbook])

  def addToDataset(self, group, datasetname, timestamp, arr):
    if(group.has_key(datasetname) == False):
        group[datasetname] = array.array("d")
    arr[0,0]=timestamp
    a1 = group[datasetname]
    a1.extend(arr[0])

  def addToOrderBook(self, group, timestamp, arr):
    self.addToDataset(self, group, orderBookName, timestamp, arr)

  def insert(self, data):
    ric, timestamp, quotes, trades, levelsUpdated, tradeupdate = data
    delta = dt.timedelta(hours=timestamp.hour,minutes=timestamp.minute, seconds=timestamp.second, microseconds=(timestamp.microsecond/1000))
    timestamp = float(str(delta.seconds)+'.'+str(delta.microseconds)) + self.offset
    ## write to array
    group = self.getGroup(ric)

    orderbookUpdate = False
    orderbookArray = self.getOrderbookArray(ric, group)
    nonzero = quotes.nonzero()
    orderbookArray[nonzero] = quotes[nonzero] 
    if(np.any(nonzero)):
        self.addToDataset(group, orderBookName, timestamp, orderbookArray)
    if(tradeupdate == True):
        self.addToDataset(group, tradeName, timestamp, trades)

模块三-解析器

class rfaTextToTAQ:
  """RFA Raw dump file reader. Readers single line (record) and returns an array or array of fid value pairs."""
  def __init__(self,tickconfig):
    self.tickconfig = tickconfig
    self.token = ''
    self.state = ReadState.SEQ_NUM
    self.fvstate = fvstate.FID
    self.quotes = np.array([]) # read from tickconfig
    self.trades = np.array([]) # read from tickconfig
    self.prodtype = ProdType.STOCK
    self.allquotes = {}
    self.alltrades = {}
    self.acvol = 0
    self.levelsUpdated = []
    self.quoteUpdate = False
    self.tradeUpdate = False
    self.depth = 0

  def updateLevel(self, index):
    if(self.levelsUpdated.__contains__(index) == False):
        self.levelsUpdated.append(index)

  def updateQuote(self, fidindex, field):
    self.value = float(self.value)
    if(self.depth == 1):
        index = fidindex[0]+(len(self.tickconfig.stkQuotes)*(self.depth - 1))
        self.quotes[index[0]][fidindex[1][0]] = self.value
        self.updateLevel(index[0])
    else:
        self.quotes[fidindex] = self.value
        self.updateLevel(fidindex[0][0])
    self.quoteUpdate = True

  def updateTrade(self, fidindex, field):
    #self.value = float(self.value)
    if(self.tickconfig.tradeUpdate(self.depth) == False):
        return
    newacvol = float(self.value)
    if(field == acvol):
        if(self.value > self.acvol):
            tradesize = newacvol - self.acvol
            self.acvol = newacvol
            self.trades[fidindex] = tradesize
            if(self.trades.__contains__(0) == False):
                self.tradeUpdate = True
    else:
        self.trades[fidindex] = self.value
        if(not (self.trades[0,1]==0 or self.trades[0,2]==0)):
            self.tradeUpdate = True

  def updateResult(self):
    field = ''
    valid, field = field_dict.FIDToField(int(self.fid), field)
    if(valid == False):
        return
    if(self.value == '0'):
        return
    if(self.prodtype == ProdType.STOCK):
        fidindex = np.where(self.tickconfig.stkQuotes == field)
        if(len(fidindex[0]) == 0):
            fidindex = np.where(self.tickconfig.stkTrades == field)
            if(len(fidindex[0]) == 0):
                return
            else:
                self.updateTrade(fidindex, field)
        else:
            self.updateQuote(fidindex, field)
    else:
        fidindex = np.where(self.tickconfig.futQuotes == field)
        if(len(fidindex[0]) == 0):
            fidindex = np.where(self.tickconfig.futTrades == field)
            if(len(fidindex[0]) == 0):
                return
            else:
                self.updateTrade(fidindex, field)
        else:
            self.updateQuote(fidindex, field)

  def getOrderBookTrade(self):
    if (self.allquotes.has_key(self.ric) == False):
        acvol = 0
        self.allquotes[self.ric] = self.tickconfig.getOrderBookArray(self.prodtype)
        trades = self.tickconfig.getTradesArray()
        self.alltrades[self.ric] = [trades, acvol]
    return self.allquotes[self.ric], self.alltrades[self.ric]

  def parseline(self, line):
    self.tradeUpdate = False
    self.levelsUpdated = []
    pos = 0
    length = len(line)
    self.state = ReadState.SEQ_NUM
    self.fvstate = fvstate.FID
    self.token = ''
    ch = ''
    while(pos < length):
        prevChar = ch
        ch = line[pos]
        pos += 1
        #SEQ_NUM
        if(self.state == ReadState.SEQ_NUM):
            if(ch != ','):
                self.token += ch
            else:
                self.seq_num = int(self.token)
                self.state = ReadState.TIMESTAMP
                self.token = ''
        # TIMESTAMP
        elif(self.state == ReadState.TIMESTAMP):
            if(ch == ' '):
                self.token = ''
            elif(ch != ','):
                self.token += ch
            else:
                if(len(self.token) != 12):
                    print "Invalid timestamp format. %s. skipping line.\n", self.token
                    self.state = ReadState.SKIPLINE
                else:
                    self.timestamp = datetime.strptime(self.token,'%H:%M:%S.%f') 
                    self.state = ReadState.RIC
                self.token = ''
        # RIC
        elif(self.state == ReadState.RIC):
            if(ch != ','):
                self.token += ch
            else:
                self.ric = self.token
                self.token = ''
                self.ric, self.depth = self.tickconfig.replaceRic(self.ric)
                self.prodtype = self.tickconfig.getProdType(self.ric)
                if(self.tickconfig.subscribed(self.ric)):
                    self.state = ReadState.UPDATE_TYPE
                    self.quotes, trades = self.getOrderBookTrade()
                    self.trades = trades[0]
                    self.acvol = trades[1]
                else:
                    self.state = ReadState.SKIPLINE
        # UPDATE_TYPE
        elif(self.state == ReadState.UPDATE_TYPE):
            if(ch != '|'):
                self.token += ch
            else:
                self.update_type = self.token
                self.token = ''
                self.state = ReadState.FVPAIRS
        #SKIPLINE
        elif(self.state == ReadState.SKIPLINE):
            return None
        # FV PAIRS
        elif(self.state == ReadState.FVPAIRS):
            # FID
            if(self.fvstate == fvstate.FID):
                if(ch != ','):
                    if(ch.isdigit() == False):
                        self.token = self.value+ch
                        self.fvstate = fvstate.FIDVALUE
                        self.state = ReadState.FVPAIRS
                    else:
                        self.token += ch
                else:
                    self.fid = self.token
                    self.token = ''
                    self.fvstate = fvstate.FIDVALUE
                    self.state = ReadState.FVPAIRS
            # FIDVALUE
            elif(self.fvstate == fvstate.FIDVALUE):
                if(ch != '|'):
                    self.token += ch
                else:
                    self.value = self.token
                    self.token = ''
                    self.state = ReadState.FVPAIRS
                    self.fvstate = fvstate.FID
                    # TODO set value
                    self.updateResult()
    return self.ric, self.timestamp, self.quotes, self.trades, self.levelsUpdated, self.tradeUpdate

感谢您的选择。

我已经添加了代码的主要部分。希望这能有所帮助。 - Alok
defragger对象是一个C++模块,经过SWIG封装后在Python中调用。欢迎提出与代码相关的任何建议。 - Alok
1
嗯...这个问题可能有帮助:https://dev59.com/lHE95IYBdhLWcg3wPbd7 - Joel Cornett
新的进程解决方案在我的情况下更合适。我的文件中的记录分隔符不是换行符。我将在另一个进程中为新文件分配内存。 - Alok
1个回答

5

唯一可靠的释放内存的方法是终止进程。

因此,如果您的主程序生成一个工作进程来完成大部分工作(一天内完成的工作),那么当该工作进程完成时,使用的内存将被释放:

import multiprocessing as mp

def work(date):
    # Do most of the memory-intensive work here
    ...

while single_date <= self.end_date:
    proc = mp.Process(target = work, args = (single_date,))
    proc.start()
    proc.join()

谢谢@ubuntu。我将使用子进程方法来分配大块内存。该链接清楚地解释了可能存在的问题。我正在进行一次大内存分配调用,这可能由于碎片化的内存而无法使用。 - Alok

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接