亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何實現 Channels 的 BlockingCollection.TakeFromAny

如何實現 Channels 的 BlockingCollection.TakeFromAny

Go
不負相思意 2022-12-19 21:23:00
我正在嘗試實現一個異步方法,該方法采用ChannelReader<T>s 數組,并從任何具有可用項目的通道中獲取值。它是一種與具有以下簽名的方法具有相似功能的BlockingCollection<T>.TakeFromAny方法:public static int TakeFromAny(BlockingCollection<T>[] collections, out T item,    CancellationToken cancellationToken);此方法返回collections從中刪除項目的數組中的索引。async方法不能有參數out,所以我要實現的 API 是這樣的:public static Task<(T Item, int Index)> TakeFromAnyAsync<T>(    ChannelReader<T>[] channelReaders,    CancellationToken cancellationToken = default);該方法應該異步讀取一個項目,并返回消耗的項目以及數組TakeFromAnyAsync<T>中關聯通道的索引。channelReaders如果所有通道都已完成(成功或出錯),或者在 期間全部完成await,則該方法應異步拋出一個ChannelClosedException.我的問題是:如何實施該TakeFromAnyAsync<T>方法?實現看起來很棘手。很明顯,在任何情況下,該方法都不應從通道中消耗多個項目。此外,它不應遺留即發即棄的任務,或讓一次性資源未被處置。該方法通常會在循環中調用,因此它也應該相當高效。它的復雜度應該不比 O(n) 差,其中n通道的數量。要了解此方法的用處,您可以查看Go語言的select語句。從旅游:該select語句讓 goroutine 等待多個通信操作。Aselect阻塞直到它的一個 case 可以運行,然后它執行那個 case。如果多個準備就緒,它會隨機選擇一個。select {case msg1 := <-c1:    fmt.Println("received", msg1)case msg2 := <-c2:    fmt.Println("received", msg2)}在上面的示例中,要么從通道中獲取一個值c1并分配給變量msg1,要么從通道中獲取一個值c2并分配給變量msg2。Goselect語句不限于從通道讀取。它可以包括多種異構情況,如寫入有界通道、等待計時器等。復制 Goselect語句的全部功能超出了這個問題的范圍。
查看完整描述

2 回答

?
慕森王

TA貢獻1777條經驗 獲得超3個贊

這是另一種方法。此實現在概念上與 alexm 的實現相同,直到沒有頻道立即可用的項目為止。然后它的不同之處在于避免了Task.WhenAny循環模式,而是為每個通道啟動一個異步循環。所有循環都在競相更新共享變量,該共享變量在臨界區更新,以防止從多個通道消耗元素。ValueTuple<T, int, bool> consumed

/// <summary>

/// Takes an item asynchronously from any one of the specified channel readers.

/// </summary>

public static async Task<(T Item, int Index)> TakeFromAnyAsync<T>(

    ChannelReader<T>[] channelReaders,

    CancellationToken cancellationToken = default)

{

    ArgumentNullException.ThrowIfNull(channelReaders);

    if (channelReaders.Length == 0) throw new ArgumentException(

        $"The {nameof(channelReaders)} argument is a zero-length array.");

    foreach (var cr in channelReaders) if (cr is null) throw new ArgumentException(

        $"The {nameof(channelReaders)} argument contains at least one null element.");


    cancellationToken.ThrowIfCancellationRequested();


    // Fast path (at least one channel has an item available immediately)

    for (int i = 0; i < channelReaders.Length; i++)

        if (channelReaders[i].TryRead(out var item))

            return (item, i);


    // Slow path (all channels are currently empty)

    using var linkedCts = CancellationTokenSource

        .CreateLinkedTokenSource(cancellationToken);


    (T Item, int Index, bool HasValue) consumed = default;


    Task[] tasks = channelReaders.Select(async (channelReader, index) =>

    {

        while (true)

        {

            try

            {

                if (!await channelReader.WaitToReadAsync(linkedCts.Token)

                    .ConfigureAwait(false)) break;

            }

            // Only the exceptional cases below are normal.

            catch (OperationCanceledException)

                when (linkedCts.IsCancellationRequested) { break; }

            catch when (channelReader.Completion.IsCompleted

                && !channelReader.Completion.IsCompletedSuccessfully) { break; }


            // This channel has an item available now.

            lock (linkedCts)

            {

                if (consumed.HasValue)

                    return; // An item has already been consumed from another channel.


                if (!channelReader.TryRead(out var item))

                    continue; // We lost the race to consume the available item.


                consumed = (item, index, true); // We consumed an item successfully.

            }

            linkedCts.Cancel(); // Cancel the other tasks.

            return;

        }

    }).ToArray();


    // The tasks should never fail. If a task ever fails, we have a bug.

    try { foreach (var task in tasks) await task.ConfigureAwait(false); }

    catch (Exception ex) { Debug.Fail("Unexpected error", ex.ToString()); throw; }


    if (consumed.HasValue)

        return (consumed.Item, consumed.Index);

    cancellationToken.ThrowIfCancellationRequested();

    Debug.Assert(channelReaders.All(cr => cr.Completion.IsCompleted));

    throw new ChannelClosedException();

}

