我的主要关注点是消息的保留和释放。如果接收块决定不处理消息会发生什么?我的理解是这会导致内存泄漏,因为消息将无限期地保留。我在考虑应该如何标记未使用的消息,但我不确定如何操作。我在考虑一些人工消息汇(没有动作的ActionBlock),或者我可以将消息标记为已丢弃吗? 对于该实现的进一步见解也将受到赞赏。 这可能几乎是以下问题的重复,但我希望使用自己的类而不是创建块的方法。那样是否被认为是不好的风格?
TPL Dataflow中保证传递的BroadcastBlock
/// <summary>
/// Broadcasts the same message to multiple consumers. This does NOT clone the message, all consumers receive an identical message
/// </summary>
/// <typeparam name="T"></typeparam>
public class BrodcastCopyBlock<T> : IPropagatorBlock<T, T>
{
private ITargetBlock<T> In { get; }
/// <summary>
/// Holds a TransformBlock for each target, that subscribed to this block
/// </summary>
private readonly IDictionary<ITargetBlock<T>, TransformBlock<T, T>> _OutBlocks = new Dictionary<ITargetBlock<T>, TransformBlock<T, T>>();
public BrodcastCopyBlock()
{
In = new ActionBlock<T>(message => Process(message));
In.Completion.ContinueWith(task =>
{
if (task.Exception == null)
Complete();
else
Fault(task.Exception);
}
);
}
/// <summary>
/// Creates a transform source block for the passed target.
/// </summary>
/// <param name="target"></param>
private void CreateOutBlock(ITargetBlock<T> target)
{
if (_OutBlocks.ContainsKey(target))
return;
var outBlock = new TransformBlock<T, T>(e => e);
_OutBlocks[target] = outBlock;
}
private void Process(T message)
{
foreach (var outBlock in _OutBlocks.Values)
{
outBlock.Post(message);
}
}
/// <inheritdoc />
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
{
return In.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
}
/// <inheritdoc />
public void Complete()
{
foreach (var outBlock in _OutBlocks.Values)
{
((ISourceBlock<T>)outBlock).Complete();
}
}
/// <inheritdoc />
public void Fault(Exception exception)
{
foreach (var outBlock in _OutBlocks.Values)
{
((ISourceBlock<T>)outBlock).Fault(exception);
}
}
/// <inheritdoc />
public Task Completion => Task.WhenAll(_OutBlocks.Select(b => b.Value.Completion));
/// <inheritdoc />
public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
{
CreateOutBlock(target);
return _OutBlocks[target].LinkTo(target, linkOptions);
}
/// <inheritdoc />
public T ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out bool messageConsumed)
{
return ((ISourceBlock<T>)_OutBlocks[target]).ConsumeMessage(messageHeader, target, out messageConsumed);
}
/// <inheritdoc />
public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
return ((ISourceBlock<T>)_OutBlocks[target]).ReserveMessage(messageHeader, target);
}
/// <inheritdoc />
public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
((ISourceBlock<T>)_OutBlocks[target]).ReleaseReservation(messageHeader, target);
}
}
BroadcastBlock
有什么不同? - svickBroadcastBlock
对我来说实际上仍然不起作用,因为我需要消息的可靠传递,而根据我所知,BroadcastBlock
并不能保证这一点。我将重新表述问题。 - crazy_crank