平滑不规则采样的时间数据。

6

给定一张表格,其中第一列是相对于某个参考点过去的秒数,第二列是任意测量值:

6   0.738158581
21  0.801697222
39  1.797224596
49  2.77920469
54  2.839757536
79  3.832232283
91  4.676794376
97  5.18244704
100 5.521878863
118 6.316630137
131 6.778507504
147 7.020395216
157 7.331607129
176 7.637492223
202 7.848079136
223 7.989456499
251 8.76853608
278 9.092367123 
    ...

正如您所看到的,这些测量值是在不规则的时间点进行采样的。我需要通过对每个测量值之前100秒内的观测数据进行平均来平滑数据(使用Python)。由于数据表很大,因此最好使用基于迭代器的方法。

不幸的是,我花了两个小时的时间编写代码,但无法找到有效且优雅的解决方案。

有谁可以帮帮我吗?

编辑:

  1. 我想要每个原始测量值的一个平滑读数,平滑读数应为前100(Delta)秒内原始读数和任何其他读数的算术平均值。(John,你是对的)

  2. 巨大-1e6-10e6行+需要使用紧凑型RAM

  3. 数据大约是随机行走的

  4. 数据已排序

解决方法

我测试了J Machin和yairchu提出的解决方案。它们都给出了相同的结果,但在我的数据集上,J Machin的版本效率呈指数级增长,而yairchu的版本效率呈线性增长。以下是通过IPython's %timeit测量的执行时间(以微秒为单位):

data size   J Machin    yairchu
10        90.2        55.6
50          930         258
100         3080        514
500         64700       2660
1000        253000      5390
2000        952000      11500

感谢大家的帮助。

1
你的数据量是否太大,无法在numpy数组中处理?你有多少个项目? - David Cournapeau
这是用线性插值来找到100的倍数点吗? - S.Lott
如果您有平滑数据的要求,请详细说明一下。我尝试了几次,但是无法解析您的描述:“我需要通过对每个测量之前100秒的读数进行平均来平滑数据”。 - rix0rrr
请发布更多有关您基准测试的信息。据我所知,如果时间值不是按升序排列的,或者时间值非常小并且窗口包含到目前为止的所有或大部分读数,则会出现比指数更加二次的行为!我的基准测试以已发布的代码为约60%Y速度的线性结果,如果放弃使用sum()而选择对总数和计数进行增量调整,则速度将翻倍至120%Y速度。注意:答案精确到14个有效数字。 - John Machin
继续:我使用max(0.1, random.normalvariate(mu=16.0, sigma=7.81))生成随机正时间间隔;0.1是为了避免负数,而16.0000000 :-)和7.81则来自您的样本。起初我得到了二次行为,直到我注意到我输入了min而不是max!顺便说一下,像你说的随机漫步,我使用new_reading = old_reading + random.random()。 - John Machin
8个回答

3
我正在使用一个总和结果,将新成员加上并减去旧成员。但是这种方式可能会遭受浮点不准确性的积累。
因此,我使用了一个“Deque”列表实现。每当我的Deque重新分配到较小的大小时,我同时重新计算总和。
我还计算到包括点x在内的平均值,以便至少有一个样本点进行平均。
def getAvgValues(data, avgSampleTime):
  lastTime = 0
  prevValsBuf = []
  prevValsStart = 0
  tot = 0
  for t, v in data:
    avgStart = t - avgSampleTime
    # remove too old values
    while prevValsStart < len(prevValsBuf):
      pt, pv = prevValsBuf[prevValsStart]
      if pt > avgStart:
        break
      tot -= pv
      prevValsStart += 1
    # add new item
    tot += v
    prevValsBuf.append((t, v))
    # yield result
    numItems = len(prevValsBuf) - prevValsStart
    yield (t, tot / numItems)
    # clean prevVals if it's time
    if prevValsStart * 2 > len(prevValsBuf):
      prevValsBuf = prevValsBuf[prevValsStart:]
      prevValsStart = 0
      # recalculate tot for not accumulating float precision error
      tot = sum(v for (t, v) in prevValsBuf)

