Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(Summary): Spread department messages over batch time #690

Merged
merged 15 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace Fusion.Resources.Functions.Common.ApiClients;
using System.Diagnostics;

namespace Fusion.Resources.Functions.Common.ApiClients;

public interface ISummaryApiClient
{
Expand All @@ -24,6 +26,7 @@ public Task PutWeeklySummaryReportAsync(string departmentSapId, ApiWeeklySummary
// TODO: Move to shared project
// Fusion.Resources.Integration.Models ?

[DebuggerDisplay("{DepartmentSapId} - {FullDepartmentName}")]
public class ApiResourceOwnerDepartment
{
public ApiResourceOwnerDepartment()
Expand Down
110 changes: 84 additions & 26 deletions src/Fusion.Summary.Functions/Functions/DepartmentResourceOwnerSync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Fusion.Resources.Functions.Common.ApiClients;
using Fusion.Resources.Functions.Common.ApiClients.ApiModels;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Fusion.Summary.Functions.Functions;
Expand All @@ -17,23 +17,43 @@ public class DepartmentResourceOwnerSync
{
private readonly ILineOrgApiClient lineOrgApiClient;
private readonly ISummaryApiClient summaryApiClient;
private readonly IConfiguration configuration;
private readonly IResourcesApiClient resourcesApiClient;
private readonly ILogger<DepartmentResourceOwnerSync> logger;

private string _serviceBusConnectionString;
private string _weeklySummaryQueueName;
private string[] _departmentFilter;
private TimeSpan _totalBatchTime;

public DepartmentResourceOwnerSync(
ILineOrgApiClient lineOrgApiClient,
ILineOrgApiClient lineOrgApiClient,
ISummaryApiClient summaryApiClient,
IConfiguration configuration,
IResourcesApiClient resourcesApiClient)
IResourcesApiClient resourcesApiClient,
ILogger<DepartmentResourceOwnerSync> logger)
{
this.lineOrgApiClient = lineOrgApiClient;
this.summaryApiClient = summaryApiClient;
this.configuration = configuration;
this.resourcesApiClient = resourcesApiClient;
this.logger = logger;

_serviceBusConnectionString = configuration["AzureWebJobsServiceBus"];
_weeklySummaryQueueName = configuration["department_summary_weekly_queue"];
_departmentFilter = configuration["departmentFilter"]?.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries) ?? ["PRD"];

var totalBatchTimeInMinutesStr = configuration["total_batch_time_in_minutes"];

if (!string.IsNullOrWhiteSpace(totalBatchTimeInMinutesStr))
{
_totalBatchTime = TimeSpan.FromMinutes(double.Parse(totalBatchTimeInMinutesStr));
logger.LogInformation("Batching messages over {BatchTime}", _totalBatchTime);
}
else
{
_totalBatchTime = TimeSpan.FromHours(4.5);

logger.LogWarning("Configuration variable 'total_batch_time_in_minutes' not found, batching messages over {BatchTime}", _totalBatchTime);
}
}

/// <summary>
Expand All @@ -43,18 +63,12 @@ public DepartmentResourceOwnerSync(
/// </summary>
/// <param name="timerInfo">The running date & time</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns></returns>
/// <exception cref="Exception"></exception>
[FunctionName("weekly-department-recipients-sync")]
public async Task RunAsync(
[TimerTrigger("0 05 00 * * MON", RunOnStartup = false)]
[TimerTrigger("0 5 0 * * MON", RunOnStartup = false)]
TimerInfo timerInfo, CancellationToken cancellationToken
)
{
_serviceBusConnectionString = configuration["AzureWebJobsServiceBus"];
_weeklySummaryQueueName = configuration["department_summary_weekly_queue"];
_departmentFilter = configuration["departmentFilter"]?.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries) ?? ["PRD"];

var client = new ServiceBusClient(_serviceBusConnectionString);
var sender = client.CreateSender(_weeklySummaryQueueName);

Expand All @@ -70,6 +84,11 @@ public async Task RunAsync(

foreach (var orgUnit in departments)
{
var resourceOwners = orgUnit.Management.Persons
.Select(p => Guid.Parse(p.AzureUniqueId))
.Distinct()
.ToArray();

var delegatedResponsibles = (await resourcesApiClient
.GetDelegatedResponsibleForDepartment(orgUnit.SapId!))
.Select(d => Guid.Parse(d.DelegatedResponsible.AzureUniquePersonId))
Expand All @@ -80,38 +99,77 @@ public async Task RunAsync(
{
DepartmentSapId = orgUnit.SapId!,
FullDepartmentName = orgUnit.FullDepartment!,
ResourceOwnersAzureUniqueId = orgUnit.Management.Persons.Select(p => Guid.Parse(p.AzureUniqueId)).Distinct().ToArray(),
ResourceOwnersAzureUniqueId = resourceOwners,
DelegateResourceOwnersAzureUniqueId = delegatedResponsibles
});
}

var parallelOptions = new ParallelOptions()
{
CancellationToken = cancellationToken,
MaxDegreeOfParallelism = 10,
};
var enqueueTimeForDepartmentMapping = CalculateDepartmentEnqueueTime(apiDepartments);

// Use Parallel.ForEachAsync to easily limit the number of parallel requests
await Parallel.ForEachAsync(apiDepartments, parallelOptions,
async (ownerDepartment, token) =>
logger.LogInformation("Syncing departments {Departments}", JsonConvert.SerializeObject(enqueueTimeForDepartmentMapping, Formatting.Indented));


foreach (var department in apiDepartments)
{
try
{
// Update the database
await summaryApiClient.PutDepartmentAsync(ownerDepartment, token);
await summaryApiClient.PutDepartmentAsync(department, cancellationToken);
}
catch (Exception e)
{
logger.LogError(e, "Failed to PUT department {Department}", JsonConvert.SerializeObject(department, Formatting.Indented));
continue;
}

try
{
// Send queue message
await SendDepartmentToQueue(sender, ownerDepartment);
});
await SendDepartmentToQueue(sender, department, enqueueTimeForDepartmentMapping[department]);
}
catch (Exception e)
{
logger.LogError(e, "Failed to send department to queue {Department}", JsonConvert.SerializeObject(department, Formatting.Indented));
}
}
}

private async Task SendDepartmentToQueue(ServiceBusSender sender, ApiResourceOwnerDepartment department, double delayInMinutes = 0)

private async Task SendDepartmentToQueue(ServiceBusSender sender, ApiResourceOwnerDepartment department, DateTimeOffset enqueueTime)
{
var serializedDto = JsonConvert.SerializeObject(department);

var message = new ServiceBusMessage(Encoding.UTF8.GetBytes(serializedDto))
{
ScheduledEnqueueTime = DateTime.UtcNow.AddMinutes(delayInMinutes)
ScheduledEnqueueTime = enqueueTime
};

await sender.SendMessageAsync(message);
}

/// <summary>
/// Calculate the enqueue time for each department based on the total batch time and amount of departments. This should spread
/// the work over the total batch time.
/// </summary>
private Dictionary<ApiResourceOwnerDepartment, DateTimeOffset> CalculateDepartmentEnqueueTime(List<ApiResourceOwnerDepartment> apiDepartments)
{
var currentTime = DateTimeOffset.UtcNow;
var minutesPerReportSlice = _totalBatchTime.TotalMinutes / apiDepartments.Count;

var departmentDelayMapping = new Dictionary<ApiResourceOwnerDepartment, DateTimeOffset>();
foreach (var department in apiDepartments)
{
// First department has no delay
if (departmentDelayMapping.Count == 0)
{
departmentDelayMapping.Add(department, currentTime);
continue;
}

var enqueueTime = departmentDelayMapping.Last().Value.AddMinutes(minutesPerReportSlice);
departmentDelayMapping.Add(department, enqueueTime);
}

return departmentDelayMapping;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,33 @@
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using static Fusion.Summary.Functions.CardBuilder.AdaptiveCardBuilder;

namespace Fusion.Summary.Functions.Functions;

public class WeeklyDepartmentSummarySender
{
private readonly ISummaryApiClient summaryApiClient;
private readonly IResourcesApiClient resourcesApiClient;
private readonly INotificationApiClient notificationApiClient;
private readonly ILogger<WeeklyDepartmentSummarySender> logger;
private readonly IConfiguration configuration;

private int _maxDegreeOfParallelism;

public WeeklyDepartmentSummarySender(ISummaryApiClient summaryApiClient, INotificationApiClient notificationApiClient,
ILogger<WeeklyDepartmentSummarySender> logger, IConfiguration configuration, IResourcesApiClient resourcesApiClient)
ILogger<WeeklyDepartmentSummarySender> logger, IConfiguration configuration)
{
this.summaryApiClient = summaryApiClient;
this.notificationApiClient = notificationApiClient;
this.logger = logger;
this.configuration = configuration;
this.resourcesApiClient = resourcesApiClient;

_maxDegreeOfParallelism = int.TryParse(configuration["weekly-department-summary-sender-parallelism"], out var result) ? result : 2;
}

[FunctionName("weekly-department-summary-sender")]
public async Task RunAsync([TimerTrigger("0 0 8 * * MON", RunOnStartup = false)] TimerInfo timerInfo)
public async Task RunAsync([TimerTrigger("0 0 5 * * MON", RunOnStartup = false)] TimerInfo timerInfo)
{
var departments = await summaryApiClient.GetDepartmentsAsync();

Expand All @@ -45,42 +47,61 @@ public async Task RunAsync([TimerTrigger("0 0 8 * * MON", RunOnStartup = false)]

var options = new ParallelOptions()
{
MaxDegreeOfParallelism = 10
MaxDegreeOfParallelism = _maxDegreeOfParallelism
};

// Use Parallel.ForEachAsync to easily limit the number of parallel requests
await Parallel.ForEachAsync(departments, options, async (department, ct) =>
await Parallel.ForEachAsync(departments, options, async (department, _) => await CreateAndSendNotificationsAsync(department));
}

private async Task CreateAndSendNotificationsAsync(ApiResourceOwnerDepartment department)
{
ApiWeeklySummaryReport summaryReport;

try
{
var summaryReport = await summaryApiClient.GetLatestWeeklyReportAsync(department.DepartmentSapId, ct);
summaryReport = await summaryApiClient.GetLatestWeeklyReportAsync(department.DepartmentSapId);

if (summaryReport is null)
{
logger.LogCritical(
"No summary report found for department {@Department}. Unable to send report notification",
department);
"No summary report found for department {Department}. Unable to send report notification",
JsonConvert.SerializeObject(department, Formatting.Indented));
return;
}
}
catch (Exception e)
{
logger.LogError(e, "Failed to get summary report for department {Department}", JsonConvert.SerializeObject(department, Formatting.Indented));
return;
}

SendNotificationsRequest notification;
try
{
notification = CreateNotification(summaryReport, department);
}
catch (Exception e)
{
logger.LogError(e, "Failed to create notification for department {@Department}", department);
throw;
}
SendNotificationsRequest notification;
try
{
notification = CreateNotification(summaryReport, department);
}
catch (Exception e)
{
logger.LogError(e, "Failed to create notification for department {DepartmentSapId} | Report {Report}", department.DepartmentSapId, JsonConvert.SerializeObject(summaryReport, Formatting.Indented));
return;
}

var reportReceivers = department.ResourceOwnersAzureUniqueId.Concat(department.DelegateResourceOwnersAzureUniqueId);
var reportReceivers = department.ResourceOwnersAzureUniqueId.Concat(department.DelegateResourceOwnersAzureUniqueId).Distinct();

foreach (var azureId in reportReceivers)
foreach (var azureId in reportReceivers)
{
try
{
var result = await notificationApiClient.SendNotification(notification, azureId);
if (!result)
logger.LogError("Failed to send notification to user with AzureId {AzureId} | Report {@ReportId}", azureId, summaryReport);
logger.LogError("Failed to send notification to user with AzureId {AzureId} | Report {Report}", azureId, JsonConvert.SerializeObject(summaryReport, Formatting.Indented));
}
catch (Exception e)
{
logger.LogError(e, "Failed to send notification to user with AzureId {AzureId} | Report {Report}", azureId, JsonConvert.SerializeObject(summaryReport, Formatting.Indented));
}
});
}
}


Expand Down Expand Up @@ -135,8 +156,8 @@ private SendNotificationsRequest CreateNotification(ApiWeeklySummaryReport repor
"Capacity in use",
"%")
.AddTextRow(
report.NumberOfRequestsLastPeriod,
"New requests last week")
report.NumberOfRequestsLastPeriod,
"New requests last week")
.AddTextRow(
report.NumberOfOpenRequests,
"Open requests")
Expand Down