Skip to content

Commit

Permalink
feat: support for checkpointing merge progress (#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
chgl authored Nov 7, 2024
1 parent aa609be commit b2abf19
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 32 deletions.
8 changes: 4 additions & 4 deletions src/PathlingS3Import.Tests.E2E/Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public async Task StartImportTool_WithRunningPathlingServerAndMinio_ShouldCreate
"--s3-bucket-name=fhir",
"--s3-object-name-prefix=staging/",
$"--import-resource-type={resourceType}",
"--dry-run=false"
"--dry-run=false",
];

var testImageTag =
Expand All @@ -55,12 +55,12 @@ public async Task StartImportTool_WithRunningPathlingServerAndMinio_ShouldCreate

consumer.Stdout.Seek(0, SeekOrigin.Begin);
using var stdoutReader = new StreamReader(consumer.Stdout);
var stdout = stdoutReader.ReadToEnd();
var stdout = await stdoutReader.ReadToEndAsync();
output.WriteLine(stdout);

consumer.Stderr.Seek(0, SeekOrigin.Begin);
using var stderrReader = new StreamReader(consumer.Stderr);
var stderr = stderrReader.ReadToEnd();
var stderr = await stderrReader.ReadToEndAsync();
output.WriteLine(stderr);

exitCode.Should().Be(0);
Expand All @@ -72,7 +72,7 @@ public async Task StartImportTool_WithRunningPathlingServerAndMinio_ShouldCreate
settings: new()
{
PreferredFormat = ResourceFormat.Json,
Timeout = (int)TimeSpan.FromSeconds(60).TotalMilliseconds
Timeout = (int)TimeSpan.FromSeconds(60).TotalMilliseconds,
}
);

Expand Down
8 changes: 7 additions & 1 deletion src/PathlingS3Import/Commands/CommandBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ public abstract partial class CommandBase
)]
public string? PushGatewayAuthHeader { get; set; }

[CliOption(
Description = "If enabled, continue processing resources from the last saved checkpoint file.",
Name = "--continue-from-last-checkpoint"
)]
public bool IsContinueFromLastCheckpointEnabled { get; set; } = false;

public ILoggerFactory LogFactory { get; set; }

public ResiliencePipeline RetryPipeline { get; set; }
Expand Down Expand Up @@ -110,7 +116,7 @@ protected CommandBase()
);
// Event handlers can be asynchronous; here, we return an empty ValueTask.
return default;
}
},
};

RetryPipeline = new ResiliencePipelineBuilder()
Expand Down
44 changes: 18 additions & 26 deletions src/PathlingS3Import/Commands/ImportCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ public partial class ImportCommand : CommandBase
private readonly IMetricFamily<IHistogram, ValueTuple<string>> importDurationHistogram;
private readonly IMetricFamily<ICounter, ValueTuple<string>> resourcesImportedCounter;

private class ImportCheckpoint
{
public DateTimeOffset CreatedAt { get; set; } = DateTimeOffset.UtcNow;
public string? LastImportedObjectUrl { get; set; }
}

