From fc7a27b8bb6c6ac36abe023bec0700302d76fbbe Mon Sep 17 00:00:00 2001 From: "Eric J. Smith" Date: Wed, 6 Nov 2024 17:34:42 -0600 Subject: [PATCH] Harden scheduled job runner --- samples/Foundatio.HostingSample/Program.cs | 2 +- .../Jobs/ScheduledJobRunner.cs | 111 +++++++++++++----- .../Jobs/ScheduledJobService.cs | 6 +- .../Startup/StartupActionsContext.cs | 5 + src/Foundatio/Jobs/JobResult.cs | 4 +- src/Foundatio/Jobs/JobRunner.cs | 3 +- src/Foundatio/Lock/CacheLockProvider.cs | 2 +- src/Foundatio/Lock/ThrottlingLockProvider.cs | 6 +- src/Foundatio/Queues/QueueBase.cs | 1 + 9 files changed, 99 insertions(+), 41 deletions(-) diff --git a/samples/Foundatio.HostingSample/Program.cs b/samples/Foundatio.HostingSample/Program.cs index 1cda5105..a38b4196 100644 --- a/samples/Foundatio.HostingSample/Program.cs +++ b/samples/Foundatio.HostingSample/Program.cs @@ -36,7 +36,7 @@ // shutdown the host if no jobs are running builder.Services.AddJobLifetimeService(); - builder.Services.AddSingleton(_ => new InMemoryCacheClient()); + builder.Services.AddSingleton(sp => new InMemoryCacheClient(o => o.LoggerFactory(sp.GetService()))); // inserts a startup action that does not complete until the critical health checks are healthy // gets inserted as 1st startup action so that any other startup actions don't run until the critical resources are available diff --git a/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobRunner.cs b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobRunner.cs index 431cf93e..4d78af6d 100644 --- a/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobRunner.cs +++ b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobRunner.cs @@ -23,11 +23,13 @@ internal class ScheduledJobRunner private readonly ILogger _logger; private readonly DateTime _baseDate = new(2010, 1, 1); private DateTime _lastStatusUpdate = DateTime.MinValue; + private string _cacheKey; public ScheduledJobRunner(ScheduledJobOptions jobOptions, IServiceProvider serviceProvider, ICacheClient cacheClient, ILoggerFactory loggerFactory = null) { _jobOptions = jobOptions; _jobOptions.Name ??= Guid.NewGuid().ToString("N").Substring(0, 10); + _cacheKey = _jobOptions.Name.ToLower().Replace(' ', '_'); _serviceProvider = serviceProvider; _timeProvider = serviceProvider.GetService() ?? TimeProvider.System; _cacheClient = new ScopedCacheClient(cacheClient, "jobs"); @@ -76,22 +78,29 @@ public async ValueTask ShouldRunAsync() { if (_timeProvider.GetUtcNow().UtcDateTime.Subtract(_lastStatusUpdate).TotalSeconds > 15) { - var lastRun = await _cacheClient.GetAsync("lastrun:" + Options.Name).AnyContext(); - if (lastRun.HasValue) + try { - LastRun = lastRun.Value; - NextRun = _cronSchedule.GetNextOccurrence(LastRun.Value); - } + var lastRun = await _cacheClient.GetAsync("lastrun:" + Options.Name).AnyContext(); + if (lastRun.HasValue) + { + LastRun = lastRun.Value; + NextRun = _cronSchedule.GetNextOccurrence(LastRun.Value); + } - var lastSuccess = await _cacheClient.GetAsync("lastsuccess:" + Options.Name).AnyContext(); - if (lastSuccess.HasValue) - LastSuccess = lastSuccess.Value; + var lastSuccess = await _cacheClient.GetAsync("lastsuccess:" + Options.Name).AnyContext(); + if (lastSuccess.HasValue) + LastSuccess = lastSuccess.Value; - var lastError = await _cacheClient.GetAsync("lasterror:" + Options.Name).AnyContext(); - if (lastError.HasValue) - LastErrorMessage = lastError.Value; + var lastError = await _cacheClient.GetAsync("lasterror:" + Options.Name).AnyContext(); + if (lastError.HasValue) + LastErrorMessage = lastError.Value; - _lastStatusUpdate = _timeProvider.GetUtcNow().UtcDateTime; + _lastStatusUpdate = _timeProvider.GetUtcNow().UtcDateTime; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error getting job ({JobName}) status", Options.Name); + } } if (!NextRun.HasValue) @@ -114,21 +123,35 @@ public async Task StartAsync(CancellationToken cancellationToken = default) if (Options.IsDistributed) { // using lock provider in a cluster with a distributed cache implementation keeps cron jobs from running duplicates - l = await _lockProvider.AcquireAsync(GetLockKey(NextRun.Value), TimeSpan.FromMinutes(60), TimeSpan.Zero).AnyContext(); - if (l == null) + try { - // if we didn't get the lock, update the last run time - var lastRun = await _cacheClient.GetAsync("lastrun:" + Options.Name).AnyContext(); - if (lastRun.HasValue) - LastRun = lastRun.Value; - - var lastSuccess = await _cacheClient.GetAsync("lastsuccess:" + Options.Name).AnyContext(); - if (lastSuccess.HasValue) - LastSuccess = lastSuccess.Value; + l = await _lockProvider.AcquireAsync(GetLockKey(NextRun.Value), TimeSpan.FromMinutes(60), TimeSpan.Zero).AnyContext(); + } catch (Exception ex) + { + _logger.LogError(ex, "Error acquiring lock for job ({JobName})", Options.Name); + } - var lastError = await _cacheClient.GetAsync("lasterror:" + Options.Name).AnyContext(); - if (lastError.HasValue) - LastErrorMessage = lastError.Value; + if (l == null) + { + try + { + // if we didn't get the lock, update the last run time + var lastRun = await _cacheClient.GetAsync("lastrun:" + Options.Name).AnyContext(); + if (lastRun.HasValue) + LastRun = lastRun.Value; + + var lastSuccess = await _cacheClient.GetAsync("lastsuccess:" + Options.Name).AnyContext(); + if (lastSuccess.HasValue) + LastSuccess = lastSuccess.Value; + + var lastError = await _cacheClient.GetAsync("lasterror:" + Options.Name).AnyContext(); + if (lastError.HasValue) + LastErrorMessage = lastError.Value; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error getting job ({JobName}) status", Options.Name); + } return; } @@ -150,12 +173,26 @@ public async Task StartAsync(CancellationToken cancellationToken = default) if (result.IsSuccess) { LastSuccess = _timeProvider.GetUtcNow().UtcDateTime; - await _cacheClient.SetAsync("lastsuccess:" + Options.Name, LastSuccess.Value).AnyContext(); + try + { + await _cacheClient.SetAsync("lastsuccess:" + Options.Name, LastSuccess.Value).AnyContext(); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error updating last success time for job ({JobName})", Options.Name); + } } else { LastErrorMessage = result.Message; - await _cacheClient.SetAsync("lasterror:" + Options.Name, LastErrorMessage).AnyContext(); + try + { + await _cacheClient.SetAsync("lasterror:" + Options.Name, LastErrorMessage).AnyContext(); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error updating last error message for job ({JobName})", Options.Name); + } } } catch (TaskCanceledException) @@ -164,12 +201,26 @@ public async Task StartAsync(CancellationToken cancellationToken = default) catch (Exception ex) { LastErrorMessage = ex.Message; - await _cacheClient.SetAsync("lasterror:" + Options.Name, LastErrorMessage).AnyContext(); + try + { + await _cacheClient.SetAsync("lasterror:" + Options.Name, LastErrorMessage).AnyContext(); + } + catch + { + // ignored + } } }, cancellationToken).Unwrap(); LastRun = _timeProvider.GetUtcNow().UtcDateTime; - await _cacheClient.SetAsync("lastrun:" + Options.Name, LastRun.Value).AnyContext(); + try + { + await _cacheClient.SetAsync("lastrun:" + Options.Name, LastRun.Value).AnyContext(); + } + catch + { + // ignored + } NextRun = _cronSchedule.GetNextOccurrence(LastRun.Value); } } @@ -178,6 +229,6 @@ private string GetLockKey(DateTime date) { long minute = (long)date.Subtract(_baseDate).TotalMinutes; - return Options.Name + ":" + minute; + return _cacheKey + ":" + minute; } } diff --git a/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobService.cs b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobService.cs index 3e8c3738..ea01ac0f 100644 --- a/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobService.cs +++ b/src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobService.cs @@ -6,6 +6,8 @@ using Foundatio.Utility; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; namespace Foundatio.Extensions.Hosting.Jobs; @@ -14,12 +16,14 @@ public class ScheduledJobService : BackgroundService private readonly IServiceProvider _serviceProvider; private readonly JobManager _jobManager; private readonly TimeProvider _timeProvider; + private readonly ILogger _logger; public ScheduledJobService(IServiceProvider serviceProvider, JobManager jobManager) { _serviceProvider = serviceProvider; _jobManager = jobManager; _timeProvider = _timeProvider = serviceProvider.GetService() ?? TimeProvider.System; + _logger = serviceProvider.GetService>() ?? NullLogger.Instance; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) @@ -30,7 +34,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) var result = await startupContext.WaitForStartupAsync(stoppingToken).AnyContext(); if (!result.Success) { - throw new ApplicationException("Failed to wait for startup actions to complete"); + throw new StartupActionsException("Failed to wait for startup actions to complete"); } } diff --git a/src/Foundatio.Extensions.Hosting/Startup/StartupActionsContext.cs b/src/Foundatio.Extensions.Hosting/Startup/StartupActionsContext.cs index 0004edac..34a06218 100644 --- a/src/Foundatio.Extensions.Hosting/Startup/StartupActionsContext.cs +++ b/src/Foundatio.Extensions.Hosting/Startup/StartupActionsContext.cs @@ -52,3 +52,8 @@ public async Task WaitForStartupAsync(CancellationToken return new RunStartupActionsResult { Success = false, ErrorMessage = $"Timed out waiting for startup actions to be completed after {DateTime.UtcNow.Subtract(startTime):mm\\:ss}" }; } } + +public class StartupActionsException : Exception +{ + public StartupActionsException(string message) : base(message) { } +} diff --git a/src/Foundatio/Jobs/JobResult.cs b/src/Foundatio/Jobs/JobResult.cs index db39203c..fbdc65de 100644 --- a/src/Foundatio/Jobs/JobResult.cs +++ b/src/Foundatio/Jobs/JobResult.cs @@ -70,9 +70,7 @@ public static void LogJobResult(this ILogger logger, JobResult result, string jo { if (result == null) { - if (logger.IsEnabled(LogLevel.Error)) - logger.LogError("Null job run result for {JobName}", jobName); - + logger.LogError("Null job run result for {JobName}", jobName); return; } diff --git a/src/Foundatio/Jobs/JobRunner.cs b/src/Foundatio/Jobs/JobRunner.cs index 1b8126b6..f7e58e23 100644 --- a/src/Foundatio/Jobs/JobRunner.cs +++ b/src/Foundatio/Jobs/JobRunner.cs @@ -193,8 +193,7 @@ public async Task RunAsync(CancellationToken cancellationToken = default) } catch (Exception ex) { - if (_logger.IsEnabled(LogLevel.Error)) - _logger.LogError(ex, "Error running job instance: {Message}", ex.Message); + _logger.LogError(ex, "Error running job instance: {Message}", ex.Message); throw; } }, cancellationToken)); diff --git a/src/Foundatio/Lock/CacheLockProvider.cs b/src/Foundatio/Lock/CacheLockProvider.cs index 3bfb7ac3..046d4ea9 100644 --- a/src/Foundatio/Lock/CacheLockProvider.cs +++ b/src/Foundatio/Lock/CacheLockProvider.cs @@ -30,7 +30,7 @@ public CacheLockProvider(ICacheClient cacheClient, IMessageBus messageBus, ILogg public CacheLockProvider(ICacheClient cacheClient, IMessageBus messageBus, TimeProvider timeProvider, ILoggerFactory loggerFactory = null) { _timeProvider = timeProvider ?? cacheClient.GetTimeProvider(); - _logger = loggerFactory?.CreateLogger() ?? NullLogger.Instance; + _logger = loggerFactory?.CreateLogger() ?? cacheClient.GetLogger() ?? NullLogger.Instance; _cacheClient = new ScopedCacheClient(cacheClient, "lock"); _messageBus = messageBus; diff --git a/src/Foundatio/Lock/ThrottlingLockProvider.cs b/src/Foundatio/Lock/ThrottlingLockProvider.cs index 72f064d7..2eb29631 100644 --- a/src/Foundatio/Lock/ThrottlingLockProvider.cs +++ b/src/Foundatio/Lock/ThrottlingLockProvider.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; @@ -20,7 +20,7 @@ public class ThrottlingLockProvider : ILockProvider, IHaveLogger, IHaveTimeProvi public ThrottlingLockProvider(ICacheClient cacheClient, int maxHitsPerPeriod = 100, TimeSpan? throttlingPeriod = null, TimeProvider timeProvider = null, ILoggerFactory loggerFactory = null) { _timeProvider = timeProvider ?? cacheClient.GetTimeProvider(); - _logger = loggerFactory?.CreateLogger() ?? NullLogger.Instance; + _logger = loggerFactory?.CreateLogger() ?? cacheClient.GetLogger() ?? NullLogger.Instance; _cacheClient = new ScopedCacheClient(cacheClient, "lock:throttled"); _maxHitsPerPeriod = maxHitsPerPeriod; @@ -95,7 +95,7 @@ public async Task AcquireAsync(string resource, TimeSpan? timeUntilExpire } catch (Exception ex) { - _logger.LogError(ex, "Error acquiring throttled lock: name={Resource} message={Message}", resource, ex.Message); + _logger.LogError(ex, "Error acquiring throttled lock ({Resource}): {Message}", resource, ex.Message); errors++; if (errors >= 3) break; diff --git a/src/Foundatio/Queues/QueueBase.cs b/src/Foundatio/Queues/QueueBase.cs index 185057ae..0beb3c21 100644 --- a/src/Foundatio/Queues/QueueBase.cs +++ b/src/Foundatio/Queues/QueueBase.cs @@ -64,6 +64,7 @@ protected QueueBase(TOptions options) : base(options?.TimeProvider, options?.Log { try { + using var _ = FoundatioDiagnostics.ActivitySource.StartActivity("Queue Stats: " + _options.Name); var stats = GetMetricsQueueStats(); return (stats.Queued, stats.Working, stats.Deadletter); }