为了账号安全,请及时绑定邮箱和手机立即绑定

通过ASP.NET Core中的控制器操作运行后台任务

通过ASP.NET Core中的控制器操作运行后台任务

C#
慕沐林林 2021-04-10 09:34:53
我正在使用带有ASP.NET Core 2.0的C#的REST API开发Web应用程序。我要实现的是,当客户端向端点发送请求时,我将运行与客户端请求上下文分离的后台任务,如果任务成功启动,该任务将结束。我知道有HostedService问题,但是问题是HostedService服务器启动时启动,据我所知,没有办法HostedService从控制器手动启动。这是一个演示该问题的简单代码。[Authorize(AuthenticationSchemes = "UsersScheme")]public class UsersController : Controller{    [HttpPost]    public async Task<JsonResult> StartJob([FromForm] string UserId, [FromServices] IBackgroundJobService backgroundService)    {        // check user account        (bool isStarted, string data) result = backgroundService.Start();        return JsonResult(result);    }}
查看完整描述

3 回答

?
千巷猫影

TA贡献1829条经验 获得超7个赞

您仍然可以结合使用IHostedService作为后台任务的基础BlockingCollection。


为创建包装器,BlockingCollection以便将其作为单例注入。


public class TasksToRun

{

    private readonly BlockingCollection<TaskSettings> _tasks;


    public TasksToRun() => _tasks = new BlockingCollection<TaskSettings>();


    public void Enqueue(TaskSettings settings) => _tasks.Add(settings);


    public TaskSettings Dequeue(CancellationToken token) => _tasks.Take(token);

}

然后在执行IHostedService“监听”任务时以及任务“到达”时执行它。

BlockingCollection如果collection为空,将停止执行-因此您的while循环不会消耗处理器时间。

.Take方法接受cancellationToken作为参数。使用令牌,可以在应用程序停止时取消“等待”下一个任务。


public class BackgroundService : IHostedService

{

    private readonly TasksToRun _tasks;


    private CancellationTokenSource _tokenSource;


    private Task _currentTask;


    public BackgroundService(TasksToRun tasks) => _tasks = tasks;


    public async Task StartAsync(CancellationToken cancellationToken)

    {

        _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

        while (cancellationToken.IsCancellationRequested == false)

        {

            try

            {

                var taskToRun = _tasks.Dequeue(_tokenSource.Token);


                // We need to save executable task, 

                // so we can gratefully wait for it's completion in Stop method

                _currentTask = ExecuteTask(taskToRun);               

                await _currentTask;

            }

            catch (OperationCanceledException)

            {

                // execution cancelled

            }

        }

    }


    public async Task StopAsync(CancellationToken cancellationToken)

    {

        _tokenSource.Cancel(); // cancel "waiting" for task in blocking collection


        if (_currentTask == null) return;


        // wait when _currentTask is complete

        await Task.WhenAny(_currentTask, Task.Delay(-1, cancellationToken));

    }

}

在控制器中,您只需将要运行的任务添加到我们的集合中


public class JobController : Controller

{

    private readonly TasksToRun _tasks;


    public JobController(TasksToRun tasks) => _tasks = tasks;


    public IActionResult PostJob()

    {

        var settings = CreateTaskSettings();


        _tasks.Enqueue(settings);


        return Ok();

    }

}

用于阻塞收集的包装程序应注册为依赖项注入,以单例方式进行


services.AddSingleton<TasksToRun, TasksToRun>();

注册后台服务


services.AddHostedService<BackgroundService>();


查看完整回答
反对 回复 2021-04-17
?
慕森王

TA贡献1777条经验 获得超3个赞

我认为,在某些时候链接断开的情况下,可以在这里重申整个示例。我做了一些调整。最值得注意的是,我注入IServiceScopeFactory,以允许后台进程自己安全地请求服务。我会在此答案的末尾解释我的推理。


核心思想是创建一个任务队列,用户可以将其注入到控制器中,然后将任务分配给该任务队列。长期运行的托管服务中存在相同的任务队列,该服务一次使一个任务出队并执行该任务。


任务队列:


public interface IBackgroundTaskQueue

{

    // Enqueues the given task.

    void EnqueueTask(Func<IServiceScopeFactory, CancellationToken, Task> task);


    // Dequeues and returns one task. This method blocks until a task becomes available.

    Task<Func<IServiceScopeFactory, CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken);

}


public class BackgroundTaskQueue : IBackgroundTaskQueue

