使用科学Python进行时间序列数据分析:对多个文件进行持续分析

3
问题
我正在进行时间序列分析。测量数据来自对传感器的电压输出进行采样,采样频率为50kHz,然后将这些数据作为单独的文件以小时为单位转储到磁盘上。使用pytables将数据保存到HDF5文件中作为CArray格式。选择此格式是为了保持与MATLAB的互操作性。
完整数据集现在是多个TB,太大而无法加载到内存中。
我的一些分析需要我迭代整个数据集。对于需要我抓取数据块的分析,我可以通过创建生成器方法找到前进的道路。我有点不确定如何处理需要连续时间序列的分析。
示例
例如,假设我要使用某些移动窗口过程(例如小波分析)找到并分类瞬变,或者应用FIR滤波器。我如何处理边界,即在文件末尾或开始时或在块边界处?我希望数据显示为一个连续的数据集。
请求
我想:
使内存占用低,只在必要时加载数据。 在内存中保留整个数据集的映射,以便我可以像处理常规pandas Series对象一样处理数据集,例如data [time1:time2]。 我正在使用科学Python(Enthought分发版),其中包括所有常规内容:numpy,scipy,pandas,matplotlib等。我最近才开始将pandas纳入我的工作流程,对其所有功能仍不熟悉。
我已经查看了相关的stackexchange线程,并没有看到完全解决我的问题的内容。
编辑:最终解决方案。
根据有用的提示,我构建了一个迭代器,可以在文件上步进并返回任意大小的块-这是一个移动窗口,希望能够优雅地处理文件边界。我添加了在每个窗口的前面和后面填充数据(重叠窗口)的选项。然后,我可以对重叠窗口应用一系列过滤器,然后在最后删除重叠部分。这样,我希望获得连续性。
我还没有实现__getitem__,但它在我的待办事项清单上。
以下是最终代码。为简洁起见省略了一些细节。
class FolderContainer(readdata.DataContainer):

    def __init__(self,startdir):
        readdata.DataContainer.__init__(self,startdir)

        self.filelist = None
        self.fs = None
        self.nsamples_hour = None
        # Build the file list
        self._build_filelist(startdir)


    def _build_filelist(self,startdir):
        """
        Populate the filelist dictionary with active files and their associated
        file date (YYYY,MM,DD) and hour.

        Each entry in 'filelist' has the form (abs. path : datetime) where the
        datetime object contains the complete date and hour information.
        """
        print('Building file list....',end='')
        # Use the full file path instead of a relative path so that we don't
        # run into problems if we change the current working directory.
        filelist = { os.path.abspath(f):self._datetime_from_fname(f)
                for f in os.listdir(startdir)
                if fnmatch.fnmatch(f,'NODE*.h5')}

        # If we haven't found any files, raise an error
        if not filelist:
            msg = "Input directory does not contain Illionix h5 files."
            raise IOError(msg)
        # Filelist is a ordered dictionary. Sort before saving.
        self.filelist = OrderedDict(sorted(filelist.items(),
                key=lambda t: t[0]))
        print('done')
    
    def _datetime_from_fname(self,fname):
        """
        Return the year, month, day, and hour from a filename as a datetime
        object
        
        """
        # Filename has the prototype: NODE##-YY-MM-DD-HH.h5. Split this up and
        # take only the date parts. Convert the year form YY to YYYY.
        (year,month,day,hour) = [int(d) for d in re.split('-|\.',fname)[1:-1]]
        year+=2000
        return datetime.datetime(year,month,day,hour)


    def chunk(self,tstart,dt,**kwargs):
        """
        Generator expression from returning consecutive chunks of data with
        overlaps from the entire set of Illionix data files.

        Parameters
        ----------
        Arguments:
            tstart: UTC start time [provided as a datetime or date string]
            dt: Chunk size [integer number of samples]

        Keyword arguments:
            tend: UTC end time [provided as a datetime or date string].
            frontpad: Padding in front of sample [integer number of samples].
            backpad: Padding in back of sample [integer number of samples]

        Yields:
            chunk: generator expression

        """
        # PARSE INPUT ARGUMENTS

        # Ensure 'tstart' is a datetime object.
        tstart = self._to_datetime(tstart)
        # Find the offset, in samples, of the starting position of the window
        # in the first data file
        tstart_samples = self._to_samples(tstart)

        # Convert dt to samples. Because dt is a timedelta object, we can't use
        # '_to_samples' for conversion.
        if isinstance(dt,int):
            dt_samples = dt
        elif isinstance(dt,datetime.timedelta):
            dt_samples = np.int64((dt.day*24*3600 + dt.seconds + 
                    dt.microseconds*1000) * self.fs)
        else:
            # FIXME: Pandas 0.13 includes a 'to_timedelta' function. Change
            # below when EPD pushes the update.
            t = self._parse_date_str(dt)
            dt_samples = np.int64((t.minute*60 + t.second) * self.fs)

        # Read keyword arguments. 'tend' defaults to the end of the last file
        # if a time is not provided.
        default_tend = self.filelist.values()[-1] + datetime.timedelta(hours=1)
        tend = self._to_datetime(kwargs.get('tend',default_tend))
        tend_samples = self._to_samples(tend)

        frontpad = kwargs.get('frontpad',0)
        backpad = kwargs.get('backpad',0)


        # CREATE FILE LIST

        # Build the the list of data files we will iterative over based upon
        # the start and stop times.
        print('Pruning file list...',end='')
        tstart_floor = datetime.datetime(tstart.year,tstart.month,tstart.day,
                tstart.hour)
        filelist_pruned = OrderedDict([(k,v) for k,v in self.filelist.items()
                if v >= tstart_floor and v <= tend])
        print('done.')
        # Check to ensure that we're not missing files by enforcing that there
        # is exactly an hour offset between all files.
        if not all([dt == datetime.timedelta(hours=1) 
                for dt in np.diff(np.array(filelist_pruned.values()))]):
            raise readdata.DataIntegrityError("Hour gap(s) detected in data")


        # MOVING WINDOW GENERATOR ALGORITHM

        # Keep two files open, the current file and the next in line (que file)
        fname_generator = self._file_iterator(filelist_pruned)
        fname_current = fname_generator.next()
        fname_next = fname_generator.next()

        # Iterate over all the files. 'lastfile' indicates when we're
        # processing the last file in the que.
        lastfile = False
        i = tstart_samples
        while True:
            with tables.openFile(fname_current) as fcurrent, \
                    tables.openFile(fname_next) as fnext:
                # Point to the data
                data_current = fcurrent.getNode('/data/voltage/raw')
                data_next = fnext.getNode('/data/voltage/raw')
                # Process all data windows associated with the current pair of
                # files. Avoid unnecessary file access operations as we moving
                # the sliding window.
                while True:
                    # Conditionals that depend on if our slice is:
                    #   (1) completely into the next hour
                    #   (2) partially spills into the next hour
                    #   (3) completely in the current hour.
                    if i - backpad >= self.nsamples_hour:
                        # If we're already on our last file in the processing
                        # que, we can't continue to the next. Exit. Generator
                        # is finished.
                        if lastfile:
                            raise GeneratorExit
                        # Advance the active and que file names. 
                        fname_current = fname_next
                        try:
                            fname_next = fname_generator.next()
                        except GeneratorExit:
                            # We've reached the end of our file processing que.
                            # Indicate this is the last file so that if we try
                            # to pull data across the next file boundary, we'll
                            # exit.
                            lastfile = True
                        # Our data slice has completely moved into the next
                        # hour.
                        i-=self.nsamples_hour
                        # Return the data
                        yield data_next[i-backpad:i+dt_samples+frontpad]
                        # Move window by amount dt
                        i+=dt_samples
                        # We've completely moved on the the next pair of files.
                        # Move to the outer scope to grab the next set of
                        # files.
                        break  
                    elif i + dt_samples + frontpad >= self.nsamples_hour:
                        if lastfile:
                            raise GeneratorExit
                        # Slice spills over into the next hour
                        yield np.r_[data_current[i-backpad:],
                                data_next[:i+dt_samples+frontpad-self.nsamples_hour]]
                        i+=dt_samples
                    else:
                        if lastfile:
                            # Exit once our slice crosses the boundary of the
                            # last file.
                            if i + dt_samples + frontpad > tend_samples:
                                raise GeneratorExit
                        # Slice is completely within the current hour
                        yield data_current[i-backpad:i+dt_samples+frontpad]
                        i+=dt_samples


    def _to_samples(self,input_time):
        """Convert input time, if not in samples, to samples"""
        if isinstance(input_time,int):
            # Input time is already in samples
            return input_time
        elif isinstance(input_time,datetime.datetime):
            # Input time is a datetime object
            return self.fs * (input_time.minute * 60 + input_time.second)
        else:
            raise ValueError("Invalid input 'tstart' parameter")


    def _to_datetime(self,input_time):
        """Return the passed time as a datetime object"""
        if isinstance(input_time,datetime.datetime):
            converted_time = input_time
        elif isinstance(input_time,str):
            converted_time = self._parse_date_str(input_time)
        else:
            raise TypeError("A datetime object or string date/time were "
                    "expected")
        return converted_time


    def _file_iterator(self,filelist):
        """Generator for iterating over file names."""
        for fname in filelist:
            yield fname

