Skip to content

Commit

Permalink
Minor improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
olokobayusuf committed Mar 6, 2024
1 parent 42b1f8f commit 9a7faea
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
## 0.0.29
*INCOMPLETE*
+ Minor fixes and improvements.

## 0.0.28
+ Added `fxn create --cloud` CLI shorthand flag for setting the predictor type to `PredictorType.Cloud`.
Expand Down
2 changes: 1 addition & 1 deletion fxn/cli/predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async def _predict_async (tag: str, context: Context, raw_outputs: bool):
inputs = { context.args[i].replace("-", ""): _parse_value(context.args[i+1]) for i in range(0, len(context.args), 2) }
# Stream
fxn = Function(get_access_key())
async for prediction in fxn.predictions.stream(tag, inputs=inputs, raw_outputs=raw_outputs, return_binary_path=True):
async for prediction in fxn.predictions.stream(tag, inputs=inputs, raw_outputs=raw_outputs, return_binary_path=True, verbose=True):
# Parse results
images = [value for value in prediction.results or [] if isinstance(value, Image.Image)]
prediction.results = [_serialize_value(value) for value in prediction.results] if prediction.results is not None else None
Expand Down
40 changes: 26 additions & 14 deletions fxn/services/prediction/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def create (
raw_outputs: bool=False,
return_binary_path: bool=True,
data_url_limit: int=None,
verbose: bool=False
) -> Prediction:
"""
Create a prediction.
Expand All @@ -54,6 +55,7 @@ def create (
raw_outputs (bool): Skip converting output values into Pythonic types. This only applies to `CLOUD` predictions.
return_binary_path (bool): Write binary values to file and return a `Path` instead of returning `BytesIO` instance.
data_url_limit (int): Return a data URL if a given output value is smaller than this size in bytes. This only applies to `CLOUD` predictions.
verbose (bool): Use verbose logging.
Returns:
Prediction: Created prediction.
Expand All @@ -71,33 +73,35 @@ def create (
headers={
"Authorization": f"Bearer {self.client.access_key}",
"fxn-client": self.__get_client_id(),
"fxn-configuration-token": "" # INCOMPLETE
"fxn-configuration-token": ""
}
)
# Check
prediction = response.json()
try:
response.raise_for_status()
except:
raise RuntimeError(prediction.get("error"))
except Exception as ex:
error = prediction["errors"][0]["message"] if "errors" in prediction else str(ex)
raise RuntimeError(error)
# Parse prediction
prediction = self.__parse_prediction(prediction, raw_outputs=raw_outputs, return_binary_path=return_binary_path)
# Create edge prediction
if prediction.type == PredictorType.Edge:
predictor = self.__load(prediction)
predictor = self.__load(prediction, verbose=verbose)
self.__cache[tag] = predictor
prediction = self.__predict(tag=tag, predictor=predictor, inputs=inputs) if inputs is not None else prediction
# Return
return prediction

async def stream (
async def stream ( # INCOMPLETE # Add edge prediction support
self,
tag: str,
*,
inputs: Dict[str, Union[float, int, str, bool, NDArray, List[Any], Dict[str, Any], Path, Image.Image, Value]] = {},
raw_outputs: bool=False,
return_binary_path: bool=True,
data_url_limit: int=None,
verbose: bool=False
) -> AsyncIterator[Prediction]:
"""
Create a streaming prediction.
Expand All @@ -110,6 +114,7 @@ async def stream (
raw_outputs (bool): Skip converting output values into Pythonic types. This only applies to `CLOUD` predictions.
return_binary_path (bool): Write binary values to file and return a `Path` instead of returning `BytesIO` instance.
data_url_limit (int): Return a data URL if a given output value is smaller than this size in bytes. This only applies to `CLOUD` predictions.
verbose (bool): Use verbose logging.
Returns:
Prediction: Created prediction.
Expand All @@ -122,19 +127,26 @@ async def stream (
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.client.access_key}",
"fxn-client": self.__get_client_id()
"fxn-client": self.__get_client_id(),
"fxn-configuration-token": ""
}
async with ClientSession(headers=headers) as session:
async with session.post(url, data=dumps(inputs)) as response:
async for chunk in response.content.iter_any():
prediction = loads(chunk)
# Check status
if response.status >= 400:
raise RuntimeError(prediction.get("error"))
try:
response.raise_for_status()
except Exception as ex:
error = prediction["errors"][0]["message"] if "errors" in prediction else str(ex)
raise RuntimeError(error)
# Parse prediction
prediction = Prediction(**prediction)
prediction.results = [Value(**value) for value in prediction.results] if prediction.results is not None else None
prediction.results = [self.to_object(value, return_binary_path=return_binary_path) for value in prediction.results] if prediction.results is not None and not raw_outputs else prediction.results
prediction = self.__parse_prediction(prediction, raw_outputs=raw_outputs, return_binary_path=return_binary_path)
# Create edge prediction
if prediction.type == PredictorType.Edge:
predictor = self.__load(prediction, verbose=verbose)
self.__cache[tag] = predictor
prediction = self.__predict(tag=tag, predictor=predictor, inputs=inputs) if inputs is not None else prediction
# Yield
yield prediction

