为了账号安全,请及时绑定邮箱和手机立即绑定

限制异步任务

限制异步任务

C#
梵蒂冈之花 2019-11-15 12:10:13
限制异步任务我想运行一堆异步任务,并限制在任何给定时间可以完成的任务数量。假设您有1000个网址,并且您只希望一次打开50个请求; 但只要一个请求完成,您就会打开与列表中下一个URL的连接。这样,一次只打开50个连接,直到URL列表用完为止。如果可能的话,我也想利用给定数量的线程。我提出了一种扩展方法,ThrottleTasksAsync可以实现我想要的功能。那里有更简单的解决方案吗?我认为这是一种常见的情况。用法:class Program{     static void Main(string[] args)     {         Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();         Console.WriteLine("Press a key to exit...");         Console.ReadKey(true);     }}这是代码:static class IEnumerableExtensions{     public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)     {         var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());         var semaphore = new SemaphoreSlim(maxConcurrentTasks);         // Run the throttler on a separate thread.         var t = Task.Run(() =>         {             foreach (var item in enumerable)             {                 // Wait for the semaphore                 semaphore.Wait();                 blockingQueue.Add(item);             }             blockingQueue.CompleteAdding();         });         var taskList = new List<Task<Result_T>>();         Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },         _ =>但是,线程池快速耗尽,你不能做async/ await。额外: 为了解决调用时BlockingCollection抛出异常的问题,我正在使用带超时的重载。如果我没有使用超时,它会破坏使用的目的,因为不会阻止。有没有更好的办法?理想情况下,会有一种方法。Take()CompleteAdding()TryTakeTryTakeBlockingCollectionTryTakeTakeAsync
查看完整描述

3 回答

?
至尊宝的传说

TA贡献1789条经验 获得超10个赞

根据要求,这是我最终使用的代码。

工作在主 - 详细配置中设置,每个主服务器作为批处理进行处理。每个工作单元都以这种方式排队:

var success = true;// Start processing all the master records.Master master;while (null != (master = await StoredProcedures.ClaimRecordsAsync(...))){
    await masterBuffer.SendAsync(master);}// Finished sending master recordsmasterBuffer.Complete();// Now, wait for all the batches to complete.await batchAction.Completion;return success;

Masters一次缓冲一个,以节省其他外部进程的工作。每个主人的详细信息都通过以下方式发送给工作人员masterTransform TransformManyBlockBatchedJoinBlock还创建了A 以在一批中收集详细信息。

实际工作是以detailTransform TransformBlock异步方式完成的,每次150个。BoundedCapacity设置为300以确保太多的Masters不会在链的开头进行缓冲,同时还留出足够的空间来排列足够的详细记录以允许一次处理150条记录。该块输出object到它的目标,因为它是整个取决于它是否是一个链接过滤DetailException

所述batchAction ActionBlock收集来自所有批次的输出,并且执行散装数据库更新,错误日志等。对于每个批次。

将有几个BatchedJoinBlocks,每个主人一个。由于每个ISourceBlock都是按顺序输出的,并且每个批次只接受与一个主数据相关联的详细记录的数量,因此将按顺序处理批次。每个块仅输出一个组,并在完成时取消链接。只有最后一个批处理块将其完成传播到最终ActionBlock

数据流网络:

// The dataflow networkBufferBlock<Master> masterBuffer = null;TransformManyBlock<Master, Detail> masterTransform = null;TransformBlock<Detail, object> detailTransform = null;ActionBlock<Tuple<IList<object>, IList<object>>> batchAction = null;// Buffer master records to enable efficient throttling.masterBuffer = new BufferBlock<Master>(new DataflowBlockOptions { BoundedCapacity = 1 });// Sequentially transform master records into a stream of detail records.masterTransform = new TransformManyBlock<Master, Detail>(async masterRecord =>{
    var records = await StoredProcedures.GetObjectsAsync(masterRecord);

    // Filter the master records based on some criteria here
    var filteredRecords = records;

    // Only propagate completion to the last batch
    var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0;

    // Create a batch join block to encapsulate the results of the master record.
    var batchjoinblock = new BatchedJoinBlock<object, object>(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 });

    // Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block.
    var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail);
    var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception);
    var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion });

    // Unlink batchjoinblock upon completion.
    // (the returned task does not need to be awaited, despite the warning.)
    batchjoinblock.Completion.ContinueWith(task =>
    {
        detailLink1.Dispose();
        detailLink2.Dispose();
        batchLink.Dispose();
    });

    return filteredRecords;}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });// Process each detail record asynchronously, 150 at a time.detailTransform = new TransformBlock<Detail, object>(async detail => {
    try
    {
        // Perform the action for each detail here asynchronously
        await DoSomethingAsync();

        return detail;
    }
    catch (Exception e)
    {
        success = false;
        return e;
    }}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 });// Perform the proper action for each batchbatchAction = new ActionBlock<Tuple<IList<object>, IList<object>>>(async batch =>{
    var details = batch.Item1.Cast<Detail>();
    var errors = batch.Item2.Cast<Exception>();

    // Do something with the batch here}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true });masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });



查看完整回答
反对 回复 2019-11-16
  • 3 回答
  • 0 关注
  • 404 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信