看看这个:https://dev59.com/aGgu5IYBdhLWcg3wDS0G。基于mmap的解决方案是我在类似设置中使用的。它可能需要将数据转储到某种大型二进制文件中,而不是HDF5,并切换到C/C++来处理所有这些任务,例如窗口过滤等。此外,还要检查SSD存储器是否适用于此项目,它可以显著提高性能。 - lowtech
@lowtech,感谢您的回复。我查看了链接,但问题和答案似乎只涉及读取大型文本文件的问题。我没有这个问题。理想情况下,我想避免使用一个大的二进制文件。我担心出现损坏、可能的多TB文件操作系统问题以及设计文件格式。此外,使用HDF5文件,如果需要,我可以轻松包含多个通道或传感器元数据。我现在要看一下mmap——我不熟悉这个命令——看看它是否提供了解决方案。 - Sean
如果您没有注意到的话:read_hdf有chunk_size参数(以及read_csv):http://pandas.pydata.org/pandas-docs/dev/generated/pandas.read_hdf.html#pandas.read_hdf。 - lowtech
1个回答

0

@Sean 这是我的意见

看看我以前创建的这个问题here。这本质上就是你正在尝试做的事情。这有点不容易。

如果不知道更多细节,我会提供一些建议:

  • HDFStore可以读取标准的CArray格式,请参见here

  • 您可以轻松地创建类似“Series”的对象,具有以下良好属性:a)知道每个文件在哪里及其范围,并使用__getitem__来“选择”这些文件,例如s[time1:time2]。从高层次的角度来看,这可能是一个非常好的抽象,然后您可以发送操作。