應該注意的是,這個解決方案,以及 alexm 的解決方案,都依賴于WaitToReadAsync在一個元素被消耗時取消所有掛起的操作。不幸的是,這會觸發臭名昭著的內存泄漏問題,該問題會影響具有空閑生產者的 .NET 通道。當取消通道上的任何異步操作時,取消的操作將保留在內存中,附加到通道的內部結構,直到將元素寫入通道。此行為已被Microsoft 分類為設計使然,但不排除改進它的可能性。有趣的是,這種歧義使得這種效果不符合記錄條件. 因此,了解這一點的唯一途徑是偶然,要么從非官方渠道閱讀,要么陷入其中。



查看完整回答
反對 回復 2022-12-19
?
郎朗坤

TA貢獻1921條經驗 獲得超9個贊

如果按照 Go 中使用通道的方式使用通道,問題就會容易得多:Channel(Readers) 作為輸入,Channel(Readers) 作為輸出。


IEnumerable<ChannelReader<T>> sources=....;

await foreach(var msg in sources.TakeFromAny(token))

{

....

}

要么


var merged=sources.TakeFromAny(token);

...

var msg=await merged.ReadAsync(token);

在這種情況下,來自所有通道閱讀器的輸入被復制到一個輸出通道。該方法的返回值是該頻道的ChannelReader。


CopyToAsync 助手


可以使用CopyToAsync函數將消息從輸入源復制到輸出通道:


static async Task CopyToAsync<T>(

        this ChannelReader<T> input,

        ChannelWriter<T> output,

        CancellationToken token=default)

{

   while (await input.WaitToReadAsync(cancellationToken).ConfigureAwait(false))

   {

         //Early exit if cancellation is requested

         while (!token.IsCancellationRequested &&  input.TryRead(out T? msg))

         {

             await output.WriteAsync(msg,token);

         }

   }

}

此代碼類似于ReadAllAsync,但如果請求取消則立即退出。ReadAllAsync即使要求取消,也將退還所有可用物品。使用的方法包括


WriteAsync如果通道關閉則不會拋出異常,這使得錯誤處理變得更加容易。


錯誤處理和面向鐵路的編程


WaitToReadAsync如果源出錯但該異常確實會拋出,并且該異常將傳播到調用方法并傳播到Task.WhenAll輸出通道。


這可能有點混亂,因為它會中斷整個管道。為避免這種情況,可以將錯誤吞沒或記錄在內部CopyToAsync。一個更好的選擇是使用面向鐵路的編程并將所有消息包裝在一個Result<TMsg,TError>類中,例如:


static async Task CopyToAsync<Result<T,Exception>>(

        this ChannelReader<Result<T,Exception>> input,

        ChannelWriter<Result<T,Exception>> output,

        CancellationToken token=default)

{

   try

   {

     while (await input.WaitToReadAsync(cancellationToken).ConfigureAwait(false))

     {

         //Early exit if cancellation is requested

         while (!token.IsCancellationRequested &&  input.TryRead(out T? msg))

         {

             var newMsg=Result.FromValue(msg);

             await output.WriteAsync(newMsg,token);

         }

     }

  }

  catch(Exception exc)

  {

    output.TryWrite(Result<T>.FromError(exc));

  }

}

TakeFromAsync


TakeFromAny(MergeAsync可能是更好的名字)可以是:


static ChannelReader<T> TakeFromAny(

        this IEnumerable<ChannelReader<T> inputs,

        CancellationToken token=default)

{

    var outChannel=Channel.CreateBounded<T>(1);


    var readers=inputs.Select(rd=>CopyToAsync(rd,outChannel,token));


    _ = Task.WhenAll(readers)

            .ContinueWith(t=>outChannel.TryComplete(t.Exception));

    return outChannel;

}

使用 1 的有界容量可確保下游代碼的背壓行為不會改變。


添加源索引


這也可以調整為發出源的索引:


static async Task CopyToAsync<T>(

        this ChannelReader<T> input,int index,

        ChannelWriter<(T,int)> output,

        CancellationToken token=default)

{

  while (await input.WaitToReadAsync(cancellationToken).ConfigureAwait(false))

  {

        while (!token.IsCancellationRequested &&  input.TryRead(out T? msg))

        {


            await output.WriteAsync((msg,index),token);

        }

  }

}


static ChannelReader<(T,int)> TakeFromAny(

        this IEnumerable<ChannelReader<T> inputs,

        CancellationToken token=default)

{

    var outChannel=Channel.CreateBounded<(int,T)>(1);


    var readers=inputs.Select((rd,idx)=>CopyToAsync(rd,idx,outChannel,token));


    _ = Task.WhenAll(readers)

            .ContinueWith(t=>outChannel.TryComplete(t.Exception));

    return outChannel;

}


查看完整回答
反對 回復 2022-12-19
  • 2 回答
  • 0 關注
  • 138 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號