Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Future Async #230

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions Source/EntityFramework.Extended/Extensions/FutureExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public static FutureQuery<TEntity> Future<TEntity>(this IQueryable<TEntity> sour
throw new ArgumentException("The source query must be of type ObjectQuery or DbQuery.", "source");

var futureContext = GetFutureContext(sourceQuery);
var future = new FutureQuery<TEntity>(sourceQuery, futureContext.ExecuteFutureQueries);
var future = new FutureQuery<TEntity>(sourceQuery, futureContext);
futureContext.AddQuery(future);

return future;
Expand All @@ -52,7 +52,7 @@ public static FutureCount FutureCount<TEntity>(this IQueryable<TEntity> source)
if (source == null)
return new FutureCount(0);

ObjectQuery sourceQuery = source.ToObjectQuery();
ObjectQuery<TEntity> sourceQuery = source.ToObjectQuery();
if (sourceQuery == null)
throw new ArgumentException("The source query must be of type ObjectQuery or DbQuery.", "source");

Expand All @@ -64,12 +64,12 @@ public static FutureCount FutureCount<TEntity>(this IQueryable<TEntity> source)
source.Expression);

// create query from expression using internal ObjectQueryProvider
ObjectQuery countQuery = sourceQuery.CreateQuery(expression, typeof(int));
var countQuery = sourceQuery.CreateQuery(expression, typeof(int)) as ObjectQuery<int>;
if (countQuery == null)
throw new ArgumentException("The source query must be of type ObjectQuery or DbQuery.", "source");

var futureContext = GetFutureContext(sourceQuery);
var future = new FutureCount(countQuery, futureContext.ExecuteFutureQueries);
var future = new FutureCount(countQuery, futureContext);
futureContext.AddQuery(future);
return future;
}
Expand Down Expand Up @@ -106,12 +106,12 @@ public static FutureValue<TResult> FutureValue<TEntity, TResult>(this IQueryable
}

var expression = Expression.Call(null, methodExpr.Method, arguments);
var valueQuery = sourceQuery.CreateQuery(expression, typeof(TResult));
var valueQuery = sourceQuery.CreateQuery(expression, typeof(TResult)) as ObjectQuery<TResult>;
if (valueQuery == null)
throw new ArgumentException("The source query must be of type ObjectQuery or DbQuery.", "source");

var futureContext = GetFutureContext(sourceQuery);
var future = new FutureValue<TResult>(valueQuery, futureContext.ExecuteFutureQueries);
var future = new FutureValue<TResult>(valueQuery, futureContext);
futureContext.AddQuery(future);
return future;
}
Expand Down Expand Up @@ -141,7 +141,7 @@ public static FutureValue<TEntity> FutureFirstOrDefault<TEntity>(this IQueryable
throw new ArgumentException("The source query must be of type ObjectQuery or DbQuery.", "source");

var futureContext = GetFutureContext(sourceQuery);
var future = new FutureValue<TEntity>(objectQuery, futureContext.ExecuteFutureQueries);
var future = new FutureValue<TEntity>(objectQuery, futureContext);
futureContext.AddQuery(future);
return future;
}
Expand Down
20 changes: 20 additions & 0 deletions Source/EntityFramework.Extended/Future/FutureContext.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.Data.Entity.Core.Objects;
using System.Threading;
using System.Threading.Tasks;

namespace EntityFramework.Future
{
Expand Down Expand Up @@ -85,6 +87,24 @@ public void ExecuteFutureQueries()
runner.ExecuteFutureQueries(context, FutureQueries);
}

#if NET45
/// <summary>
/// Executes the future queries as a single batch.
/// </summary>
public async Task ExecuteFutureQueriesAsync(CancellationToken cancellationToken = default(CancellationToken))
{
ObjectContext context = ObjectContext;
if (context == null)
throw new ObjectDisposedException("ObjectContext", "The ObjectContext for the future queries has been disposed.");

var runner = Locator.Current.Resolve<IFutureRunner>();
if (runner == null)
throw new InvalidOperationException("Could not resolve the IFutureRunner. Make sure IFutureRunner is registered in the Locator.Current container.");

await runner.ExecuteFutureQueriesAsync(context, FutureQueries, cancellationToken).ConfigureAwait(false);
}
#endif

