TPL Dataflow 实现 IO 读写操作时的内存问题

5
我尝试使用文件IO操作实现读写操作,并将这些操作封装到TransformBlock中,以使这些操作线程安全,而不是使用锁定机制。
但问题在于,即使我尝试并行写入5个文件,仍会出现内存不足异常,并且使用此实现会阻塞UI线程。该实现在Windows Phone项目中完成。请建议此实现有何问题。
文件IO操作
public static readonly IsolatedStorageFile _isolatedStore = IsolatedStorageFile.GetUserStoreForApplication();
public static readonly FileIO _file = new FileIO();
public static readonly ConcurrentExclusiveSchedulerPair taskSchedulerPair = new ConcurrentExclusiveSchedulerPair();
public static readonly ExecutionDataflowBlockOptions exclusiveExecutionDataFlow 
    = new ExecutionDataflowBlockOptions
{
    TaskScheduler = taskSchedulerPair.ExclusiveScheduler,
    BoundedCapacity = 1
};

public static readonly ExecutionDataflowBlockOptions concurrentExecutionDataFlow 
    = new ExecutionDataflowBlockOptions
{
    TaskScheduler = taskSchedulerPair.ConcurrentScheduler,
    BoundedCapacity = 1
};

public static async Task<T> LoadAsync<T>(string fileName)
{
    T result = default(T);

    var transBlock = new TransformBlock<string, T>
       (async fName =>
       {
           return await LoadData<T>(fName);
       }, concurrentExecutionDataFlow);

    transBlock.Post(fileName);

    result = await transBlock.ReceiveAsync();

    return result;
}

public static async Task SaveAsync<T>(T obj, string fileName)
{
    var transBlock = new TransformBlock<Tuple<T, string>, Task>
       (async tupleData =>
       {
          await SaveData(tupleData.Item1, tupleData.Item2);
       }, exclusiveExecutionDataFlow);

    transBlock.Post(new Tuple<T, string>(obj, fileName));

    await transBlock.ReceiveAsync();
}

MainPage.xaml.cs 使用方法

private static string data = "vjdsskjfhkjsdhvnvndjfhjvkhdfjkgd"
private static string fileName = string.Empty;
private List<string> DataLstSample = new List<string>();
private ObservableCollection<string> TestResults = new ObservableCollection<string>();
private static string data1 = "hjhkjhkhkjhjkhkhkjhkjhkhjkhjkh";
List<Task> allTsk = new List<Task>();
private Random rand = new Random();
private string  fileNameRand
{
    get
    {
        return rand.Next(100).ToString();
    }
}

public MainPage()
{
    InitializeComponent();

    for (int i = 0; i < 5; i ++)
    {
        DataLstSample.Add((i % 2) == 0 ? data : data1);
    }

}

private void Button_Click(object sender, RoutedEventArgs e)
{
    AppIsolatedStore_TestInMultiThread_LstResultShouldBeEqual();
}

public async void AppIsolatedStore_TestInMultiThread_LstResultShouldBeEqual()
{
    TstRst.Text = "InProgress..";
    allTsk.Clear();

    foreach(var data in DataLstSample)
    {
        var fName = fileNameRand;

        var t = Task.Run(async () =>
        {
            await AppIsolatedStore.SaveAsync<string>(data, fName);
        });

        TestResults.Add(string.Format("Writing file name: {0}, data: {1}", fName, data));
        allTsk.Add(t);
    }

    await Task.WhenAll(allTsk);

    TstRst.Text = "Completed..";
}

