From 958435a9aa91215c5d8554e1ae8110a0d1e7bcee Mon Sep 17 00:00:00 2001 From: "Eric J. Smith" Date: Fri, 26 Jul 2024 10:04:01 -0500 Subject: [PATCH] Various improvements to hosting scheduled jobs (#304) * Various improvements to hosting scheduled jobs * Extract ScheduledJobManager interface * Changes based on feedback * Move Cronos to Foundatio namespace to prevent collisions. * Update error message to include job name --- .../Jobs/EvenMinutesJob.cs | 27 --- samples/Foundatio.HostingSample/Program.cs | 17 +- .../Properties/launchSettings.json | 2 +- .../Startup/MyStartupAction.cs | 12 +- .../Cronos/CalendarHelper.cs | 2 +- .../Cronos/CronExpression.cs | 2 +- .../Cronos/CronExpressionFlag.cs | 2 +- .../Cronos/CronField.cs | 2 +- .../Cronos/CronFormat.cs | 2 +- .../Cronos/CronFormatException.cs | 2 +- .../Cronos/TimeZoneHelper.cs | 2 +- .../Jobs/DynamicJob.cs | 26 +++ .../Jobs/HostedJobOptions.cs | 1 - .../Jobs/HostedJobService.cs | 5 +- .../Jobs/JobHostExtensions.cs | 202 +++++++++++++----- .../Jobs/JobOptionsBuilder.cs | 8 +- .../Jobs/ScheduledJobManager.cs | 178 +++++++++++++++ .../Jobs/ScheduledJobOptions.cs | 14 ++ .../Jobs/ScheduledJobOptionsBuilder.cs | 50 +++++ .../Jobs/ScheduledJobRegistration.cs | 13 +- .../Jobs/ScheduledJobRunner.cs | 141 ++++++++++++ .../Jobs/ScheduledJobService.cs | 107 +--------- .../Caching/InMemoryCacheClientOptions.cs | 2 +- src/Foundatio/Jobs/JobOptions.cs | 10 +- src/Foundatio/Jobs/JobRunner.cs | 37 ++-- src/Foundatio/Utility/FoundatioDiagnostics.cs | 6 +- tests/Foundatio.Tests/Jobs/JobTests.cs | 18 +- 27 files changed, 644 insertions(+), 246 deletions(-) delete mode 100644 samples/Foundatio.HostingSample/Jobs/EvenMinutesJob.cs create mode 100644 src/Foundatio.Extensions.Hosting/Jobs/DynamicJob.cs create mode 100644 src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobManager.cs create mode 100644 src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobOptions.cs create mode 100644 src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobOptionsBuilder.cs create mode 100644 src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobRunner.cs diff --git a/samples/Foundatio.HostingSample/Jobs/EvenMinutesJob.cs b/samples/Foundatio.HostingSample/Jobs/EvenMinutesJob.cs deleted file mode 100644 index 3f3958e6..00000000 --- a/samples/Foundatio.HostingSample/Jobs/EvenMinutesJob.cs +++ /dev/null @@ -1,27 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Foundatio.Jobs; -using Microsoft.Extensions.Logging; - -namespace Foundatio.HostingSample; - -public class EvenMinutesJob : IJob -{ - private readonly ILogger _logger; - - public EvenMinutesJob(ILoggerFactory loggerFactory) - { - _logger = loggerFactory.CreateLogger(); - } - - public async Task RunAsync(CancellationToken cancellationToken = default) - { - if (_logger.IsEnabled(LogLevel.Information)) - _logger.LogInformation("EvenMinuteJob Run Thread={ManagedThreadId}", Thread.CurrentThread.ManagedThreadId); - - await Task.Delay(TimeSpan.FromSeconds(5)); - - return JobResult.Success; - } -} diff --git a/samples/Foundatio.HostingSample/Program.cs b/samples/Foundatio.HostingSample/Program.cs index a1e67598..725fddb2 100644 --- a/samples/Foundatio.HostingSample/Program.cs +++ b/samples/Foundatio.HostingSample/Program.cs @@ -1,9 +1,11 @@ using System; using System.Diagnostics; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Foundatio.Extensions.Hosting.Jobs; using Foundatio.Extensions.Hosting.Startup; +using Foundatio.Jobs; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.DependencyInjection; @@ -87,18 +89,25 @@ public static IHostBuilder CreateHostBuilder(string[] args) s.AddHealthChecks().AddCheckForStartupActions("Critical"); if (everyMinute) - s.AddCronJob("* * * * *"); + s.AddDistributedCronJob("* * * * *"); if (evenMinutes) - s.AddCronJob("*/2 * * * *"); + s.AddCronJob("EvenMinutes", "*/2 * * * *", async sp => + { + var logger = sp.GetRequiredService>(); + if (logger.IsEnabled(LogLevel.Information)) + logger.LogInformation("EvenMinuteJob Run Thread={ManagedThreadId}", Thread.CurrentThread.ManagedThreadId); + + await Task.Delay(TimeSpan.FromSeconds(5)); + }); if (sample1) - s.AddJob(sp => new Sample1Job(sp.GetRequiredService()), o => o.ApplyDefaults().WaitForStartupActions(true).InitialDelay(TimeSpan.FromSeconds(4))); + s.AddJob("Sample1", sp => new Sample1Job(sp.GetRequiredService()), o => o.ApplyDefaults().WaitForStartupActions(true).InitialDelay(TimeSpan.FromSeconds(4))); if (sample2) { s.AddHealthChecks().AddCheck("Sample2Job"); - s.AddJob(true); + s.AddJob(o => o.WaitForStartupActions(true)); } // if you don't specify priority, actions will automatically be assigned an incrementing priority starting at 0 diff --git a/samples/Foundatio.HostingSample/Properties/launchSettings.json b/samples/Foundatio.HostingSample/Properties/launchSettings.json index 8b955c7e..ae82a820 100644 --- a/samples/Foundatio.HostingSample/Properties/launchSettings.json +++ b/samples/Foundatio.HostingSample/Properties/launchSettings.json @@ -2,7 +2,7 @@ "profiles": { "Foundatio.HostingSample": { "commandName": "Project", - "commandLineArgs": "sample1 sample2" + "commandLineArgs": "all" } } } \ No newline at end of file diff --git a/samples/Foundatio.HostingSample/Startup/MyStartupAction.cs b/samples/Foundatio.HostingSample/Startup/MyStartupAction.cs index 12c89137..f0f55439 100644 --- a/samples/Foundatio.HostingSample/Startup/MyStartupAction.cs +++ b/samples/Foundatio.HostingSample/Startup/MyStartupAction.cs @@ -1,5 +1,6 @@ using System.Threading; using System.Threading.Tasks; +using Foundatio.Extensions.Hosting.Jobs; using Foundatio.Extensions.Hosting.Startup; using Microsoft.Extensions.Logging; @@ -7,10 +8,12 @@ namespace Foundatio.HostingSample; public class MyStartupAction : IStartupAction { + private readonly IScheduledJobManager _scheduledJobManager; private readonly ILogger _logger; - public MyStartupAction(ILogger logger) + public MyStartupAction(IScheduledJobManager scheduledJobManager, ILogger logger) { + _scheduledJobManager = scheduledJobManager; _logger = logger; } @@ -21,5 +24,12 @@ public async Task RunAsync(CancellationToken cancellationToken = default) _logger.LogTrace("MyStartupAction Run Thread={ManagedThreadId}", Thread.CurrentThread.ManagedThreadId); await Task.Delay(500); } + + _scheduledJobManager.AddOrUpdate("MyJob", "* * * * *", async () => + { + _logger.LogInformation("Running MyJob"); + await Task.Delay(1000); + _logger.LogInformation("MyJob Complete"); + }); } } diff --git a/src/Foundatio.Extensions.Hosting/Cronos/CalendarHelper.cs b/src/Foundatio.Extensions.Hosting/Cronos/CalendarHelper.cs index 55e5ec52..c447b746 100644 --- a/src/Foundatio.Extensions.Hosting/Cronos/CalendarHelper.cs +++ b/src/Foundatio.Extensions.Hosting/Cronos/CalendarHelper.cs @@ -5,7 +5,7 @@ using System; using System.Runtime.CompilerServices; -namespace Cronos; +namespace Foundatio.Extensions.Hosting.Cronos; internal static class CalendarHelper { diff --git a/src/Foundatio.Extensions.Hosting/Cronos/CronExpression.cs b/src/Foundatio.Extensions.Hosting/Cronos/CronExpression.cs index 27f131a0..972bd10e 100644 --- a/src/Foundatio.Extensions.Hosting/Cronos/CronExpression.cs +++ b/src/Foundatio.Extensions.Hosting/Cronos/CronExpression.cs @@ -26,7 +26,7 @@ using System.Runtime.CompilerServices; using System.Text; -namespace Cronos; +namespace Foundatio.Extensions.Hosting.Cronos; /// /// Provides a parser and scheduler for cron expressions. diff --git a/src/Foundatio.Extensions.Hosting/Cronos/CronExpressionFlag.cs b/src/Foundatio.Extensions.Hosting/Cronos/CronExpressionFlag.cs index eb39c64d..df7b2fc7 100644 --- a/src/Foundatio.Extensions.Hosting/Cronos/CronExpressionFlag.cs +++ b/src/Foundatio.Extensions.Hosting/Cronos/CronExpressionFlag.cs @@ -22,7 +22,7 @@ using System; -namespace Cronos; +namespace Foundatio.Extensions.Hosting.Cronos; [Flags] internal enum CronExpressionFlag : byte diff --git a/src/Foundatio.Extensions.Hosting/Cronos/CronField.cs b/src/Foundatio.Extensions.Hosting/Cronos/CronField.cs index b919460a..aa5b41c8 100644 --- a/src/Foundatio.Extensions.Hosting/Cronos/CronField.cs +++ b/src/Foundatio.Extensions.Hosting/Cronos/CronField.cs @@ -20,7 +20,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. -namespace Cronos; +namespace Foundatio.Extensions.Hosting.Cronos; internal sealed class CronField { diff --git a/src/Foundatio.Extensions.Hosting/Cronos/CronFormat.cs b/src/Foundatio.Extensions.Hosting/Cronos/CronFormat.cs index ed377bda..2e762e5e 100644 --- a/src/Foundatio.Extensions.Hosting/Cronos/CronFormat.cs +++ b/src/Foundatio.Extensions.Hosting/Cronos/CronFormat.cs @@ -22,7 +22,7 @@ using System; -namespace Cronos; +namespace Foundatio.Extensions.Hosting.Cronos; /// /// Defines the cron format options that customize string parsing for . diff --git a/src/Foundatio.Extensions.Hosting/Cronos/CronFormatException.cs b/src/Foundatio.Extensions.Hosting/Cronos/CronFormatException.cs index 3b92c9ab..77b7e0f8 100644 --- a/src/Foundatio.Extensions.Hosting/Cronos/CronFormatException.cs +++ b/src/Foundatio.Extensions.Hosting/Cronos/CronFormatException.cs @@ -22,7 +22,7 @@ using System; -namespace Cronos; +namespace Foundatio.Extensions.Hosting.Cronos; /// /// Represents an exception that's thrown, when invalid Cron expression is given. diff --git a/src/Foundatio.Extensions.Hosting/Cronos/TimeZoneHelper.cs b/src/Foundatio.Extensions.Hosting/Cronos/TimeZoneHelper.cs index 9e51bd7b..587cfa24 100644 --- a/src/Foundatio.Extensions.Hosting/Cronos/TimeZoneHelper.cs +++ b/src/Foundatio.Extensions.Hosting/Cronos/TimeZoneHelper.cs @@ -22,7 +22,7 @@ using System; -namespace Cronos; +namespace Foundatio.Extensions.Hosting.Cronos; internal static class TimeZoneHelper { diff --git a/src/Foundatio.Extensions.Hosting/Jobs/DynamicJob.cs b/src/Foundatio.Extensions.Hosting/Jobs/DynamicJob.cs new file mode 100644 index 00000000..1eda9192 --- /dev/null +++ b/src/Foundatio.Extensions.Hosting/Jobs/DynamicJob.cs @@ -0,0 +1,26 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Foundatio.Jobs; +using Foundatio.Utility; + +namespace Foundatio.Extensions.Hosting.Jobs; + +internal class DynamicJob : IJob +{ + private readonly IServiceProvider _serviceProvider; + private readonly Func _action; + + public DynamicJob(IServiceProvider serviceProvider, Func action) + { + _serviceProvider = serviceProvider; + _action = action; + } + + public async Task RunAsync(CancellationToken cancellationToken = default) + { + await _action(_serviceProvider, cancellationToken).AnyContext(); + + return JobResult.Success; + } +} diff --git a/src/Foundatio.Extensions.Hosting/Jobs/HostedJobOptions.cs b/src/Foundatio.Extensions.Hosting/Jobs/HostedJobOptions.cs index 64f9d59e..ce9488ed 100644 --- a/src/Foundatio.Extensions.Hosting/Jobs/HostedJobOptions.cs +++ b/src/Foundatio.Extensions.Hosting/Jobs/HostedJobOptions.cs @@ -5,5 +5,4 @@ namespace Foundatio.Extensions.Hosting.Jobs; public class HostedJobOptions : JobOptions { public bool WaitForStartupActions { get; set; } - public string CronSchedule { get; set; } } diff --git a/src/Foundatio.Extensions.Hosting/Jobs/HostedJobService.cs b/src/Foundatio.Extensions.Hosting/Jobs/HostedJobService.cs index e838e7e5..d9aa026a 100644 --- a/src/Foundatio.Extensions.Hosting/Jobs/HostedJobService.cs +++ b/src/Foundatio.Extensions.Hosting/Jobs/HostedJobService.cs @@ -1,4 +1,5 @@ using System; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using Foundatio.Extensions.Hosting.Startup; @@ -47,10 +48,12 @@ private async Task ExecuteAsync(CancellationToken stoppingToken) } } - var runner = new JobRunner(_jobOptions, _loggerFactory); + var runner = new JobRunner(_jobOptions, _serviceProvider, _loggerFactory); try { + using var activity = FoundatioDiagnostics.ActivitySource.StartActivity("Job " + _jobOptions.Name, ActivityKind.Server); + await runner.RunAsync(stoppingToken).AnyContext(); #if NET8_0_OR_GREATER await _stoppingCts.CancelAsync().AnyContext(); diff --git a/src/Foundatio.Extensions.Hosting/Jobs/JobHostExtensions.cs b/src/Foundatio.Extensions.Hosting/Jobs/JobHostExtensions.cs index 43cea720..5b7f9b14 100644 --- a/src/Foundatio.Extensions.Hosting/Jobs/JobHostExtensions.cs +++ b/src/Foundatio.Extensions.Hosting/Jobs/JobHostExtensions.cs @@ -1,5 +1,7 @@ using System; using System.Linq; +using System.Threading; +using System.Threading.Tasks; using Foundatio.Jobs; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; @@ -14,90 +16,182 @@ public static IServiceCollection AddJob(this IServiceCollection services, Hosted if (jobOptions.JobFactory == null) throw new ArgumentNullException(nameof(jobOptions), "jobOptions.JobFactory is required"); - if (String.IsNullOrEmpty(jobOptions.CronSchedule)) - { - return services.AddTransient(s => new HostedJobService(s, jobOptions, s.GetService())); - } - else - { - if (!services.Any(s => s.ServiceType == typeof(IHostedService) && s.ImplementationType == typeof(ScheduledJobService))) - services.AddTransient(); - - return services.AddTransient(s => new ScheduledJobRegistration(jobOptions.JobFactory, jobOptions.CronSchedule)); - } + return services.AddTransient(s => new HostedJobService(s, jobOptions, s.GetService())); } - public static IServiceCollection AddJob(this IServiceCollection services, Func jobFactory, HostedJobOptions jobOptions) + public static IServiceCollection AddJob(this IServiceCollection services, HostedJobOptions jobOptions = null) where T : class, IJob { - if (String.IsNullOrEmpty(jobOptions.CronSchedule)) + services.AddTransient(); + return services.AddTransient(s => { - return services.AddTransient(s => + if (jobOptions == null) { - jobOptions.JobFactory = () => jobFactory(s); + jobOptions = new HostedJobOptions(); + jobOptions.ApplyDefaults(); + } - return new HostedJobService(s, jobOptions, s.GetService()); - }); - } - else - { - if (!services.Any(s => s.ServiceType == typeof(IHostedService) && s.ImplementationType == typeof(ScheduledJobService))) - services.AddTransient(); + jobOptions.Name ??= typeof(T).FullName; + jobOptions.JobFactory ??= sp => sp.GetRequiredService(); + + return new HostedJobService(s, jobOptions, s.GetService()); + }); + } + + public static IServiceCollection AddJob(this IServiceCollection services, Action configureJobOptions) where T : class, IJob + { + var jobOptionsBuilder = new HostedJobOptionsBuilder(); + jobOptionsBuilder.ApplyDefaults(); + jobOptionsBuilder.Name(typeof(T).FullName); + configureJobOptions?.Invoke(jobOptionsBuilder); + return services.AddJob(jobOptionsBuilder.Target); + } + + public static IServiceCollection AddJob(this IServiceCollection services, Action configureJobOptions) + { + var jobOptionsBuilder = new HostedJobOptionsBuilder(); + configureJobOptions?.Invoke(jobOptionsBuilder); + return services.AddJob(jobOptionsBuilder.Target); + } - return services.AddTransient(s => new ScheduledJobRegistration(() => jobFactory(s), jobOptions.CronSchedule)); - } + public static IServiceCollection AddJob(this IServiceCollection services, string name, Func jobFactory, Action configureJobOptions) + { + var jobOptionsBuilder = new HostedJobOptionsBuilder(); + jobOptionsBuilder.Name(name).JobFactory(jobFactory); + configureJobOptions?.Invoke(jobOptionsBuilder); + return services.AddJob(jobOptionsBuilder.Target); + } + + public static IServiceCollection AddCronJob(this IServiceCollection services, ScheduledJobOptions jobOptions) + { + if (jobOptions.JobFactory == null) + throw new ArgumentNullException(nameof(jobOptions), "jobOptions.JobFactory is required"); + + services.AddJobScheduler(); + + return services.AddTransient(s => new ScheduledJobRegistration(jobOptions)); } - public static IServiceCollection AddJob(this IServiceCollection services, HostedJobOptions jobOptions) where T : class, IJob + public static IServiceCollection AddCronJob(this IServiceCollection services, Action configureJobOptions) + { + var jobOptionsBuilder = new ScheduledJobOptionsBuilder(); + configureJobOptions?.Invoke(jobOptionsBuilder); + return services.AddCronJob(jobOptionsBuilder.Target); + } + + public static IServiceCollection AddCronJob(this IServiceCollection services, string name, string cronSchedule, Action configureJobOptions = null) where T : class, IJob { services.AddTransient(); - if (String.IsNullOrEmpty(jobOptions.CronSchedule)) + var jobOptionsBuilder = new ScheduledJobOptionsBuilder(); + jobOptionsBuilder.Name(name).CronSchedule(cronSchedule).JobFactory(sp => sp.GetRequiredService()); + configureJobOptions?.Invoke(jobOptionsBuilder); + return services.AddCronJob(jobOptionsBuilder.Target); + } + + public static IServiceCollection AddCronJob(this IServiceCollection services, string name, string cronSchedule, Func action) + { + return services.AddCronJob(o => o.Name(name).CronSchedule(cronSchedule).JobFactory(sp => new DynamicJob(sp, action))); + } + + public static IServiceCollection AddCronJob(this IServiceCollection services, string name, string cronSchedule, Func action) + { + return services.AddCronJob(o => o.Name(name).CronSchedule(cronSchedule).JobFactory(sp => new DynamicJob(sp, (xp, _) => action(xp)))); + } + + public static IServiceCollection AddCronJob(this IServiceCollection services, string name, string cronSchedule, Func action) + { + return services.AddCronJob(o => o.Name(name).CronSchedule(cronSchedule).JobFactory(sp => new DynamicJob(sp, (_, _) => action()))); + } + + public static IServiceCollection AddCronJob(this IServiceCollection services, string name, string cronSchedule, Action action) + { + return services.AddCronJob(o => o.Name(name).CronSchedule(cronSchedule).JobFactory(sp => new DynamicJob(sp, (xp, ct) => { - return services.AddTransient(s => - { - if (jobOptions.JobFactory == null) - jobOptions.JobFactory = s.GetRequiredService; + action(xp, ct); + return Task.CompletedTask; + }))); + } - return new HostedJobService(s, jobOptions, s.GetService()); - }); - } - else + public static IServiceCollection AddCronJob(this IServiceCollection services, string name, string cronSchedule, Action action) + { + return services.AddCronJob(o => o.Name(name).CronSchedule(cronSchedule).JobFactory(sp => new DynamicJob(sp, (_, ct) => { - if (!services.Any(s => s.ServiceType == typeof(IHostedService) && s.ImplementationType == typeof(ScheduledJobService))) - services.AddTransient(); + action(ct); + return Task.CompletedTask; + }))); + } - return services.AddTransient(s => new ScheduledJobRegistration(jobOptions.JobFactory ?? (s.GetRequiredService), jobOptions.CronSchedule)); - } + public static IServiceCollection AddCronJob(this IServiceCollection services, string name, string cronSchedule, Action action) + { + return services.AddCronJob(o => o.Name(name).CronSchedule(cronSchedule).JobFactory(sp => new DynamicJob(sp, (_, _) => + { + action(); + return Task.CompletedTask; + }))); } - public static IServiceCollection AddJob(this IServiceCollection services, bool waitForStartupActions = false) where T : class, IJob + public static IServiceCollection AddDistributedCronJob(this IServiceCollection services, string cronSchedule, Action configureJobOptions = null) where T : class, IJob { - return services.AddJob(o => o.ApplyDefaults().WaitForStartupActions(waitForStartupActions)); + services.AddTransient(); + var jobOptionsBuilder = new ScheduledJobOptionsBuilder(); + jobOptionsBuilder.Name(typeof(T).FullName).Distributed(true).CronSchedule(cronSchedule).JobFactory(sp => sp.GetRequiredService()); + configureJobOptions?.Invoke(jobOptionsBuilder); + return services.AddCronJob(jobOptionsBuilder.Target); } - public static IServiceCollection AddCronJob(this IServiceCollection services, string cronSchedule) where T : class, IJob + public static IServiceCollection AddDistributedCronJob(this IServiceCollection services, string name, string cronSchedule, Func action) { - return services.AddJob(o => o.CronSchedule(cronSchedule)); + return services.AddCronJob(o => o.Name(name).Distributed(true).CronSchedule(cronSchedule).JobFactory(sp => new DynamicJob(sp, action))); } - public static IServiceCollection AddJob(this IServiceCollection services, Action configureJobOptions) where T : class, IJob + public static IServiceCollection AddDistributedCronJob(this IServiceCollection services, string name, string cronSchedule, Func action) { - var jobOptionsBuilder = new HostedJobOptionsBuilder(); - configureJobOptions?.Invoke(jobOptionsBuilder); - return services.AddJob(jobOptionsBuilder.Target); + return services.AddCronJob(o => o.Name(name).Distributed(true).CronSchedule(cronSchedule).JobFactory(sp => new DynamicJob(sp, (xp, _) => action(xp)))); } - public static IServiceCollection AddJob(this IServiceCollection services, Action configureJobOptions) + public static IServiceCollection AddDistributedCronJob(this IServiceCollection services, string name, string cronSchedule, Func action) { - var jobOptionsBuilder = new HostedJobOptionsBuilder(); - configureJobOptions?.Invoke(jobOptionsBuilder); - return services.AddJob(jobOptionsBuilder.Target); + return services.AddCronJob(o => o.Name(name).Distributed(true).CronSchedule(cronSchedule).JobFactory(sp => new DynamicJob(sp, (_, _) => action()))); } - public static IServiceCollection AddJob(this IServiceCollection services, Func jobFactory, Action configureJobOptions) + public static IServiceCollection AddDistributedCronJob(this IServiceCollection services, string name, string cronSchedule, Action action) { - var jobOptionsBuilder = new HostedJobOptionsBuilder(); - configureJobOptions?.Invoke(jobOptionsBuilder); - return services.AddJob(jobFactory, jobOptionsBuilder.Target); + return services.AddCronJob(o => o.Name(name).Distributed(true).CronSchedule(cronSchedule).JobFactory(sp => new DynamicJob(sp, (xp, ct) => + { + action(xp, ct); + return Task.CompletedTask; + }))); + } + + public static IServiceCollection AddDistributedCronJob(this IServiceCollection services, string name, string cronSchedule, Action action) + { + return services.AddCronJob(o => o.Name(name).Distributed(true).CronSchedule(cronSchedule).JobFactory(sp => new DynamicJob(sp, (_, ct) => + { + action(ct); + return Task.CompletedTask; + }))); + } + + public static IServiceCollection AddDistributedCronJob(this IServiceCollection services, string name, string cronSchedule, Action action) + { + return services.AddCronJob(o => o.Name(name).Distributed(true).CronSchedule(cronSchedule).JobFactory(sp => new DynamicJob(sp, (_, _) => + { + action(); + return Task.CompletedTask; + }))); + } + + public static IServiceCollection AddJobScheduler(this IServiceCollection services) + { + if (!services.Any(s => s.ServiceType == typeof(IHostedService) && s.ImplementationType == typeof(ScheduledJobService))) + services.AddTransient(); + + if (!services.Any(s => s.ServiceType == typeof(IScheduledJobManager) && s.ImplementationType == typeof(ScheduledJobManager))) + services.AddSingleton(); + + if (!services.Any(s => s.ServiceType == typeof(ScheduledJobManager) && s.ImplementationType == typeof(ScheduledJobManager))) + services.AddSingleton(); + + return services; } public static IServiceCollection AddJobLifetimeService(this IServiceCollection services) diff --git a/src/Foundatio.Extensions.Hosting/Jobs/JobOptionsBuilder.cs b/src/Foundatio.Extensions.Hosting/Jobs/JobOptionsBuilder.cs index 05194610..731b0584 100644 --- a/src/Foundatio.Extensions.Hosting/Jobs/JobOptionsBuilder.cs +++ b/src/Foundatio.Extensions.Hosting/Jobs/JobOptionsBuilder.cs @@ -36,7 +36,7 @@ public HostedJobOptionsBuilder Description(string value) return this; } - public HostedJobOptionsBuilder JobFactory(Func value) + public HostedJobOptionsBuilder JobFactory(Func value) { Target.JobFactory = value; return this; @@ -48,12 +48,6 @@ public HostedJobOptionsBuilder RunContinuous(bool value) return this; } - public HostedJobOptionsBuilder CronSchedule(string value) - { - Target.CronSchedule = value; - return this; - } - public HostedJobOptionsBuilder Interval(TimeSpan? value) { Target.Interval = value; diff --git a/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobManager.cs b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobManager.cs new file mode 100644 index 00000000..0cb90f7b --- /dev/null +++ b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobManager.cs @@ -0,0 +1,178 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Foundatio.Caching; +using Foundatio.Jobs; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace Foundatio.Extensions.Hosting.Jobs; + + +// TODO: Persist last run time to cache so it's not lost on restart +// TODO: Add telemetry spans around job runs + +public interface IScheduledJobManager +{ + void AddOrUpdate(string cronSchedule, Action configure = null) where TJob : class, IJob; + void AddOrUpdate(string jobName, string cronSchedule, Func action, Action configure = null); + void AddOrUpdate(string jobName, string cronSchedule, Func action, Action configure = null); + void AddOrUpdate(string jobName, string cronSchedule, Func action, Action configure = null); + void AddOrUpdate(string jobName, string cronSchedule, Action action, Action configure = null); + void AddOrUpdate(string jobName, string cronSchedule, Action action, Action configure = null); + void AddOrUpdate(string jobName, string cronSchedule, Action action, Action configure = null); + void Remove() where TJob : class, IJob; + void Remove(string jobName); +} + +public class ScheduledJobManager : IScheduledJobManager +{ + private readonly IServiceProvider _serviceProvider; + private readonly ILoggerFactory _loggerFactory; + private readonly ICacheClient _cacheClient; + private readonly List _jobs = new(); + private ScheduledJobRunner[] _jobsArray; + private readonly object _lock = new(); + + public ScheduledJobManager(IServiceProvider serviceProvider, ILoggerFactory loggerFactory) + { + _serviceProvider = serviceProvider; + _loggerFactory = loggerFactory; + var cacheClient = serviceProvider.GetService(); + _jobs.AddRange(serviceProvider.GetServices().Select(j => new ScheduledJobRunner(j.Options, serviceProvider, _cacheClient, loggerFactory))); + _jobsArray = _jobs.ToArray(); + if (_jobs.Any(j => j.Options.IsDistributed && cacheClient == null)) + throw new ArgumentException("A distributed cache client is required to run distributed jobs."); + _cacheClient = cacheClient ?? new InMemoryCacheClient(o => o.LoggerFactory(loggerFactory)); + } + + public void AddOrUpdate(string cronSchedule, Action configure = null) where TJob : class, IJob + { + string jobName = typeof(TJob).Name; + lock (_lock) + { + var job = Jobs.FirstOrDefault(j => j.Options.Name == jobName); + if (job == null) + { + var options = new ScheduledJobOptions + { + CronSchedule = cronSchedule, + Name = jobName, + JobFactory = sp => sp.GetRequiredService() + }; + var builder = new ScheduledJobOptionsBuilder(options); + configure?.Invoke(builder); + _jobs.Add(new ScheduledJobRunner(options, _serviceProvider, _cacheClient, _loggerFactory)); + _jobsArray = _jobs.ToArray(); + } + else + { + var builder = new ScheduledJobOptionsBuilder(job.Options); + builder.CronSchedule(cronSchedule); + configure?.Invoke(builder); + job.Schedule = job.Options.CronSchedule; + } + } + } + + public void AddOrUpdate(string jobName, string cronSchedule, Action configure = null) + { + lock (_lock) + { + var job = Jobs.FirstOrDefault(j => j.Options.Name == jobName); + if (job == null) + { + var options = new ScheduledJobOptions + { + CronSchedule = cronSchedule, + Name = jobName + }; + var builder = new ScheduledJobOptionsBuilder(options); + configure?.Invoke(builder); + options.JobFactory = options.JobFactory; + _jobs.Add(new ScheduledJobRunner(options, _serviceProvider, _cacheClient, _loggerFactory)); + _jobsArray = _jobs.ToArray(); + } + else + { + var builder = new ScheduledJobOptionsBuilder(job.Options); + builder.CronSchedule(cronSchedule); + configure?.Invoke(builder); + job.Schedule = job.Options.CronSchedule; + } + } + } + + public void AddOrUpdate(string jobName, string cronSchedule, Func action, Action configure = null) + { + AddOrUpdate(jobName, cronSchedule, b => b.JobFactory(sp => new DynamicJob(sp, action))); + } + + public void AddOrUpdate(string jobName, string cronSchedule, Func action, Action configure = null) + { + AddOrUpdate(jobName, cronSchedule, (_, ct) => action(ct), configure); + } + + public void AddOrUpdate(string jobName, string cronSchedule, Func action, Action configure = null) + { + AddOrUpdate(jobName, cronSchedule, (_, _) => action(), configure); + } + + public void AddOrUpdate(string jobName, string cronSchedule, Action action, Action configure = null) + { + AddOrUpdate(jobName, cronSchedule, (sp, ct) => + { + action(sp, ct); + return Task.CompletedTask; + }, configure); + } + + public void AddOrUpdate(string jobName, string cronSchedule, Action action, Action configure = null) + { + AddOrUpdate(jobName, cronSchedule, (_, ct) => + { + action(ct); + return Task.CompletedTask; + }, configure); + } + + public void AddOrUpdate(string jobName, string cronSchedule, Action action, Action configure = null) + { + AddOrUpdate(jobName, cronSchedule, (_, _) => + { + action(); + return Task.CompletedTask; + }, configure); + } + + public void Remove() where TJob : class, IJob + { + string jobName = typeof(TJob).Name; + lock (_lock) + { + var job = Jobs.FirstOrDefault(j => j.Options.Name == jobName); + if (job == null) + return; + + _jobs.Remove(job); + _jobsArray = _jobs.ToArray(); + } + } + + public void Remove(string jobName) + { + lock (_lock) + { + var job = _jobs.FirstOrDefault(j => j.Options.Name == jobName); + if (job == null) + return; + + _jobs.Remove(job); + _jobsArray = _jobs.ToArray(); + } + } + + internal ScheduledJobRunner[] Jobs => _jobsArray; +} diff --git a/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobOptions.cs b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobOptions.cs new file mode 100644 index 00000000..f331c738 --- /dev/null +++ b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobOptions.cs @@ -0,0 +1,14 @@ +using System; +using Foundatio.Jobs; + +namespace Foundatio.Extensions.Hosting.Jobs; + +public class ScheduledJobOptions +{ + public string Name { get; set; } + public string Description { get; set; } + public Func JobFactory { get; set; } + public bool WaitForStartupActions { get; set; } + public string CronSchedule { get; set; } + public bool IsDistributed { get; set; } +} diff --git a/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobOptionsBuilder.cs b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobOptionsBuilder.cs new file mode 100644 index 00000000..37526e5a --- /dev/null +++ b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobOptionsBuilder.cs @@ -0,0 +1,50 @@ +using System; +using Foundatio.Jobs; + +namespace Foundatio.Extensions.Hosting.Jobs; + +public class ScheduledJobOptionsBuilder +{ + public ScheduledJobOptionsBuilder(ScheduledJobOptions target = null) + { + Target = target ?? new ScheduledJobOptions(); + } + + public ScheduledJobOptions Target { get; } + + public ScheduledJobOptionsBuilder Name(string value) + { + Target.Name = value; + return this; + } + + public ScheduledJobOptionsBuilder Description(string value) + { + Target.Description = value; + return this; + } + + public ScheduledJobOptionsBuilder CronSchedule(string value) + { + Target.CronSchedule = value; + return this; + } + + public ScheduledJobOptionsBuilder JobFactory(Func value) + { + Target.JobFactory = value; + return this; + } + + public ScheduledJobOptionsBuilder WaitForStartupActions(bool value) + { + Target.WaitForStartupActions = value; + return this; + } + + public ScheduledJobOptionsBuilder Distributed(bool value) + { + Target.IsDistributed = value; + return this; + } +} diff --git a/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobRegistration.cs b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobRegistration.cs index 89fd57b6..6d7269e7 100644 --- a/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobRegistration.cs +++ b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobRegistration.cs @@ -1,16 +1,11 @@ -using System; -using Foundatio.Jobs; - -namespace Foundatio.Extensions.Hosting.Jobs; +namespace Foundatio.Extensions.Hosting.Jobs; public class ScheduledJobRegistration { - public ScheduledJobRegistration(Func jobFactory, string schedule) + public ScheduledJobRegistration(ScheduledJobOptions options) { - JobFactory = jobFactory; - Schedule = schedule; + Options = options; } - public Func JobFactory { get; } - public string Schedule { get; } + public ScheduledJobOptions Options { get; private set; } } diff --git a/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobRunner.cs b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobRunner.cs new file mode 100644 index 00000000..15c413e9 --- /dev/null +++ b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobRunner.cs @@ -0,0 +1,141 @@ +using System; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; +using Foundatio.Caching; +using Foundatio.Extensions.Hosting.Cronos; +using Foundatio.Jobs; +using Foundatio.Lock; +using Foundatio.Utility; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; + +namespace Foundatio.Extensions.Hosting.Jobs; + +internal class ScheduledJobRunner +{ + private readonly ScheduledJobOptions _jobOptions; + private readonly IServiceProvider _serviceProvider; + private readonly ICacheClient _cacheClient; + private CronExpression _cronSchedule; + private readonly ILockProvider _lockProvider; + private readonly ILogger _logger; + private readonly DateTime _baseDate = new(2010, 1, 1); + + public ScheduledJobRunner(ScheduledJobOptions jobOptions, IServiceProvider serviceProvider, ICacheClient cacheClient, ILoggerFactory loggerFactory = null) + { + _jobOptions = jobOptions; + _jobOptions.Name ??= Guid.NewGuid().ToString("N").Substring(0, 10); + _serviceProvider = serviceProvider; + _cacheClient = new ScopedCacheClient(cacheClient, "jobs"); + _logger = loggerFactory?.CreateLogger() ?? NullLogger.Instance; + + _cronSchedule = CronExpression.Parse(_jobOptions.CronSchedule); + if (_cronSchedule == null) + throw new ArgumentException("Could not parse schedule.", nameof(ScheduledJobOptions.CronSchedule)); + + var interval = TimeSpan.FromDays(1); + + var nextOccurrence = _cronSchedule.GetNextOccurrence(SystemClock.UtcNow); + if (nextOccurrence.HasValue) + { + var nextNextOccurrence = _cronSchedule.GetNextOccurrence(nextOccurrence.Value); + if (nextNextOccurrence.HasValue) + interval = nextNextOccurrence.Value.Subtract(nextOccurrence.Value); + } + + _lockProvider = new ThrottlingLockProvider(_cacheClient, 1, interval.Add(interval)); + + NextRun = _cronSchedule.GetNextOccurrence(SystemClock.UtcNow); + } + + public ScheduledJobOptions Options => _jobOptions; + + private string _schedule; + public string Schedule + { + get { return _schedule;} + set + { + _cronSchedule = CronExpression.Parse(value); + NextRun = _cronSchedule.GetNextOccurrence(SystemClock.UtcNow); + _schedule = value; + } + } + + public DateTime? LastRun { get; private set; } + public DateTime? NextRun { get; private set; } + public Task RunTask { get; private set; } + + public bool ShouldRun() + { + if (!NextRun.HasValue) + return false; + + // not time yet + if (NextRun > SystemClock.UtcNow) + return false; + + // check if already run + if (LastRun != null && LastRun.Value == NextRun.Value) + return false; + + return true; + } + + public async Task StartAsync(CancellationToken cancellationToken = default) + { + ILock l = new EmptyLock(); + if (Options.IsDistributed) + { + // using lock provider in a cluster with a distributed cache implementation keeps cron jobs from running duplicates + l = await _lockProvider.AcquireAsync(GetLockKey(NextRun.Value), TimeSpan.FromMinutes(60), TimeSpan.Zero).AnyContext(); + if (l == null) + { + // if we didn't get the lock, update the last run time + var lastRun = await _cacheClient.GetAsync("lastrun:" + Options.Name).AnyContext(); + if (lastRun.HasValue) + LastRun = lastRun.Value; + + return; + } + } + + await using (l) + { + // start running the job in a thread + RunTask = Task.Factory.StartNew(async () => + { + using var activity = FoundatioDiagnostics.ActivitySource.StartActivity("Job " + Options.Name, ActivityKind.Server); + + try + { + var result = await Options.JobFactory(_serviceProvider).TryRunAsync(cancellationToken).AnyContext(); + _logger.LogJobResult(result, Options.Name); + } + catch (TaskCanceledException) + { + } + catch (Exception ex) + { + if (_logger.IsEnabled(LogLevel.Error)) + _logger.LogError(ex, "Error running scheduled job ({JobName}): {Message}", Options.Name, ex.Message); + + throw; + } + }, cancellationToken).Unwrap(); + + LastRun = NextRun; + if (Options.IsDistributed) + await _cacheClient.SetAsync("lastrun:" + Options.Name, LastRun.Value).AnyContext(); + NextRun = _cronSchedule.GetNextOccurrence(LastRun.Value); + } + } + + private string GetLockKey(DateTime date) + { + long minute = (long)date.Subtract(_baseDate).TotalMinutes; + + return Options.Name + ":" + minute; + } +} diff --git a/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobService.cs b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobService.cs index e724b7ab..eb7acaf7 100644 --- a/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobService.cs +++ b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobService.cs @@ -1,31 +1,23 @@ using System; -using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; -using Cronos; -using Foundatio.Caching; using Foundatio.Extensions.Hosting.Startup; -using Foundatio.Jobs; -using Foundatio.Lock; using Foundatio.Utility; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Logging.Abstractions; namespace Foundatio.Extensions.Hosting.Jobs; public class ScheduledJobService : BackgroundService, IJobStatus { - private readonly List _jobs; private readonly IServiceProvider _serviceProvider; + private readonly ScheduledJobManager _jobManager; - public ScheduledJobService(IServiceProvider serviceProvider, ILoggerFactory loggerFactory) + public ScheduledJobService(IServiceProvider serviceProvider, ScheduledJobManager jobManager) { _serviceProvider = serviceProvider; - var cacheClient = serviceProvider.GetService() ?? new InMemoryCacheClient(o => o.LoggerFactory(loggerFactory)); - _jobs = new List(serviceProvider.GetServices().Select(j => new ScheduledJobRunner(j.JobFactory, j.Schedule, cacheClient, loggerFactory))); + _jobManager = jobManager; var lifetime = serviceProvider.GetService(); lifetime?.RegisterHostedJobInstance(this); @@ -35,7 +27,6 @@ public ScheduledJobService(IServiceProvider serviceProvider, ILoggerFactory logg protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - // TODO: Add more logging throughout var startupContext = _serviceProvider.GetService(); if (startupContext != null) { @@ -49,7 +40,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) while (!stoppingToken.IsCancellationRequested) { - var jobsToRun = _jobs.Where(j => j.ShouldRun()).ToArray(); + var jobsToRun = _jobManager.Jobs.Where(j => j.ShouldRun()).ToArray(); foreach (var jobToRun in jobsToRun) await jobToRun.StartAsync(stoppingToken).AnyContext(); @@ -61,94 +52,4 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) await Task.Delay(timeUntilNextMinute, stoppingToken).AnyContext(); } } - - private class ScheduledJobRunner - { - private readonly Func _jobFactory; - private readonly CronExpression _cronSchedule; - private readonly ILockProvider _lockProvider; - private readonly ILogger _logger; - private readonly DateTime _baseDate = new(2010, 1, 1); - private string _cacheKeyPrefix; - - public ScheduledJobRunner(Func jobFactory, string schedule, ICacheClient cacheClient, ILoggerFactory loggerFactory = null) - { - _jobFactory = jobFactory; - Schedule = schedule; - _logger = loggerFactory?.CreateLogger() ?? NullLogger.Instance; - - _cronSchedule = CronExpression.Parse(schedule); - if (_cronSchedule == null) - throw new ArgumentException("Could not parse schedule.", nameof(schedule)); - - var interval = TimeSpan.FromDays(1); - - var nextOccurrence = _cronSchedule.GetNextOccurrence(SystemClock.UtcNow); - if (nextOccurrence.HasValue) - { - var nextNextOccurrence = _cronSchedule.GetNextOccurrence(nextOccurrence.Value); - if (nextNextOccurrence.HasValue) - interval = nextNextOccurrence.Value.Subtract(nextOccurrence.Value); - } - - _lockProvider = new ThrottlingLockProvider(cacheClient, 1, interval.Add(interval)); - - NextRun = _cronSchedule.GetNextOccurrence(SystemClock.UtcNow); - } - - public string Schedule { get; private set; } - public DateTime? LastRun { get; private set; } - public DateTime? NextRun { get; private set; } - public Task RunTask { get; private set; } - - public bool ShouldRun() - { - if (!NextRun.HasValue) - return false; - - // not time yet - if (NextRun > SystemClock.UtcNow) - return false; - - // check if already run - if (LastRun != null && LastRun.Value == NextRun.Value) - return false; - - return true; - } - - public Task StartAsync(CancellationToken cancellationToken = default) - { - // using lock provider in a cluster with a distributed cache implementation keeps cron jobs from running duplicates - // TODO: provide ability to run cron jobs on a per host isolated schedule - return _lockProvider.TryUsingAsync(GetLockKey(NextRun.Value), t => - { - // start running the job in a thread - RunTask = Task.Factory.StartNew(async () => - { - var job = _jobFactory(); - // TODO: Don't calculate job name every time - string jobName = job.GetType().Name; - var result = await _jobFactory().TryRunAsync(cancellationToken).AnyContext(); - // TODO: Should we only set last run on success? Seems like that could be bad. - _logger.LogJobResult(result, jobName); - }, cancellationToken).Unwrap(); - - LastRun = NextRun; - NextRun = _cronSchedule.GetNextOccurrence(SystemClock.UtcNow); - - return Task.CompletedTask; - }, TimeSpan.Zero, TimeSpan.Zero); - } - - private string GetLockKey(DateTime date) - { - if (_cacheKeyPrefix == null) - _cacheKeyPrefix = TypeHelper.GetTypeDisplayName(_jobFactory().GetType()); - - long minute = (long)date.Subtract(_baseDate).TotalMinutes; - - return _cacheKeyPrefix + minute; - } - } } diff --git a/src/Foundatio/Caching/InMemoryCacheClientOptions.cs b/src/Foundatio/Caching/InMemoryCacheClientOptions.cs index c608ad29..8681008b 100644 --- a/src/Foundatio/Caching/InMemoryCacheClientOptions.cs +++ b/src/Foundatio/Caching/InMemoryCacheClientOptions.cs @@ -5,7 +5,7 @@ public class InMemoryCacheClientOptions : SharedOptions /// /// The maximum number of items to store in the cache /// - public int? MaxItems { get; set; } = 1000; + public int? MaxItems { get; set; } = 10000; /// /// Whether or not values should be cloned during get and set to make sure that any cache entry changes are isolated diff --git a/src/Foundatio/Jobs/JobOptions.cs b/src/Foundatio/Jobs/JobOptions.cs index 1b69bbb1..36c745bb 100644 --- a/src/Foundatio/Jobs/JobOptions.cs +++ b/src/Foundatio/Jobs/JobOptions.cs @@ -8,7 +8,7 @@ public class JobOptions { public string Name { get; set; } public string Description { get; set; } - public Func JobFactory { get; set; } + public Func JobFactory { get; set; } public bool RunContinuous { get; set; } = true; public TimeSpan? Interval { get; set; } public TimeSpan? InitialDelay { get; set; } @@ -65,25 +65,25 @@ public static JobOptions GetDefaults() where T : IJob public static JobOptions GetDefaults(IJob instance) { var jobOptions = GetDefaults(instance.GetType()); - jobOptions.JobFactory = () => instance; + jobOptions.JobFactory = _ => instance; return jobOptions; } public static JobOptions GetDefaults(IJob instance) where T : IJob { var jobOptions = GetDefaults(); - jobOptions.JobFactory = () => instance; + jobOptions.JobFactory = _ => instance; return jobOptions; } - public static JobOptions GetDefaults(Type jobType, Func jobFactory) + public static JobOptions GetDefaults(Type jobType, Func jobFactory) { var jobOptions = GetDefaults(jobType); jobOptions.JobFactory = jobFactory; return jobOptions; } - public static JobOptions GetDefaults(Func jobFactory) where T : IJob + public static JobOptions GetDefaults(Func jobFactory) where T : IJob { var jobOptions = GetDefaults(); jobOptions.JobFactory = jobFactory; diff --git a/src/Foundatio/Jobs/JobRunner.cs b/src/Foundatio/Jobs/JobRunner.cs index d40e79b3..51510059 100644 --- a/src/Foundatio/Jobs/JobRunner.cs +++ b/src/Foundatio/Jobs/JobRunner.cs @@ -15,37 +15,42 @@ public class JobRunner private readonly ILogger _logger; private string _jobName; private readonly JobOptions _options; + private readonly IServiceProvider _serviceProvider; - public JobRunner(JobOptions options, ILoggerFactory loggerFactory = null) + public JobRunner(JobOptions options, IServiceProvider serviceProvider, ILoggerFactory loggerFactory = null) { _logger = loggerFactory?.CreateLogger() ?? NullLogger.Instance; _options = options; + _serviceProvider = serviceProvider; } - public JobRunner(IJob instance, ILoggerFactory loggerFactory = null, TimeSpan? initialDelay = null, int instanceCount = 1, bool runContinuous = true, int iterationLimit = -1, TimeSpan? interval = null) + public JobRunner(IJob instance, IServiceProvider serviceProvider, ILoggerFactory loggerFactory = null, TimeSpan? initialDelay = null, int instanceCount = 1, bool runContinuous = true, int iterationLimit = -1, TimeSpan? interval = null) : this(new JobOptions { - JobFactory = () => instance, + JobFactory = _ => instance, InitialDelay = initialDelay, InstanceCount = instanceCount, IterationLimit = iterationLimit, RunContinuous = runContinuous, Interval = interval - }, loggerFactory) + }, serviceProvider, loggerFactory) { } - public JobRunner(Func jobFactory, ILoggerFactory loggerFactory = null, TimeSpan? initialDelay = null, int instanceCount = 1, bool runContinuous = true, int iterationLimit = -1, TimeSpan? interval = null) + public JobRunner(Func jobFactory, IServiceProvider serviceProvider, + ILoggerFactory loggerFactory = null, TimeSpan? initialDelay = null, int instanceCount = 1, + bool runContinuous = true, int iterationLimit = -1, TimeSpan? interval = null) : this(new JobOptions - { - JobFactory = jobFactory, - InitialDelay = initialDelay, - InstanceCount = instanceCount, - IterationLimit = iterationLimit, - RunContinuous = runContinuous, - Interval = interval - }, loggerFactory) - { } + { + JobFactory = jobFactory, + InitialDelay = initialDelay, + InstanceCount = instanceCount, + IterationLimit = iterationLimit, + RunContinuous = runContinuous, + Interval = interval + }, serviceProvider, loggerFactory) + { + } public CancellationTokenSource CancellationTokenSource { get; private set; } @@ -127,7 +132,7 @@ public async Task RunAsync(CancellationToken cancellationToken = default) IJob job = null; try { - job = _options.JobFactory(); + job = _options.JobFactory(_serviceProvider); } catch (Exception ex) { @@ -171,7 +176,7 @@ public async Task RunAsync(CancellationToken cancellationToken = default) { try { - var jobInstance = _options.JobFactory(); + var jobInstance = _options.JobFactory(_serviceProvider); await jobInstance.RunContinuousAsync(_options.Interval, _options.IterationLimit, cancellationToken).AnyContext(); } catch (TaskCanceledException) diff --git a/src/Foundatio/Utility/FoundatioDiagnostics.cs b/src/Foundatio/Utility/FoundatioDiagnostics.cs index 78d3b47d..aca3faff 100644 --- a/src/Foundatio/Utility/FoundatioDiagnostics.cs +++ b/src/Foundatio/Utility/FoundatioDiagnostics.cs @@ -4,10 +4,10 @@ namespace Foundatio; -internal static class FoundatioDiagnostics +public static class FoundatioDiagnostics { internal static readonly AssemblyName AssemblyName = typeof(FoundatioDiagnostics).Assembly.GetName(); internal static readonly string AssemblyVersion = typeof(FoundatioDiagnostics).Assembly.GetCustomAttribute()?.InformationalVersion ?? AssemblyName.Version.ToString(); - internal static readonly ActivitySource ActivitySource = new(AssemblyName.Name, AssemblyVersion); - internal static readonly Meter Meter = new("Foundatio", AssemblyVersion); + public static readonly ActivitySource ActivitySource = new(AssemblyName.Name, AssemblyVersion); + public static readonly Meter Meter = new("Foundatio", AssemblyVersion); } diff --git a/tests/Foundatio.Tests/Jobs/JobTests.cs b/tests/Foundatio.Tests/Jobs/JobTests.cs index 48777b69..55e35379 100644 --- a/tests/Foundatio.Tests/Jobs/JobTests.cs +++ b/tests/Foundatio.Tests/Jobs/JobTests.cs @@ -9,6 +9,7 @@ using Foundatio.Metrics; using Foundatio.Utility; using Foundatio.Xunit; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Xunit; using Xunit.Abstractions; @@ -23,8 +24,9 @@ public JobTests(ITestOutputHelper output) : base(output) { } public async Task CanCancelJob() { var job = new HelloWorldJob(Log); + var sp = new ServiceCollection().BuildServiceProvider(); var timeoutCancellationTokenSource = new CancellationTokenSource(1000); - var resultTask = new JobRunner(job, Log).RunAsync(timeoutCancellationTokenSource.Token); + var resultTask = new JobRunner(job, sp, Log).RunAsync(timeoutCancellationTokenSource.Token); await SystemClock.SleepAsync(TimeSpan.FromSeconds(2)); Assert.True(await resultTask); } @@ -33,7 +35,8 @@ public async Task CanCancelJob() public async Task CanStopLongRunningJob() { var job = new LongRunningJob(Log); - var runner = new JobRunner(job, Log); + var sp = new ServiceCollection().BuildServiceProvider(); + var runner = new JobRunner(job, sp, Log); var cts = new CancellationTokenSource(1000); bool result = await runner.RunAsync(cts.Token); @@ -44,7 +47,8 @@ public async Task CanStopLongRunningJob() public async Task CanStopLongRunningCronJob() { var job = new LongRunningJob(Log); - var runner = new JobRunner(job, Log); + var sp = new ServiceCollection().BuildServiceProvider(); + var runner = new JobRunner(job, sp, Log); var cts = new CancellationTokenSource(1000); bool result = await runner.RunAsync(cts.Token); @@ -81,11 +85,12 @@ public async Task CanRunJobs() public async Task CanRunMultipleInstances() { var job = new HelloWorldJob(Log); + var sp = new ServiceCollection().BuildServiceProvider(); HelloWorldJob.GlobalRunCount = 0; using (var timeoutCancellationTokenSource = new CancellationTokenSource(1000)) { - await new JobRunner(job, Log, instanceCount: 5, iterationLimit: 1).RunAsync(timeoutCancellationTokenSource.Token); + await new JobRunner(job, sp, Log, instanceCount: 5, iterationLimit: 1).RunAsync(timeoutCancellationTokenSource.Token); } Assert.Equal(5, HelloWorldJob.GlobalRunCount); @@ -93,7 +98,7 @@ public async Task CanRunMultipleInstances() HelloWorldJob.GlobalRunCount = 0; using (var timeoutCancellationTokenSource = new CancellationTokenSource(50000)) { - await new JobRunner(job, Log, instanceCount: 5, iterationLimit: 100).RunAsync(timeoutCancellationTokenSource.Token); + await new JobRunner(job, sp, Log, instanceCount: 5, iterationLimit: 100).RunAsync(timeoutCancellationTokenSource.Token); } Assert.Equal(500, HelloWorldJob.GlobalRunCount); @@ -105,13 +110,14 @@ public async Task CanCancelContinuousJobs() using (TestSystemClock.Install()) { var job = new HelloWorldJob(Log); + var sp = new ServiceCollection().BuildServiceProvider(); var timeoutCancellationTokenSource = new CancellationTokenSource(100); await job.RunContinuousAsync(TimeSpan.FromSeconds(1), 5, timeoutCancellationTokenSource.Token); Assert.Equal(1, job.RunCount); timeoutCancellationTokenSource = new CancellationTokenSource(500); - var runnerTask = new JobRunner(job, Log, instanceCount: 5, iterationLimit: 10000, interval: TimeSpan.FromMilliseconds(1)).RunAsync(timeoutCancellationTokenSource.Token); + var runnerTask = new JobRunner(job, sp, Log, instanceCount: 5, iterationLimit: 10000, interval: TimeSpan.FromMilliseconds(1)).RunAsync(timeoutCancellationTokenSource.Token); await SystemClock.SleepAsync(TimeSpan.FromSeconds(1)); await runnerTask; }