Skip to content

Commit

Permalink
Reworked monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
analogrelay committed Nov 26, 2013
1 parent 8ce9577 commit 4247cde
Show file tree
Hide file tree
Showing 23 changed files with 484 additions and 375 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "ext/Slab"]
path = ext/Slab
url = https://git01.codeplex.com/slab
Empty file added openssl
Empty file.
33 changes: 22 additions & 11 deletions src/NuGetGallery.Backend/Infrastructure/Job.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,17 @@ public virtual async Task<JobResult> Invoke(JobInvocationContext context)
{
// Bind invocation information
Context = context;
BindProperties(Invocation.Request.Parameters);
try
{
BindProperties(Invocation.Request.Parameters);
}
catch (Exception ex)
{
context.Log.BindingError(ex);
return JobResult.Faulted(ex);
}

// Invoke the job
WorkerEventSource.Log.JobStarted(Invocation.Request.Name, Invocation.Id);
JobResult result;
try
{
Expand All @@ -54,15 +61,6 @@ public virtual async Task<JobResult> Invoke(JobInvocationContext context)
result = JobResult.Faulted(ex);
}

if (result.Status != JobStatus.Faulted)
{
WorkerEventSource.Log.JobCompleted(Invocation.Request.Name, Invocation.Id);
}
else
{
WorkerEventSource.Log.JobFaulted(Invocation.Request.Name, result.Exception, Invocation.Id);
}

// Return the result
return result;
}
Expand Down Expand Up @@ -144,4 +142,17 @@ public override object ConvertFrom(ITypeDescriptorContext context, System.Global
}
}
}

public abstract class Job<TEventSource> : Job
where TEventSource : EventSource, new()
{
private TEventSource _log = new TEventSource();

public TEventSource Log { get { return _log; } }

public override EventSource GetEventSource()
{
return Log;
}
}
}
21 changes: 7 additions & 14 deletions src/NuGetGallery.Backend/Infrastructure/JobDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,16 @@ public class JobDispatcher
private Dictionary<string, Job> _jobMap;
private List<Job> _jobs;
private BackendMonitoringHub _monitor;

public IReadOnlyList<Job> Jobs { get { return _jobs.AsReadOnly(); } }
public BackendConfiguration Config { get; private set; }

public JobDispatcher(BackendConfiguration config, IEnumerable<Job> jobs) : this(config, jobs, null) { }
public JobDispatcher(BackendConfiguration config, IEnumerable<Job> jobs, BackendMonitoringHub monitor)
{
_jobs = jobs.ToList();
_jobMap = _jobs.ToDictionary(j => j.Name);
_monitor = monitor;

Config = config;

foreach (var job in _jobs)
Expand All @@ -32,23 +31,22 @@ public JobDispatcher(BackendConfiguration config, IEnumerable<Job> jobs, Backend
}
}