异步保存和加载数据

        /// <summary>
        /// Load object from file
        /// </summary>
        private static async Task<T> LoadData<T>(string fileName)
        {

            T result = default(T);

            try
            {
                if (!string.IsNullOrWhiteSpace(fileName))
                {
                    using (var file = new IsolatedStorageFileStream(fileName, FileMode.OpenOrCreate, _isolatedStore))
                    {
                        var data = await _file.ReadTextAsync(file);

                        if (!string.IsNullOrWhiteSpace(data))
                        {
                            result = JsonConvert.DeserializeObject<T>(data);
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                //todo: log the megatron exception in a file
                Debug.WriteLine("AppIsolatedStore: LoadAsync : An error occured while loading data : {0}", ex.Message);
            }
            finally
            {

            }

            return result;
        }


        /// <summary>
        /// Save object from file
        /// </summary>
        private static async Task SaveData<T>(T obj, string fileName)
        {
            try
            {
                if (obj != null && !string.IsNullOrWhiteSpace(fileName))
                {
                    //Serialize object with JSON or XML serializer
                    string storageString = JsonConvert.SerializeObject(obj);

                    if (!string.IsNullOrWhiteSpace(storageString))
                    {
                        //Write content to file
                        await _file.WriteTextAsync(new IsolatedStorageFileStream(fileName, FileMode.Create, _isolatedStore), storageString);
                    }
                }
            }
            catch (Exception ex)
            {
                //todo: log the megatron exception in a file
                Debug.WriteLine("AppIsolatedStore: SaveAsync : An error occured while saving the data : {0}", ex.Message);
            }
            finally
            {
            }
        }

编辑:

它出现内存异常的原因是数据字符串太大了。该字符串链接为:http://1drv.ms/1QWSAsc

但第二个问题是,即使我添加小数据,也会阻塞UI线程。代码是否在UI线程上执行任务?

2个回答

1
不,你使用并发对来使用默认的线程池进行任务处理,然后用Run方法实例化任务,所以问题不在这里。但是你这里的代码有两个主要威胁:
var transBlock = new TransformBlock<string, T>
   (async fName =>
   {
       // process file here
   }, concurrentExecutionDataFlow);

你真的不应该每次都创建transBlockTPL Dataflow 的主要思想是只创建一次块,然后在之后使用它们。 因此,您应该重构应用程序以降低实例化块的数量,否则就不应该使用TPL Dataflow
你代码中的另一个问题是你明确地阻塞了线程!
// Right here
await Task.WhenAll(allTsk);
TstRst.Text = "Completed..";

在同步事件处理程序中从异步无返回值方法调用任务的await会阻塞线程,因为默认情况下它会捕获同步上下文。首先,应该避免使用{{link2:async void}}。其次,如果您使用了异步,则应该一路使用异步, 因此事件处理程序也应该是异步的。第三,您可以使用继续执行您的任务以更新UI或使用当前同步上下文

因此,您的代码应该像这样:

// store the sync context in the field of your form
SynchronizationContext syncContext = SynchronizationContext.Current;

// avoid the async void :)
public async Task AppIsolatedStore_TestInMultiThread_LstResultShouldBeEqual()

// make event handler async - this is the only exception for the async void use rule from above
private async void Button_Click(object sender, RoutedEventArgs e)

// asynchronically wait the result without capturing the context
await Task.WhenAll(allTsk).ContinueWith(
  t => {
    // you can move out this logic to main method
    syncContext.Post(new SendOrPostCallback(o =>
        {
            TstRst.Text = "Completed..";
        }));
  }
);

我正在探索一种不使用锁机制就能使IO操作线程安全的方法,以避免锁定的副作用。正如你所说,我应该减少块创建的数量或使用不同的方法。你能否建议一种更好的方式或者我可以探索更多的新方法。 - Balraj Singh
@BalrajSingh TPL Dataflow 仍然在内部使用块。手动的 lock 语句更易读,也更高效。 - VMAtm
1
在最后一个示例中,ConfigureAwait 是错误的 - 这不会编译。此外,不应使用 ContinueWith。如果操作员正在使用 TPL Dataflow,则更符合惯用法的解决方案是使用具有 UI TaskScheduler 的最终 ActionBlock - Stephen Cleary
@StephenCleary 谢谢您的添加!我忘记了大括号。 - VMAtm

0

你试过在ExecutionDataflowBlockOptions的BoundedCapacity参数上玩耍吗? TPL入门提到了关于块容量的内容:

[...]限制在数据流网络中非常有用,以避免无限增长的内存。 如果存在生成者生成数据比消费者处理数据快得多的可能性,则出于可靠性原因,这可能非常重要...

我建议尝试使用此选项来限制已处理项目的排队,并查看它是否有助于解决您的内存问题。


我已将BoundingCapacity设置为1,仍然存在问题。 - Balraj Singh

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接