F#不可变数据结构用于高频实时流数据

11
我们正在进行一个涉及流数据实时和历史分析的f#项目。数据包含在c#对象中(见下文),并作为标准的.net事件的一部分发送。实时情况下,我们通常接收的事件数量可能会大不相同,从小于每秒1个到每秒每个仪器的800个以上,并且可能会非常突发。一般而言,一个典型的日子可能会积累每个仪器5百万行/元素。
C#事件数据结构的通用版本如下:
public enum MyType { type0 = 0, type1 = 1}

public class dataObj
{
    public int myInt= 0;
    public double myDouble;
    public string myString;
    public DateTime myDataTime;
    public MyType type;
    public object myObj = null;

}

我们计划以两种方式在F#中使用此数据结构:

  1. 使用监督和无监督机器学习(CRFs、聚类模型等)进行历史分析
  2. 使用上述模型对数据流进行实时分类

数据结构需要能够随着我们添加更多事件而增长。这就排除了array<t>,因为它不允许调整大小,尽管它可用于历史分析。该数据结构还需要能够快速访问最近的数据,并且最好能够跳转到x个点的数据。这排除了Lists<T>,因为它具有线性查找时间,并且没有对元素的随机访问,只有“仅向前”遍历。

根据此帖子Set<T>可能是一个不错的选择...

>“……Vanilla Set<'a>可以做得非常好。我更喜欢'Set'而不是'List',因为你总是可以O(lg n)访问最大和最小的项目,使您可以按插入日期/时间对集合进行排序,以便有效地访问最新和最旧的项目...”

编辑:Yin Zhu的回应给我带来了额外的清晰度,准确地说明了我的要求。我已经编辑了剩余的帖子以反映这一点。此外,先前版本的问题在历史分析要求的介绍中变得混乱。我省略了它们。

以下是实时过程的步骤说明:

  1. 接收到实时事件
  2. 将该事件放入数据结构中。我们正在确定的是这个数据结构。它应该是一个Set<T> 还是其他一些结构?
  3. 从元素的子集中提取或以某种方式进行迭代,以生成特征。这可以是数据结构的最后n行/元素(即最后1000个事件或10000个事件),也可以是最后x秒/分钟内的所有元素(即过去10分钟内的所有事件)。理想情况下,我们希望使用一种允许我们高效执行此操作的结构。特别是一种允许随机访问第n个元素而无需遍历所有其他元素的数据结构是有价值的。
  4. 生成模型的特征并将其发送到模型进行评估。
  5. 我们可以修剪旧数据的数据结构以提高性能。

因此,问题是用于存储我们将用于生成特征的实时流事件的最佳数据结构是什么。


刚刚发现了这篇关于使用邮箱处理器作为高频传感器数据的线程安全消息处理队列的帖子。有点相关。https://dev59.com/h0fSa4cB1Zd3GeqPAMtY#928892 - Andre P.
跳表值得考虑,因为它具有O(log n)的随机访问、搜索时间、累积统计等特点。这里有一个F#实现:http://ffogd.blogspot.co.uk/2010/10/f-skip-list.html - 79E09796
1
有点跑题,但如果你在项目的开头阶段,你可能想看一下 Rx。你所描述的环境具有异步传入的数据流。Rx 允许你组合这些流并直接对它们进行操作,还可以用 Linq、滑动窗口等方式。因此你甚至可以在将其提交到某个数据结构之前,在“实时”流上执行相当复杂的实时分析。这里是一个很好的起点。 - gjvdkamp
4个回答

11

你应该考虑使用 FSharpx.Collections.Vector。 Vector<T>会提供类似数组的功能,包括 O(log32(n)) 索引查找和更新,接近O(1),以及向序列末尾添加新元素。还有另一种可从 F# 使用的 Vector 实现Solid Vector。非常好文档化,某些函数在大规模情况下(元素数 > 10K)的性能高达4倍。这两种实现在多达 1M 个元素的情况下表现非常出色。


谢谢Jack!这似乎正是我正在寻找的。 - Andre P.
1
正如Rich Hickey所说:“你会非常惊讶性能有多好。” - David Grenier

10
在他的回答中,Jack Fox建议使用FSharpx.Collections Vector<'T>或Greg Rosenbaum的Solid Vector<'t>https://github.com/GregRos/Solid)。我想通过提供关于如何使用它们的说明来回馈社区。

使用FSharpx.Collections.Vector<'T>

该过程非常简单:

  1. 使用项目管理器控制台或解决方案的NuGet包管理器下载FSharpx.Core NuGet包。两者都可以在Visual Studio -> 工具 -> 库管理器中找到。
  2. 如果您在F#脚本文件中使用它,请添加#r "FSharpx.Core.dll"。您可能需要使用完整路径。

用法:

open FSharpx.Collections

let ListOfTuples = [(1,true,3.0);(2,false,1.5)] 
let vector = ListOfTuples |> Vector.ofSeq

printfn "Last %A" vector.Last
printfn "Unconj %A" vector.Unconj
printfn "Item(0) %A" (vector.[0])
printfn "Item(1) %A" (vector.[1])
printfn "TryInitial %A" dataAsVector.TryInitial
printfn "TryUnconj %A" dataAsVector.Last

使用 Solid.Vector<'T>