public ImportCommand()
{
log = LogFactory.CreateLogger<ImportCommand>();
Expand Down Expand Up @@ -71,18 +65,15 @@ public ImportCommand()
[CliOption(Description = "The type of FHIR resource to import")]
public ResourceType ImportResourceType { get; set; } = ResourceType.Patient;

[CliOption(Description = "Delay to wait after importing a bundle")]
public TimeSpan SleepAfterImport { get; set; } = TimeSpan.FromSeconds(10);

[CliOption(
Description = "If enabled, continue processing resources from the last saved checkpoint file.",
Name = "--continue-from-last-checkpoint"
Description = "Name of the import checkpoint file",
Name = "--import-checkpoint-file-name"
)]
public bool IsContinueFromLastCheckpointEnabled { get; set; } = false;

[CliOption(Description = "Name of the checkpoint file", Name = "--checkpoint-file-name")]
public string CheckpointFileName { get; set; } = "_last-import-checkpoint.json";

[CliOption(Description = "Delay to wait after importing a bundle")]
public TimeSpan SleepAfterImport { get; set; } = TimeSpan.FromSeconds(10);

public async Task RunAsync()
{
log.LogInformation(
Expand Down Expand Up @@ -123,7 +114,7 @@ public async Task RunAsync()
{
options.AdditionalHeaders = new Dictionary<string, string>
{
{ "Authorization", PushGatewayAuthHeader }
{ "Authorization", PushGatewayAuthHeader },
};
}

Expand Down Expand Up @@ -234,15 +225,15 @@ await allObjects.CountAsync()
using var reader = new StreamReader(stream, Encoding.UTF8);

var checkpointJson = await reader.ReadToEndAsync(ct);
var checkpoint = JsonSerializer.Deserialize<ImportCheckpoint>(
var checkpoint = JsonSerializer.Deserialize<ProgressCheckpoint>(
checkpointJson
);

log.LogInformation("Last checkpoint: {CheckpointJson}", checkpointJson);

if (checkpoint is not null)
{
lastProcessedFile = checkpoint.LastImportedObjectUrl;
lastProcessedFile = checkpoint.LastProcessedObjectUrl;
}
else
{
Expand All @@ -265,8 +256,6 @@ await allObjects.CountAsync()

log.LogInformation("Continuing after {LastProcessedFile}", lastProcessedFile);

// order again just so we have an IOrderedEnumerable in the end.
// not really necessary.
objectsToProcess = objectsToProcess
.SkipWhile(item => $"s3://{S3BucketName}/{item.Key}" != lastProcessedFile)
// SkipWhile stops if we reach the lastProcessedFile, but includes the entry itself in the
Expand Down Expand Up @@ -330,27 +319,27 @@ await allObjects.CountAsync()
new Parameters.ParameterComponent()
{
Name = "resourceType",
Value = new Code(ImportResourceType.ToString())
Value = new Code(ImportResourceType.ToString()),
},
new Parameters.ParameterComponent()
{
Name = "mode",
Value = new Code("merge")
Value = new Code("merge"),
},
new Parameters.ParameterComponent()
{
Name = "url",
Value = new FhirUrl(objectUrl)
}
]
Value = new FhirUrl(objectUrl),
},
],
};

var importParameters = new Parameters();
// for now, create one Import request per file. In the future,
// we might want to add multiple ndjson files at once in batches.
importParameters.Parameter.Add(parameter);

log.LogInformation("{ImportParameters}", importParameters.ToJson());
log.LogInformation("{ImportParameters}", await importParameters.ToJsonAsync());

log.LogInformation(
"Starting {PathlingServerBaseUrl}/$import for {ObjectUrl}",
Expand Down Expand Up @@ -391,7 +380,10 @@ await allObjects.CountAsync()
checkpointObjectName
);

var checkpoint = new ImportCheckpoint() { LastImportedObjectUrl = objectUrl, };
var checkpoint = new ProgressCheckpoint()
{
LastProcessedObjectUrl = objectUrl,
};
var jsonString = JsonSerializer.Serialize(checkpoint);
var bytes = Encoding.UTF8.GetBytes(jsonString);
using var memoryStream = new MemoryStream(bytes);
Expand Down
125 changes: 124 additions & 1 deletion src/PathlingS3Import/Commands/MergeCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using Hl7.Fhir.Serialization;
using Microsoft.Extensions.Logging;
using Minio;
using Minio.ApiEndpoints;
using Minio.DataModel.Args;
using Prometheus.Client;
using Prometheus.Client.Collectors;
Expand Down Expand Up @@ -41,6 +40,12 @@ public partial class MergeCommand : CommandBase
[CliOption(Description = "The maximum size of the merged bundle in bytes. Default: 1 GiB")]
public int MaxMergedBundleSizeInBytes { get; set; } = 1 * 1024 * 1024 * 1024;

[CliOption(
Description = "Name of the merge checkpoint file",
Name = "--merge-checkpoint-file-name"
)]
public string CheckpointFileName { get; set; } = "_last-merge-checkpoint.json";

public MergeCommand()
{
log = LogFactory.CreateLogger<MergeCommand>();
Expand Down Expand Up @@ -110,13 +115,87 @@ await allObjects.CountAsync()
})
.ToListAsync();

var checkpointObjectName = $"{prefix}{CheckpointFileName}";

log.LogInformation(
"Name of the current progress checkpoint object set to {CheckpointObjectName}.",
checkpointObjectName
);

