Skip to content

Commit

Permalink
Merge pull request #51 from Evodim/fixes/TableClient-getpaged
Browse files Browse the repository at this point in the history
removed skip, take in getpaged
  • Loading branch information
medevod authored May 2, 2023
2 parents 624c8b7 + 4f70226 commit 554a960
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 80 deletions.
109 changes: 88 additions & 21 deletions samples/TableClient.Performance.Sample/SampleConsole.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using Azure.EntityServices.Queries;
using Azure.EntityServices.Tables;
using Azure.EntityServices.Queries;
using Azure.EntityServices.Tables;
using System;
using System.Linq;
using System.Threading.Tasks;
Expand All @@ -21,7 +21,7 @@ public static async Task Run()
//set here for your technical stuff: table name, connection, parallelization
var entityClient = EntityTableClient.Create<PersonEntity>(TestEnvironment.ConnectionString)
.Configure(options =>
{
{
options.TableName = $"{nameof(PersonEntity)}";
options.CreateTableIfNotExists = true;
}
Expand Down Expand Up @@ -50,7 +50,7 @@ public static async Task Run()
.AddTag("_FirstLastName3Chars")

//add an entity oberver to track entity changes and apply any action (projection, logging, etc.)
.AddObserver("EntityLoggerObserver",()=> new EntityLoggerObserver<PersonEntity>());
.AddObserver("EntityLoggerObserver", () => new EntityLoggerObserver<PersonEntity>());
});
//===============================================================================================

Expand All @@ -67,33 +67,100 @@ public static async Task Run()
{
await entityClient.AddManyAsync(entities);
}
string nextToken = "";

do
using (var mesure = counters.Mesure($"Add or replace many entities {ENTITY_COUNT} items"))
{
await entityClient.AddOrReplaceManyAsync(entities);
}

using (var mesure = counters.Mesure($"Add one entity"))
{
var pages = await entityClient.GetPagedAsync(nextPageToken: nextToken, skip: 10000);
nextToken = pages.ContinuationToken;
await entityClient.AddAsync(onePerson);
}

Console.WriteLine("not skipped" + pages.Entities.Count());
using (var mesure = counters.Mesure($"Add or replace one entity"))
{
await entityClient.AddOrReplaceAsync(onePerson);
}
while (!string.IsNullOrEmpty(nextToken));
nextToken = "";
do

Console.WriteLine($"Querying entities ...");

using (var mesure = counters.Mesure("Get By Id"))
{
var pages = await entityClient.GetPagedAsync(nextPageToken: nextToken);
nextToken = pages.ContinuationToken;
_ = await entityClient.GetByIdAsync(onePerson.TenantId, onePerson.PersonId);
Console.WriteLine($"{mesure.Name}");
}

Console.WriteLine("total " + pages.Entities.Count());
using (var mesure = counters.Mesure("Get with filter "))
{
var count = 0;
await foreach (var _ in entityClient.GetAsync(
filter => filter
.Where(entity => entity.LastName)
.Equal(onePerson.LastName)
.AndPartitionKey()
.Equal("tenant1"))
)
{
count += _.Count();
Console.WriteLine($"{mesure.Name} {count} iterated ");
Console.CursorTop--;
}
Console.WriteLine();
}
while (!string.IsNullOrEmpty(nextToken));

nextToken = string.Empty;

using (var mesure = counters.Mesure("Get with filter indexed"))
{
var count = 0;
await foreach (var _ in entityClient.GetAsync(
filter => filter
.WhereTag(entity => entity.LastName)
.Equal(onePerson.LastName)
.AndPartitionKey()
.Equal("tenant1"))
)

{
count += _.Count();
Console.WriteLine($"{mesure.Name} {count} iterated");
Console.CursorTop--;
}
Console.WriteLine();
}

using (var mesure = counters.Mesure("Get By dynamic prop"))
{
var count = 0;
await foreach (var _ in entityClient.GetAsync(
filter => filter
.WherePartitionKey()
.Equal("tenant1")
.And("_FirstLastName3Chars")
.Equal("arm")))
{
count += _.Count();
Console.WriteLine($"{mesure.Name} {count} iterated");
Console.CursorTop--;
}
Console.WriteLine();
}