Expand Down Expand Up @@ -266,7 +278,7 @@ def to_value (
# Unsupported
raise RuntimeError(f"Cannot create Function value '{name}' for object {object} of type {type(object)}")

def __load (self, prediction: Prediction):
def __load (self, prediction: Prediction, *, verbose: bool=False):
# Load fxnc
if self.__fxnc is None:
fxnc_resource = next(x for x in prediction.resources if x.type == "fxn")
Expand All @@ -286,7 +298,7 @@ def __load (self, prediction: Prediction):
for resource in prediction.resources:
if resource.type == "fxn":
continue
path = self.__get_resource_path(resource)
path = self.__get_resource_path(resource, verbose=verbose)
status = fxnc.FXNConfigurationAddResource(configuration, resource.type.encode(), str(path).encode())
assert status.value == FXNStatus.OK, f"Failed to set prediction configuration resource with type {resource.type} for tag {prediction.tag} with status: {status.value}"
# Create predictor
Expand Down Expand Up @@ -386,7 +398,7 @@ def __download_value_data (self, url: str) -> BytesIO:
result = BytesIO(response.content)
return result

def __get_resource_path (self, resource: PredictionResource) -> Path: # INCOMPLETE
def __get_resource_path (self, resource: PredictionResource, *, verbose: bool=False) -> Path: # INCOMPLETE # Verbose
cache_dir = Path.home() / ".fxn" / "cache"
cache_dir.mkdir(exist_ok=True)
res_name = Path(urlparse(resource.url).path).name
Expand Down
29 changes: 25 additions & 4 deletions fxn/services/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
#

from base64 import b64encode
from filetype import guess_mime
from io import BytesIO
from magika import Magika
from pathlib import Path
from requests import put
from rich.progress import open as open_progress, wrap_file
Expand Down Expand Up @@ -87,7 +87,7 @@ def __upload_file (
assert file.exists(), f"Cannot upload {file.name} because the file does not exist"
assert file.is_file(), f"Cannot upload {file.name} becaause it does not point to a file"
# Create data URL
mime = guess_mime(file) or "application/octet-stream"
mime = self.__infer_mime(file)
if file.stat().st_size < (data_url_limit or 0):
with open(file, mode="rb") as f:
buffer = BytesIO(f.read())
Expand All @@ -114,7 +114,7 @@ def __upload_buffer (
assert name, "You must specify the file `name` if the `file` is not a path"
# Create data URL
file.seek(0)
mime = guess_mime(file) or "application/octet-stream"
mime = self.__infer_mime(file)
size = file.getbuffer().nbytes
if size < (data_url_limit or 0):
return self.__create_data_url(file, mime=mime)
Expand All @@ -136,4 +136,25 @@ def __simplify_url (self, url: str) -> str:
parsed_url = urlparse(url)
parsed_url = parsed_url._replace(netloc="cdn.fxn.ai", query="")
url = urlunparse(parsed_url)
return url
return url

def __infer_mime (self, file: Union[str, Path, BytesIO]) -> str:
MAGIC_TO_MIME = {
b"\x00\x61\x73\x6d": "application/wasm"
}
# Read magic
file = Path(file) if isinstance(file, str) else file
if isinstance(file, Path):
with open(file, "rb") as f:
magic = f.read(4)
elif isinstance(file, BytesIO):
magic = file.getvalue()[:4]
# Check known mime
mime = MAGIC_TO_MIME.get(magic)
# Infer
if mime is None:
magika = Magika()
result = magika.identify_bytes(file.getvalue()) if isinstance(file, BytesIO) else magika.identify_path(file)
mime = result.output.mime_type
# Return
return mime
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
aiohttp
filetype
magika
numpy
pillow
requests
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
python_requires=">=3.7",
install_requires=[
"aiohttp",
"filetype",
"magika",
"numpy",
"pillow",
"pydantic>=2.0",
Expand Down
24 changes: 23 additions & 1 deletion test/storage_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from fxn import Function, UploadType
from io import BytesIO
from pathlib import Path
from requests import get

def test_file_upload ():
fxn = Function()
Expand Down Expand Up @@ -34,4 +35,25 @@ def test_buffer_upload_data_url ():
buffer = BytesIO(f.read())
buffer_size = buffer.getbuffer().nbytes
url = fxn.storage.upload(buffer, type=UploadType.Value, name=path.name, data_url_limit=buffer_size + 1, verbose=True)
assert url.startswith("data:")
assert url.startswith("data:")

def test_wasm_upload ():
fxn = Function()
path = Path("../edgefxn/build/WebAssembly/Debug/Predictor.wasm").resolve()
url = fxn.storage.upload(path, type=UploadType.Value, name="predictor.wasm")
response = get(url)
content_type = response.headers.get("Content-Type")
assert content_type == "application/wasm"

def test_js_upload ():
fxn = Function()
path = Path("../edgefxn/build/WebAssembly/Debug/Predictor.js").resolve()
url = fxn.storage.upload(path, type=UploadType.Value, name="predictor.js")
response = get(url)
content_type = response.headers.get("Content-Type")
assert content_type == "application/javascript"

def test_fxn_upload ():
response = get("https://cdn.fxn.ai/edgefxn/0.0.10/libFunction-wasm.so")
content_type = response.headers.get("Content-Type")
assert content_type == "application/wasm"

0 comments on commit 9a7faea

Please sign in to comment.