public virtual async Task<JobResponse> Dispatch(JobInvocation invocation)
public virtual async Task<JobResponse> Dispatch(JobInvocation invocation, InvocationEventSource log, InvocationMonitoringContext monitoring)
{
Job job;
if (!_jobMap.TryGetValue(invocation.Request.Name, out job))
{
throw new UnknownJobException(invocation.Request.Name);
}

IAsyncDeferred<JobResult> monitorCompletion = null;
if (_monitor != null)
if (monitoring != null)
{
monitorCompletion = await _monitor.InvokingJob(invocation, job);
await monitoring.SetJob(job);
}

WorkerEventSource.Log.DispatchingRequest(invocation);
log.Invoking(job);
JobResult result = null;
var context = new JobInvocationContext(invocation, Config, _monitor);
var context = new JobInvocationContext(invocation, Config, _monitor, log);
try
{
result = await job.Invoke(context);
Expand All @@ -57,11 +55,6 @@ public virtual async Task<JobResponse> Dispatch(JobInvocation invocation)
{
result = JobResult.Faulted(ex);
}

if (monitorCompletion != null)
{
await monitorCompletion.Complete(result);
}

return new JobResponse(invocation, result, DateTimeOffset.UtcNow);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ public class JobInvocationContext
public JobInvocation Invocation { get; private set; }
public BackendConfiguration Config { get; private set; }
public BackendMonitoringHub Monitor { get; private set; }
public InvocationEventSource Log { get; private set; }

public JobInvocationContext(JobInvocation invocation, BackendConfiguration config, BackendMonitoringHub monitor)
public JobInvocationContext(JobInvocation invocation, BackendConfiguration config, BackendMonitoringHub monitor, InvocationEventSource log)
{
Invocation = invocation;
Config = config;
Monitor = monitor;
Log = log;
}
}
}
74 changes: 48 additions & 26 deletions src/NuGetGallery.Backend/Infrastructure/JobRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,54 +38,76 @@ public JobRunner(JobDispatcher dispatcher, BackendConfiguration config, BackendM

public async Task Run(CancellationToken cancelToken)
{
WorkerEventSource.Log.WorkerDispatching();
while (!cancelToken.IsCancellationRequested)
WorkerEventSource.Log.DispatchLoopStarted();
try
{
var response = await DispatchOne(cancelToken);
if (response != null)
while (!cancelToken.IsCancellationRequested)
{
WorkerEventSource.Log.JobExecuted(response);
}
else
{
WorkerEventSource.Log.QueueEmpty(_config.QueuePollInterval);
await Task.Delay(_config.QueuePollInterval);
JobDequeueResult dequeued = await _queue.Dequeue(DefaultInvisibilityPeriod, cancelToken);
if (dequeued == null)
{
WorkerEventSource.Log.DispatchLoopWaiting(_config.QueuePollInterval);
await Task.Delay(_config.QueuePollInterval);
WorkerEventSource.Log.DispatchLoopResumed();
}
else if (!dequeued.Success)
{
WorkerEventSource.Log.InvalidQueueMessage(dequeued.MessageBody, dequeued.ParseException);
}
else
{
Debug.Assert(dequeued.Request.Message != null); // Since we dequeued, there'd better be a CloudQueueMessage.
await Dispatch(dequeued.Request);
}
}
}
}

private async Task<JobResponse> DispatchOne(CancellationToken cancelToken)
{
var request = await _queue.Dequeue(DefaultInvisibilityPeriod, cancelToken);
if (request == null)
catch (Exception ex)
{
return null;
WorkerEventSource.Log.DispatchLoopError(ex);
}
Debug.Assert(request.Message != null); // Since we dequeued, there'd better be a CloudQueueMessage.
WorkerEventSource.Log.RequestReceived(request.Id, request.InsertionTime);
WorkerEventSource.Log.DispatchLoopEnded();
}

private async Task Dispatch(JobRequest request)
{
// Construct an invocation. From here on, we're in the context of this invocation.
var invocation = new JobInvocation(Guid.NewGuid(), request, DateTimeOffset.UtcNow);
var log = new InvocationEventSource(invocation.Id);

var context = _monitoring.BeginInvocation(invocation, log);
log.Started();

JobResponse response = null;
try
{
JobResponse response = await _dispatcher.Dispatch(invocation);
response = await _dispatcher.Dispatch(invocation, log, context);

if (response.Result.Status != JobStatus.Faulted)
{
log.Succeeded(response);
}
else
{
log.Faulted(response);
}

if (request.ExpiresAt.HasValue && DateTimeOffset.UtcNow > request.ExpiresAt.Value)
{
WorkerEventSource.Log.JobRequestExpired(request, request.Id, DateTimeOffset.UtcNow - request.ExpiresAt.Value);
log.RequestExpired(request);
}

// If dispatch throws, we don't delete the message
// NOTE: If the JOB throws, the dispatcher should catch it and return the error in the response
// Thus the request is considered "handled"
await _queue.Acknowledge(request);

return response;
}
catch(Exception ex)
catch (Exception ex)
{
WorkerEventSource.Log.DispatchError(invocation, ex);
return null;
log.DispatchError(ex);
}

log.Ended();
await context.End(response == null ? null : response.Result);
}
}
}
102 changes: 42 additions & 60 deletions src/NuGetGallery.Backend/Jobs/CreateOnlineDatabaseBackupJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

