Skip to content

Commit

Permalink
ITS ALIVE!
Browse files Browse the repository at this point in the history
  • Loading branch information
analogrelay committed Nov 26, 2013
1 parent ece7c7f commit a581f16
Show file tree
Hide file tree
Showing 21 changed files with 522 additions and 167 deletions.
6 changes: 3 additions & 3 deletions src/NuGetGallery.Backend/Infrastructure/JobDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ public JobDispatcher(BackendConfiguration config, IEnumerable<Job> jobs, Backend
public virtual async Task<JobResponse> Dispatch(JobInvocation invocation)
{
Job job;
if (!_jobMap.TryGetValue(request.Name, out job))
if (!_jobMap.TryGetValue(invocation.Request.Name, out job))
{
throw new UnknownJobException(request.Name);
throw new UnknownJobException(invocation.Request.Name);
}

IAsyncDeferred<JobResult> monitorCompletion = null;
if (_monitor != null)
{
monitorCompletion = _monitor.InvokingJob(invocation, job);
monitorCompletion = await _monitor.InvokingJob(invocation, job);
}

WorkerEventSource.Log.DispatchingRequest(invocation);
Expand Down
10 changes: 5 additions & 5 deletions src/NuGetGallery.Backend/Infrastructure/JobRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,22 @@ private async Task<JobResponse> DispatchOne(CancellationToken cancelToken)
return null;
}
Debug.Assert(request.Message != null); // Since we dequeued, there'd better be a CloudQueueMessage.
WorkerEventSource.Log.RequestReceived(request.Message.Id, request.InsertionTime);
WorkerEventSource.Log.RequestReceived(request.Id, request.InsertionTime);

var invocation = new JobInvocation(Guid.NewGuid(), request, DateTimeOffset.UtcNow);
try
{
JobResponse response = _dispatcher.Dispatch(invocation);
JobResponse response = await _dispatcher.Dispatch(invocation);

if (request.ExpirationTime.HasValue && DateTimeOffset.UtcNow > request.ExpirationTime.Value)
if (request.ExpiresAt.HasValue && DateTimeOffset.UtcNow > request.ExpiresAt.Value)
{
WorkerEventSource.Log.JobRequestExpired(req, request.Id, DateTimeOffset.UtcNow - request.ExpirationTime.Value);
WorkerEventSource.Log.JobRequestExpired(request, request.Id, DateTimeOffset.UtcNow - request.ExpiresAt.Value);
}

// If dispatch throws, we don't delete the message
// NOTE: If the JOB throws, the dispatcher should catch it and return the error in the response
// Thus the request is considered "handled"
await _queue.DeleteMessageAsync(request);
await _queue.Acknowledge(request);

return response;
}
Expand Down
44 changes: 44 additions & 0 deletions src/NuGetGallery.Backend/Jobs/HelloWorldJob.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.Tracing;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace NuGetGallery.Backend.Jobs
{
public class HelloWorldJob : Job
{
public string Message { get; set; }

public override EventSource GetEventSource()
{
return HelloWorldEventSource.Log;
}

protected internal override Task Execute()
{
if (Message.Contains("you suck"))
{
HelloWorldEventSource.Log.TellingUserTheySuck();
throw new Exception("NO YOU SUCK!");
}
HelloWorldEventSource.Log.Hello(Message);
return Task.FromResult<object>(null);
}

[EventSource(Name = "NuGet-Jobs-HelloWorld")]
public class HelloWorldEventSource : EventSource
{
public static readonly HelloWorldEventSource Log = new HelloWorldEventSource();

private HelloWorldEventSource() { }

[Event(eventId: 1, Message = "Hello world! Your message: {0}")]
public void Hello(string message) { WriteEvent(1, message); }

[Event(eventId: 2, Message = "The user said we suck! NO THEY SUCK!")]
public void TellingUserTheySuck() { WriteEvent(2); }
}
}
}
116 changes: 62 additions & 54 deletions src/NuGetGallery.Backend/Monitoring/BackendMonitoringHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@ namespace NuGetGallery.Backend.Monitoring
public class BackendMonitoringHub : MonitoringHub
{
private const string BackendMonitoringContainerName = "backend-monitoring";
private const string BackendServiceName = "Backend";
private const string BackendTraceTableName = "BackendTrace";

private Dictionary<Job, ObservableEventListener> _eventStreams = new Dictionary<Job, ObservableEventListener>();
private MonitoringTable<BackendInstanceStatusEntry> _instanceStatusTable;
private MonitoringTable<BackendJobInvocationEntry> _invocationsTable;

private MonitoringTable<WorkerInstanceStatusEntry> _instanceStatusTable;
private MonitoringTable<WorkerInstanceHistoryEntry> _instanceHistoryTable;
private MonitoringTable<JobStatusEntry> _jobStatusTable;
private MonitoringTable<JobHistoryEntry> _jobHistoryTable;
private MonitoringTable<InvocationsEntry> _invocationsTable;

public string LogsDirectory { get; private set; }
public string TempDirectory { get; private set; }
Expand All @@ -45,8 +49,11 @@ public BackendMonitoringHub(
TempDirectory = tempDirectory;
InstanceName = instanceName;

_instanceStatusTable = Table<BackendInstanceStatusEntry>();
_invocationsTable = Table<BackendJobInvocationEntry>();
_instanceStatusTable = Table<WorkerInstanceStatusEntry>();
_instanceHistoryTable = Table<WorkerInstanceHistoryEntry>();
_jobStatusTable = Table<JobStatusEntry>();
_jobHistoryTable = Table<JobHistoryEntry>();
_invocationsTable = Table<InvocationsEntry>();
}

/// <summary>
Expand All @@ -66,18 +73,23 @@ public virtual void RegisterJob(Job job)
InstanceName,
StorageConnectionString,
tableAddress: tableName);

// Log an entry for the job in the status table
_jobStatusTable.InsertOrIgnoreDuplicate(new JobStatusEntry(job.Name, DateTimeOffset.UtcNow));
}

