Skip to content

Commit

Permalink
chore(Summary): Spread department messages over time + more error han…
Browse files Browse the repository at this point in the history
…dling (#690)

- [ ] New feature
- [ ] Bug fix
- [ ] High impact

**Description of work:**

AB#55978

- Makes it so that the messages are batched over a time period. This can be configured in az app settings

- Possible to change the amount of concurrency the sync and sender function use in az app settings. Reduced to 2 by defualt

- Refactored some code

- Fixed issue where complex objects weren't logged properly

- Updated timer trigger times

- Added more error handling

**Testing:**
- [x] Can be tested
- [ ] Automatic tests created / updated
- [x] Local tests are passing

I've tested running the sync function in the PR environment

**Checklist:**
- [x] Considered automated tests
- [x] Considered updating specification / documentation
- [x] Considered work items 
- [x] Considered security
- [x] Performed developer testing
- [x] Checklist finalized / ready for review
  • Loading branch information
Jonathanio123 authored Sep 5, 2024
1 parent 5881a81 commit 39a9287
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace Fusion.Resources.Functions.Common.ApiClients;
using System.Diagnostics;

namespace Fusion.Resources.Functions.Common.ApiClients;

public interface ISummaryApiClient
{
Expand All @@ -24,6 +26,7 @@ public Task PutWeeklySummaryReportAsync(string departmentSapId, ApiWeeklySummary
// TODO: Move to shared project
// Fusion.Resources.Integration.Models ?

[DebuggerDisplay("{DepartmentSapId} - {FullDepartmentName}")]
public class ApiResourceOwnerDepartment
{
public ApiResourceOwnerDepartment()
Expand Down
110 changes: 84 additions & 26 deletions src/Fusion.Summary.Functions/Functions/DepartmentResourceOwnerSync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Fusion.Resources.Functions.Common.ApiClients;
using Fusion.Resources.Functions.Common.ApiClients.ApiModels;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Fusion.Summary.Functions.Functions;
Expand All @@ -17,23 +17,43 @@ public class DepartmentResourceOwnerSync
{
private readonly ILineOrgApiClient lineOrgApiClient;
private readonly ISummaryApiClient summaryApiClient;
private readonly IConfiguration configuration;
private readonly IResourcesApiClient resourcesApiClient;
private readonly ILogger<DepartmentResourceOwnerSync> logger;

private string _serviceBusConnectionString;
private string _weeklySummaryQueueName;
private string[] _departmentFilter;
private TimeSpan _totalBatchTime;

public DepartmentResourceOwnerSync(
ILineOrgApiClient lineOrgApiClient,
ILineOrgApiClient lineOrgApiClient,
ISummaryApiClient summaryApiClient,
IConfiguration configuration,
IResourcesApiClient resourcesApiClient)
IResourcesApiClient resourcesApiClient,
ILogger<DepartmentResourceOwnerSync> logger)
{
this.lineOrgApiClient = lineOrgApiClient;
this.summaryApiClient = summaryApiClient;
this.configuration = configuration;
this.resourcesApiClient = resourcesApiClient;
this.logger = logger;

_serviceBusConnectionString = configuration["AzureWebJobsServiceBus"];
_weeklySummaryQueueName = configuration["department_summary_weekly_queue"];
_departmentFilter = configuration["departmentFilter"]?.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries) ?? ["PRD"];

var totalBatchTimeInMinutesStr = configuration["total_batch_time_in_minutes"];

if (!string.IsNullOrWhiteSpace(totalBatchTimeInMinutesStr))
{
_totalBatchTime = TimeSpan.FromMinutes(double.Parse(totalBatchTimeInMinutesStr));
logger.LogInformation("Batching messages over {BatchTime}", _totalBatchTime);
}
else
{
_totalBatchTime = TimeSpan.FromHours(4.5);

logger.LogWarning("Configuration variable 'total_batch_time_in_minutes' not found, batching messages over {BatchTime}", _totalBatchTime);
}
}

/// <summary>
Expand All @@ -43,18 +63,12 @@ public DepartmentResourceOwnerSync(
/// </summary>
/// <param name="timerInfo">The running date & time</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns></returns>
/// <exception cref="Exception"></exception>
[FunctionName("weekly-department-recipients-sync")]
public async Task RunAsync(
[TimerTrigger("0 05 00 * * MON", RunOnStartup = false)]
[TimerTrigger("0 5 0 * * MON", RunOnStartup = false)]
TimerInfo timerInfo, CancellationToken cancellationToken
)
{
_serviceBusConnectionString = configuration["AzureWebJobsServiceBus"];
_weeklySummaryQueueName = configuration["department_summary_weekly_queue"];
_departmentFilter = configuration["departmentFilter"]?.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries) ?? ["PRD"];

var client = new ServiceBusClient(_serviceBusConnectionString);
var sender = client.CreateSender(_weeklySummaryQueueName);

Expand All @@ -70,6 +84,11 @@ public async Task RunAsync(

foreach (var orgUnit in departments)
{
var resourceOwners = orgUnit.Management.Persons
.Select(p => Guid.Parse(p.AzureUniqueId))
.Distinct()
.ToArray();

var delegatedResponsibles = (await resourcesApiClient
.GetDelegatedResponsibleForDepartment(orgUnit.SapId!))
.Select(d => Guid.Parse(d.DelegatedResponsible.AzureUniquePersonId))
Expand All @@ -80,38 +99,77 @@ public async Task RunAsync(
{
DepartmentSapId = orgUnit.SapId!,
FullDepartmentName = orgUnit.FullDepartment!,
ResourceOwnersAzureUniqueId = orgUnit.Management.Persons.Select(p => Guid.Parse(p.AzureUniqueId)).Distinct().ToArray(),
ResourceOwnersAzureUniqueId = resourceOwners,
DelegateResourceOwnersAzureUniqueId = delegatedResponsibles
});
}

var parallelOptions = new ParallelOptions()
{
CancellationToken = cancellationToken,
MaxDegreeOfParallelism = 10,
};
var enqueueTimeForDepartmentMapping = CalculateDepartmentEnqueueTime(apiDepartments);

// Use Parallel.ForEachAsync to easily limit the number of parallel requests
await Parallel.ForEachAsync(apiDepartments, parallelOptions,
async (ownerDepartment, token) =>
logger.LogInformation("Syncing departments {Departments}", JsonConvert.SerializeObject(enqueueTimeForDepartmentMapping, Formatting.Indented));


foreach (var department in apiDepartments)
{
try
{
// Update the database
await summaryApiClient.PutDepartmentAsync(ownerDepartment, token);
await summaryApiClient.PutDepartmentAsync(department, cancellationToken);
}
catch (Exception e)
{
logger.LogError(e, "Failed to PUT department {Department}", JsonConvert.SerializeObject(department, Formatting.Indented));
continue;
}

try
{
// Send queue message
await SendDepartmentToQueue(sender, ownerDepartment);
});
await SendDepartmentToQueue(sender, department, enqueueTimeForDepartmentMapping[department]);
}
catch (Exception e)
{
logger.LogError(e, "Failed to send department to queue {Department}", JsonConvert.SerializeObject(department, Formatting.Indented));
}
}
}

private async Task SendDepartmentToQueue(ServiceBusSender sender, ApiResourceOwnerDepartment department, double delayInMinutes = 0)

private async Task SendDepartmentToQueue(ServiceBusSender sender, ApiResourceOwnerDepartment department, DateTimeOffset enqueueTime)
{
var serializedDto = JsonConvert.SerializeObject(department);

var message = new ServiceBusMessage(Encoding.UTF8.GetBytes(serializedDto))
{
ScheduledEnqueueTime = DateTime.UtcNow.AddMinutes(delayInMinutes)
ScheduledEnqueueTime = enqueueTime
};

await sender.SendMessageAsync(message);
}

/// <summary>
/// Calculate the enqueue time for each department based on the total batch time and amount of departments. This should spread
/// the work over the total batch time.
/// </summary>
private Dictionary<ApiResourceOwnerDepartment, DateTimeOffset> CalculateDepartmentEnqueueTime(List<ApiResourceOwnerDepartment> apiDepartments)
{
var currentTime = DateTimeOffset.UtcNow;
var minutesPerReportSlice = _totalBatchTime.TotalMinutes / apiDepartments.Count;

var departmentDelayMapping = new Dictionary<ApiResourceOwnerDepartment, DateTimeOffset>();
foreach (var department in apiDepartments)
{
// First department has no delay
if (departmentDelayMapping.Count == 0)
{
departmentDelayMapping.Add(department, currentTime);
continue;
}

var enqueueTime = departmentDelayMapping.Last().Value.AddMinutes(minutesPerReportSlice);
departmentDelayMapping.Add(department, enqueueTime);
}

return departmentDelayMapping;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,33 @@
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using static Fusion.Summary.Functions.CardBuilder.AdaptiveCardBuilder;

namespace Fusion.Summary.Functions.Functions;

public class WeeklyDepartmentSummarySender
{
private readonly ISummaryApiClient summaryApiClient;
private readonly IResourcesApiClient resourcesApiClient;
private readonly INotificationApiClient notificationApiClient;
private readonly ILogger<WeeklyDepartmentSummarySender> logger;
private readonly IConfiguration configuration;

private int _maxDegreeOfParallelism;

public WeeklyDepartmentSummarySender(ISummaryApiClient summaryApiClient, INotificationApiClient notificationApiClient,
ILogger<WeeklyDepartmentSummarySender> logger, IConfiguration configuration, IResourcesApiClient resourcesApiClient)
ILogger<WeeklyDepartmentSummarySender> logger, IConfiguration configuration)
{
this.summaryApiClient = summaryApiClient;
this.notificationApiClient = notificationApiClient;
this.logger = logger;
this.configuration = configuration;
this.resourcesApiClient = resourcesApiClient;

_maxDegreeOfParallelism = int.TryParse(configuration["weekly-department-summary-sender-parallelism"], out var result) ? result : 2;
}

[FunctionName("weekly-department-summary-sender")]
public async Task RunAsync([TimerTrigger("0 0 8 * * MON", RunOnStartup = false)] TimerInfo timerInfo)
public async Task RunAsync([TimerTrigger("0 0 5 * * MON", RunOnStartup = false)] TimerInfo timerInfo)
{
var departments = await summaryApiClient.GetDepartmentsAsync();

Expand All @@ -45,42 +47,61 @@ public async Task RunAsync([TimerTrigger("0 0 8 * * MON", RunOnStartup = false)]

var options = new ParallelOptions()
{
MaxDegreeOfParallelism = 10
MaxDegreeOfParallelism = _maxDegreeOfParallelism
};

// Use Parallel.ForEachAsync to easily limit the number of parallel requests
await Parallel.ForEachAsync(departments, options, async (department, ct) =>
await Parallel.ForEachAsync(departments, options, async (department, _) => await CreateAndSendNotificationsAsync(department));
}

private async Task CreateAndSendNotificationsAsync(ApiResourceOwnerDepartment department)
{
ApiWeeklySummaryReport summaryReport;

try
{
var summaryReport = await summaryApiClient.GetLatestWeeklyReportAsync(department.DepartmentSapId, ct);
summaryReport = await summaryApiClient.GetLatestWeeklyReportAsync(department.DepartmentSapId);

if (summaryReport is null)
{
logger.LogCritical(
"No summary report found for department {@Department}. Unable to send report notification",
department);
"No summary report found for department {Department}. Unable to send report notification",
JsonConvert.SerializeObject(department, Formatting.Indented));
return;
}
}
catch (Exception e)
{
logger.LogError(e, "Failed to get summary report for department {Department}", JsonConvert.SerializeObject(department, Formatting.Indented));
return;
}

SendNotificationsRequest notification;
try
{
notification = CreateNotification(summaryReport, department);
}
catch (Exception e)
{
logger.LogError(e, "Failed to create notification for department {@Department}", department);
throw;
}
SendNotificationsRequest notification;
try
{
notification = CreateNotification(summaryReport, department);
}
catch (Exception e)
{
logger.LogError(e, "Failed to create notification for department {DepartmentSapId} | Report {Report}", department.DepartmentSapId, JsonConvert.SerializeObject(summaryReport, Formatting.Indented));
return;
}

var reportReceivers = department.ResourceOwnersAzureUniqueId.Concat(department.DelegateResourceOwnersAzureUniqueId);
var reportReceivers = department.ResourceOwnersAzureUniqueId.Concat(department.DelegateResourceOwnersAzureUniqueId).Distinct();

foreach (var azureId in reportReceivers)
foreach (var azureId in reportReceivers)
{
try
{
var result = await notificationApiClient.SendNotification(notification, azureId);
if (!result)
logger.LogError("Failed to send notification to user with AzureId {AzureId} | Report {@ReportId}", azureId, summaryReport);
logger.LogError("Failed to send notification to user with AzureId {AzureId} | Report {Report}", azureId, JsonConvert.SerializeObject(summaryReport, Formatting.Indented));
}
catch (Exception e)
{
logger.LogError(e, "Failed to send notification to user with AzureId {AzureId} | Report {Report}", azureId, JsonConvert.SerializeObject(summaryReport, Formatting.Indented));
}
});
}
}


Expand Down Expand Up @@ -135,8 +156,8 @@ private SendNotificationsRequest CreateNotification(ApiWeeklySummaryReport repor
"Capacity in use",
"%")
.AddTextRow(
report.NumberOfRequestsLastPeriod,
"New requests last week")
report.NumberOfRequestsLastPeriod,
"New requests last week")
.AddTextRow(
report.NumberOfOpenRequests,
"Open requests")
Expand Down

0 comments on commit 39a9287

Please sign in to comment.