用响应式扩展包装一个文件监视器

8

我一直在研究将文件监视器包装成一个可观察对象,以帮助处理事件,但我在如何获得所需的行为方面遇到了一些麻烦。文件监视器监视一个目录,将文件放入其中。当文件首次放入该目录时,文件监视器会触发Created事件。但是,如果文件很大或网络连接很慢,则会触发一系列Changed事件,因为文件正在更新。在文件完成写入之前,我不想处理该文件,因此我真正需要的是这个时间表:

|Created    |Changed   |Changed   |Changed                      
________________________________________________ 
^Write starts       ^Write finishes       ^Processing Starts    

我研究了Rx中几种过滤事件的方法,但我没有得到我需要的东西,即“在文件X秒未更改后执行函数一次”。节流(throttle)不好用,因为它会丢失中间的事件。缓冲(buffer)也不好用,因为事件可能发生在缓冲边界上。
我曾经考虑使用超时(timeout),但我不太喜欢它们会抛出异常,而且我希望处理可以在文件被写入时开始,而不是等到完全没有事件了再开始。
有一个类似的问题在Reactive Extensions vs FileSystemWatcher上,但从未真正解决。
是否有一种简单的方法可以做到这一点?我相信这不是一个罕见的用例。
4个回答

3

ObservableFileSystemWatcher - 一个对 FileSystemWatcher 类型进行观察的包装器 - 运行良好。添加名为 ReactiveFileSystemWatcher 的 NuGet 包并创建控制台应用程序进行测试,如下所示:

class Program
{
  static void Main(string[] args)
  {
    using (var watcher = new ObservableFileSystemWatcher(c => { c.Path = @"C:\FolderToWatch\"; c.IncludeSubdirectories = true; }))
    {
      watcher.Created.Select(x => $"{x.Name} was {x.ChangeType}").Subscribe(Console.WriteLine);
      watcher.Changed.Select(x => $"{x.Name} was {x.ChangeType}").Subscribe(Console.WriteLine);
      watcher.Renamed.Select(x => $"{x.OldName} was {x.ChangeType} to {x.Name}").Subscribe(Console.WriteLine);
      watcher.Deleted.Select(x => $"{x.Name} was {x.ChangeType}").Subscribe(Console.WriteLine);
      watcher.Errors.Subscribe(Console.WriteLine);
      watcher.Start();
      Console.ReadLine();
    }
  }
}

2

编辑:经过审核,我认为你不需要这个...

也许我有点过于简单化了,但是在这里使用Throttle不是最理想的吗?

这并不是“简单”的,但我认为它比我之前的想法更接近你想要的:

(额外加赠:测试用例! ;) )

void Main()
{
    var pathToWatch = @"c:\temp\";
    var fsw = new FileSystemWatcher(pathToWatch);

    // set up observables for create and changed
    var changedObs = 
       Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
          dlgt => fsw.Changed += dlgt, 
          dlgt => fsw.Changed -= dlgt);
    var createdObs = 
       Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>( 
          dlgt => fsw.Created += dlgt, 
          dlgt => fsw.Created -= dlgt);

    // the longest we'll wait between last file write and calling it "changed"
    var maximumTimeBetweenWrites = TimeSpan.FromSeconds(1);

    // A "pulse" ticking off every 10ms (adjust this as desired)
    var timer = Observable
        .Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(10))
        .Select(i => DateTime.Now);

    var watcher = 
        from creation in createdObs
        from change in changedObs
            // we only care about changes matching a create
            .Where(changeEvt => changeEvt.EventArgs.Name == creation.EventArgs.Name)
            // take latest of (pulse, changes) and select (event, time since last file write)
            .CombineLatest(timer, (evt, now) => new {
                    Change = evt, 
                    DeltaFromLast = now.Subtract(new FileInfo(evt.EventArgs.FullPath).LastWriteTime)})
            // skip all until we trigger than "time before considered changed" threshold
            .SkipWhile(evt => evt.DeltaFromLast < maximumTimeBetweenWrites)
            // Then lock on that until we change a diff file
            .Distinct(evt => evt.Change.EventArgs.FullPath)
        select change.Change;

    var disp = new CompositeDisposable();

    // to show creates
    disp.Add(
        createdObs.Subscribe(
           evt => Console.WriteLine("New file:{0}", 
                evt.EventArgs.FullPath)));

    // to show "final changes"
    disp.Add(
        watcher.Subscribe(
           evt => Console.WriteLine("{0}:{1}:{2}", 
                 evt.EventArgs.Name, 
                 evt.EventArgs.ChangeType, 
                 evt.EventArgs.FullPath)));

    fsw.EnableRaisingEvents = true;

    var rnd = new Random();
    Enumerable.Range(0,10)
        .AsParallel()
        .ForAll(i => 
            {
                var filename = Path.Combine(pathToWatch, "foo" + i + ".txt");
                if(File.Exists(filename))
                    File.Delete(filename);

                foreach(var j in Enumerable.Range(0, 20))
                {
                    var writer = File.AppendText(filename);
                    writer.WriteLine(j);
                    writer.Close();
                    Thread.Sleep(rnd.Next(500));
                }
            });

    Console.WriteLine("Press enter to quit...");
    Console.ReadLine();
    disp.Dispose();        
}

我还没有对我的单元测试进行过测试,但我认为它不会起作用,因为更改事件并不总是被触发。顺序是您有一个单一的Created事件,然后是0个或多个Changed事件。我也不确定这是否解决了文件写入持续时间超过5秒的可能性。 - stimms
@stimms 实际上,我完全忘记了我回答过这个问题...看了一下,我认为这不是你想要的...让我稍微更新一下... - JerKimball
@stimms 好的,对事情有了新的看法 - "创建"应该让您在文件到来时开始处理文件,而"最终更改"应该表示文件已经"完成"。 - JerKimball
非常感谢您在这方面的努力。我认为我终于使用BufferWithInactivity得到了一些可行的东西,但仅仅通过阅读您的代码,我已经学到很多了。 - stimms
@stimms 不用担心,写这个其实有点有趣(好吧,也许不是有趣...但很有意思)。 :) - JerKimball

1

请查看我的答案中的BufferWithInactivity扩展方法。

我认为您可以使用它来查找更改事件中的不活动状态。


2
我使用了缓冲区与分组函数相结合的方法来实现我想要的功能。 - stimms

0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接