我实现了一个卡夫卡事件总线作为网络核心单独服务。 该服务本身配置了Autofac在Startup.cs。 该服务有一个Listen()
方法:
public void Listen()
{
using(var consumer = new Consumer<Null, string>(_config, null, new StringDeserializer(Encoding.UTF8)))
{
consumer.Subscribe(new string[] { "business-write-topic" });
consumer.OnMessage += (_, msg) =>
{
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
consumer.CommitAsync(msg);
};
while (true)
{
consumer.Poll(100);
}
}
}
我的理解是,为了使此方法的应用程序的生命周期内的信息不断地听,我必须通过某种方式获得与主机相关的ServiceProvider,然后检索的一个实例是从虚拟主机调用的Program.cs服务,并调用方法。
我已经从默认的网络核心2.1模板下面的配置我的Program.cs:
public class Program
{
public static void Main(string[] args)
{
var host = CreateWebHost(args);
host.Run();
}
public static IWebHost CreateWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>()
.Build();
}
除了具有主机可用,以便我能以某种方式访问这些服务,我不知道在哪里可以从这里走。 我搜索了类似的问题,并在官方文档阅读周围,但我似乎无法弄清楚如何访问该服务,这样我可以调用Listen()
方法。
这是实现我的目标的“去来”的方式? 如果是这样,我怎么继续? 而如果不是 - 是 - 如果这样的任务以另一种方式是常用来完成,我怎么做呢?
编辑:
下面的答案仍然是完全有效的。 有一个基类叫做BackgroundService
由微软提供了可以用在你只需要实现ExecuteAsync(CancellationToken stoppingToken)
而不是整个界面IHostedService
。 你可以找到它在这里 。 对于您需要安装包Microsoft.Extensions.Hosting
。
先前且仍有效的答案:我会建议使用IHostedService 。 IHostedService实现被注册为单身人士和他们运行的全部时间,直至服务器关闭。
创建这个基类
public abstract class HostedService : IHostedService
{
private Task executingTask;
private CancellationTokenSource cancellationTokenSource;
public Task StartAsync(CancellationToken cancellationToken)
{
this.cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
this.executingTask = this.ExecuteAsync(this.cancellationTokenSource.Token);
return this.executingTask.IsCompleted ? this.executingTask : Task.CompletedTask;
}
public async Task StopAsync(CancellationToken cancellationToken)
{
if (this.executingTask == null)
{
return;
}
this.cancellationTokenSource.Cancel();
await Task.WhenAny(this.executingTask, Task.Delay(-1, cancellationToken));
}
protected abstract Task ExecuteAsync(CancellationToken cancellationToken);
}
然后创建消费者主机
public class ConsumerHost : HostedService
{
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
using (var consumer = new Consumer<Null, string>(_config, null, new StringDeserializer(Encoding.UTF8)))
{
consumer.Subscribe(new string[] {"business-write-topic"});
consumer.OnMessage += (_, msg) =>
{
Console.WriteLine(
$"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
consumer.CommitAsync(msg);
};
while (!cancellationToken.IsCancellationRequested) // will make sure to stop if the application is being shut down!
{
consumer.Poll(100);
await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken);
}
}
}
}
现在,在ConfigureService方法你的启动级中添加单
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<IHostedService, ConsumerHost>();
}
这个服务现在踢时,虚拟主机提供商完成建设,并停止当你关闭服务器。 无需手动触发它,让虚拟主机提供商为你做它。
我想BackgroundService是你所需要的。
public class ListnerBackgroundService : BackgroundService
{
private readonly ListnerService service;
public ListnerBackgroundService(ListnerService service)
{
this.service = service;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
service.Listen();
return Task.CompletedTask;
}
}
并注册它:
public void ConfigureServices(IServiceCollection services)
{
...
services.AddSingleton<IHostedService, ListnerBackgroundService>();
...
}
文章来源: How do i call a method from a singleton service to run throughout the application lifetime