Skip to content

Commit

Permalink
Implement FileClient2, which raises on error
Browse files Browse the repository at this point in the history
Signed-off-by: Hannes Weisbach <[email protected]>
  • Loading branch information
hannesweisbach committed Jan 30, 2024
1 parent 765ab8d commit 4503b74
Show file tree
Hide file tree
Showing 2 changed files with 448 additions and 0 deletions.
270 changes: 270 additions & 0 deletions pycyphal/application/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,10 +500,280 @@ def __repr__(self) -> str:
return pycyphal.util.repr_attributes(self, self._node, server_node_id=self._server_node_id)


class FileClient2:
"""
A trivial proxy that provides a higher-level and more pythonic API on top of the standard RPC-services
from ``uavcan.file``.
Client instances are created lazily at first request and then kept alive until this instance is closed.
All remote operations raise :class:`FileTimeoutError` on timeout.
In contrast to pycyphal.application.file.FileClient,
pycyphal.application.file.FileClient2 raises exceptions for errors reported
over the network. The intent is to expose better error handling in the API;
especially avoid ``isintance()`` tests on return values.
"""

def __init__(
self,
local_node: pycyphal.application.Node,
server_node_id: int,
response_timeout: float = 3.0,
priority: pycyphal.transport.Priority = pycyphal.transport.Priority.SLOW,
) -> None:
"""
:param local_node: RPC-service clients will be created on this node.
:param server_node_id: All requests will be sent to this node-ID.
:param response_timeout: Raise :class:`FileTimeoutError` if the server does not respond in this time.
:param priority: Transfer priority for requests (and, therefore, responses).
"""
self._node = local_node
self._server_node_id = server_node_id
self._response_timeout = float(response_timeout)
# noinspection PyArgumentList
self._priority = pycyphal.transport.Priority(priority)

self._clients: typing.Dict[typing.Type[object], pycyphal.presentation.Client[object]] = {}

# noinspection PyUnresolvedReferences
self._data_transfer_capacity = int(nunavut_support.get_model(Unstructured)["value"].data_type.capacity)

@property
def data_transfer_capacity(self) -> int:
"""
A convenience constant derived from DSDL: the maximum number of bytes per read/write transfer.
Larger reads/writes are non-atomic.
"""
return self._data_transfer_capacity

@property
def server_node_id(self) -> int:
"""
The node-ID of the remote file server.
"""
return self._server_node_id

def close(self) -> None:
"""
Close all RPC-service client instances created up to this point.
"""
for c in self._clients.values():
c.close()
self._clients.clear()

async def list(self, path: str) -> typing.AsyncIterable[str]:
"""
Proxy for ``uavcan.file.List``. Invokes requests in series until all elements are listed.
"""
for index in itertools.count():
res = await self._call(List, List.Request(entry_index=index, directory_path=Path(path)))
assert isinstance(res, List.Response)
p = res.entry_base_name.path.tobytes().decode(errors="ignore")
if p:
yield str(p)
else:
break

async def get_info(self, path: str) -> GetInfo.Response:
"""
Proxy for ``uavcan.file.GetInfo``.
:raises OSError: If the operation failed; see ``uavcan.file.Error``
"""
res = await self._call(GetInfo, GetInfo.Request(Path(path)))
assert isinstance(res, GetInfo.Response)
_raise_if_error(res.error, path)
return res

async def remove(self, path: str) -> None:
"""
Proxy for ``uavcan.file.Modify``.
:raises OSError: If the operation failed; see ``uavcan.file.Error``
"""
res = await self._call(Modify, Modify.Request(source=Path(path)))
assert isinstance(res, Modify.Response)
_raise_if_error(res.error, path)

async def touch(self, path: str) -> None:
"""
Proxy for ``uavcan.file.Modify``.
:raises OSError: If the operation failed; see ``uavcan.file.Error``
"""
res = await self._call(Modify, Modify.Request(destination=Path(path)))
assert isinstance(res, Modify.Response)
_raise_if_error(res.error, path)

async def copy(self, src: str, dst: str, overwrite: bool = False) -> None:
"""
Proxy for ``uavcan.file.Modify``.
:raises OSError: If the operation failed; see ``uavcan.file.Error``
"""
res = await self._call(
Modify,
Modify.Request(
preserve_source=True,
overwrite_destination=overwrite,
source=Path(src),
destination=Path(dst),
),
)
assert isinstance(res, Modify.Response)
_raise_if_error(res.error, f"{src}->{dst}")