if (IsContinueFromLastCheckpointEnabled)
{
log.LogInformation(
"Reading last checkpoint file from {CheckpointObjectName}",
checkpointObjectName
);

var lastProcessedFile = string.Empty;
// read the contents of the last checkpoint file
var getArgs = new GetObjectArgs()
.WithBucket(S3BucketName)
.WithObject(checkpointObjectName)
.WithCallbackStream(
async (stream, ct) =>
{
using var reader = new StreamReader(stream, Encoding.UTF8);

var checkpointJson = await reader.ReadToEndAsync(ct);
var checkpoint = JsonSerializer.Deserialize<ProgressCheckpoint>(
checkpointJson
);

log.LogInformation("Last checkpoint: {CheckpointJson}", checkpointJson);

if (checkpoint is not null)
{
lastProcessedFile = checkpoint.LastProcessedObjectUrl;
}
else
{
log.LogError(
"Failed to read checkpoint file: deserialized object is null"
);
}

await stream.DisposeAsync();
}
);
await minio.GetObjectAsync(getArgs);

if (string.IsNullOrEmpty(lastProcessedFile))
{
throw new InvalidDataException(
"Failed to read last processed file. Contents are null or empty."
);
}

log.LogInformation("Continuing after {LastProcessedFile}", lastProcessedFile);

objectsToProcess = objectsToProcess
.SkipWhile(item => $"s3://{S3BucketName}/{item.Key}" != lastProcessedFile)
// SkipWhile stops if we reach the lastProcessedFile, but includes the entry itself in the
// result, so we need to skip that as well.
// Ideally, we'd say `SkipWhile(item.key-timestamp <= lastProcessedFile-timestamp)`.
.Skip(1)
.ToList();

log.LogInformation("Listing actual objects to process");
}

var objectsToProcessCount = objectsToProcess.Count;
log.LogInformation("Actually processing {ObjectsToProcessCount}", objectsToProcessCount);

var currentMergedResources = new ConcurrentDictionary<string, string>();
var estimatedSizeInBytes = 0;
var processedCount = 0;
string lastProcessedObjectUrl = string.Empty;

foreach (var item in objectsToProcess)
{
var objectUrl = $"s3://{S3BucketName}/{item.Key}";

lastProcessedObjectUrl = objectUrl;

using (log.BeginScope("[Merging ndjson file {NdjsonObjectUrl}]", objectUrl))
{
var resourceCountInFile = 0;
Expand Down Expand Up @@ -177,6 +256,7 @@ await allObjects.CountAsync()
await PutMergedBundleAsync(minio, currentMergedResources);
currentMergedResources.Clear();
estimatedSizeInBytes = 0;
await CheckpointProgressAsync(minio, checkpointObjectName, objectUrl);
}

processedCount++;
Expand All @@ -199,6 +279,7 @@ await allObjects.CountAsync()
await PutMergedBundleAsync(minio, currentMergedResources);
currentMergedResources.Clear();
estimatedSizeInBytes = 0;
await CheckpointProgressAsync(minio, checkpointObjectName, lastProcessedObjectUrl);
}
}

Expand Down Expand Up @@ -251,4 +332,46 @@ await RetryPipeline.ExecuteAsync(async token =>
);
}
}

private async System.Threading.Tasks.Task CheckpointProgressAsync(
IMinioClient minio,
string checkpointObjectName,
string lastProcessedObjectUrl
)
{
log.LogInformation(
"Checkpointing progress '{ObjectUrl}' as '{S3BucketName}/{CheckpointObjectName}'",
lastProcessedObjectUrl,
S3BucketName,
checkpointObjectName
);

var checkpoint = new ProgressCheckpoint()
{
LastProcessedObjectUrl = lastProcessedObjectUrl,
};
var jsonString = JsonSerializer.Serialize(checkpoint);
var bytes = Encoding.UTF8.GetBytes(jsonString);
using var memoryStream = new MemoryStream(bytes);

// persist progress
var putArgs = new PutObjectArgs()
.WithBucket(S3BucketName)
.WithObject(checkpointObjectName)
.WithContentType("application/json")
.WithStreamData(memoryStream)
.WithObjectSize(bytes.LongLength);

if (!IsDryRun)
{
await RetryPipeline.ExecuteAsync(async token =>
{
await minio.PutObjectAsync(putArgs, token);
});
}
else
{
log.LogInformation("Running in dry-run mode. Not updating the checkpoint file.");
}
}
}
7 changes: 7 additions & 0 deletions src/PathlingS3Import/ProgressCheckpoint.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace PathlingS3Import;

sealed class ProgressCheckpoint
{
public DateTimeOffset CreatedAt { get; set; } = DateTimeOffset.UtcNow;
public string? LastProcessedObjectUrl { get; set; }
}

0 comments on commit b2abf19

Please sign in to comment.