Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental Async FCM #639

Draft
wants to merge 6 commits into
base: async-fcm
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions firebase_admin/_gapic_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import io
import socket

import googleapiclient
import httplib2
import googleapiclient # type: ignore
import httplib2 # type: ignore
import requests

from firebase_admin import exceptions
Expand Down Expand Up @@ -92,15 +92,15 @@ def handle_googleapiclient_error(error, message=None, code=None, http_response=N
if isinstance(error, socket.timeout) or (
isinstance(error, socket.error) and 'timed out' in str(error)):
return exceptions.DeadlineExceededError(
message='Timed out while making an API call: {0}'.format(error),
message=f'Timed out while making an API call: {error}',
cause=error)
if isinstance(error, httplib2.ServerNotFoundError):
return exceptions.UnavailableError(
message='Failed to establish a connection: {0}'.format(error),
message=f'Failed to establish a connection: {error}',
cause=error)
if not isinstance(error, googleapiclient.errors.HttpError):
return exceptions.UnknownError(
message='Unknown error while making a remote service call: {0}'.format(error),
message=f'Unknown error while making a remote service call: {error}',
cause=error)

if not code:
Expand Down
4 changes: 2 additions & 2 deletions firebase_admin/_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
This module provides utilities for making HTTP calls using the requests library.
"""

from google.auth import transport
from google.auth import transport # type: ignore
import requests
from requests.packages.urllib3.util import retry # pylint: disable=import-error
from requests.packages.urllib3.util import retry # type: ignore # pylint: disable=import-error


if hasattr(retry.Retry.DEFAULT, 'allowed_methods'):
Expand Down
182 changes: 182 additions & 0 deletions firebase_admin/_http_client_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
# Copyright 2022 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Internal async HTTP client module.

This module provides utilities for making async HTTP calls using the aiohttp library.
"""

import json

import aiohttp
from aiohttp.client_exceptions import ClientResponseError
from google.auth.transport import _aiohttp_requests # type: ignore
from google.auth.transport._aiohttp_requests import _CombinedResponse # type: ignore


DEFAULT_RETRY_ATTEMPTS = 4
DEFAULT_RETRY_CODES = (500, 503)
DEFAULT_TIMEOUT_SECONDS = 120


class HttpClientAsync:
"""Base HTTP client used to make aiohttp calls.

HttpClientAsync maintains an aiohttp session, and handles request authentication and retries if
necessary.
"""

def __init__(
self,
credential=None,
session=None,
base_url='',
headers=None,
retry_attempts=DEFAULT_RETRY_ATTEMPTS,
retry_codes=DEFAULT_RETRY_CODES,
timeout=DEFAULT_TIMEOUT_SECONDS
):
"""Creates a new HttpClientAsync instance from the provided arguments.

If a credential is provided, initializes a new aiohttp client session authorized with it.
If neither a credential nor a session is provided, initializes a new unauthorized client
session.

Args:
credential: A Google credential that can be used to authenticate requests (optional).
session: A custom aiohttp session (optional).
base_url: A URL prefix to be added to all outgoing requests (optional).
headers: A map of headers to be added to all outgoing requests (optional).
retry_attempts: The maximum number of retries that should be attempeted for a request
(optional).
retry_codes: A list of status codes for which the request retry should be attempted
(optional).
timeout: A request timeout in seconds. Defaults to 120 seconds when not specified. Set to
None to disable timeouts (optional).
"""
if credential:
self._session = _aiohttp_requests.AuthorizedSession(
credential,
max_refresh_attempts=retry_attempts,
refresh_status_codes=retry_codes,
refresh_timeout=timeout
)
elif session:
self._session = session
else:
self._session = aiohttp.ClientSession() # pylint: disable=redefined-variable-type

if headers:
self._session.headers.update(headers)
self._base_url = base_url
self._timeout = timeout

@property
def session(self):
return self._session

@property
def base_url(self):
return self._base_url

@property
def timeout(self):
return self._timeout

async def parse_body(self, resp):
raise NotImplementedError

async def request(self, method, url, **kwargs):
"""Makes an async HTTP call using the aiohttp library.

This is the sole entry point to the aiohttp library. All other helper methods in this
class call this method to send async HTTP requests out. Refer to
http://docs.python-requests.org/en/master/api/ for more information on supported options
and features.

Args:
method: HTTP method name as a string (e.g. get, post).
url: URL of the remote endpoint.
**kwargs: An additional set of keyword arguments to be passed into the aiohttp API
(e.g. json, params, timeout).

Returns:
Response: A ``_CombinedResponse`` wrapped ``ClientResponse`` object.

Raises:
ClientResponseWithBodyError: Any requests exceptions encountered while making the async
HTTP call.
"""
if 'timeout' not in kwargs:
kwargs['timeout'] = self.timeout
resp = await self._session.request(method, self.base_url + url, **kwargs)
wrapped_resp = _CombinedResponse(resp)

# Get response content from StreamReader before it is closed by error throw.
resp_content = await wrapped_resp.content()

# Catch response error and re-release it after appending response body needed to
# determine the underlying reason for the error.
try:
resp.raise_for_status()
except ClientResponseError as err:
raise ClientResponseWithBodyError(
err.request_info,
err.history,
wrapped_resp,
resp_content
) from err
return wrapped_resp

async def headers(self, method, url, **kwargs):
resp = await self.request(method, url, **kwargs)
return resp.headers

async def body_and_response(self, method, url, **kwargs):
resp = await self.request(method, url, **kwargs)
return await self.parse_body(resp), resp

async def body(self, method, url, **kwargs):
resp = await self.request(method, url, **kwargs)
return await self.parse_body(resp)

async def headers_and_body(self, method, url, **kwargs):
resp = await self.request(method, url, **kwargs)
return await resp.headers, self.parse_body(resp)

async def close(self):
if self._session is not None:
await self._session.close()
self._session = None


class JsonHttpClientAsync(HttpClientAsync):
"""An async HTTP client that parses response messages as JSON."""

def __init__(self, **kwargs):
HttpClientAsync.__init__(self, **kwargs)

async def parse_body(self, resp):
content = await resp.content()
return json.loads(content)


class ClientResponseWithBodyError(aiohttp.ClientResponseError):
"""A ClientResponseError wrapper to hold the response body of the underlying failed
aiohttp request.
"""
def __init__(self, request_info, history, response, response_content):
super().__init__(request_info, history)
self.response = response
self.response_content = response_content
42 changes: 35 additions & 7 deletions firebase_admin/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import json

import google.auth
import google.auth # type: ignore
import requests

import firebase_admin
Expand Down Expand Up @@ -89,7 +89,7 @@ def _get_initialized_app(app):
return app

raise ValueError('Illegal app argument. Argument must be of type '
' firebase_admin.App, but given "{0}".'.format(type(app)))
f' firebase_admin.App, but given "{type(app)}".')



