在StatefulService中嵌套事务以保存已中止事务的异步状态

3
我有一个ReliableQueue<MyTask>,在不同的范围内进行入队操作,然后在事务中出队任务,然后想在每个任务上运行一些长时间运行的计算。

问题在于,如果我的队列事务被中止,我不想丢失长计算的实例。它将在后台独立运行,而我只想在重试处理任务时检查其是否已完成。

代码段:

public void protected override async Task RunAsync(CancellationToken cancellationToken)
{
    var queue = await StateManager.GetOrAddAsync<IReliableQueue<MyTask>>(...);
    while(!cancellationToken.IsCancellationRequested)
    {
        using (var transaction = ...)
        {
            var myTaskConditional = await queue.TryDequeueAsync(transaction);
            if (!myTaskConditional.HasValue)
            {
                break;
            }
            await DoLongProcessing(myTaskConditional)
            await transaction.CommitAsync();
        }
    }
}

private async void DoLongProcessing(MyTask myTask) {
    var dict = await StateManager.GetOrAddAsync<IReliableDictionary<Guid,Guid>>(...);
    Conditional<Guid> guidConditional;
    using (var transaction = ...)
    {
        guidConditional = await dict.TryGetValueAsync(myTask.TaskGuid);
        if (guidConditional.HasValue) {
            await transaction.CommitAsync();
            // continue handling knowing we already started, continue to wait for 
            await WaitForClaulcationFinish(guidConditional.Value);
        }
        else {
            // start handling knowing we never handled this task, create new guid and store it in dict
            var runGuid = await StartRunningCalculation(runGuid);
            await dict.AddAsync(myTask.TaskGuid, runGuid);
            await transaction.CommitAsync();
            await WaitForClaulcationFinish(runGuid);
        }
    }
}

我的担忧:我正在使用嵌套事务,这不被推荐。

如果我仅针对ReliableQueueReliableDictionary单独使用事务,这里是否存在死锁的风险?

是否有更好的设计方案来实现我的目标?


StartRunningCalculation 是做什么的? - Francesco Bonizzi
@FrancescoB。正在运行长时间计算的Python脚本。 - Mugen
1个回答

1

在事务中不应进行任何长时间运行的操作。请查看我发布的优先队列服务。在进行工作时,从队列中取出项目并将其放入集合中,然后完成工作后,将其放回队列或完成工作。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接