{

    private readonly ConcurrentQueue<Func<IServiceScopeFactory, CancellationToken, Task>> _items = new();


    // Holds the current count of tasks in the queue.

    private readonly SemaphoreSlim _signal = new SemaphoreSlim(0);


    public void EnqueueTask(Func<IServiceScopeFactory, CancellationToken, Task> task)

    {

        if(task == null)

            throw new ArgumentNullException(nameof(task));


        _items.Enqueue(task);

        _signal.Release();

    }


    public async Task<Func<IServiceScopeFactory, CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken)

    {

        // Wait for task to become available

        await _signal.WaitAsync(cancellationToken);


        _items.TryDequeue(out var task);

        return task;

    }

}

在任务队列的核心,我们有一个线程安全的ConcurrentQueue<>。由于在新任务可用之前我们不想轮询队列,因此我们使用一个SemaphoreSlim对象来跟踪队列中当前的任务数。每次调用时Release,内部计数器都会递增。该WaitAsync方法将阻塞,直到内部计数器大于0,然后再将其递减。


为了使任务出队和执行任务,我们创建了一个后台服务:


public class BackgroundQueueHostedService : BackgroundService

{

    private readonly IBackgroundTaskQueue _taskQueue;

    private readonly IServiceScopeFactory _serviceScopeFactory;

    private readonly ILogger<BackgroundQueueHostedService> _logger;


    public BackgroundQueueHostedService(IBackgroundTaskQueue taskQueue, IServiceScopeFactory serviceScopeFactory, ILogger<BackgroundQueueHostedService> logger)

    {

        _taskQueue = taskQueue ?? throw new ArgumentNullException(nameof(taskQueue));

        _serviceScopeFactory = serviceScopeFactory ?? throw new ArgumentNullException(nameof(serviceScopeFactory));

        _logger = logger ?? throw new ArgumentNullException(nameof(logger));

    }


    protected override async Task ExecuteAsync(CancellationToken stoppingToken)

    {

        // Dequeue and execute tasks until the application is stopped

        while(!stoppingToken.IsCancellationRequested)

        {

            // Get next task

            // This blocks until a task becomes available

            var task = await _taskQueue.DequeueAsync(stoppingToken);


            try

            {

                // Run task

                await task(_serviceScopeFactory, stoppingToken);

            }

            catch(Exception ex)

            {

                _logger.LogError(ex, "An error occured during execution of a background task");

            }

        }

    }

}

最后,我们需要使任务队列可用于依赖项注入,并启动我们的后台服务:


public void ConfigureServices(IServiceCollection services)

{

    // ...

    

    services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();

    services.AddHostedService<BackgroundQueueHostedService>();

    

    // ...

}

现在,我们可以将后台任务队列注入控制器并排队任务:


public class ExampleController : Controller

{

    private readonly IBackgroundTaskQueue _backgroundTaskQueue;


    public ExampleController(IBackgroundTaskQueue backgroundTaskQueue)

    {

        _backgroundTaskQueue = backgroundTaskQueue ?? throw new ArgumentNullException(nameof(backgroundTaskQueue));

    }


    public IActionResult Index()

    {

        _backgroundTaskQueue.EnqueueTask(async (serviceScopeFactory, cancellationToken) =>

        {

            // Get services

            using var scope = serviceScopeFactory.CreateScope();

            var myService = scope.ServiceProvider.GetRequiredService<IMyService>();

            var logger = scope.ServiceProvider.GetRequiredService<ILogger<ExampleController>>();

            

            try

            {

                // Do something expensive

                await myService.DoSomethingAsync(cancellationToken);

            }

            catch(Exception ex)

            {

                logger.LogError(ex, "Could not do something expensive");

            }

        });


        return Ok();

    }

}

为什么要使用IServiceScopeFactory?


从理论上讲,我们可以直接使用注入到控制器中的服务对象。这可能对单例服务以及大多数作用域服务都适用。


但是,对于实现IDisposable(例如DbContext)的作用域服务,这可能会中断:将任务加入队列后,控制器方法返回并完成请求。然后,框架会清理注入的服务。如果我们的后台任务足够缓慢或延迟,它可能会尝试调用已处置服务的方法,然后会出现错误。


为避免这种情况,我们的排队任务应始终创建其自己的服务范围,而不应使用周围控制器的服务实例。


查看完整回答
反对 回复 2021-04-17
  • 3 回答
  • 0 关注
  • 282 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信