async def move(self, src: str, dst: str, overwrite: bool = False) -> None:
"""
Proxy for ``uavcan.file.Modify``.
:raises OSError: If the operation failed; see ``uavcan.file.Error``
"""
res = await self._call(
Modify,
Modify.Request(
preserve_source=False,
overwrite_destination=overwrite,
source=Path(src),
destination=Path(dst),
),
)
assert isinstance(res, Modify.Response)
_raise_if_error(res.error, f"{src}->{dst}")

async def read(self, path: str, offset: int = 0, size: typing.Optional[int] = None) -> bytes:
"""
Proxy for ``uavcan.file.Read``.
:param path:
The file to read.
:param offset:
Read offset from the beginning of the file.
Currently, it must be positive; negative offsets from the end of the file may be supported later.
:param size:
Read requests will be stopped after the end of the file is reached or at least this many bytes are read.
If None (default), the entire file will be read (this may exhaust local memory).
If zero, this call is a no-op.
:raises OSError: If the read operation failed; see ``uavcan.file.Error``
:returns:
data on success (empty if the offset is out of bounds or the file is empty).
"""

async def once() -> bytes:
res = await self._call(Read, Read.Request(offset=offset, path=Path(path)))
assert isinstance(res, Read.Response)
_raise_if_error(res.error, path)
return bytes(res.data.value.tobytes())

if size is None:
size = 2**64
data = b""
while len(data) < size:
out = await once()
assert isinstance(out, bytes)
if not out:
break
data += out
offset += len(out)
return data

async def write(
self, path: str, data: typing.Union[memoryview, bytes], offset: int = 0, *, truncate: bool = True
) -> None:
"""
Proxy for ``uavcan.file.Write``.
:param path:
The file to write.
:param data:
The data to write at the specified offset.
The number of write requests depends on the size of data.
:param offset:
Write offset from the beginning of the file.
Currently, it must be positive; negative offsets from the end of the file may be supported later.
:param truncate:
If True, the rest of the file after ``offset + len(data)`` will be truncated.
This is done by sending an empty write request, as prescribed by the Specification.
:raises OSError: If the write operation failed; see ``uavcan.file.Error``
"""

async def once(d: typing.Union[memoryview, bytes]) -> None:
res = await self._call(
Write,
Write.Request(offset, path=Path(path), data=Unstructured(np.frombuffer(d, np.uint8))),
)
assert isinstance(res, Write.Response)
_raise_if_error(res.error, path)

limit = self.data_transfer_capacity
while len(data) > 0:
frag, data = data[:limit], data[limit:]
await once(frag)
offset += len(frag)
if truncate:
await once(b"")

async def _call(self, ty: typing.Type[object], request: object) -> object:
try:
cln = self._clients[ty]
except LookupError:
self._clients[ty] = self._node.make_client(ty, self._server_node_id)
cln = self._clients[ty]
cln.response_timeout = self._response_timeout
cln.priority = self._priority

result = await cln.call(request)
if result is None:
raise FileTimeoutError(f"File service call timed out on {cln}")
return result[0]

def __repr__(self) -> str:
return pycyphal.util.repr_attributes(self, self._node, server_node_id=self._server_node_id)


class FileTimeoutError(pycyphal.application.NetworkTimeoutError):
"""
The specialization of the network error for file access.
"""


_perror_uavcan = {
Error.OK: "OK",
Error.UNKNOWN_ERROR: "Unknown error",
Error.NOT_FOUND: "Not found", # FileNotFoundError
Error.IO_ERROR: "I/O error",
Error.ACCESS_DENIED: "Access denied", # PermissionError
Error.IS_DIRECTORY: "Is a directory", # IsADirectoryError
Error.INVALID_VALUE: "Invalid value",
Error.FILE_TOO_LARGE: "File too large",
Error.OUT_OF_SPACE: "Out of space",
Error.NOT_SUPPORTED: "Not supported", # io.UnsupportedOperation
}


def _raise_if_error(error: Error, filename: str) -> None:
if error.value == Error.OK:
return

raise OSError(error.value, _perror_uavcan[error.value], filename)
# alternative:
# raise FileError(error, filename)


class FileError(OSError):
"""
Exception type specialized for uavcan.file.Error.
"""

def __init__(self, error: Error, filename: str):
super().__init__(error.value, _perror_uavcan[error.value], filename)


_logger = logging.getLogger(__name__)
Loading

0 comments on commit 4503b74

Please sign in to comment.