Expand Down Expand Up @@ -125,6 +125,34 @@ def handle_platform_error_from_requests(error, handle_func=None):

return exc if exc else _handle_func_requests(error, message, error_dict)

async def handle_platform_error_from_aiohttp(error, handle_func=None):
"""Constructs a ``FirebaseError`` from the given requests error.

This can be used to handle errors returned by Google Cloud Platform (GCP) APIs.

Args:
error: An error raised by the aiohttp module while making an HTTP call to a GCP API.
handle_func: A function that can be used to handle platform errors in a custom way. When
specified, this function will be called with three arguments. It has the same
signature as ```_handle_func_requests``, but may return ``None``.

Returns:
FirebaseError: A ``FirebaseError`` that can be raised to the user code.
"""
if error.response is None:
return handle_requests_error(error)

response = error.response
content = error.response_content.decode()
status_code = response.status
error_dict, message = _parse_platform_error(content, status_code)
exc = None
if handle_func:
exc = handle_func(error, message, error_dict)

# TODO: Implement aiohttp version of ``_handle_func_requests``.
return exc if exc else _handle_func_requests(error, message, error_dict)


def handle_operation_error(error):
"""Constructs a ``FirebaseError`` from the given operation error.
Expand All @@ -137,7 +165,7 @@ def handle_operation_error(error):
"""
if not isinstance(error, dict):
return exceptions.UnknownError(
message='Unknown error while making a remote service call: {0}'.format(error),
message=f'Unknown error while making a remote service call: {error}',
cause=error)