例如。

class OutOfCoreSeries(object):

     def __init__(self, dir):
            .... load a list of the files in the dir where you have them ...

     def __getitem__(self, key):
            .... map the selection key (say its a slice, which 'time1:time2' resolves) ...
            .... to the files that make it up .... , then return a new Series that only
            .... those file pointers ....

     def apply(self, func, **kwargs):
            """ apply a function to the files """
            results = []
            for f in self.files:
                     results.append(func(self.read_file(f)))
            return Results(results)

这可能会变得非常复杂。例如,如果您应用的操作可以在内存中完成缩减,则结果可以简单地是pandas.Series(或Frame)。但是, 您可能正在执行需要编写一组新转换数据文件的转换。如果是这样,那么您必须处理它。

还有几个建议:

  • 您可能希望以可能有用的多种方式保存数据。例如,您说在1小时切片中保存了多个值。也许您可以将这些1小时文件拆分为每个要保存的变量文件,但保存一个更长的切片,然后使其变为可读取内存。

  • 您可能希望将数据重新采样到较低的频率,并在这些频率上工作,根据需要加载特定切片的数据进行更详细的工作。

  • 您可能希望创建一个可以随时间查询的数据集,例如,以不同频率的高低峰为例,例如使用表格式,请参见此处

因此,您可能有相同数据的多个变体。磁盘空间通常比主内存便宜/易于管理。充分利用这一点是非常明智的。

感谢您的见解。我看了你的ENH模块。我建立了一个迭代器来遍历文件,但这允许在任一端进行数据填充。通过重叠数据块,我可以应用一个过滤器,然后弃掉结尾处的填充,从而保持连续性。 - Sean
很棒。如果您想在该链接上发布您的见解/代码,那将是非常好的。 - Jeff

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接