public override async Task Start()
{
// Log Status
await _instanceStatusTable.Upsert(new BackendInstanceStatusEntry(BackendServiceName, InstanceName)
{
Status = BackendInstanceStatus.Started,
LastInvocation = Guid.Empty,
LastJob = String.Empty,
LastUpdatedAt = DateTimeOffset.UtcNow
});
// Set up worker logging
var listener = WindowsAzureTableLog.CreateListener(
InstanceName,
StorageConnectionString,
tableAddress: GetTableFullName(BackendTraceTableName));
listener.EnableEvents(WorkerEventSource.Log, EventLevel.Informational);
listener.EnableEvents(SemanticLoggingEventSource.Log, EventLevel.Informational);

// Log Instance Status
await _instanceStatusTable.Upsert(new WorkerInstanceStatusEntry(InstanceName, DateTimeOffset.UtcNow, BackendInstanceStatus.Started));
}

/// <summary>
Expand All @@ -86,39 +98,23 @@ await _instanceStatusTable.Upsert(new BackendInstanceStatusEntry(BackendServiceN
/// </summary>
public async Task<IAsyncDeferred<JobResult>> InvokingJob(JobInvocation invocation, Job job)
{
// Log Invocation Status
await _invocationsTable.Upsert(new BackendJobInvocationEntry(job.Name, invocation.Id)
{
Status = JobStatus.Executing,
RecievedAt = invocation.RecievedAt,
Source = invocation.Request.Source,
Payload = invocation.Request.Message != null ? invocation.Request.Message.AsString : String.Empty,
LogUrl = String.Empty
});

// Log Instance Status
await _instanceStatusTable.Upsert(new BackendInstanceStatusEntry(BackendServiceName, InstanceName)
{
Status = BackendInstanceStatus.Executing,
LastInvocation = invocation.Id,
LastJob = job.Name,
LastUpdatedAt = DateTimeOffset.UtcNow
});
// Record start of job
DateTimeOffset startTime = DateTimeOffset.UtcNow;
await ReportStartJob(invocation, job, startTime);

// Get the event stream for this job
ObservableEventListener eventStream;
if (!_eventStreams.TryGetValue(job, out eventStream))
{
return;
return null;
}

// Capture the events into a flat file
var fileName = invocation.Id.ToString("N") + ".json";
var path = Path.Combine(TempDirectory, "Invocations", fileName);
var token = eventStream.LogToFlatFile(
path,
new JsonEventTextFormatter(EventTextFormatting),
isAsync: true);
new JsonEventTextFormatter(EventTextFormatting.None));
return new AsyncDeferred<JobResult>(async result =>
{
// Disconnect the listener
Expand All @@ -130,27 +126,39 @@ await _instanceStatusTable.Upsert(new BackendInstanceStatusEntry(BackendServiceN
// Delete the temp file
File.Delete(path);

// Log Invocation status
await _invocationsTable.Upsert(new BackendJobInvocationEntry(job.Name, invocation.Id)
{
Status = result.Status,
RecievedAt = invocation.RecievedAt,
Source = invocation.Request.Source,
Payload = invocation.Request.Message != null ? invocation.Request.Message.AsString : String.Empty,
LogUrl = blob.Uri.AbsoluteUri
});

// Log Instance Status
await _instanceStatusTable.Upsert(new BackendInstanceStatusEntry(BackendServiceName, InstanceName)
{
PartitionKey = BackendServiceName,
RowKey = InstanceName,
Status = BackendInstanceStatus.Idle,
LastInvocation = invocation.Id,
LastJob = job.Name,
LastUpdatedAt = DateTimeOffset.UtcNow
});
// Record end of job
await ReportEndJob(invocation, result, job, blob.Uri.AbsoluteUri, startTime, DateTimeOffset.UtcNow);
});
}

private Task ReportStartJob(JobInvocation invocation, Job job, DateTimeOffset startTime)
{
return Task.WhenAll(
// Add History Rows
_jobHistoryTable.Upsert(new JobHistoryEntry(job.Name, startTime, invocation.Id, InstanceName)),
_instanceHistoryTable.Upsert(new WorkerInstanceHistoryEntry(InstanceName, startTime, invocation.Id, job.Name)),

// Upsert Status Rows
_jobStatusTable.Upsert(new JobStatusEntry(job.Name, startTime, invocation.Id, InstanceName)),
_instanceStatusTable.Upsert(new WorkerInstanceStatusEntry(InstanceName, startTime, BackendInstanceStatus.Executing, invocation.Id, job.Name)),

// Add invocation row
_invocationsTable.Upsert(new InvocationsEntry(invocation)));
}

private Task ReportEndJob(JobInvocation invocation, JobResult result, Job job, string logUrl, DateTimeOffset startTime, DateTimeOffset completionTime)
{
return Task.WhenAll(
// Add History Rows
_jobHistoryTable.Upsert(new JobHistoryEntry(job.Name, completionTime, invocation.Id, InstanceName, result, completionTime)),
_instanceHistoryTable.Upsert(new WorkerInstanceHistoryEntry(InstanceName, completionTime, invocation.Id, job.Name, result, completionTime)),

// Add Status rows
_jobStatusTable.Upsert(new JobStatusEntry(job.Name, startTime, invocation.Id, result, InstanceName, completionTime)),
_instanceStatusTable.Upsert(new WorkerInstanceStatusEntry(InstanceName, startTime, BackendInstanceStatus.Idle, invocation.Id, job.Name, result, completionTime)),

// Update invocation row
_invocationsTable.Upsert(new InvocationsEntry(invocation, result, logUrl)));
}
}
}
5 changes: 2 additions & 3 deletions src/NuGetGallery.Backend/NuGetGallery.Backend.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
<Compile Include="Infrastructure\JobDispatcher.cs" />
<Compile Include="Infrastructure\JobRunner.cs" />
<Compile Include="InvalidJobRequestException.cs" />
<Compile Include="Jobs\HelloWorldJob.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Strings.Designer.cs">
<AutoGen>True</AutoGen>
Expand All @@ -112,9 +113,7 @@
<LastGenOutput>Strings.Designer.cs</LastGenOutput>
</EmbeddedResource>
</ItemGroup>
<ItemGroup>
<Folder Include="Jobs\" />
</ItemGroup>
<ItemGroup />
<ItemGroup>
<ProjectReference Include="..\NuGetGallery.Core\NuGetGallery.Core.csproj">
<Project>{097b2cdd-9623-4c34-93c2-d373d51f5b4e}</Project>
Expand Down
10 changes: 8 additions & 2 deletions src/NuGetGallery.Backend/WorkerRole.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Threading;
Expand Down Expand Up @@ -64,14 +65,19 @@ private JobDispatcher DiscoverJobs(BackendConfiguration config, BackendMonitorin
.Where(t => !t.IsAbstract && typeof(Job).IsAssignableFrom(t))
.Select(t => Activator.CreateInstance(t))
.Cast<Job>();
return new JobDispatcher(config, jobs);
return new JobDispatcher(config, jobs, monitoring);
}

private BackendMonitoringHub ConfigureMonitoring(BackendConfiguration config)
{
var connectionString = config.Get("Microsoft.WindowsAzure.Plugins.Diagnostics.ConnectionString");
var logDirectory = RoleEnvironment.GetLocalResource("Logs").RootPath;
var monitoring = new BackendMonitoringHub(connectionString, logDirectory);
var tempDirectory = Path.Combine(Path.GetTempPath(), "NuGetWorkerTemp");
var monitoring = new BackendMonitoringHub(
connectionString,
logDirectory,
tempDirectory,
RoleEnvironment.CurrentRoleInstance.Id);
monitoring.Start();
return monitoring;
}
Expand Down
16 changes: 15 additions & 1 deletion src/NuGetGallery.Core/Jobs/JobRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ public class JobRequest
public string Source { get; private set; }
public Dictionary<string, string> Parameters { get; private set; }
public CloudQueueMessage Message { get; private set; }
public DateTimeOffset? ExpiresAt { get { return Message == null ? null : Message.ExpirationTime; } }

public DateTimeOffset InsertionTime { get; private set; }
public string Id { get; private set; }

public JobRequest(string name, string source, Dictionary<string, string> parameters)
: this(name, source, parameters, null)
{

}

public JobRequest(string name, string source, Dictionary<string, string> parameters, CloudQueueMessage message)
Expand All @@ -28,6 +31,17 @@ public JobRequest(string name, string source, Dictionary<string, string> paramet
Source = source;
Parameters = parameters;
Message = message;

if (message == null)
{
Id = Guid.NewGuid().ToString();
InsertionTime = DateTimeOffset.UtcNow;
}
else
{
Id = message.Id;
InsertionTime = message.InsertionTime ?? DateTimeOffset.UtcNow;
}
}

public static JobRequest Parse(string requestString)
Expand Down
8 changes: 8 additions & 0 deletions src/NuGetGallery.Core/Jobs/JobRequestQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,13 @@ public async Task<JobRequest> Dequeue(TimeSpan invisibleFor, CancellationToken t
return JobRequest.Parse(message.AsString, message);
}
}

public async Task Acknowledge(JobRequest request)
{
if (request.Message != null)
{
await _queue.SafeExecute(q => q.DeleteMessageAsync(request.Message));
}
}
}
}
1 change: 1 addition & 0 deletions src/NuGetGallery.Core/Jobs/JobResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public override int GetHashCode()

public enum JobStatus
{
Unspecified = 0,
Executing,
Completed,
Faulted
Expand Down
Loading

0 comments on commit a581f16

Please sign in to comment.