/// <summary>
/// Adds the future query to the waiting queries list on this context.
/// </summary>
Expand Down
8 changes: 4 additions & 4 deletions Source/EntityFramework.Extended/Future/FutureCount.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ namespace EntityFramework.Future
public class FutureCount : FutureValue<int>
{
/// <summary>
/// Initializes a new instance of the <see cref="FutureCount"/> class.
/// Initializes a new instance of the <see cref="FutureCount" /> class.
/// </summary>
/// <param name="query">The query source to use when materializing.</param>
/// <param name="loadAction">The action to execute when the query is accessed.</param>
internal FutureCount(IQueryable query, Action loadAction)
: base(query, loadAction)
/// <param name="futureContext">The future context.</param>
internal FutureCount(IQueryable<int> query, IFutureContext futureContext)
: base(query, futureContext)
{ }

/// <summary>
Expand Down
33 changes: 28 additions & 5 deletions Source/EntityFramework.Extended/Future/FutureQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace EntityFramework.Future
{
Expand All @@ -26,13 +28,14 @@ namespace EntityFramework.Future
public class FutureQuery<T> : FutureQueryBase<T>, IEnumerable<T>
{
/// <summary>
/// Initializes a new instance of the <see cref="T:EntityFramework.Future.FutureQuery`1"/> class.
/// Initializes a new instance of the <see cref="T:EntityFramework.Future.FutureQuery`1" /> class.
/// </summary>
/// <param name="query">The query source to use when materializing.</param>
/// <param name="loadAction">The action to execute when the query is accessed.</param>
internal FutureQuery(IQueryable query, Action loadAction)
: base(query, loadAction)
{ }
/// <param name="futureContext">The future context.</param>
internal FutureQuery(IQueryable<T> query, IFutureContext futureContext)
: base(query, futureContext)
{
}

/// <summary>
/// Returns an enumerator that iterates through the collection.
Expand Down Expand Up @@ -61,5 +64,25 @@ IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}

#if NET45
/// <summary>
/// To the list asynchronous.
/// </summary>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns></returns>
/// <exception cref="FutureException">An error occurred executing the future query.</exception>
public async Task<IList<T>> ToListAsync(CancellationToken cancellationToken = default(CancellationToken))
{
// triggers loading future queries
var result =
await GetResultAsync(cancellationToken).ConfigureAwait(false) ?? new List<T>();

if (Exception != null)
throw new FutureException("An error occurred executing the future query.", Exception);

return result;
}
#endif
}
}
145 changes: 122 additions & 23 deletions Source/EntityFramework.Extended/Future/FutureQueryBase.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Data.Entity;
using System.Data.Entity.Core.Objects;
using System.Data.Entity.Infrastructure;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using EntityFramework.Reflection;

namespace EntityFramework.Future
Expand All @@ -15,31 +19,31 @@ namespace EntityFramework.Future
[DebuggerDisplay("IsLoaded={IsLoaded}")]
public abstract class FutureQueryBase<T> : IFutureQuery
{
private readonly Action _loadAction;
private readonly IQueryable _query;
private IEnumerable<T> _result;
private readonly IQueryable<T> _query;
private readonly IFutureContext _futureContext;
private IList<T> _result;
private bool _isLoaded;

/// <summary>
/// Initializes a new instance of the <see cref="FutureQuery&lt;T&gt;"/> class.
/// Initializes a new instance of the <see cref="FutureQuery&lt;T&gt;" /> class.
/// </summary>
/// <param name="query">The query source to use when materializing.</param>
/// <param name="loadAction">The action to execute when the query is accessed.</param>
protected FutureQueryBase(IQueryable query, Action loadAction)
/// <param name="futureContext">The future context.</param>
protected FutureQueryBase(IQueryable<T> query, IFutureContext futureContext)
{
_query = query;
_loadAction = loadAction;
_result = null;
_futureContext = futureContext;
//_result = null;
}

/// <summary>
/// Gets the action to execute when the query is accessed.
/// </summary>
/// <value>The load action.</value>
protected Action LoadAction
{
get { return _loadAction; }
}
//protected Action LoadAction
//{
// get { return _loadAction; }
//}

/// <summary>
/// Gets a value indicating whether this instance is loaded.
Expand All @@ -66,29 +70,54 @@ IQueryable IFutureQuery.Query
}

/// <summary>
/// Gets the result by invoking the <see cref="LoadAction"/> if not already loaded.
/// Gets the result.
/// </summary>
/// <returns></returns>
protected virtual IList<T> GetResult()
{
if (IsLoaded)
return _result;

// no load action, run query directly
if (_futureContext == null)
{
_isLoaded = true;
var enumerable = _query as IEnumerable<T>;
_result = enumerable == null ? null : enumerable.ToList();
return _result;
}

// invoke the load action on the datacontext
// result will be set with a callback to SetResult
_futureContext.ExecuteFutureQueries();
return _result ?? new List<T>();
}

