diff --git a/pycyphal/application/file.py b/pycyphal/application/file.py index 142397e7..bf231681 100644 --- a/pycyphal/application/file.py +++ b/pycyphal/application/file.py @@ -499,11 +499,272 @@ async def _call(self, ty: typing.Type[object], request: object) -> object: def __repr__(self) -> str: return pycyphal.util.repr_attributes(self, self._node, server_node_id=self._server_node_id) +class FileClient2: + """ + A trivial proxy that provides a higher-level and more pythonic API on top of the standard RPC-services + from ``uavcan.file``. + Client instances are created lazily at first request and then kept alive until this instance is closed. + All remote operations raise :class:`FileTimeoutError` on timeout. + + In contrast to pycyphal.application.file.FileClient, + pycyphal.application.file.FileClient2 raises exceptions for error reported + via Cyphal. + """ + + def __init__( + self, + local_node: pycyphal.application.Node, + server_node_id: int, + response_timeout: float = 3.0, + priority: pycyphal.transport.Priority = pycyphal.transport.Priority.SLOW, + ) -> None: + """ + :param local_node: RPC-service clients will be created on this node. + :param server_node_id: All requests will be sent to this node-ID. + :param response_timeout: Raise :class:`FileTimeoutError` if the server does not respond in this time. + :param priority: Transfer priority for requests (and, therefore, responses). + """ + self._node = local_node + self._server_node_id = server_node_id + self._response_timeout = float(response_timeout) + # noinspection PyArgumentList + self._priority = pycyphal.transport.Priority(priority) + + self._clients: typing.Dict[typing.Type[object], pycyphal.presentation.Client[object]] = {} + + # noinspection PyUnresolvedReferences + self._data_transfer_capacity = int(nunavut_support.get_model(Unstructured)["value"].data_type.capacity) + + @property + def data_transfer_capacity(self) -> int: + """ + A convenience constant derived from DSDL: the maximum number of bytes per read/write transfer. + Larger reads/writes are non-atomic. + """ + return self._data_transfer_capacity + + @property + def server_node_id(self) -> int: + """ + The node-ID of the remote file server. + """ + return self._server_node_id + + def close(self) -> None: + """ + Close all RPC-service client instances created up to this point. + """ + for c in self._clients.values(): + c.close() + self._clients.clear() + + async def list(self, path: str) -> typing.AsyncIterable[str]: + """ + Proxy for ``uavcan.file.List``. Invokes requests in series until all elements are listed. + """ + for index in itertools.count(): + res = await self._call(List, List.Request(entry_index=index, directory_path=Path(path))) + assert isinstance(res, List.Response) + p = res.entry_base_name.path.tobytes().decode(errors="ignore") + if p: + yield str(p) + else: + break + + async def get_info(self, path: str) -> GetInfo.Response: + """ + Proxy for ``uavcan.file.GetInfo``. Be sure to check the error code in the returned object. + """ + res = await self._call(GetInfo, GetInfo.Request(Path(path))) + assert isinstance(res, GetInfo.Response) + _raise_if_error(res.error, path) + return res + + async def remove(self, path: str) -> None: + """ + Proxy for ``uavcan.file.Modify``. + + :returns: See ``uavcan.file.Error`` + """ + res = await self._call(Modify, Modify.Request(source=Path(path))) + assert isinstance(res, Modify.Response) + _raise_if_error(res.error, path) + + async def touch(self, path: str) -> None: + """ + Proxy for ``uavcan.file.Modify``. + + :returns: See ``uavcan.file.Error`` + """ + res = await self._call(Modify, Modify.Request(destination=Path(path))) + assert isinstance(res, Modify.Response) + _raise_if_error(res.error, path) + + async def copy(self, src: str, dst: str, overwrite: bool = False) -> None: + """ + Proxy for ``uavcan.file.Modify``. + + :raises: OSError; see ``uavcan.file.Error`` + """ + res = await self._call( + Modify, + Modify.Request( + preserve_source=True, + overwrite_destination=overwrite, + source=Path(src), + destination=Path(dst), + ), + ) + assert isinstance(res, Modify.Response) + _raise_if_error(res.error, f'{src}->{dst}') + + async def move(self, src: str, dst: str, overwrite: bool = False) -> None: + """ + Proxy for ``uavcan.file.Modify``. + + :raises: OSError + """ + res = await self._call( + Modify, + Modify.Request( + preserve_source=False, + overwrite_destination=overwrite, + source=Path(src), + destination=Path(dst), + ), + ) + assert isinstance(res, Modify.Response) + _raise_if_error(res.error, f'{src}->{dst}') + + async def read(self, path: str, offset: int = 0, size: typing.Optional[int] = None) -> bytes: + """ + Proxy for ``uavcan.file.Read``. + + :param path: + The file to read. + + :param offset: + Read offset from the beginning of the file. + Currently, it must be positive; negative offsets from the end of the file may be supported later. + + :param size: + Read requests will be stopped after the end of the file is reached or at least this many bytes are read. + If None (default), the entire file will be read (this may exhaust local memory). + If zero, this call is a no-op. + + :raises: OSError + + :returns: + data on success (empty if the offset is out of bounds or the file is empty). + """ + + async def once() -> bytes: + res = await self._call(Read, Read.Request(offset=offset, path=Path(path))) + assert isinstance(res, Read.Response) + _raise_if_error(res.error, path) + return bytes(res.data.value.tobytes()) + + if size is None: + size = 2**64 + data = b"" + while len(data) < size: + out = await once() + assert isinstance(out, bytes) + if not out: + break + data += out + offset += len(out) + return data + + async def write( + self, path: str, data: typing.Union[memoryview, bytes], offset: int = 0, *, truncate: bool = True + ) -> None: + """ + Proxy for ``uavcan.file.Write``. + + :param path: + The file to write. + + :param data: + The data to write at the specified offset. + The number of write requests depends on the size of data. + + :param offset: + Write offset from the beginning of the file. + Currently, it must be positive; negative offsets from the end of the file may be supported later. + + :param truncate: + If True, the rest of the file after ``offset + len(data)`` will be truncated. + This is done by sending an empty write request, as prescribed by the Specification. + + :raises: OSError + """ + + async def once(d: typing.Union[memoryview, bytes]) -> None: + res = await self._call( + Write, + Write.Request(offset, path=Path(path), data=Unstructured(np.frombuffer(d, np.uint8))), + ) + assert isinstance(res, Write.Response) + _raise_if_error(res.error, path) + + limit = self.data_transfer_capacity + while len(data) > 0: + frag, data = data[:limit], data[limit:] + await once(frag) + offset += len(frag) + if truncate: + await once(b"") + + async def _call(self, ty: typing.Type[object], request: object) -> object: + try: + cln = self._clients[ty] + except LookupError: + self._clients[ty] = self._node.make_client(ty, self._server_node_id) + cln = self._clients[ty] + cln.response_timeout = self._response_timeout + cln.priority = self._priority + + result = await cln.call(request) + if result is None: + raise FileTimeoutError(f"File service call timed out on {cln}") + return result[0] + + def __repr__(self) -> str: + return pycyphal.util.repr_attributes(self, self._node, server_node_id=self._server_node_id) class FileTimeoutError(pycyphal.application.NetworkTimeoutError): """ The specialization of the network error for file access. """ +_perror_uavcan = { + Error.OK: "OK", + Error.UNKNOWN_ERROR: "Unknown error", + Error.NOT_FOUND: "Not found", # FileNotFoundError + Error.IO_ERROR: "I/O error", + Error.ACCESS_DENIED: "Access denied", # PermissionError + Error.IS_DIRECTORY: "Is a directory", # IsADirectoryError + Error.INVALID_VALUE: "Invalid value", + Error.FILE_TOO_LARGE: "File too large", + Error.OUT_OF_SPACE: "Out of space", + Error.NOT_SUPPORTED: "Not supported" # io.UnsupportedOperation + } + +def _raise_if_error(error: Error, filename: str) -> None: + if error.value == Error.OK: + return + + raise OSError(error.value, _perror_uavcan[error.value], filename) + # alternative: + # raise FileError(error, filename) + +class FileError(OSError): + """ + Exception type specialized for uavcan.file.Error. + """ + + def __init__(self, error: Error, filename: str): + super().__init__(error.value, _perror_uavcan[error.value], filename) _logger = logging.getLogger(__name__) diff --git a/tests/application/file.py b/tests/application/file.py index 9abd5081..6ca6926c 100644 --- a/tests/application/file.py +++ b/tests/application/file.py @@ -166,3 +166,180 @@ async def ls(path: str) -> typing.List[str]: await asyncio.sleep(1.0) shutil.rmtree(root_a, ignore_errors=True) shutil.rmtree(root_b, ignore_errors=True) + +async def _unittest_file2(compiled: typing.List[pycyphal.dsdl.GeneratedPackageInfo]) -> None: + from pycyphal.application import make_node, NodeInfo + from pycyphal.transport.udp import UDPTransport + from pycyphal.application.file import FileClient2, FileServer, Error + + assert compiled + asyncio.get_running_loop().slow_callback_duration = 3.0 + + root_a = mkdtemp(".file", "a.") + root_b = mkdtemp(".file", "b.") + srv_node = make_node( + NodeInfo(name="org.opencyphal.pycyphal.test.file.server"), + transport=UDPTransport("127.0.0.1", 222, service_transfer_multiplier=2), + ) + cln_node = make_node( + NodeInfo(name="org.opencyphal.pycyphal.test.file.client"), + transport=UDPTransport("127.0.0.1", 223, service_transfer_multiplier=2), + ) + try: + srv_node.start() + file_server = FileServer(srv_node, [root_a, root_b]) + assert (Path(root_a), Path("abc")) == file_server.locate(Path("abc")) + assert [] == list(file_server.glob("*")) + + cln_node.start() + cln = FileClient2(cln_node, 222) + + async def ls(path: str) -> typing.List[str]: + out: typing.List[str] = [] + async for e in cln.list(path): + out.append(e) + return out + + assert [] == await ls("") + assert [] == await ls("nonexistent/directory") + with pytest.raises(OSError) as e: + await cln.get_info("none") + assert e.value.errno == Error.NOT_FOUND + + await cln.touch("a/foo/x") + await cln.touch("a/foo/y") + await cln.touch("b") + assert ["foo"] == await ls("a") + + # Make sure files are created. + assert [ + (file_server.roots[0], Path("a/foo/x")), + (file_server.roots[0], Path("a/foo/y")), + ] == list(sorted(file_server.glob("a/foo/*"))) + + assert await cln.read("a/foo/x") == b"" + assert await cln.read("/a/foo/x") == b"" # Slash or no slash makes no difference. + with pytest.raises(OSError) as e: + await cln.read("a/foo/z") + assert e.value.errno == Error.NOT_FOUND + with pytest.raises(OSError) as e: + await cln.get_info("a/foo/z") + assert e.value.errno == Error.NOT_FOUND + + # Write non-existent file + with pytest.raises(OSError) as e: + await cln.write("a/foo/z", bytes(range(200)) * 3) + assert e.value.errno == Error.NOT_FOUND + + # Write into empty file + await cln.write("a/foo/x", bytes(range(200)) * 3) + assert await cln.read("a/foo/x") == bytes(range(200)) * 3 + assert (await cln.get_info("a/foo/x")).size == 600 + + # Truncation -- this write is shorter + hundred = bytes(x ^ 0xFF for x in range(100)) + await cln.write("a/foo/x", hundred * 4) + assert (await cln.get_info("a/foo/x")).size == 400 + assert await cln.read("a/foo/x") == (hundred * 4) + assert (await cln.get_info("a/foo/x")).size == 400 + + # Fill in the middle without truncation + ref = bytearray(hundred * 4) + for i in range(100): + ref[i + 100] = 0x55 + assert len(ref) == 400 + assert (await cln.get_info("a/foo/x")).size == 400 + await cln.write("a/foo/x", b"\x55" * 100, offset=100, truncate=False) + assert (await cln.get_info("a/foo/x")).size == 400 + assert await cln.read("a/foo/x") == ref + + # Fill in the middle with truncation + await cln.write("a/foo/x", b"\xAA" * 50, offset=50) + assert (await cln.get_info("a/foo/x")).size == 100 + assert await cln.read("a/foo/x") == hundred[:50] + b"\xAA" * 50 + + # Directories + info = await cln.get_info("a/foo") + print("a/foo:", info) + assert info.error.value == Error.OK + assert info.is_writeable + assert info.is_readable + assert not info.is_file_not_directory + assert not info.is_link + + with pytest.raises(OSError) as e: + await cln.get_info("a/foo/nothing") + assert e.value.errno == Error.NOT_FOUND + with pytest.raises(OSError) as e: + await cln.write("a/foo", b"123") + assert e.value.errno in (Error.IS_DIRECTORY, Error.ACCESS_DENIED) # Windows compatibility + + # Removal + with pytest.raises(OSError) as e: + await cln.remove("a/foo/z") + assert e.value.errno == Error.NOT_FOUND + await cln.remove("a/foo/x") + await cln.touch("a/foo/x") # Put it back + await cln.remove("a/foo/") # Removed + with pytest.raises(OSError) as e: + await cln.remove("a/foo/") + assert e.value.errno == Error.NOT_FOUND # Not found + + # Copy + await cln.touch("r/a") + await cln.touch("r/b/0") + await cln.touch("r/b/1") + assert not (await cln.get_info("r/b")).is_file_not_directory + assert ["a", "b"] == await ls("r") + await cln.copy("r/b", "r/c") + assert ["a", "b", "c"] == await ls("r") + with pytest.raises(OSError) as e: + await cln.copy("r/a", "r/c") # Overwrite not enabled + assert e.value.errno == Error.INVALID_VALUE + assert ["a", "b", "c"] == await ls("r") + assert not (await cln.get_info("r/c")).is_file_not_directory + await cln.copy("/r/a", "r/c", overwrite=True) + assert (await cln.get_info("r/c")).is_file_not_directory + + # Move + assert ["a", "b", "c"] == await ls("r") + with pytest.raises(OSError) as e: + await cln.move("/r/a", "r/c") + assert e.value.errno == Error.INVALID_VALUE # Overwrite not enabled + await cln.move("/r/a", "r/c", overwrite=True) + assert ["b", "c"] == await ls("r") + with pytest.raises(OSError) as e: + await cln.move("/r/a", "r/c", overwrite=True) + assert e.value.errno == Error.NOT_FOUND + assert ["b", "c"] == await ls("r") + + # Access protected files + if sys.platform.startswith("linux"): # pragma: no branch + file_server.roots.append(Path("/")) + info = await cln.get_info("dev/null") + print("/dev/null:", info) + assert info.error.value == 0 + assert not info.is_link + assert info.is_writeable + assert info.is_file_not_directory + + info = await cln.get_info("/bin/sh") + print("/bin/sh:", info) + assert info.error.value == 0 + assert not info.is_writeable + assert info.is_file_not_directory + + assert await cln.read("/dev/null", size=100) == b"" # Read less than requested + assert await cln.read("/dev/zero", size=100) == b"\x00" * 256 # Read more than requested + # Umm, is this a good idea?! What if it succeeds :O + with pytest.raises(OSError) as e: + await cln.write("bin/sh", b"123") + e.value.errno == Error.ACCESS_DENIED + + file_server.roots.pop(-1) + finally: + srv_node.close() + cln_node.close() + await asyncio.sleep(1.0) + shutil.rmtree(root_a, ignore_errors=True) + shutil.rmtree(root_b, ignore_errors=True)