(1) 那是一个非常有趣的双端队列实现。 (2) 我怀疑 OP 并不太担心浮点舍入误差的累积;与平滑的严重变化相比,它们肯定只是小幅度的削减...但如果他真的担心,可以考虑使用 Kahan 加法器来维护运行总数。 - John Machin
请注意,相对于指数移动平均(https://dev59.com/YXNA5IYBdhLWcg3wSrua#1024008),这非常计算密集。除非您特别需要确保时间范围内的所有样本都同等贡献,并且较旧的样本不起作用,否则我会选择效率更高的EMA。 - cjs
@Curt Sampson:OP 特别要求这个。 - yairchu

2

你没有明确说明你希望在什么时候得到输出。我假设你想要每个原始读数的一个平滑读数,平滑读数是原始读数和前100(delta)秒内任何其他读数的算术平均值。

简短回答:使用collections.deque……它永远不会保存超过“delta”秒的读数。我设置的方式使您可以将deque视为列表,并轻松计算平均值或一些花哨的东西,给最近的读数更多的权重。

长回答:

>>> the_data = [tuple(map(float, x.split())) for x in """\
... 6       0.738158581
... 21      0.801697222
[snip]
... 251     8.76853608
... 278     9.092367123""".splitlines()]
>>> import collections
>>> delta = 100.0
>>> q = collections.deque()
>>> for t, v in the_data:
...     while q and q[0][0] <= t - delta:
...         # jettison outdated readings
...         _unused = q.popleft()
...     q.append((t, v))
...     count = len(q)
...     print t, sum(item[1] for item in q) / count, count
...
...
6.0 0.738158581 1
21.0 0.7699279015 2
39.0 1.112360133 3
49.0 1.52907127225 4
54.0 1.791208525 5
79.0 2.13137915133 6
91.0 2.49500989771 7
97.0 2.8309395405 8
100.0 3.12993279856 9
118.0 3.74976297144 9
131.0 4.41385300278 9
147.0 4.99420529389 9
157.0 5.8325615685 8
176.0 6.033109419 9
202.0 7.15545189083 6
223.0 7.4342562845 6
251.0 7.9150342134 5
278.0 8.4246097095 4
>>>

编辑

一站式购物:在这里获取你的时髦小玩意。以下是代码:

numerator = sum(item[1] * upsilon ** (t - item[0]) for item in q)
denominator = sum(upsilon ** (t - item[0]) for item in q)
gizmoid = numerator / denominator

upsilon应该略小于1.0(<=零是非法的,略高于零会进行少量平滑处理,等于一将得到算术平均值加上浪费的CPU时间,大于一将得到您目的的倒数)。


在这里,我觉得一个普通的列表会起作用,使用.pop(0)而不是.popleft()。collections.deque有什么优势? - Paul
2
Python列表的左侧弹出是O(N);deque的左侧弹出是O(1)。 - John Machin

0

这个使它线性:

def process_data(datafile):
    previous_n = 0
    previous_t = 0
    for line in datafile:
        t, number = line.strip().split()
        t = int(t)
        number = float(number)
        delta_n = number - previous_n
        delta_t = t - previous_t
        n_per_t = delta_n / delta_t
        for t0 in xrange(delta_t):
            yield previous_t + t0, previous_n + (n_per_t * t0)
        previous_n = n
        previous_t = t

f = open('datafile.dat')

for sample in process_data(f):
    print sample

(1) .strip() 是多余的。 (2) 您似乎忘记每次更新 previous_*。 (3) 即使如此,这是什么意思也不明显...它似乎会在一秒间隔内对先前读数和当前读数进行线性插值 - 这是对 OP 要求的有趣解释。 (4) 我认为您的意思是 for t0 in xrange(1, delta_t + 1) - John Machin

0

如果您可以多次迭代输入,则可以使用一个迭代器用于“左”侧,另一个迭代器用于“右”侧,以实现O(1)的内存效率。

def getAvgValues(makeIter, avgSampleTime):
  leftIter = makeIter()
  leftT, leftV = leftIter.next()
  tot = 0
  count = 0
  for rightT, rightV in makeIter():
    tot += rightV
    count += 1
    while leftT <= rightT - avgSampleTime:
      tot -= leftV
      count -= 1
      leftT, leftV = leftIter.next()
    yield rightT, tot / count

1
我猜楼主想要实时显示平滑后的数值...就像重症监护室里的心跳监测器一样。 - John Machin

0

你的数据似乎大致呈线性:

你的数据图表 http://rix0r.nl/~rix0r/share/shot-20090621.144851.gif

你需要什么样的平滑处理?对这个数据集进行最小二乘法拟合直线?一些低通滤波器?还是其他什么东西?

请告诉我们应用场景,以便我们能更好地为您提供建议。

编辑:例如,根据应用程序的不同,可能在第一个和最后一个点之间插值一条直线就足够了。


0

虽然它提供的是指数衰减平均值,而不是总平均值,但我认为您可能需要我所谓的具有可变alpha的指数移动平均值,这实际上是一个单极低通滤波器。现在有一个解决方案,它的运行时间与数据点的数量成线性关系。看看它是否适合您。


-1

这样的话怎么样,持续存储数值直到与上次时间差大于100,对这些数值进行平均并返回

def getAvgValues(data):
    lastTime = 0
    prevValues = []
    avgSampleTime=100

    for t, v in data:
        if t - lastTime < avgSampleTime:
            prevValues.append(v)
        else:
            avgV = sum(prevValues)/len(prevValues)
            lastTime = t
            prevValues = [v]
            yield (t,avgV)

for v in getAvgValues(data):
    print v

他要求所有原始测量时间的前100秒的平均值。你只需给出他的示例的2个结果。 - yairchu
我误解了它,不过看起来你修改了它以得到正确的解决方案。 - Anurag Uniyal
我并没有真正修改它。我只是使用了你的变量名。 - yairchu

-2

听起来你需要一个简单的四舍五入公式。将任何数字舍入到任意间隔:

round(num/interval)*interval

你可以用 floor 或 ceiling 替换 round,以实现“向上取整”或“向下取整”的效果。它可以在任何语言中使用,包括 SQL。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接