namespace NuGetGallery.Backend.Jobs
{
public class CreateOnlineDatabaseBackupJob : Job
public class CreateOnlineDatabaseBackupJob : Job<CreateOnlineDatabaseBackupEventSource>
{
/// <summary>
/// The target server, in the form of a known SQL Server (primary, warehouse, etc.)
Expand Down Expand Up @@ -46,11 +46,6 @@ public class CreateOnlineDatabaseBackupJob : Job
/// </summary>
public bool Force { get; set; }

public override EventSource GetEventSource()
{
return JobEventSource.Log;
}

protected internal override async Task Execute()
{
// Resolve the connection if not specified explicitly
Expand All @@ -60,7 +55,7 @@ protected internal override async Task Execute()
.GetSqlServer(TargetServer)
.ChangeDatabase(TargetDatabaseName);
}
JobEventSource.Log.PreparingToBackup(
Log.PreparingToBackup(
TargetDatabaseConnection.InitialCatalog,
TargetDatabaseConnection.DataSource);
// Connect to the master database
Expand All @@ -69,7 +64,7 @@ protected internal override async Task Execute()
if (!Force && MaxAge != null)
{
// Get databases
JobEventSource.Log.GettingDatabaseList(TargetDatabaseConnection.DataSource);
Log.GettingDatabaseList(TargetDatabaseConnection.DataSource);
var databases = await GetDatabases(connection);

// Gather backups with matching prefix and order descending
Expand All @@ -88,7 +83,7 @@ orderby backupMeta.Timestamp descending
if (mostRecent != null && mostRecent.Timestamp.IsYoungerThan(MaxAge.Value))
{
// Skip the backup
JobEventSource.Log.BackupWithinMaxAge(mostRecent, MaxAge.Value);
Log.BackupWithinMaxAge(mostRecent, MaxAge.Value);
return;
}
}
Expand All @@ -98,19 +93,13 @@ orderby backupMeta.Timestamp descending

// Start a copy
// (have to build the SQL string manually because you can't parameterize CREATE DATABASE)
JobEventSource.Log.StartingCopy(TargetDatabaseConnection.InitialCatalog, backupName);
Log.StartingCopy(TargetDatabaseConnection.InitialCatalog, backupName);
await connection.ExecuteAsync(String.Format(
CultureInfo.InvariantCulture,
"CREATE DATABASE {0} AS COPY OF {1}",
backupName,
TargetDatabaseConnection.InitialCatalog));
JobEventSource.Log.StartedCopy(TargetDatabaseConnection.InitialCatalog, backupName);

// Return a result to queue an async completion check in 5 minutes.
return JobResult.AsyncCompletion(new
{
DatabaseName = backupName
}, TimeSpan.FromMinutes(5));
Log.StartedCopy(TargetDatabaseConnection.InitialCatalog, backupName);
}
}

Expand All @@ -121,49 +110,42 @@ protected internal virtual Task<IEnumerable<Database>> GetDatabases(SqlConnectio
FROM sys.databases
");
}
}

[EventSource(Name="NuGet-Jobs-CreateOnlineDatabaseBackup")]
public class JobEventSource : EventSource
{
public static readonly JobEventSource Log = new JobEventSource();

private JobEventSource() { }

#pragma warning disable 0618
[Event(
eventId: 1,
Level = EventLevel.Informational,
Message = "Skipping backup. {0} is within maximum age of {1}.")]
[Obsolete("This method supports ETL infrastructure. Use other overloads instead")]
public void BackupWithinMaxAge(string name, string age) { WriteEvent(1, name, age); }

[NonEvent]
public void BackupWithinMaxAge(DatabaseBackup mostRecent, TimeSpan timeSpan) { BackupWithinMaxAge(mostRecent.Db.name, timeSpan.ToString()); }

[Event(
eventId: 2,
Level = EventLevel.Informational,
Message = "Getting list of databases on {0}")]
public void GettingDatabaseList(string server) { WriteEvent(2, server); }

[Event(
eventId: 3,
Level = EventLevel.Informational,
Message = "Preparing to backup {1} on server {0}")]
public void PreparingToBackup(string server, string database) { WriteEvent(3, server, database); }

[Event(
eventId: 4,
Level = EventLevel.Informational,
Message = "Starting copy of {0} to {1}")]
public void StartingCopy(string source, string destination) { WriteEvent(4, source, destination); }

[Event(
eventId: 5,
Level = EventLevel.Informational,
Message = "Started copy of {0} to {1}")]
public void StartedCopy(string source, string destination) { WriteEvent(5, source, destination); }
#pragma warning restore 0618
}
[EventSource(Name = "NuGet-Jobs-CreateOnlineDatabaseBackup")]
public class CreateOnlineDatabaseBackupEventSource : EventSource
{
[Event(
eventId: 1,
Level = EventLevel.Informational,
Message = "Skipping backup. {0} is within maximum age of {1}.")]
private void BackupWithinMaxAge(string name, string age) { WriteEvent(1, name, age); }

[NonEvent]
public void BackupWithinMaxAge(DatabaseBackup mostRecent, TimeSpan timeSpan) { BackupWithinMaxAge(mostRecent.Db.name, timeSpan.ToString()); }

[Event(
eventId: 2,
Level = EventLevel.Informational,
Message = "Getting list of databases on {0}")]
public void GettingDatabaseList(string server) { WriteEvent(2, server); }

[Event(
eventId: 3,
Level = EventLevel.Informational,
Message = "Preparing to backup {1} on server {0}")]
public void PreparingToBackup(string server, string database) { WriteEvent(3, server, database); }

[Event(
eventId: 4,
Level = EventLevel.Informational,
Message = "Starting copy of {0} to {1}")]
public void StartingCopy(string source, string destination) { WriteEvent(4, source, destination); }

[Event(
eventId: 5,
Level = EventLevel.Informational,
Message = "Started copy of {0} to {1}")]
public void StartedCopy(string source, string destination) { WriteEvent(5, source, destination); }
}
}
Loading

0 comments on commit 4247cde

Please sign in to comment.