准备使用 Solid Vector<'t> 有点复杂。但是,Solid 版本具有更多方便的功能,并且正如 Jack 指出的那样,具有许多性能优势。它还有很多有用的文档。

  1. 您需要从 https://github.com/GregRos/Solid 下载 Visual Studio 解决方案。
  2. 下载完成后,您需要构建它,因为没有预先构建的 dll。
  3. 如果您像我一样,可能会遇到许多缺少的依赖关系,这些依赖关系会阻止解决方案被构建。在我的情况下,它们都与 nuit 测试框架相关(我使用了不同的框架)。只需下载/添加每个依赖项,直到解决方案构建即可。
  4. 完成此操作并构建解决方案后,您将在 Solid/Solid/bin 文件夹中拥有一个全新的 Solid.dll。这就是我犯的错误。那是核心 dll,仅适用于 C# 使用。如果您只包含对 Solid.dll 的引用,则可以在 F# 中创建 vector<'T>,但从那时起将发生奇怪的事情。
  5. 要在 F# 中使用此数据结构,您需要引用 Solid.dllSolid.FSharp.dll,后者位于 \Solid\SolidFS\obj\Debug\ 文件夹中。 您只需要一个 open 语句 -> open Solid

这里是一些在 F# 脚本文件中使用的代码:

#r "Solid.dll"
#r "Solid.FSharp.dll" // don't forget this reference

open Solid

let ListOfTuples2 = [(1,true,3.0);(2,false,1.5)] 
let SolidVector = ListOfTuples2 |> Vector.ofSeq

printfn "%A" SolidVector.Last
printfn "%A" SolidVector.First
printfn "%A" (SolidVector.[0])
printfn "%A" (SolidVector.[1])
printfn "Count %A" SolidVector.Count

let test2 = vector { for i in {0 .. 100} -> i }

1
感谢添加好的文档!您也可以在FSharpx中执行此操作(类似于Solid):printfn“Item(0)%A”vector。[0];printfn“Item(0)%A”(Vector.nth 0 vector) - Jack Fox
1
请注意,F#对x.Item(i)有语法糖,可以写成x.[i] - J D
感谢Jack和Jon指出这一点。我已经编辑了代码,使其更加简洁。 - Andre P.

5
这个事件被放置在一个数据结构中。这就是我们想确定的数据结构。它应该是一个Set、一个Queue或其他一些结构吗?
没有更多信息很难说。
如果您的数据按升序时间戳传入(即它们永远不会错乱),那么您可以使用某种队列或可扩展数组。
如果您的数据可能无序到达并且需要重新排序,则需要使用优先级队列或索引集合。
每秒高达800个事件的子集
对于插入速率来说,这些要求非常温和。
元素的子集被提取或以某种方式迭代,以生成特征。这将是数据结构的最后n行/元素(即最后1000个事件或10000个事件)或最后x秒/分钟内的所有元素(即最近10分钟内的所有事件)中的所有元素。理想情况下,我们希望有一个允许我们有效执行此操作的结构。特别是,允许通过随机访问第n个元素而无需迭代所有其他元素的数据结构具有价值。
如果您只想要接近开头的元素,为什么要随机访问?您真的想要按索引进行随机访问,还是实际上想要按时间等其他键进行随机访问?
根据您所说的,我建议使用一个普通的F# Map,以索引为键,由MailboxProcessor维护,可以附加一个新事件并检索一个对象,该对象允许所有事件进行索引,即将Map包装在提供其自己的Item属性和IEnumerable<_>实现的对象中。在我的机器上,这个简单的解决方案需要50行代码,并且可以处理大约500,000个事件每秒。

2
你能发一下你的例子吗?我想看看,因为我不是完全确定我理解了你的建议。 - devshorts
@Jon 谢谢你认真思考的帖子。回答一些你的问题:数据有时可能会失序。每秒800个事件是每个仪器的数量。总负载将会更高,但肯定低于你提到的每秒500,000个事件。我们还不确定是否只需要在开始附近进行随机访问,但目前我们想保留远程查看的选项。我们需要通过索引和时间进行随机访问。感谢你关于“MailboxProcessor”的建议。在其他类似的帖子中已经多次提到过。我们一定会研究一下。 - Andre P.
1
@AndreP。如果您想通过索引和时间进行随机访问,则可以实现自定义平衡树。这样做的另一个优点是,您可以在分支本身中存储子树的统计信息。您还可以在O(log n)中提取范围,并在O(n)而不是O(n log n)中对它们进行迭代。 - J D

5
假设你的dataObj包含唯一的ID字段,那么任何设置数据结构都可以完成你的工作。不可变数据结构主要用于函数式代码或持久性。如果你不需要这两个,可以使用.Net集合库中的HashSet<T>SortedSet<T>
一些特定于流的优化可能会有用,例如在流中保留固定大小的Queue<T>,并在更重的集合中存储旧对象。在切换到这种混合数据结构解决方案之前,建议进行基准测试。
编辑:
仔细阅读您的要求后,我发现您想要一个具有用户可访问的索引或反向枚举器的队列。在这种数据结构下,您的特征提取操作(如平均值/总和等)成本为O(n)。如果您想以O(log n)执行其中的某些操作,则可以使用更高级的数据结构,例如区间树或跳表。但是,您将不得不自己实现这些数据结构,因为您需要在树节点中存储元信息,这些节点在集合API之后。

谢谢这个。在我们当前的生产环境中,我们的事件数据结构具有微秒保真度的时间戳,并且我们强制确保在事件接收时瞬间唯一性,以确保没有两个时间戳相同。因此,我们可以将其用作唯一标识符。它还允许基于时间进行查找。现在正在研究 Queue<T>。 - Andre P.
@AndreP.,Queue<T> 不允许显式索引或反向枚举。因此,您必须使用另一种实现方式或自己实现。顺便说一下,队列非常容易实现。 - Yin Zhu

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接