using (var mesure = counters.Mesure("Get by dynamic prop indexed"))
{
var count = 0;
await foreach (var _ in entityClient.GetAsync(
filter => filter
.WhereTag("_FirstLastName3Chars")
.Equal("arm")
.AndPartitionKey()
.Equal("tenant1")))
{
count += _.Count();
Console.WriteLine($"{mesure.Name} {count} iterated");
Console.CursorTop--;
}
Console.WriteLine();
}
Console.WriteLine("====================================");
counters.WriteToConsole();
}
Expand Down
10 changes: 5 additions & 5 deletions src/Azure.EntityServices.Tables/EntityPage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@

namespace Azure.EntityServices.Tables
{
public record struct EntityPage<T>(IEnumerable<T> Entities,int skipped, bool isLastPage, string ContinuationToken)
public record struct EntityPage<T>(IEnumerable<T> Entities, int IteratedCount, bool isLastPage, string ContinuationToken)
{
public static implicit operator (IEnumerable<T>,int skipped, bool isLastPage, string ContinuationToken)(EntityPage<T> value)
public static implicit operator (IEnumerable<T>, int iteratorCount, bool isLastPage, string ContinuationToken)(EntityPage<T> value)
{
return (value.Entities, value.skipped, value.isLastPage, value.ContinuationToken);
return (value.Entities, value.IteratedCount, value.isLastPage, value.ContinuationToken);
}

public static implicit operator EntityPage<T>((IEnumerable<T> Item1,int skipped, string ContinuationToken) value)
public static implicit operator EntityPage<T>((IEnumerable<T> Item1, int IteratorCount, string ContinuationToken) value)
{
return new EntityPage<T>(value.Item1, value.skipped , string.IsNullOrEmpty(value.ContinuationToken), value.ContinuationToken);
return new EntityPage<T>(value.Item1, value.IteratorCount, string.IsNullOrEmpty(value.ContinuationToken), value.ContinuationToken);
}
}
}
46 changes: 8 additions & 38 deletions src/Azure.EntityServices.Tables/EntityTableClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,8 @@ public async IAsyncEnumerable<IEnumerable<T>> GetAsync(Action<IQuery<T>> filter

public async Task<EntityPage<T>> GetPagedAsync(
Action<IQuery<T>> filter = default,
int? skip = 0,
int? take = null,
int? iteratedCount = null,
int? maxPerPage = null,
string nextPageToken = null,
CancellationToken cancellationToken = default
)
Expand All @@ -337,45 +337,16 @@ public async Task<EntityPage<T>> GetPagedAsync(

try
{
var skipped = 0;
//if skipping is required, try to iterate quickly per page without fetching all entity properties
if (skip.HasValue && skip > 0 && string.IsNullOrEmpty(continuationToken))
{
var remainToSkip = skip;
var nextAvailable = true;

string skippedNextPageToken = continuationToken;

while (remainToSkip > 0 && nextAvailable)
{
// iterator with dynamic paging to handle remaining entities to skip in a page
var pageVisitor = QueryEntities(filter, (remainToSkip <= take) ? remainToSkip : take, skippedNextPageToken, cancellationToken, true)
.GetAsyncEnumerator(cancellationToken);

nextAvailable = await pageVisitor.MoveNextAsync();
remainToSkip -= pageVisitor.Current.Values.Count;
skipped += pageVisitor.Current.Values.Count;
skippedNextPageToken = pageVisitor.Current.ContinuationToken;
}
//in this case, there is no available data after skipping,therefore we return empty result
if (string.IsNullOrEmpty(skippedNextPageToken))
{
return new EntityPage<T>(Enumerable.Empty<T>(),
skipped,
true,
skippedNextPageToken);
}
//set continuation token to next iterator
continuationToken = skippedNextPageToken;
}
//Create a new iterator after skipping entities to return next available entities
pageEnumerator = QueryEntities(filter, take, continuationToken, cancellationToken)
//Create a new iterator after skipping entities to return next available entities
pageEnumerator = QueryEntities(filter, maxPerPage, continuationToken, cancellationToken)
.GetAsyncEnumerator(cancellationToken);
await pageEnumerator.MoveNextAsync();

var currentCount = (iteratedCount ?? 0) + pageEnumerator.Current.Values.Count;

return new EntityPage<T>(pageEnumerator.Current.Values.Select(
tableEntity => CreateEntityBinderFromTableEntity(tableEntity).UnBind()),
skipped,
currentCount,
string.IsNullOrEmpty(pageEnumerator.Current.ContinuationToken),
pageEnumerator.Current.ContinuationToken);
}
Expand Down Expand Up @@ -570,12 +541,11 @@ public Task DeleteManyAsync(IEnumerable<T> entities, CancellationToken cancellat

public async Task DeleteByIdAsync(string partition, object id, CancellationToken cancellationToken = default)
{
var entity = await GetByIdAsync(partition,id, cancellationToken);
var entity = await GetByIdAsync(partition, id, cancellationToken);
if (entity != null)
{
await DeleteAsync(entity, cancellationToken);
}

}
}
}
2 changes: 1 addition & 1 deletion src/Azure.EntityServices.Tables/IEntityTableClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public interface IEntityTableClient<T> : IEntityTableClientRuntimeConfig<T>

IAsyncEnumerable<IEnumerable<T>> GetAsync(Action<IQuery<T>> filter = default, CancellationToken cancellationToken = default);

Task<EntityPage<T>> GetPagedAsync(Action<IQuery<T>> filter = default, int? skip = 0, int? take = null, string nextPageToken = null, CancellationToken cancellationToken = default);
Task<EntityPage<T>> GetPagedAsync(Action<IQuery<T>> filter = default, int? iteratorCount = null, int? maxPerPage = null, string nextPageToken = null, CancellationToken cancellationToken = default);

Task DeleteByIdAsync(string partition, object id, CancellationToken cancellationToken = default);

Expand Down
29 changes: 14 additions & 15 deletions tests/Azure.EntityServices.Tests/Table/EntityTableClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,7 @@ public async Task Should_Get_Paged_Entities()

do
{
page = await personClient.GetPagedAsync(take: 10, nextPageToken: page.ContinuationToken);
page = await personClient.GetPagedAsync(nextPageToken: page.ContinuationToken);

if (page.isLastPage)
{
Expand All @@ -1215,18 +1215,16 @@ public async Task Should_Get_Paged_Entities()
}
}

//[DataRow[ available_entities, to_skip, to_take ]
[DataRow(10, 10, 20)] //skip more than available entities
[DataRow(158, 10, 20)]
[DataRow(122, 5, 100)]
[DataRow(1024, 500, 999)]
[DataRow(2012, 1200, 1000)]
//[DataRow[ available_entities, to_skip, max_per_page ]
[DataRow(10, 20)] //skip more than available entities
[DataRow(158, 100)]
[DataRow(1024, 999)]
[DataRow(2012, 1000)]
[TestMethod]
public async Task Should_Get_Paged_Entities_By_Skipping_Entities(params int[] inputs)
public async Task Should_Get_Paged_Entities_With_Custom_Max_Per_Page(params int[] inputs)
{
int totalCount = inputs[0];
int skipCount = inputs[1];
int takeCount = inputs[2];
int totalCount = inputs[0];
int maxPerPage = inputs[1];

var persons = Fakers.CreateFakePerson().Generate(totalCount);
var options = new EntityTableClientOptions() { };
Expand All @@ -1251,21 +1249,22 @@ public async Task Should_Get_Paged_Entities_By_Skipping_Entities(params int[] in

do
{
page = await personClient.GetPagedAsync(take: takeCount, skip: skipCount, nextPageToken: page.ContinuationToken);
page = await personClient.GetPagedAsync(maxPerPage: maxPerPage, iteratedCount: page.IteratedCount, nextPageToken: page.ContinuationToken);

if (page.isLastPage)
{
page.Entities.Count().Should().BeLessThanOrEqualTo(takeCount);
page.Entities.Count().Should().BeLessThanOrEqualTo(maxPerPage);
}
else
{
page.Entities.Count().Should().Be(takeCount);
page.Entities.Count().Should().Be(maxPerPage);
}
entityCount += page.Entities.Count();
}
while (!page.isLastPage);

entityCount.Should().Be(totalCount - skipCount);
entityCount.Should().Be(totalCount);
page.IteratedCount.Should().Be(entityCount);
}
catch { throw; }
finally
Expand Down

0 comments on commit 554a960

Please sign in to comment.