2 回答

TA貢獻1777條經驗 獲得超3個贊
這是另一種方法。此實現在概念上與 alexm 的實現相同,直到沒有頻道立即可用的項目為止。然后它的不同之處在于避免了Task.WhenAny
循環模式,而是為每個通道啟動一個異步循環。所有循環都在競相更新共享變量,該共享變量在臨界區更新,以防止從多個通道消耗元素。ValueTuple<T, int, bool>
consumed
/// <summary>
/// Takes an item asynchronously from any one of the specified channel readers.
/// </summary>
public static async Task<(T Item, int Index)> TakeFromAnyAsync<T>(
ChannelReader<T>[] channelReaders,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(channelReaders);
if (channelReaders.Length == 0) throw new ArgumentException(
$"The {nameof(channelReaders)} argument is a zero-length array.");
foreach (var cr in channelReaders) if (cr is null) throw new ArgumentException(
$"The {nameof(channelReaders)} argument contains at least one null element.");
cancellationToken.ThrowIfCancellationRequested();
// Fast path (at least one channel has an item available immediately)
for (int i = 0; i < channelReaders.Length; i++)
if (channelReaders[i].TryRead(out var item))
return (item, i);
// Slow path (all channels are currently empty)
using var linkedCts = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
(T Item, int Index, bool HasValue) consumed = default;
Task[] tasks = channelReaders.Select(async (channelReader, index) =>
{
while (true)
{
try
{
if (!await channelReader.WaitToReadAsync(linkedCts.Token)
.ConfigureAwait(false)) break;
}
// Only the exceptional cases below are normal.
catch (OperationCanceledException)
when (linkedCts.IsCancellationRequested) { break; }
catch when (channelReader.Completion.IsCompleted
&& !channelReader.Completion.IsCompletedSuccessfully) { break; }
// This channel has an item available now.
lock (linkedCts)
{
if (consumed.HasValue)
return; // An item has already been consumed from another channel.
if (!channelReader.TryRead(out var item))
continue; // We lost the race to consume the available item.
consumed = (item, index, true); // We consumed an item successfully.
}
linkedCts.Cancel(); // Cancel the other tasks.
return;
}
}).ToArray();
// The tasks should never fail. If a task ever fails, we have a bug.
try { foreach (var task in tasks) await task.ConfigureAwait(false); }
catch (Exception ex) { Debug.Fail("Unexpected error", ex.ToString()); throw; }
if (consumed.HasValue)
return (consumed.Item, consumed.Index);
cancellationToken.ThrowIfCancellationRequested();
Debug.Assert(channelReaders.All(cr => cr.Completion.IsCompleted));
throw new ChannelClosedException();
}
應該注意的是,這個解決方案,以及 alexm 的解決方案,都依賴于WaitToReadAsync
在一個元素被消耗時取消所有掛起的操作。不幸的是,這會觸發臭名昭著的內存泄漏問題,該問題會影響具有空閑生產者的 .NET 通道。當取消通道上的任何異步操作時,取消的操作將保留在內存中,附加到通道的內部結構,直到將元素寫入通道。此行為已被Microsoft 分類為設計使然,但不排除改進它的可能性。有趣的是,這種歧義使得這種效果不符合記錄條件. 因此,了解這一點的唯一途徑是偶然,要么從非官方渠道閱讀,要么陷入其中。

TA貢獻1921條經驗 獲得超9個贊
如果按照 Go 中使用通道的方式使用通道,問題就會容易得多:Channel(Readers) 作為輸入,Channel(Readers) 作為輸出。
IEnumerable<ChannelReader<T>> sources=....;
await foreach(var msg in sources.TakeFromAny(token))
{
....
}
要么
var merged=sources.TakeFromAny(token);
...
var msg=await merged.ReadAsync(token);
在這種情況下,來自所有通道閱讀器的輸入被復制到一個輸出通道。該方法的返回值是該頻道的ChannelReader。
CopyToAsync 助手
可以使用CopyToAsync函數將消息從輸入源復制到輸出通道:
static async Task CopyToAsync<T>(
this ChannelReader<T> input,
ChannelWriter<T> output,
CancellationToken token=default)
{
while (await input.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
//Early exit if cancellation is requested
while (!token.IsCancellationRequested && input.TryRead(out T? msg))
{
await output.WriteAsync(msg,token);
}
}
}
此代碼類似于ReadAllAsync,但如果請求取消則立即退出。ReadAllAsync即使要求取消,也將退還所有可用物品。使用的方法包括
WriteAsync如果通道關閉則不會拋出異常,這使得錯誤處理變得更加容易。
錯誤處理和面向鐵路的編程
WaitToReadAsync如果源出錯但該異常確實會拋出,并且該異常將傳播到調用方法并傳播到Task.WhenAll輸出通道。
這可能有點混亂,因為它會中斷整個管道。為避免這種情況,可以將錯誤吞沒或記錄在內部CopyToAsync。一個更好的選擇是使用面向鐵路的編程并將所有消息包裝在一個Result<TMsg,TError>類中,例如:
static async Task CopyToAsync<Result<T,Exception>>(
this ChannelReader<Result<T,Exception>> input,
ChannelWriter<Result<T,Exception>> output,
CancellationToken token=default)
{
try
{
while (await input.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
//Early exit if cancellation is requested
while (!token.IsCancellationRequested && input.TryRead(out T? msg))
{
var newMsg=Result.FromValue(msg);
await output.WriteAsync(newMsg,token);
}
}
}
catch(Exception exc)
{
output.TryWrite(Result<T>.FromError(exc));
}
}
TakeFromAsync
TakeFromAny(MergeAsync可能是更好的名字)可以是:
static ChannelReader<T> TakeFromAny(
this IEnumerable<ChannelReader<T> inputs,
CancellationToken token=default)
{
var outChannel=Channel.CreateBounded<T>(1);
var readers=inputs.Select(rd=>CopyToAsync(rd,outChannel,token));
_ = Task.WhenAll(readers)
.ContinueWith(t=>outChannel.TryComplete(t.Exception));
return outChannel;
}
使用 1 的有界容量可確保下游代碼的背壓行為不會改變。
添加源索引
這也可以調整為發出源的索引:
static async Task CopyToAsync<T>(
this ChannelReader<T> input,int index,
ChannelWriter<(T,int)> output,
CancellationToken token=default)
{
while (await input.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
while (!token.IsCancellationRequested && input.TryRead(out T? msg))
{
await output.WriteAsync((msg,index),token);
}
}
}
static ChannelReader<(T,int)> TakeFromAny(
this IEnumerable<ChannelReader<T> inputs,
CancellationToken token=default)
{
var outChannel=Channel.CreateBounded<(int,T)>(1);
var readers=inputs.Select((rd,idx)=>CopyToAsync(rd,idx,outChannel,token));
_ = Task.WhenAll(readers)
.ContinueWith(t=>outChannel.TryComplete(t.Exception));
return outChannel;
}
- 2 回答
- 0 關注
- 138 瀏覽
添加回答
舉報