#if NET45
/// <summary>
/// Gets the result asynchronous.
/// </summary>
/// <returns>
/// An <see cref="T:System.Collections.Generic.IEnumerable`1"/> that can be used to iterate through the collection.
/// </returns>
protected virtual IEnumerable<T> GetResult()
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns></returns>
protected virtual async Task<IList<T>> GetResultAsync(CancellationToken cancellationToken = default(CancellationToken))
{
if (IsLoaded)
return _result;

// no load action, run query directly
if (LoadAction == null)
if (_futureContext == null)
{
_isLoaded = true;
_result = _query as IEnumerable<T>;
_result = await (_query as IQueryable<T>).ToListAsync(cancellationToken).ConfigureAwait(false);
return _result;
}

// invoke the load action on the datacontext
// result will be set with a callback to SetResult
LoadAction.Invoke();
return _result ?? Enumerable.Empty<T>();
await _futureContext.ExecuteFutureQueriesAsync(cancellationToken).ConfigureAwait(false);
return _result ?? new List<T>();
}
#endif

/// <summary>
/// Gets the data command for this query.
Expand Down Expand Up @@ -155,13 +184,16 @@ protected virtual void SetResult(ObjectContext dataContext, DbDataReader reader)
dynamic queryProxy = new DynamicProxy(q);
// ObjectQueryState
dynamic queryState = queryProxy.QueryState;


// ObjectQueryExecutionPlan
dynamic executionPlan = queryState.GetExecutionPlan(null);

// ShaperFactory
dynamic shaperFactory = executionPlan.ResultShaperFactory;
// Shaper<T>
dynamic shaper = shaperFactory.Create(reader, dataContext, dataContext.MetadataWorkspace, MergeOption.AppendOnly, false, true, false);
dynamic shaper = shaperFactory.Create(reader, dataContext, dataContext.MetadataWorkspace,
MergeOption.AppendOnly, false, true, false);

var list = new List<T>();
IEnumerator<T> enumerator = shaper.GetEnumerator();
Expand All @@ -179,5 +211,72 @@ protected virtual void SetResult(ObjectContext dataContext, DbDataReader reader)
Exception = ex;
}
}

#if NET45
/// <summary>
/// Sets the underling value after the query has been executed.
/// </summary>
/// <param name="dataContext">The data context to translate the results with.</param>
/// <param name="reader">The <see cref="DbDataReader" /> to get the result from.</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task IFutureQuery.SetResultAsync(ObjectContext dataContext, DbDataReader reader, CancellationToken cancellationToken = default(CancellationToken))
{
return SetResultAsync(dataContext, reader, cancellationToken);
}

/// <summary>
/// Sets the underling value after the query has been executed.
/// </summary>
/// <param name="dataContext">The data context to translate the results with.</param>
/// <param name="reader">The <see cref="DbDataReader" /> to get the result from.</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="InvalidOperationException">The future query is not of type ObjectQuery.</exception>
protected virtual async Task SetResultAsync(ObjectContext dataContext, DbDataReader reader, CancellationToken cancellationToken = default(CancellationToken))
{
_isLoaded = true;

try
{
IFutureQuery futureQuery = this;
var source = futureQuery.Query;

var q = source as ObjectQuery;
if (q == null)
throw new InvalidOperationException("The future query is not of type ObjectQuery.");

// create execution plan
dynamic queryProxy = new DynamicProxy(q);
// ObjectQueryState
dynamic queryState = queryProxy.QueryState;

// ObjectQueryExecutionPlan
dynamic executionPlan = queryState.GetExecutionPlan(null);

// ShaperFactory
dynamic shaperFactory = executionPlan.ResultShaperFactory;
// Shaper<T>
dynamic shaper = shaperFactory.Create(reader, dataContext, dataContext.MetadataWorkspace,
MergeOption.AppendOnly, false, true, false);

var list = new List<T>();
// Shaper<T> GetEnumerator method return IDbEnumerator<T>, which implements publicly accessible IDbAsyncEnumerator<T>
IDbAsyncEnumerator<T> enumerator = shaper.GetEnumerator();
while (await enumerator.MoveNextAsync(cancellationToken).ConfigureAwait(false))
list.Add(enumerator.Current);

_result = list;

// translate has issue with column names not matching
//var resultSet = dataContext.Translate<T>(reader);
//_result = resultSet.ToList();
}
catch (Exception ex)
{
Exception = ex;
}
}
#endif
}
}
Loading