diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f25637e..22459460 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +## [0.7.1] - 2023-11-17 + +### Added + +- Experimental Google Drive input connector. +- Stateful deduplication function (`pw.stateful.deduplicate`) allowing alerting on significant changes. +- The ability to split data into batches in `pw.debug.table_from_markdown` and `pw.debug.table_from_pandas`. + ## [0.7.0] - 2023-11-16 ### Added diff --git a/Cargo.lock b/Cargo.lock index 0b268c9e..420a4df9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1434,7 +1434,7 @@ dependencies = [ [[package]] name = "pathway" -version = "0.7.0" +version = "0.7.1" dependencies = [ "arc-swap", "arcstr", diff --git a/Cargo.toml b/Cargo.toml index dd6ed54d..b03cd333 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pathway" -version = "0.7.0" +version = "0.7.1" edition = "2021" publish = false rust-version = "1.72.0" @@ -13,7 +13,6 @@ crate-type = ["cdylib", "lib"] [dev-dependencies] assert_matches = "1.5.0" eyre = "0.6.8" -tempfile = "3.8.1" [dependencies] arc-swap = "1.6.0" @@ -59,6 +58,7 @@ serde_json = "1.0" serde_with = "3.4.0" smallvec = { version = "1.11.1", features = ["union", "const_generics"] } syn = { version = "2.0.38", features = ["default", "full", "visit", "visit-mut"] } # Hack to keep features unified between normal and build deps +tempfile = "3.8.1" thiserror = "1.0.50" timely = { path = "./external/timely-dataflow/timely", features = ["bincode"] } tokio = "1.33.0" diff --git a/pyproject.toml b/pyproject.toml index 8e5d4b86..3e514388 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ dependencies = [ "diskcache >= 5.2.1", "exceptiongroup >= 1.1.3; python_version < '3.11'", "boto3 >= 1.26.76", + "google-api-python-client >= 2.108.0", ] [project.optional-dependencies] diff --git a/python/pathway/__init__.py b/python/pathway/__init__.py index 0827002b..12a28f49 100644 --- a/python/pathway/__init__.py +++ b/python/pathway/__init__.py @@ -79,7 +79,16 @@ unwrap, ) from pathway.internals.api import PathwayType as Type -from pathway.stdlib import graphs, indexing, ml, ordered, statistical, temporal, utils +from pathway.stdlib import ( + graphs, + indexing, + ml, + ordered, + stateful, + statistical, + temporal, + utils, +) from pathway.stdlib.utils.async_transformer import AsyncTransformer from pathway.stdlib.utils.pandas_transformer import pandas_transformer @@ -161,6 +170,7 @@ "Duration", "Json", "table_transformer", + "stateful", ] diff --git a/python/pathway/debug/__init__.py b/python/pathway/debug/__init__.py index acc1f24a..08302573 100644 --- a/python/pathway/debug/__init__.py +++ b/python/pathway/debug/__init__.py @@ -120,6 +120,27 @@ def table_to_pandas(table: Table): return res +def _validate_dataframe(df: pd.DataFrame) -> None: + for pseudocolumn in api.PANDAS_PSEUDOCOLUMNS: + if pseudocolumn in df.columns: + if not pd.api.types.is_integer_dtype(df[pseudocolumn].dtype): + raise ValueError(f"Column {pseudocolumn} has to contain integers only.") + if api.TIME_PSEUDOCOLUMN in df.columns: + if any(df[api.TIME_PSEUDOCOLUMN] < 0): + raise ValueError( + f"Column {api.TIME_PSEUDOCOLUMN} cannot contain negative times." + ) + if any(df[api.TIME_PSEUDOCOLUMN] % 2 == 1): + warn("timestamps are required to be even; all timestamps will be doubled") + df[api.TIME_PSEUDOCOLUMN] = 2 * df[api.TIME_PSEUDOCOLUMN] + + if api.DIFF_PSEUDOCOLUMN in df.columns: + if any((df[api.DIFF_PSEUDOCOLUMN] != 1) & (df[api.DIFF_PSEUDOCOLUMN] != -1)): + raise ValueError( + f"Column {api.DIFF_PSEUDOCOLUMN} can only have 1 and -1 values." + ) + + @runtime_type_check @trace_user_frame def table_from_pandas( @@ -128,14 +149,27 @@ def table_from_pandas( unsafe_trusted_ids: bool = False, schema: type[Schema] | None = None, ) -> Table: + """ + A function for creating a table from a pandas DataFrame. If it contains a special + column ``__time__``, rows will be split into batches with timestamps from the column. + A special column ``__diff__`` can be used to set an event type - with ``1`` treated + as inserting the row and ``-1`` as removing it. + """ if id_from is not None and schema is not None: raise ValueError("parameters `schema` and `id_from` are mutually exclusive") + ordinary_columns_names = [ + column for column in df.columns if column not in api.PANDAS_PSEUDOCOLUMNS + ] if schema is None: - schema = schema_from_pandas(df, id_from=id_from) - elif list(df.columns) != schema.column_names(): + schema = schema_from_pandas( + df, id_from=id_from, exclude_columns=api.PANDAS_PSEUDOCOLUMNS + ) + elif ordinary_columns_names != schema.column_names(): raise ValueError("schema does not match given dataframe") + _validate_dataframe(df) + return table_from_datasource( PandasDataSource( schema=schema, @@ -168,18 +202,28 @@ def _markdown_to_pandas(table_def): ).convert_dtypes() -def parse_to_table( +def table_from_markdown( table_def, id_from=None, unsafe_trusted_ids=False, schema: type[Schema] | None = None, ) -> Table: + """ + A function for creating a table from its definition in markdown. If it contains a special + column ``__time__``, rows will be split into batches with timestamps from the column. + A special column ``__diff__`` can be used to set an event type - with ``1`` treated + as inserting the row and ``-1`` as removing it. + """ df = _markdown_to_pandas(table_def) return table_from_pandas( df, id_from=id_from, unsafe_trusted_ids=unsafe_trusted_ids, schema=schema ) +# XXX: clean this up +parse_to_table = table_from_markdown + + @runtime_type_check def table_from_parquet( path: str | PathLike, @@ -205,10 +249,6 @@ def table_to_parquet(table: Table, filename: str | PathLike): return df.to_parquet(filename) -# XXX: clean this up -table_from_markdown = parse_to_table - - class _EmptyConnectorSubject(ConnectorSubject): def run(self): pass @@ -352,7 +392,7 @@ def table_from_pandas( """ if schema is None: schema = schema_from_pandas( - df, exclude_columns=["_time", "_diff", "_worker"] + df, exclude_columns={"_time", "_diff", "_worker"} ) schema, api_schema = read_schema(schema=schema) value_fields: list[api.ValueField] = api_schema["value_fields"] diff --git a/python/pathway/engine.pyi b/python/pathway/engine.pyi index f414166a..0827d8cc 100644 --- a/python/pathway/engine.pyi +++ b/python/pathway/engine.pyi @@ -104,6 +104,18 @@ class LegacyTable: class Table: """Table with tuples containing values from multiple columns.""" +class InputRow: + """Row of data for static_table""" + + def __init__( + self, + key: Pointer, + value: list[Value], + time: int = 0, + diff: int = 1, + shard: int | None = None, + ) -> None: ... + class MissingValueError(BaseException): "Marker class to indicate missing attributes" @@ -374,7 +386,7 @@ class Scope: def static_table( self, universe: Universe, - rows: Iterable[tuple[Pointer, list[Value]]], + rows: Iterable[InputRow], dt: DType, ) -> Table: ... def map_column( @@ -617,6 +629,15 @@ class SnapshotAccess(Enum): REPLAY: SnapshotAccess FULL: SnapshotAccess +class DataEventType(Enum): + INSERT: DataEventType + DELETE: DataEventType + UPSERT: DataEventType + +class SessionType(Enum): + NATIVE: SessionType + UPSERT: SessionType + class SnapshotEvent: @staticmethod def insert(key: Pointer, values: list[Value]) -> SnapshotEvent: ... diff --git a/python/pathway/internals/api.py b/python/pathway/internals/api.py index 582716d8..7bcec0e0 100644 --- a/python/pathway/internals/api.py +++ b/python/pathway/internals/api.py @@ -71,6 +71,12 @@ def ids_from_pandas( return {k: ref_scalar(*args) for (k, *args) in df[id_from].itertuples()} +TIME_PSEUDOCOLUMN = "__time__" +DIFF_PSEUDOCOLUMN = "__diff__" +SHARD_PSEUDOCOLUMN = "__shard__" +PANDAS_PSEUDOCOLUMNS = {TIME_PSEUDOCOLUMN, DIFF_PSEUDOCOLUMN, SHARD_PSEUDOCOLUMN} + + def static_table_from_pandas( scope, df: pd.DataFrame, @@ -79,17 +85,19 @@ def static_table_from_pandas( ) -> Table: ids = ids_from_pandas(df, connector_properties, id_from) - all_data: list[tuple[Pointer, list[Value]]] = [(key, []) for key in ids.values()] - data = {} for c in df.columns: - data[c] = {ids[k]: denumpify(v) for k, v in df[c].items()} + data[c] = [denumpify(v) for _, v in df[c].items()] + # df[c].items() is used because df[c].values is a numpy array + ordinary_columns = [ + column for column in df.columns if column not in PANDAS_PSEUDOCOLUMNS + ] if connector_properties is None: column_properties = [] - for c in df.columns: + for c in ordinary_columns: dtype: type = int - for v in data[c].values(): + for v in data[c]: if v is not None: dtype = type(v) break @@ -99,13 +107,19 @@ def static_table_from_pandas( connector_properties = ConnectorProperties(column_properties=column_properties) assert len(connector_properties.column_properties) == len( - df.columns + ordinary_columns ), "prrovided connector properties do not match the dataframe" - for c in df.columns: - for (key, values), (column_key, value) in zip( - all_data, data[c].items(), strict=True - ): - assert key == column_key - values.append(value) - return scope.static_table(all_data, connector_properties) + input_data: list[InputRow] = [] + for i, index in enumerate(df.index): + key = ids[index] + values = [data[c][i] for c in ordinary_columns] + time = data[TIME_PSEUDOCOLUMN][i] if TIME_PSEUDOCOLUMN in data else 0 + diff = data[DIFF_PSEUDOCOLUMN][i] if DIFF_PSEUDOCOLUMN in data else 1 + if diff not in [-1, 1]: + raise ValueError(f"Column {DIFF_PSEUDOCOLUMN} can only contain 1 and -1.") + shard = data[SHARD_PSEUDOCOLUMN][i] if SHARD_PSEUDOCOLUMN in data else None + input_row = InputRow(key, values, time=time, diff=diff, shard=shard) + input_data.append(input_row) + + return scope.static_table(input_data, connector_properties) diff --git a/python/pathway/internals/schema.py b/python/pathway/internals/schema.py index a77d3298..4db8ac97 100644 --- a/python/pathway/internals/schema.py +++ b/python/pathway/internals/schema.py @@ -81,7 +81,7 @@ def schema_from_pandas( *, id_from: list[str] | None = None, name: str | None = None, - exclude_columns: list[str] = [], + exclude_columns: set[str] = set(), ) -> type[Schema]: if name is None: name = "schema_from_pandas(" + str(dframe.columns) + ")" diff --git a/python/pathway/io/__init__.py b/python/pathway/io/__init__.py index dd6d84dd..0491b043 100644 --- a/python/pathway/io/__init__.py +++ b/python/pathway/io/__init__.py @@ -5,6 +5,7 @@ debezium, elasticsearch, fs, + gdrive, http, jsonlines, kafka, @@ -42,4 +43,5 @@ "subscribe", "s3", "s3_csv", + "gdrive", ] diff --git a/python/pathway/io/gdrive/__init__.py b/python/pathway/io/gdrive/__init__.py new file mode 100644 index 00000000..0e2dc2ff --- /dev/null +++ b/python/pathway/io/gdrive/__init__.py @@ -0,0 +1,228 @@ +from __future__ import annotations + +import io +import time +import warnings +from dataclasses import dataclass +from typing import Any, NewType + +from google.oauth2.service_account import Credentials as ServiceCredentials +from googleapiclient.discovery import build +from googleapiclient.errors import HttpError +from googleapiclient.http import MediaIoBaseDownload + +import pathway as pw +from pathway.internals import api +from pathway.internals.api import SessionType +from pathway.io.python import ConnectorSubject + +SCOPES = ["https://www.googleapis.com/auth/drive.metadata.readonly"] +MIME_TYPE_FOLDER = "application/vnd.google-apps.folder" +FILE_FIELDS = "id, name, mimeType, parents, modifiedTime" + +DEFAULT_MIME_TYPE_MAPPING: dict[str, str] = { + "application/vnd.google-apps.document": "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + "application/vnd.google-apps.spreadsheet": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + "application/vnd.google-apps.presentation": "application/vnd.openxmlformats-officedocument.presentationml.presentation", # noqa: E501 +} + +GDriveFile = NewType("GDriveFile", dict) + + +class GDriveClient: + def __init__(self, credentials: Any) -> None: + self.drive = build("drive", "v3", credentials=credentials, num_retries=3) + self.export_type_mapping = DEFAULT_MIME_TYPE_MAPPING + + def _query(self, q: str = "") -> list: + items = [] + page_token = None + while True: + response = ( + self.drive.files() + .list( + q=q, + pageSize=10, + supportsAllDrives=True, + includeItemsFromAllDrives=True, + fields=f"nextPageToken, files({FILE_FIELDS})", + pageToken=page_token, + ) + .execute() + ) + items.extend(response.get("files", [])) + page_token = response.get("nextPageToken", None) + if page_token is None: + break + + return items + + def _ls(self, id: str) -> list[GDriveFile]: + root = self._get(id) + files: list[GDriveFile] = [] + if root is None: + return [] + elif root["mimeType"] != MIME_TYPE_FOLDER: + return [root] + else: + subitems = self._query(f"'{id}' in parents") + files = [i for i in subitems if i["mimeType"] != MIME_TYPE_FOLDER] + subdirs = [i for i in subitems if i["mimeType"] == MIME_TYPE_FOLDER] + for subdir in subdirs: + files.extend(self._ls(subdir["id"])) + return files + + def _get(self, file_id: str) -> GDriveFile | None: + try: + file = self.drive.files().get(fileId=file_id, fields=FILE_FIELDS).execute() + return file + except HttpError as e: + reason: str = e.reason + warnings.warn( + f"cannot fetch metadata of file with id {file_id}, reason: {reason}" + ) + return None + + def _prepare_download_request(self, file: GDriveFile) -> Any: + file_id = file["id"] + mime_type = file["mimeType"] + export_type = self.export_type_mapping.get(mime_type, None) + if export_type is not None: + return self.drive.files().export_media(fileId=file_id, mimeType=export_type) + else: + return self.drive.files().get_media(fileId=file_id) + + def download(self, file: GDriveFile) -> bytes | None: + try: + response = io.BytesIO() + request = self._prepare_download_request(file) + downloader = MediaIoBaseDownload(response, request) + done = False + while done is False: + _, done = downloader.next_chunk() + return response.getvalue() + except HttpError as e: + file_id = file["id"] + reason: str = e.reason + warnings.warn(f"cannot fetch file with id {file_id}, reason: {reason}") + return None + + def tree(self, root_id: str) -> GDriveTree: + return GDriveTree({file["id"]: file for file in self._ls(root_id)}) + + +@dataclass(frozen=True) +class GDriveTree: + files: dict[str, GDriveFile] + + def _diff(self, other: GDriveTree) -> list[GDriveFile]: + return [file for file in self.files.values() if file["id"] not in other.files] + + def _modified_files(self, previous: GDriveTree) -> list[GDriveFile]: + result = [] + for file in self.files.values(): + previous_file = previous.files.get(file["id"], None) + if ( + previous_file is not None + and file["modifiedTime"] > previous_file["modifiedTime"] + ): + result.append(file) + return result + + def removed_files(self, previous: GDriveTree) -> list[GDriveFile]: + return previous._diff(self) + + def new_and_changed_files(self, previous: GDriveTree) -> list[GDriveFile]: + return self._diff(previous) + self._modified_files(previous) + + +class GDriveSubject(ConnectorSubject): + client: GDriveClient + root: str + refresh_interval: int + + def __init__( + self, + *, + credentials: Any, + root: str, + refresh_interval: int, + mode: str, + ) -> None: + super().__init__() + self.client = GDriveClient(credentials) + self.refresh_interval = refresh_interval + self.root = root + self.mode = mode + assert mode in ["streaming", "static"] + + @property + def _session_type(self) -> SessionType: + return SessionType.UPSERT if self.mode == "streaming" else SessionType.NATIVE + + def run(self) -> None: + prev = GDriveTree({}) + + while True: + tree = self.client.tree(self.root) + for file in tree.removed_files(prev): + self.remove(file) + for file in tree.new_and_changed_files(prev): + payload = self.client.download(file) + if payload is not None: + self.upsert(file, payload) + + if self.mode == "static": + break + prev = tree + time.sleep(self.refresh_interval) + + def upsert(self, file: GDriveFile, payload: bytes): + self._add(api.ref_scalar(file["id"]), payload) + + def remove(self, file: GDriveFile): + self._remove(api.ref_scalar(file["id"]), b"") + + +def read( + object_id: str, + *, + mode: str = "streaming", + refresh_interval: int = 30, + service_user_credentials_file: str, +) -> pw.Table: + """Reads a table from a Google Drive directory. + + It will return a table with single column `data` containing each file in a binary format. + + Args: + object_id: id of a directory or file. Directories will be scanned recursively. + mode: denotes how the engine polls the new data from the source. Currently "streaming", + and "static" are supported. If set to "streaming", it will check for updates, deletions + and new files every `refresh_interval` seconds. + "static" mode will only consider the available data and ingest all of it in one commit. + The default value is "streaming". + refresh_interval: time in seconds between scans. Applicable if mode is set to 'streaming'. + service_user_credentials_file: Google API service user json file. + Returns: + The table read. + """ + + if mode not in ["streaming", "static"]: + raise ValueError(f"Unrecognized connector mode: {mode}") + + service_credentials = ServiceCredentials.from_service_account_file( + service_user_credentials_file + ) + + subject = GDriveSubject( + credentials=service_credentials, + root=object_id, + refresh_interval=refresh_interval, + mode=mode, + ) + + return pw.io.python.read( + subject, + format="binary", + ) diff --git a/python/pathway/io/http/_server.py b/python/pathway/io/http/_server.py index 880a44ba..5c092a5c 100644 --- a/python/pathway/io/http/_server.py +++ b/python/pathway/io/http/_server.py @@ -174,9 +174,10 @@ def on_change(key: Pointer, row: dict[str, Any], time: int, is_addition: bool): task = tasks.get(key, None) if task is None: - logging.info( - "Query response has changed. It probably indicates an error in the pipeline." - ) + if delete_completed_queries: + logging.info( + "Query response has changed. It probably indicates an error in the pipeline." + ) return def set_task(): diff --git a/python/pathway/io/python/__init__.py b/python/pathway/io/python/__init__.py index f0a81c93..9939e6a2 100644 --- a/python/pathway/io/python/__init__.py +++ b/python/pathway/io/python/__init__.py @@ -7,7 +7,7 @@ from typing import Any from pathway.internals import api, datasource -from pathway.internals.api import PathwayType, Pointer +from pathway.internals.api import DataEventType, PathwayType, Pointer, SessionType from pathway.internals.decorators import table_from_datasource from pathway.internals.runtime_type_check import runtime_type_check from pathway.internals.schema import Schema @@ -16,12 +16,14 @@ RawDataSchema, assert_schema_or_value_columns_not_none, get_data_format_type, + internal_read_method, read_schema, ) SUPPORTED_INPUT_FORMATS: set[str] = { "json", "raw", + "binary", } @@ -100,10 +102,15 @@ def target(): threading.Thread(target=target).start() def _add(self, key: Pointer | None, message: Any) -> None: - self._buffer.put((True, key, message)) + if self._session_type == SessionType.NATIVE: + self._buffer.put((DataEventType.INSERT, key, message)) + elif self._session_type == SessionType.UPSERT: + self._buffer.put((DataEventType.UPSERT, key, message)) + else: + raise NotImplementedError(f"session type {self._session_type} not handled") def _remove(self, key: Pointer, message: Any) -> None: - self._buffer.put((False, key, message)) + self._buffer.put((DataEventType.DELETE, key, message)) def _read(self) -> Any: """Allows to retrieve data from a buffer. @@ -124,6 +131,10 @@ def _is_internal(self) -> bool: """ return False + @property + def _session_type(self) -> SessionType: + return SessionType.NATIVE + @runtime_type_check @trace_user_frame @@ -145,7 +156,7 @@ def read( Args: subject: An instance of a :py:class:`~pathway.python.ConnectorSubject`. schema: Schema of the resulting table. - format: Format of the data produced by a subject, "json" or "raw". In case of + format: Format of the data produced by a subject, "json", "raw" or "binary". In case of a "raw" format, table with single "data" column will be produced. debug_data: Static data replacing original one when debug mode is active. autocommit_duration_ms: the maximum time between two commits. Every @@ -192,12 +203,15 @@ def read( data_format = api.DataFormat( **api_schema, format_type=data_format_type, + session_type=subject._session_type, + parse_utf8=(format != "binary"), ) data_storage = api.DataStorage( storage_type="python", python_subject=api.PythonSubject( start=subject.start, read=subject._read, is_internal=subject._is_internal() ), + read_method=internal_read_method(format), persistent_id=persistent_id, ) data_source_options = datasource.DataSourceOptions( diff --git a/python/pathway/stdlib/stateful/__init__.py b/python/pathway/stdlib/stateful/__init__.py new file mode 100644 index 00000000..2a85b7b3 --- /dev/null +++ b/python/pathway/stdlib/stateful/__init__.py @@ -0,0 +1,9 @@ +# Copyright © 2023 Pathway + +from __future__ import annotations + +from .deduplicate import deduplicate + +__all__ = [ + "deduplicate", +] diff --git a/python/pathway/stdlib/stateful/deduplicate.py b/python/pathway/stdlib/stateful/deduplicate.py new file mode 100644 index 00000000..6a305425 --- /dev/null +++ b/python/pathway/stdlib/stateful/deduplicate.py @@ -0,0 +1,55 @@ +from typing import Any, Callable, TypeVar + +import pathway as pw +from pathway.internals import api +from pathway.internals.schema import Schema +from pathway.stdlib.utils.col import unpack_col + +TDedupe = TypeVar("TDedupe", bound=api.Value) +TSchema = TypeVar("TSchema", bound=Schema) + + +def deduplicate( + table: pw.Table[TSchema], + *, + col: pw.ColumnReference, + instance: pw.ColumnExpression | None = None, + acceptor: Callable[[TDedupe, TDedupe], bool], +) -> pw.Table[TSchema]: + """Deduplicates rows in `table` on `col` column using acceptor function. + + It keeps rows which where accepted by the acceptor function. + Acceptor operates on two arguments - current value and the previous accepted value. + + Args: + table (pw.Table[TSchema]): table to deduplicate + col (pw.ColumnReference): column used for deduplication + acceptor (Callable[[TDedupe, TDedupe], bool]): callback telling whether two values are different + instance (pw.ColumnExpression, optional): Group column for which deduplication will be performed separately. + Defaults to None. + + Returns: + pw.Table[TSchema]: + """ + assert col.table == table + + def is_different_with_state( + state: tuple[Any, ...] | None, rows + ) -> tuple[Any, ...] | None: + for [col, *cols], diff in rows: + if diff <= 0: + continue + state_val = state[0] if state is not None else None + if state_val is None or acceptor(col, state_val): + state = (col, *cols) + return state + + _table = table.select(*table, _instance=instance) + res = _table.groupby(_table._instance).reduce( + _table._instance, + res=pw.reducers.stateful_many( + is_different_with_state, col, *_table.without(_table._instance) + ), + ) + res = res.select(res=pw.apply(lambda x: x[1:], res.res)) + return unpack_col(res.res, *_table.without(_table._instance)) diff --git a/python/pathway/tests/test_io.py b/python/pathway/tests/test_io.py index d81c82d8..008729d2 100644 --- a/python/pathway/tests/test_io.py +++ b/python/pathway/tests/test_io.py @@ -17,6 +17,7 @@ import pathway as pw from pathway.engine import ref_scalar from pathway.internals import api +from pathway.internals.api import SessionType from pathway.internals.parse_graph import G from pathway.tests.utils import ( CountDifferentTimestampsCallback, @@ -2247,3 +2248,82 @@ class InputSchema(pw.Schema): any_order=True, ) assert on_change.call_count == 3 + + +def test_python_connector_upsert_raw(tmp_path: pathlib.Path): + class TestSubject(pw.io.python.ConnectorSubject): + @property + def _session_type(self) -> SessionType: + return SessionType.UPSERT + + def run(self): + self._add(api.ref_scalar(0), b"one") + time.sleep(5e-2) + self._add(api.ref_scalar(0), b"two") + time.sleep(5e-2) + self._add(api.ref_scalar(0), b"three") + self.close() + + table = pw.io.python.read(TestSubject(), format="raw", autocommit_duration_ms=10) + pw.io.csv.write(table, tmp_path / "output.csv") + pw.run() + + result = pd.read_csv(tmp_path / "output.csv") + return len(result) == 5 + + +def test_python_connector_removal_by_key(tmp_path: pathlib.Path): + class TestSubject(pw.io.python.ConnectorSubject): + @property + def _session_type(self) -> SessionType: + return SessionType.UPSERT + + def run(self): + self._add(api.ref_scalar(0), b"one") + time.sleep(5e-2) + self._remove(api.ref_scalar(0), b"") # Note: we don't pass an actual value + self.close() + + table = pw.io.python.read(TestSubject(), format="raw", autocommit_duration_ms=10) + pw.io.csv.write(table, tmp_path / "output.csv") + pw.run() + + result = pd.read_csv(tmp_path / "output.csv") + return len(result) == 2 + + +def test_python_connector_upsert_json(tmp_path: pathlib.Path): + class TestSubject(pw.io.python.ConnectorSubject): + @property + def _session_type(self) -> SessionType: + return SessionType.UPSERT + + def run(self): + self._add( + api.ref_scalar(0), + json.dumps({"word": "one", "digit": 1}).encode("utf-8"), + ) + time.sleep(5e-2) + self._add( + api.ref_scalar(0), + json.dumps({"word": "two", "digit": 2}).encode("utf-8"), + ) + time.sleep(5e-2) + self._add( + api.ref_scalar(0), + json.dumps({"word": "three", "digit": 3}).encode("utf-8"), + ) + self.close() + + class InputSchema(pw.Schema): + word: str + digit: int + + table = pw.io.python.read( + TestSubject(), format="json", schema=InputSchema, autocommit_duration_ms=10 + ) + pw.io.csv.write(table, tmp_path / "output.csv") + pw.run() + + result = pd.read_csv(tmp_path / "output.csv") + return len(result) == 5 diff --git a/src/connectors/data_format.rs b/src/connectors/data_format.rs index 5f71895d..f01fd7a8 100644 --- a/src/connectors/data_format.rs +++ b/src/connectors/data_format.rs @@ -546,14 +546,20 @@ pub struct IdentityParser { value_fields: Vec, parse_utf8: bool, metadata_column_value: Value, + session_type: SessionType, } impl IdentityParser { - pub fn new(value_fields: Vec, parse_utf8: bool) -> IdentityParser { + pub fn new( + value_fields: Vec, + parse_utf8: bool, + session_type: SessionType, + ) -> IdentityParser { Self { value_fields, parse_utf8, metadata_column_value: Value::None, + session_type, } } @@ -601,11 +607,22 @@ impl Parser for IdentityParser { values.push(value.clone()); } } - match event { - DataEventType::Insert => ParsedEvent::Insert((key, values)), - DataEventType::Delete => ParsedEvent::Delete((key, values)), - DataEventType::Upsert => { - unreachable!("readers can't send upserts to IdentityParser") + match self.session_type { + SessionType::Native => { + match event { + DataEventType::Insert => ParsedEvent::Insert((key, values)), + DataEventType::Delete => ParsedEvent::Delete((key, values)), + DataEventType::Upsert => { + panic!("incorrect Reader-Parser configuration: unexpected Upsert event in Native session") + } + } + } + SessionType::Upsert => { + match event { + DataEventType::Insert => panic!("incorrect Reader-Parser configuration: unexpected Insert event in Upsert session"), + DataEventType::Delete => ParsedEvent::Upsert((key, None)), + DataEventType::Upsert => ParsedEvent::Upsert((key, Some(values))), + } } } }; @@ -624,6 +641,10 @@ impl Parser for IdentityParser { fn column_count(&self) -> usize { self.value_fields.len() } + + fn session_type(&self) -> SessionType { + self.session_type + } } pub struct DsvFormatter { @@ -1067,6 +1088,7 @@ pub struct JsonLinesParser { field_absence_is_error: bool, schema: HashMap, metadata_column_value: Value, + session_type: SessionType, } impl JsonLinesParser { @@ -1076,6 +1098,7 @@ impl JsonLinesParser { column_paths: HashMap, field_absence_is_error: bool, schema: HashMap, + session_type: SessionType, ) -> JsonLinesParser { JsonLinesParser { key_field_names, @@ -1084,6 +1107,7 @@ impl JsonLinesParser { field_absence_is_error, schema, metadata_column_value: Value::None, + session_type, } } } @@ -1147,10 +1171,21 @@ impl Parser for JsonLinesParser { &self.metadata_column_value, )?; - let event = match data_event { - DataEventType::Insert => ParsedEvent::Insert((key, values)), - DataEventType::Delete => ParsedEvent::Delete((key, values)), - DataEventType::Upsert => unreachable!("readers can't send upserts to JsonLinesParser"), + let event = match self.session_type { + SessionType::Native => { + match data_event { + DataEventType::Insert => ParsedEvent::Insert((key, values)), + DataEventType::Delete => ParsedEvent::Delete((key, values)), + DataEventType::Upsert => panic!("incorrect Reader-Parser configuration: unexpected Upsert event in Native session"), + } + } + SessionType::Upsert => { + match data_event { + DataEventType::Insert => panic!("incorrect Reader-Parser configuration: unexpected Insert event in Upsert session"), + DataEventType::Delete => ParsedEvent::Upsert((key, None)), + DataEventType::Upsert => ParsedEvent::Upsert((key, Some(values))), + } + } }; Ok(vec![event]) @@ -1167,6 +1202,10 @@ impl Parser for JsonLinesParser { fn column_count(&self) -> usize { self.value_field_names.len() } + + fn session_type(&self) -> SessionType { + self.session_type + } } #[derive(Debug)] diff --git a/src/connectors/data_storage.rs b/src/connectors/data_storage.rs index a0d3281a..aec1ad0a 100644 --- a/src/connectors/data_storage.rs +++ b/src/connectors/data_storage.rs @@ -27,6 +27,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use chrono::{DateTime, FixedOffset}; use log::{error, warn}; use postgres::types::ToSql; +use tempfile::{tempdir, TempDir}; use xxhash_rust::xxh3::Xxh3 as Hasher; use crate::connectors::data_format::FormatterContext; @@ -719,6 +720,10 @@ struct FilesystemScanner { object_pattern: GlobPattern, next_file_for_insertion: Option, cached_metadata: HashMap>, + + // Storage is deleted on object destruction, so we need to store it + // for the connector's life time + _connector_tmp_storage: Option, } impl FilesystemScanner { @@ -740,20 +745,26 @@ impl FilesystemScanner { None }; - let cache_directory_path = { + let (cache_directory_path, connector_tmp_storage) = { if streaming_mode.are_deletions_enabled() { - let root_dir_str_path = env::var("PATHWAY_PERSISTENT_STORAGE") - .ok() - .unwrap_or("/tmp/pathway".to_string()); - let root_dir_path = Path::new(&root_dir_str_path); - ensure_directory(root_dir_path)?; - - let unique_id = persistent_id.unwrap_or_else(|| rand::thread_rng().gen::()); - let connector_tmp_directory = root_dir_path.join(format!("cache-{unique_id}")); - ensure_directory(&connector_tmp_directory)?; - Some(connector_tmp_directory) + if let Ok(root_dir_str_path) = env::var("PATHWAY_PERSISTENT_STORAGE") { + let root_dir_path = Path::new(&root_dir_str_path); + ensure_directory(root_dir_path)?; + let unique_id = + persistent_id.unwrap_or_else(|| rand::thread_rng().gen::()); + let connector_tmp_directory = root_dir_path.join(format!("cache-{unique_id}")); + ensure_directory(&connector_tmp_directory)?; + (Some(connector_tmp_directory), None) + } else { + let cache_tmp_storage = tempdir()?; + let connector_tmp_directory = cache_tmp_storage.path(); + ( + Some(connector_tmp_directory.to_path_buf()), + Some(cache_tmp_storage), + ) + } } else { - None + (None, None) } }; @@ -770,6 +781,7 @@ impl FilesystemScanner { object_pattern: GlobPattern::new(object_pattern)?, next_file_for_insertion: None, cached_metadata: HashMap::new(), + _connector_tmp_storage: connector_tmp_storage, }) } @@ -1376,7 +1388,7 @@ impl Reader for PythonReader { } with_gil_and_pool(|py| { - let (addition, key, values): (bool, Option, Vec) = self + let (event, key, values): (DataEventType, Option, Vec) = self .subject .borrow(py) .read @@ -1397,13 +1409,6 @@ impl Reader for PythonReader { OffsetValue::PythonEntrySequentialId(self.total_entries_read), ); - // bridge from Python to internal representation - let event = if addition { - DataEventType::Insert - } else { - DataEventType::Delete - }; - Ok(ReadResult::Data( ReaderContext::from_diff(event, key, values), offset, diff --git a/src/engine/dataflow.rs b/src/engine/dataflow.rs index 44b81133..01a5d4a5 100644 --- a/src/engine/dataflow.rs +++ b/src/engine/dataflow.rs @@ -84,6 +84,7 @@ use self::operators::{ConsolidateNondecreasingMap, MaybeTotal}; use self::shard::Shard; use super::error::{DynError, DynResult, Trace}; use super::expression::AnyExpression; +use super::graph::InputRow; use super::http_server::maybe_run_http_server_thread; use super::progress_reporter::{maybe_run_reporter, MonitoringLevel}; use super::reduce::{ @@ -847,10 +848,6 @@ impl DataflowGraphInner { self.static_column(universe_handle, Vec::new(), column_properties) } - fn empty_table(&mut self, table_properties: Arc) -> Result { - self.static_table(Vec::new(), table_properties) - } - #[track_caller] fn assert_collections_same_size( &self, @@ -959,32 +956,6 @@ impl DataflowGraphInner { Ok(column_handle) } - fn static_table( - &mut self, - values: Vec<(Key, Vec)>, - table_properties: Arc, - ) -> Result { - let worker_count = self.scope.peers(); - let worker_index = self.scope.index(); - let values = values - .into_iter() - .filter(move |(k, _v)| k.shard_as_usize() % worker_count == worker_index) - .map(|(key, values)| { - ( - (key, Value::from(values.as_slice())), - S::Timestamp::minimum(), - 1, - ) - }) - .to_stream(&mut self.scope) - .as_collection() - .probe_with(&mut self.input_probe); - - Ok(self - .tables - .alloc(Table::from_collection(values).with_properties(table_properties))) - } - fn tuples( &mut self, universe_handle: UniverseHandle, @@ -2590,6 +2561,39 @@ enum OutputEvent { #[allow(clippy::unnecessary_wraps)] // we want to always return Result for symmetry impl> DataflowGraphInner { + fn empty_table(&mut self, table_properties: Arc) -> Result { + self.static_table(Vec::new(), table_properties) + } + + fn static_table( + &mut self, + values: Vec, + table_properties: Arc, + ) -> Result { + let worker_count = self.scope.peers(); + let worker_index = self.scope.index(); + let values = values + .into_iter() + .filter(move |row| { + row.shard.unwrap_or_else(|| row.key.shard_as_usize()) % worker_count == worker_index + }) + .map(|row| { + assert!(row.diff == 1 || row.diff == -1); + ( + (row.key, Value::from(row.values.as_slice())), + row.time, + row.diff, + ) + }) + .to_stream(&mut self.scope) + .as_collection() + .probe_with(&mut self.input_probe); + + Ok(self + .tables + .alloc(Table::from_collection(values).with_properties(table_properties))) + } + fn connector_table( &mut self, mut reader: Box, @@ -3582,8 +3586,8 @@ where .empty_column(universe_handle, column_properties) } - fn empty_table(&self, table_properties: Arc) -> Result { - self.0.borrow_mut().empty_table(table_properties) + fn empty_table(&self, _table_properties: Arc) -> Result { + Err(Error::IoNotPossible) } fn static_universe(&self, keys: Vec) -> Result { @@ -3603,10 +3607,10 @@ where fn static_table( &self, - values: Vec<(Key, Vec)>, - table_properties: Arc, + _data: Vec, + _table_properties: Arc, ) -> Result { - self.0.borrow_mut().static_table(values, table_properties) + Err(Error::IoNotPossible) } fn expression_column( @@ -4121,10 +4125,10 @@ impl> Graph for OuterDataflowGraph fn static_table( &self, - values: Vec<(Key, Vec)>, + data: Vec, table_properties: Arc, ) -> Result { - self.0.borrow_mut().static_table(values, table_properties) + self.0.borrow_mut().static_table(data, table_properties) } fn expression_column( diff --git a/src/engine/graph.rs b/src/engine/graph.rs index 28722504..a31b995b 100644 --- a/src/engine/graph.rs +++ b/src/engine/graph.rs @@ -161,6 +161,15 @@ impl ColumnPath { } } +#[derive(Debug, Clone)] +pub struct InputRow { + pub key: Key, + pub values: Vec, + pub time: u64, + pub diff: isize, + pub shard: Option, +} + pub struct ExpressionData { pub expression: Arc, pub properties: Arc, @@ -457,7 +466,7 @@ pub trait Graph { fn static_table( &self, - values: Vec<(Key, Vec)>, + data: Vec, table_properties: Arc, ) -> Result; @@ -824,10 +833,10 @@ impl Graph for ScopedGraph { fn static_table( &self, - values: Vec<(Key, Vec)>, + data: Vec, table_properties: Arc, ) -> Result { - self.try_with(|g| g.static_table(values, table_properties)) + self.try_with(|g| g.static_table(data, table_properties)) } fn expression_column( diff --git a/src/engine/mod.rs b/src/engine/mod.rs index 62b035ed..707029ba 100644 --- a/src/engine/mod.rs +++ b/src/engine/mod.rs @@ -15,9 +15,9 @@ pub use reduce::Reducer; pub mod graph; pub use graph::{ BatchWrapper, ColumnHandle, ColumnPath, ColumnProperties, ComplexColumn, Computer, - ConcatHandle, Context, ExpressionData, Graph, IterationLogic, IxKeyPolicy, IxerHandle, - JoinType, LegacyTable, OperatorStats, ProberStats, ReducerData, ScopedGraph, TableHandle, - TableProperties, UniverseHandle, + ConcatHandle, Context, ExpressionData, Graph, InputRow, IterationLogic, IxKeyPolicy, + IxerHandle, JoinType, LegacyTable, OperatorStats, ProberStats, ReducerData, ScopedGraph, + TableHandle, TableProperties, UniverseHandle, }; pub mod http_server; diff --git a/src/python_api.rs b/src/python_api.rs index da5bf280..43a5e8b5 100644 --- a/src/python_api.rs +++ b/src/python_api.rs @@ -54,12 +54,12 @@ use crate::connectors::data_format::{ PsqlSnapshotFormatter, PsqlUpdatesFormatter, }; use crate::connectors::data_storage::{ - ConnectorMode, CsvFilesystemReader, ElasticSearchWriter, FileWriter, FilesystemReader, - KafkaReader, KafkaWriter, NullWriter, PsqlWriter, PythonReaderBuilder, ReadMethod, - ReaderBuilder, S3CsvReader, S3GenericReader, Writer, + ConnectorMode, CsvFilesystemReader, DataEventType, ElasticSearchWriter, FileWriter, + FilesystemReader, KafkaReader, KafkaWriter, NullWriter, PsqlWriter, PythonReaderBuilder, + ReadMethod, ReaderBuilder, S3CsvReader, S3GenericReader, Writer, }; use crate::connectors::snapshot::Event as SnapshotEvent; -use crate::connectors::{ReplayMode, SnapshotAccess}; +use crate::connectors::{ReplayMode, SessionType, SnapshotAccess}; use crate::engine::dataflow::config_from_env; use crate::engine::error::{DynError, DynResult, Trace as EngineTrace}; use crate::engine::graph::ScopedContext; @@ -70,8 +70,9 @@ use crate::engine::ReducerData; use crate::engine::{ run_with_new_dataflow_graph, BatchWrapper, ColumnHandle, ColumnPath, ColumnProperties as EngineColumnProperties, DateTimeNaive, DateTimeUtc, Duration, - ExpressionData, IxKeyPolicy, JoinType, Key, KeyImpl, PointerExpression, Reducer, ScopedGraph, - TableHandle, TableProperties as EngineTableProperties, Type, UniverseHandle, Value, + ExpressionData, InputRow as EngineInputRow, IxKeyPolicy, JoinType, Key, KeyImpl, + PointerExpression, Reducer, ScopedGraph, TableHandle, TableProperties as EngineTableProperties, + Type, UniverseHandle, Value, }; use crate::engine::{AnyExpression, Context as EngineContext}; use crate::engine::{BoolExpression, Error as EngineError}; @@ -410,6 +411,30 @@ impl IntoPy for ConnectorMode { } } +impl<'source> FromPyObject<'source> for SessionType { + fn extract(ob: &'source PyAny) -> PyResult { + Ok(ob.extract::>()?.0) + } +} + +impl IntoPy for SessionType { + fn into_py(self, py: Python<'_>) -> PyObject { + PySessionType(self).into_py(py) + } +} + +impl<'source> FromPyObject<'source> for DataEventType { + fn extract(ob: &'source PyAny) -> PyResult { + Ok(ob.extract::>()?.0) + } +} + +impl IntoPy for DataEventType { + fn into_py(self, py: Python<'_>) -> PyObject { + PyDataEventType(self).into_py(py) + } +} + impl<'source> FromPyObject<'source> for DebeziumDBType { fn extract(ob: &'source PyAny) -> PyResult { Ok(ob.extract::>()?.0) @@ -1267,6 +1292,30 @@ impl PyConnectorMode { pub const STREAMING_WITH_DELETIONS: ConnectorMode = ConnectorMode::StreamingWithDeletions; } +#[pyclass(module = "pathway.engine", frozen, name = "SessionType")] +pub struct PySessionType(SessionType); + +#[pymethods] +impl PySessionType { + #[classattr] + pub const NATIVE: SessionType = SessionType::Native; + #[classattr] + pub const UPSERT: SessionType = SessionType::Upsert; +} + +#[pyclass(module = "pathway.engine", frozen, name = "DataEventType")] +pub struct PyDataEventType(DataEventType); + +#[pymethods] +impl PyDataEventType { + #[classattr] + pub const INSERT: DataEventType = DataEventType::Insert; + #[classattr] + pub const DELETE: DataEventType = DataEventType::Delete; + #[classattr] + pub const UPSERT: DataEventType = DataEventType::Upsert; +} + #[pyclass(module = "pathway.engine", frozen, name = "DebeziumDBType")] pub struct PyDebeziumDBType(DebeziumDBType); @@ -1512,6 +1561,37 @@ impl<'source> FromPyObject<'source> for ColumnPath { } } } +#[derive(Clone)] +#[pyclass(module = "pathway.engine", frozen)] +pub struct InputRow(EngineInputRow); + +#[pymethods] +impl InputRow { + #[new] + #[pyo3(signature = ( + key, + values, + time = 0, + diff = 1, + shard = None, + ))] + pub fn new( + key: Key, + values: Vec, + time: u64, + diff: isize, + shard: Option, + ) -> InputRow { + let inner = EngineInputRow { + key, + values, + time, + diff, + shard, + }; + Self(inner) + } +} static MISSING_VALUE_ERROR_TYPE: Lazy> = Lazy::new(|| { Python::with_gil(|py| { @@ -1790,12 +1870,12 @@ impl Scope { pub fn static_table( self_: &PyCell, - #[pyo3(from_py_with = "from_py_iterable")] values: Vec<(Key, Vec)>, + #[pyo3(from_py_with = "from_py_iterable")] data: Vec, properties: ConnectorProperties, ) -> PyResult> { let column_properties = properties.column_properties(); let handle = self_.borrow().graph.static_table( - values, + data.into_iter().map(|row| row.0).collect(), Arc::new(EngineTableProperties::flat(column_properties)), )?; Table::new(self_, handle) @@ -3187,6 +3267,7 @@ pub struct DataFormat { field_absence_is_error: bool, parse_utf8: bool, debezium_db_type: DebeziumDBType, + session_type: SessionType, } #[pymethods] @@ -3264,6 +3345,7 @@ impl DataFormat { field_absence_is_error = true, parse_utf8 = true, debezium_db_type = DebeziumDBType::Postgres, + session_type = SessionType::Native, ))] #[allow(clippy::too_many_arguments)] fn new( @@ -3276,6 +3358,7 @@ impl DataFormat { field_absence_is_error: bool, parse_utf8: bool, debezium_db_type: DebeziumDBType, + session_type: SessionType, ) -> Self { DataFormat { format_type, @@ -3287,6 +3370,7 @@ impl DataFormat { field_absence_is_error, parse_utf8, debezium_db_type, + session_type, } } } @@ -3715,12 +3799,14 @@ impl DataFormat { self.column_paths.clone().unwrap_or_default(), self.field_absence_is_error, self.schema(py), + self.session_type, ); Ok(Box::new(parser)) } "identity" => Ok(Box::new(IdentityParser::new( self.value_field_names(py), self.parse_utf8, + self.session_type, ))), _ => Err(PyValueError::new_err("Unknown data format")), } @@ -4041,6 +4127,8 @@ fn module(_py: Python<'_>, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; @@ -4048,6 +4136,7 @@ fn module(_py: Python<'_>, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/tests/test_bytes.rs b/tests/test_bytes.rs index 608cf9dc..b4c24236 100644 --- a/tests/test_bytes.rs +++ b/tests/test_bytes.rs @@ -4,6 +4,7 @@ use pathway_engine::connectors::data_format::{IdentityParser, ParseResult, Parse use pathway_engine::connectors::data_storage::{ ConnectorMode, FilesystemReader, ReadMethod, ReadResult, Reader, }; +use pathway_engine::connectors::SessionType; use pathway_engine::engine::Value; fn read_bytes_from_path(path: &str) -> eyre::Result> { @@ -14,7 +15,7 @@ fn read_bytes_from_path(path: &str) -> eyre::Result> { ReadMethod::Full, "*", )?; - let mut parser = IdentityParser::new(vec!["data".to_string()], false); + let mut parser = IdentityParser::new(vec!["data".to_string()], false, SessionType::Native); let mut events = Vec::new(); loop { diff --git a/tests/test_connector_field_defaults.rs b/tests/test_connector_field_defaults.rs index 8f582bb4..1174ce4c 100644 --- a/tests/test_connector_field_defaults.rs +++ b/tests/test_connector_field_defaults.rs @@ -10,6 +10,7 @@ use pathway_engine::connectors::data_format::{ use pathway_engine::connectors::data_storage::{ ConnectorMode, CsvFilesystemReader, FilesystemReader, ReadMethod, }; +use pathway_engine::connectors::SessionType; use pathway_engine::engine::{Type, Value}; #[test] @@ -218,6 +219,7 @@ fn test_jsonlines_fails_without_default() -> eyre::Result<()> { HashMap::new(), true, HashMap::new(), + SessionType::Native, ); assert!(data_parsing_fails(Box::new(reader), Box::new(parser))?); @@ -246,6 +248,7 @@ fn test_jsonlines_with_default() -> eyre::Result<()> { HashMap::new(), true, schema, + SessionType::Native, ); let read_lines = read_data_from_reader(Box::new(reader), Box::new(parser))?; @@ -298,6 +301,7 @@ fn test_jsonlines_with_default_at_jsonpath() -> eyre::Result<()> { routes, true, schema, + SessionType::Native, ); let read_lines = read_data_from_reader(Box::new(reader), Box::new(parser))?; @@ -344,6 +348,7 @@ fn test_jsonlines_explicit_null_not_overridden() -> eyre::Result<()> { HashMap::new(), true, schema, + SessionType::Native, ); let read_lines = read_data_from_reader(Box::new(reader), Box::new(parser))?; diff --git a/tests/test_jsonlines.rs b/tests/test_jsonlines.rs index 8f15b19e..7f95864b 100644 --- a/tests/test_jsonlines.rs +++ b/tests/test_jsonlines.rs @@ -8,6 +8,7 @@ use std::sync::Arc; use pathway_engine::connectors::data_format::{JsonLinesParser, ParsedEvent}; use pathway_engine::connectors::data_storage::{ConnectorMode, FilesystemReader, ReadMethod}; +use pathway_engine::connectors::SessionType; use pathway_engine::engine::Value; #[test] @@ -25,6 +26,7 @@ fn test_jsonlines_ok() -> eyre::Result<()> { HashMap::new(), true, HashMap::new(), + SessionType::Native, ); let entries = read_data_from_reader(Box::new(reader), Box::new(parser))?; @@ -64,6 +66,7 @@ fn test_jsonlines_incorrect_key() -> eyre::Result<()> { HashMap::new(), true, HashMap::new(), + SessionType::Native, ); assert_error_shown( @@ -90,6 +93,7 @@ fn test_jsonlines_incomplete_key_to_null() -> eyre::Result<()> { HashMap::new(), false, HashMap::new(), + SessionType::Native, ); let entries = read_data_from_reader(Box::new(reader), Box::new(parser))?; @@ -113,6 +117,7 @@ fn test_jsonlines_incorrect_values() -> eyre::Result<()> { HashMap::new(), true, HashMap::new(), + SessionType::Native, ); assert_error_shown( @@ -147,6 +152,7 @@ fn test_jsonlines_types_parsing() -> eyre::Result<()> { HashMap::new(), true, HashMap::new(), + SessionType::Native, ); let entries = read_data_from_reader(Box::new(reader), Box::new(parser))?; @@ -201,6 +207,7 @@ fn test_jsonlines_complex_paths() -> eyre::Result<()> { routes, true, HashMap::new(), + SessionType::Native, ); let entries = read_data_from_reader(Box::new(reader), Box::new(parser))?; @@ -260,6 +267,7 @@ fn test_jsonlines_complex_paths_error() -> eyre::Result<()> { routes, true, HashMap::new(), + SessionType::Native, ); assert_error_shown( @@ -301,6 +309,7 @@ fn test_jsonlines_complex_path_ignore_errors() -> eyre::Result<()> { routes, false, HashMap::new(), + SessionType::Native, ); let entries = read_data_from_reader(Box::new(reader), Box::new(parser))?; @@ -324,6 +333,7 @@ fn test_jsonlines_incorrect_key_verbose_error() -> eyre::Result<()> { HashMap::new(), true, HashMap::new(), + SessionType::Native, ); assert_error_shown( @@ -353,6 +363,7 @@ fn test_jsonlines_incorrect_jsonpointer_verbose_error() -> eyre::Result<()> { routes, true, HashMap::new(), + SessionType::Native, ); assert_error_shown( @@ -379,6 +390,7 @@ fn test_jsonlines_failed_to_parse_field() -> eyre::Result<()> { HashMap::new(), true, HashMap::new(), + SessionType::Native, ); assert_error_shown( diff --git a/tests/test_metadata.rs b/tests/test_metadata.rs index 0aa19c5a..1acea2c6 100644 --- a/tests/test_metadata.rs +++ b/tests/test_metadata.rs @@ -10,6 +10,7 @@ use pathway_engine::connectors::data_format::{ use pathway_engine::connectors::data_storage::{ ConnectorMode, CsvFilesystemReader, FilesystemReader, ReadMethod, }; +use pathway_engine::connectors::SessionType; use pathway_engine::engine::Value; /// This function requires that _metadata field is the last in the `value_names_list` @@ -163,6 +164,7 @@ fn test_metadata_json_file() -> eyre::Result<()> { HashMap::new(), false, HashMap::new(), + SessionType::Native, ); let data_read = read_data_from_reader(Box::new(reader), Box::new(parser))?; @@ -186,6 +188,7 @@ fn test_metadata_json_dir() -> eyre::Result<()> { HashMap::new(), false, HashMap::new(), + SessionType::Native, ); let data_read = read_data_from_reader(Box::new(reader), Box::new(parser))?; @@ -204,7 +207,11 @@ fn test_metadata_identity_file() -> eyre::Result<()> { ReadMethod::ByLine, "*", )?; - let parser = IdentityParser::new(vec!["data".to_string(), "_metadata".to_string()], false); + let parser = IdentityParser::new( + vec!["data".to_string(), "_metadata".to_string()], + false, + SessionType::Native, + ); let data_read = read_data_from_reader(Box::new(reader), Box::new(parser))?; check_file_name_in_metadata(&data_read[0], "tests/data/jsonlines.txt\""); @@ -221,7 +228,11 @@ fn test_metadata_identity_dir() -> eyre::Result<()> { ReadMethod::ByLine, "*", )?; - let parser = IdentityParser::new(vec!["data".to_string(), "_metadata".to_string()], false); + let parser = IdentityParser::new( + vec!["data".to_string(), "_metadata".to_string()], + false, + SessionType::Native, + ); let data_read = read_data_from_reader(Box::new(reader), Box::new(parser))?; check_file_name_in_metadata(&data_read[0], "tests/data/jsonlines/one.jsonlines\""); diff --git a/tests/test_seek.rs b/tests/test_seek.rs index 64798126..ac3a466a 100644 --- a/tests/test_seek.rs +++ b/tests/test_seek.rs @@ -14,6 +14,7 @@ use pathway_engine::connectors::data_storage::ReaderBuilder; use pathway_engine::connectors::data_storage::{ ConnectorMode, CsvFilesystemReader, FilesystemReader, ReadMethod, }; +use pathway_engine::connectors::SessionType; use pathway_engine::engine::Value; use pathway_engine::persistence::sync::SharedWorkersPersistenceCoordinator; use pathway_engine::persistence::tracker::SingleWorkerPersistentStorage; @@ -60,6 +61,7 @@ fn json_reader_parser_pair(input_path: &Path) -> (Box, Box