rpc_code = error.get('code')
Expand Down Expand Up @@ -182,15 +210,15 @@ def handle_requests_error(error, message=None, code=None):
"""
if isinstance(error, requests.exceptions.Timeout):
return exceptions.DeadlineExceededError(
message='Timed out while making an API call: {0}'.format(error),
message=f'Timed out while making an API call: {error}',
cause=error)
if isinstance(error, requests.exceptions.ConnectionError):
return exceptions.UnavailableError(
message='Failed to establish a connection: {0}'.format(error),
message=f'Failed to establish a connection: {error}',
cause=error)
if error.response is None:
return exceptions.UnknownError(
message='Unknown error while making a remote service call: {0}'.format(error),
message=f'Unknown error while making a remote service call: {error}',
cause=error)

if not code:
Expand Down Expand Up @@ -237,7 +265,7 @@ def _parse_platform_error(content, status_code):
error_dict = data.get('error', {})
msg = error_dict.get('message')
if not msg:
msg = 'Unexpected HTTP response with status: {0}; body: {1}'.format(status_code, content)
msg = f'Unexpected HTTP response with status: {status_code}; body: {content}'
return error_dict, msg


Expand Down
16 changes: 8 additions & 8 deletions firebase_admin/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import json

from googleapiclient import http
from googleapiclient import http # type: ignore
from googleapiclient import _auth
import requests

Expand Down Expand Up @@ -228,7 +228,7 @@ class TopicManagementResponse:

def __init__(self, resp):
if not isinstance(resp, dict) or 'results' not in resp:
raise ValueError('Unexpected topic management response: {0}.'.format(resp))
raise ValueError(f'Unexpected topic management response: {resp}.')
self._success_count = 0
self._failure_count = 0
self._errors = []
Expand Down Expand Up @@ -328,7 +328,7 @@ def __init__(self, app):
self._fcm_url = _MessagingService.FCM_URL.format(project_id)
self._fcm_headers = {
'X-GOOG-API-FORMAT-VERSION': '2',
'X-FIREBASE-CLIENT': 'fire-admin-python/{0}'.format(firebase_admin.__version__),
'X-FIREBASE-CLIENT': f'fire-admin-python/{firebase_admin.__version__}',
}
timeout = app.options.get('httpTimeout', _http_client.DEFAULT_TIMEOUT_SECONDS)
self._credential = app.credential.get_credential()
Expand Down Expand Up @@ -407,12 +407,12 @@ def make_topic_management_request(self, tokens, topic, operation):
if not isinstance(topic, str) or not topic:
raise ValueError('Topic must be a non-empty string.')
if not topic.startswith('/topics/'):
topic = '/topics/{0}'.format(topic)
topic = f'/topics/{topic}'
data = {
'to': topic,
'registration_tokens': tokens,
}
url = '{0}/{1}'.format(_MessagingService.IID_URL, operation)
url = f'{_MessagingService.IID_URL}/{operation}'
try:
resp = self._client.body(
'post',
Expand Down Expand Up @@ -458,10 +458,10 @@ def _handle_iid_error(self, error):
code = data.get('error')
msg = None
if code:
msg = 'Error while calling the IID service: {0}'.format(code)
msg = f'Error while calling the IID service: {code}'
else:
msg = 'Unexpected HTTP response with status: {0}; body: {1}'.format(
error.response.status_code, error.response.content.decode())
msg = (f'Unexpected HTTP response with status: {error.response.status_code}; '
f'body: {error.response.content.decode()}')

return _utils.handle_requests_error(error, msg)

Expand Down
Loading