-
Notifications
You must be signed in to change notification settings - Fork 132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFE: Support async interceptors #580
Comments
It is possible, albeit inconvenient with a lot of boilerplate. Basically you must create and return your own Here's something similar for asynchronously getting/setting an authorization header: import {
Deferred,
RpcError,
RpcInterceptor,
RpcMetadata,
RpcStatus,
UnaryCall,
} from '@protobuf-ts/runtime-rpc';
import { asyncGetAuthHeader } from '../somewhere';
const myAuthInterceptor: RpcInterceptor = {
interceptUnary(next, method, input, options) {
const defHeader = new Deferred<RpcMetadata>(),
defMessage = new Deferred<object>(),
defStatus = new Deferred<RpcStatus>(),
defTrailer = new Deferred<RpcMetadata>();
if (!options.meta) {
options.meta = {};
}
void (async () => {
try {
// set auth request header
options.meta.Authorization = await asyncGetAuthHeader();
// perform the actual call
const result = next(method, input, options);
// resolve all our Deferreds
defHeader.resolve(result.headers);
defMessage.resolve(result.response);
defStatus.resolve(result.status);
defTrailer.resolve(result.trailers);
} catch (err) {
defHeader.rejectPending(err);
defMessage.rejectPending(err);
defStatus.rejectPending(err);
defTrailer.rejectPending(err);
}
})();
return new UnaryCall(
method,
options.meta,
input,
defHeader.promise,
defMessage.promise,
defStatus.promise,
defTrailer.promise
);
},
}; |
Thanks for sharing this, but this is too much bit magic for me. Code is more read than written and my co-workers would just stumble upon it. I still would appreciate a leaner way with less code. |
@jcready Please can you provide an example how to handle unauthenticated request (expired access token) too? So we should request a new token with refresh token provided. I'm struggling with that stuff |
@daudfauzy98 How are you sending the access token? Ideally you'd know that your access token was expired before issuing the request. If you were able to know it was expired (or about to expire) then the above should basically be all you need to do. Where If you aren't able to know that your access token is expired (or about to expire) then you could do something like this (perform initial request, if it fails then refresh access token, set new access token in the request headers, attempt the request again): import {
Deferred,
RpcError,
RpcInterceptor,
RpcMetadata,
RpcStatus,
UnaryCall,
} from '@protobuf-ts/runtime-rpc';
import { getCurrentAccessToken, refreshAccessToken } from '../somewhere';
const myAuthInterceptor: RpcInterceptor = {
interceptUnary(next, method, input, options) {
const defHeader = new Deferred<RpcMetadata>(),
defMessage = new Deferred<object>(),
defStatus = new Deferred<RpcStatus>(),
defTrailer = new Deferred<RpcMetadata>(),
performRequest = async () => {
const result = await next(method, input, options);
defHeader.resolve(result.headers);
defMessage.resolve(result.response);
defStatus.resolve(result.status);
defTrailer.resolve(result.trailers);
},
fail = (err: unknown) => {
defHeader.rejectPending(err);
defMessage.rejectPending(err);
defStatus.rejectPending(err);
defTrailer.rejectPending(err);
};
if (!options.meta) {
options.meta = {};
}
void (async () => {
try {
await performRequest();
} catch (err) {
if (err instanceof RpcError && (
err.code == GrpcStatus[GrpcStatus.PERMISSION_DENIED] ||
err.code == GrpcStatus[GrpcStatus.UNAUTHENTICATED]
)) {
try {
// Refresh the access token
await refreshAccessToken();
// update the request headers
options.meta.Authorization = await getCurrentAccessToken();
await performRequest();
} catch (e) {
return fail(e);
}
}
fail(err);
}
})();
return new UnaryCall(
method,
options.meta,
input,
defHeader.promise,
defMessage.promise,
defStatus.promise,
defTrailer.promise
);
},
}; |
Thanks for the example! Currently I did something like your given code. I got the session that contains access and refresh tokens when user has login, then store it to the session storage. So inside the interceptor, I take back the access token from the session storage, then provide it to It runs fine as long if single request was performed. The problem is when there are multiple requests and the access token is expired, so it uses the same token and will got rejected by the server. Each request will perform |
We're getting a little outside the scope of protobuf-ts here. Basically you need the // ../somewhere.ts
let refreshPromise: Promise<void> | undefined;
export async function getCurrentAccessToken(): Promise<string> {
await refreshPromise;
return sessionStorage.getItem('accessToken') ?? '';
}
export function refreshAccessToken(): Promise<void> {
return refreshPromise ?? (refreshPromise = (async () => {
try {
// Actually retrieve a new access token somehow and store it in sessionStorage
sessionStorage.setItem('accessToken', await magic());
} finally {
refreshPromise = undefined;
}
})());
} Then we need to modify the interceptor a bit: void (async () => {
try {
// set the request headers and prevent new requests while access token is refreshing
options.meta.Authorization = await getCurrentAccessToken();
await performRequest();
} catch (err) {
if (err instanceof RpcError && (
err.code == GrpcStatus[GrpcStatus.PERMISSION_DENIED] ||
err.code == GrpcStatus[GrpcStatus.UNAUTHENTICATED]
)) {
try {
// Refresh the access token
await refreshAccessToken();
// update the request headers
options.meta.Authorization = await getCurrentAccessToken();
await performRequest();
} catch (e) {
return fail(e);
}
}
fail(err);
}
})(); Again, you would be much better off if you could know the access token was about to expire and preemptively refresh the access token as it would avoid having to make requests we know are going to fail. |
Just want to confirm that this is a reasonable implementation of both client/server streaming. In particular that I've covered both the successful and error cases of the import {
Deferred,
type RpcInterceptor,
type RpcMetadata,
RpcOutputStreamController,
type RpcStatus,
ServerStreamingCall,
UnaryCall,
} from "@protobuf-ts/runtime-rpc";
import invariant from "tiny-invariant";
// async interceptors require reimplementing quite a few of the internals of protobuf-ts
// see https://github.com/timostamm/protobuf-ts/blob/main/packages/grpcweb-transport/src/grpc-web-transport.ts
// and https://github.com/timostamm/protobuf-ts/issues/580
// basic strategy is:
// 1. create a Deferred for each of the properties of the RpcCall
// 2. create a Promise to run the actual RPC call
// 3. return a new RpcCall using the Deferreds
async function getAccessToken(): Promise<string> {
// eslint-disable-next-line unicorn/no-useless-promise-resolve-reject
return Promise.resolve("fake-access-token");
}
export const createAsyncInterceptor = (): RpcInterceptor => {
return {
interceptUnary(next, method, input, options) {
const defHeaders = new Deferred<RpcMetadata>();
const defResponse = new Deferred<object>();
const defStatus = new Deferred<RpcStatus>();
const defTrailer = new Deferred<RpcMetadata>();
if (!options.meta) {
options.meta = {};
}
const runRPC = async () => {
try {
// set the access token
invariant(options.meta);
options.meta.Authorization = `Bearer ${await getAccessToken()}`;
// perform the actual call
const result = next(method, input, options);
// resolve all our Deferreds
defHeaders.resolve(result.headers);
defResponse.resolve(result.response);
defStatus.resolve(result.status);
defTrailer.resolve(result.trailers);
} catch (error) {
defHeaders.rejectPending(error);
defResponse.rejectPending(error);
defStatus.rejectPending(error);
defTrailer.rejectPending(error);
}
};
// eslint-disable-next-line @typescript-eslint/no-floating-promises
runRPC();
const call = new UnaryCall(
method,
options.meta,
input,
defHeaders.promise,
defResponse.promise,
defStatus.promise,
defTrailer.promise
);
return call;
},
interceptServerStreaming(next, method, input, options) {
const defHeaders = new Deferred<RpcMetadata>();
const responseStream = new RpcOutputStreamController<object>();
const defStatus = new Deferred<RpcStatus>();
const defTrailer = new Deferred<RpcMetadata>();
if (!options.meta) {
options.meta = {};
}
const runRPC = async () => {
try {
// set the access token
invariant(options.meta);
options.meta.Authorization = `Bearer ${await getAccessToken()}`;
// perform the actual call
const result = next(method, input, options);
// resolve all our Deferreds
defHeaders.resolve(result.headers);
result.responses.onMessage((message) => {
responseStream.notifyMessage(message);
});
result.responses.onComplete(() => {
responseStream.notifyComplete();
});
defStatus.resolve(result.status);
defTrailer.resolve(result.trailers);
} catch (error) {
invariant(error instanceof Error);
defHeaders.rejectPending(error);
responseStream.notifyError(error);
defStatus.rejectPending(error);
defTrailer.rejectPending(error);
}
};
// eslint-disable-next-line @typescript-eslint/no-floating-promises
runRPC();
const call = new ServerStreamingCall(
method,
options.meta,
input,
defHeaders.promise,
responseStream,
defStatus.promise,
defTrailer.promise
);
return call;
},
};
}; |
What you have basically does nothing since you aren't actually doing anything in the error handling (you aren't retrying the request with a new access token, just re-throwing the error). You're not awaiting the |
As far as I see there's no way to asynchronously intercept resp. call the next interceptor which is sometimes inconvenient when e.g. authorization is lazily initialized.
The text was updated successfully, but these errors were encountered: