From 9a7faeae4e9472e25abddb6cb52a361b6fc27289 Mon Sep 17 00:00:00 2001 From: Yusuf Olokoba Date: Wed, 6 Mar 2024 11:49:02 -0500 Subject: [PATCH] Minor improvements --- Changelog.md | 2 +- fxn/cli/predict.py | 2 +- fxn/services/prediction/service.py | 40 +++++++++++++++++++----------- fxn/services/storage.py | 29 +++++++++++++++++++--- requirements.txt | 2 +- setup.py | 2 +- test/storage_test.py | 24 +++++++++++++++++- 7 files changed, 78 insertions(+), 23 deletions(-) diff --git a/Changelog.md b/Changelog.md index fdca00e..55b7e94 100644 --- a/Changelog.md +++ b/Changelog.md @@ -1,5 +1,5 @@ ## 0.0.29 -*INCOMPLETE* ++ Minor fixes and improvements. ## 0.0.28 + Added `fxn create --cloud` CLI shorthand flag for setting the predictor type to `PredictorType.Cloud`. diff --git a/fxn/cli/predict.py b/fxn/cli/predict.py index 0027f5e..1c4374e 100644 --- a/fxn/cli/predict.py +++ b/fxn/cli/predict.py @@ -35,7 +35,7 @@ async def _predict_async (tag: str, context: Context, raw_outputs: bool): inputs = { context.args[i].replace("-", ""): _parse_value(context.args[i+1]) for i in range(0, len(context.args), 2) } # Stream fxn = Function(get_access_key()) - async for prediction in fxn.predictions.stream(tag, inputs=inputs, raw_outputs=raw_outputs, return_binary_path=True): + async for prediction in fxn.predictions.stream(tag, inputs=inputs, raw_outputs=raw_outputs, return_binary_path=True, verbose=True): # Parse results images = [value for value in prediction.results or [] if isinstance(value, Image.Image)] prediction.results = [_serialize_value(value) for value in prediction.results] if prediction.results is not None else None diff --git a/fxn/services/prediction/service.py b/fxn/services/prediction/service.py index e2f72a3..497fd22 100644 --- a/fxn/services/prediction/service.py +++ b/fxn/services/prediction/service.py @@ -44,6 +44,7 @@ def create ( raw_outputs: bool=False, return_binary_path: bool=True, data_url_limit: int=None, + verbose: bool=False ) -> Prediction: """ Create a prediction. @@ -54,6 +55,7 @@ def create ( raw_outputs (bool): Skip converting output values into Pythonic types. This only applies to `CLOUD` predictions. return_binary_path (bool): Write binary values to file and return a `Path` instead of returning `BytesIO` instance. data_url_limit (int): Return a data URL if a given output value is smaller than this size in bytes. This only applies to `CLOUD` predictions. + verbose (bool): Use verbose logging. Returns: Prediction: Created prediction. @@ -71,26 +73,27 @@ def create ( headers={ "Authorization": f"Bearer {self.client.access_key}", "fxn-client": self.__get_client_id(), - "fxn-configuration-token": "" # INCOMPLETE + "fxn-configuration-token": "" } ) # Check prediction = response.json() try: response.raise_for_status() - except: - raise RuntimeError(prediction.get("error")) + except Exception as ex: + error = prediction["errors"][0]["message"] if "errors" in prediction else str(ex) + raise RuntimeError(error) # Parse prediction prediction = self.__parse_prediction(prediction, raw_outputs=raw_outputs, return_binary_path=return_binary_path) # Create edge prediction if prediction.type == PredictorType.Edge: - predictor = self.__load(prediction) + predictor = self.__load(prediction, verbose=verbose) self.__cache[tag] = predictor prediction = self.__predict(tag=tag, predictor=predictor, inputs=inputs) if inputs is not None else prediction # Return return prediction - async def stream ( + async def stream ( # INCOMPLETE # Add edge prediction support self, tag: str, *, @@ -98,6 +101,7 @@ async def stream ( raw_outputs: bool=False, return_binary_path: bool=True, data_url_limit: int=None, + verbose: bool=False ) -> AsyncIterator[Prediction]: """ Create a streaming prediction. @@ -110,6 +114,7 @@ async def stream ( raw_outputs (bool): Skip converting output values into Pythonic types. This only applies to `CLOUD` predictions. return_binary_path (bool): Write binary values to file and return a `Path` instead of returning `BytesIO` instance. data_url_limit (int): Return a data URL if a given output value is smaller than this size in bytes. This only applies to `CLOUD` predictions. + verbose (bool): Use verbose logging. Returns: Prediction: Created prediction. @@ -122,19 +127,26 @@ async def stream ( headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.client.access_key}", - "fxn-client": self.__get_client_id() + "fxn-client": self.__get_client_id(), + "fxn-configuration-token": "" } async with ClientSession(headers=headers) as session: async with session.post(url, data=dumps(inputs)) as response: async for chunk in response.content.iter_any(): prediction = loads(chunk) # Check status - if response.status >= 400: - raise RuntimeError(prediction.get("error")) + try: + response.raise_for_status() + except Exception as ex: + error = prediction["errors"][0]["message"] if "errors" in prediction else str(ex) + raise RuntimeError(error) # Parse prediction - prediction = Prediction(**prediction) - prediction.results = [Value(**value) for value in prediction.results] if prediction.results is not None else None - prediction.results = [self.to_object(value, return_binary_path=return_binary_path) for value in prediction.results] if prediction.results is not None and not raw_outputs else prediction.results + prediction = self.__parse_prediction(prediction, raw_outputs=raw_outputs, return_binary_path=return_binary_path) + # Create edge prediction + if prediction.type == PredictorType.Edge: + predictor = self.__load(prediction, verbose=verbose) + self.__cache[tag] = predictor + prediction = self.__predict(tag=tag, predictor=predictor, inputs=inputs) if inputs is not None else prediction # Yield yield prediction @@ -266,7 +278,7 @@ def to_value ( # Unsupported raise RuntimeError(f"Cannot create Function value '{name}' for object {object} of type {type(object)}") - def __load (self, prediction: Prediction): + def __load (self, prediction: Prediction, *, verbose: bool=False): # Load fxnc if self.__fxnc is None: fxnc_resource = next(x for x in prediction.resources if x.type == "fxn") @@ -286,7 +298,7 @@ def __load (self, prediction: Prediction): for resource in prediction.resources: if resource.type == "fxn": continue - path = self.__get_resource_path(resource) + path = self.__get_resource_path(resource, verbose=verbose) status = fxnc.FXNConfigurationAddResource(configuration, resource.type.encode(), str(path).encode()) assert status.value == FXNStatus.OK, f"Failed to set prediction configuration resource with type {resource.type} for tag {prediction.tag} with status: {status.value}" # Create predictor @@ -386,7 +398,7 @@ def __download_value_data (self, url: str) -> BytesIO: result = BytesIO(response.content) return result - def __get_resource_path (self, resource: PredictionResource) -> Path: # INCOMPLETE + def __get_resource_path (self, resource: PredictionResource, *, verbose: bool=False) -> Path: # INCOMPLETE # Verbose cache_dir = Path.home() / ".fxn" / "cache" cache_dir.mkdir(exist_ok=True) res_name = Path(urlparse(resource.url).path).name diff --git a/fxn/services/storage.py b/fxn/services/storage.py index 5a59ba6..adf981b 100644 --- a/fxn/services/storage.py +++ b/fxn/services/storage.py @@ -4,8 +4,8 @@ # from base64 import b64encode -from filetype import guess_mime from io import BytesIO +from magika import Magika from pathlib import Path from requests import put from rich.progress import open as open_progress, wrap_file @@ -87,7 +87,7 @@ def __upload_file ( assert file.exists(), f"Cannot upload {file.name} because the file does not exist" assert file.is_file(), f"Cannot upload {file.name} becaause it does not point to a file" # Create data URL - mime = guess_mime(file) or "application/octet-stream" + mime = self.__infer_mime(file) if file.stat().st_size < (data_url_limit or 0): with open(file, mode="rb") as f: buffer = BytesIO(f.read()) @@ -114,7 +114,7 @@ def __upload_buffer ( assert name, "You must specify the file `name` if the `file` is not a path" # Create data URL file.seek(0) - mime = guess_mime(file) or "application/octet-stream" + mime = self.__infer_mime(file) size = file.getbuffer().nbytes if size < (data_url_limit or 0): return self.__create_data_url(file, mime=mime) @@ -136,4 +136,25 @@ def __simplify_url (self, url: str) -> str: parsed_url = urlparse(url) parsed_url = parsed_url._replace(netloc="cdn.fxn.ai", query="") url = urlunparse(parsed_url) - return url \ No newline at end of file + return url + + def __infer_mime (self, file: Union[str, Path, BytesIO]) -> str: + MAGIC_TO_MIME = { + b"\x00\x61\x73\x6d": "application/wasm" + } + # Read magic + file = Path(file) if isinstance(file, str) else file + if isinstance(file, Path): + with open(file, "rb") as f: + magic = f.read(4) + elif isinstance(file, BytesIO): + magic = file.getvalue()[:4] + # Check known mime + mime = MAGIC_TO_MIME.get(magic) + # Infer + if mime is None: + magika = Magika() + result = magika.identify_bytes(file.getvalue()) if isinstance(file, BytesIO) else magika.identify_path(file) + mime = result.output.mime_type + # Return + return mime \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index c7b1510..abadc9c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ aiohttp -filetype +magika numpy pillow requests diff --git a/setup.py b/setup.py index 03fa407..469a1a6 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ python_requires=">=3.7", install_requires=[ "aiohttp", - "filetype", + "magika", "numpy", "pillow", "pydantic>=2.0", diff --git a/test/storage_test.py b/test/storage_test.py index b79c7ff..c6c16e2 100644 --- a/test/storage_test.py +++ b/test/storage_test.py @@ -6,6 +6,7 @@ from fxn import Function, UploadType from io import BytesIO from pathlib import Path +from requests import get def test_file_upload (): fxn = Function() @@ -34,4 +35,25 @@ def test_buffer_upload_data_url (): buffer = BytesIO(f.read()) buffer_size = buffer.getbuffer().nbytes url = fxn.storage.upload(buffer, type=UploadType.Value, name=path.name, data_url_limit=buffer_size + 1, verbose=True) - assert url.startswith("data:") \ No newline at end of file + assert url.startswith("data:") + +def test_wasm_upload (): + fxn = Function() + path = Path("../edgefxn/build/WebAssembly/Debug/Predictor.wasm").resolve() + url = fxn.storage.upload(path, type=UploadType.Value, name="predictor.wasm") + response = get(url) + content_type = response.headers.get("Content-Type") + assert content_type == "application/wasm" + +def test_js_upload (): + fxn = Function() + path = Path("../edgefxn/build/WebAssembly/Debug/Predictor.js").resolve() + url = fxn.storage.upload(path, type=UploadType.Value, name="predictor.js") + response = get(url) + content_type = response.headers.get("Content-Type") + assert content_type == "application/javascript" + +def test_fxn_upload (): + response = get("https://cdn.fxn.ai/edgefxn/0.0.10/libFunction-wasm.so") + content_type = response.headers.get("Content-Type") + assert content_type == "application/wasm" \ No newline at end of file