Skip to content

Commit

Permalink
Release 0.7.1
Browse files Browse the repository at this point in the history
Co-authored-by: Michał Bartoszkiewicz <[email protected]>
Co-authored-by: Jan Chorowski <[email protected]>
Co-authored-by: Xavier Gendre <[email protected]>
Co-authored-by: Adrian Kosowski <[email protected]>
Co-authored-by: Jakub Kowalski <[email protected]>
Co-authored-by: Sergey Kulik <[email protected]>
Co-authored-by: Mateusz Lewandowski <[email protected]>
Co-authored-by: Mohamed Malhou <[email protected]>
Co-authored-by: Krzysztof Nowicki <[email protected]>
Co-authored-by: Richard Pelgrim <[email protected]>
Co-authored-by: Kamil Piechowiak <[email protected]>
Co-authored-by: Paweł Podhajski <[email protected]>
Co-authored-by: Olivier Ruas <[email protected]>
Co-authored-by: Przemysław Uznański <[email protected]>
Co-authored-by: Sebastian Włudzik <[email protected]>
GitOrigin-RevId: 50c042a9e5b04e8ecc09ccb3f80086f6e598cdff
  • Loading branch information
16 people committed Nov 17, 2023
1 parent 4c570e1 commit 59e8dae
Show file tree
Hide file tree
Showing 27 changed files with 778 additions and 118 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]

## [0.7.1] - 2023-11-17

### Added

- Experimental Google Drive input connector.
- Stateful deduplication function (`pw.stateful.deduplicate`) allowing alerting on significant changes.
- The ability to split data into batches in `pw.debug.table_from_markdown` and `pw.debug.table_from_pandas`.

## [0.7.0] - 2023-11-16

### Added
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pathway"
version = "0.7.0"
version = "0.7.1"
edition = "2021"
publish = false
rust-version = "1.72.0"
Expand All @@ -13,7 +13,6 @@ crate-type = ["cdylib", "lib"]
[dev-dependencies]
assert_matches = "1.5.0"
eyre = "0.6.8"
tempfile = "3.8.1"

[dependencies]
arc-swap = "1.6.0"
Expand Down Expand Up @@ -59,6 +58,7 @@ serde_json = "1.0"
serde_with = "3.4.0"
smallvec = { version = "1.11.1", features = ["union", "const_generics"] }
syn = { version = "2.0.38", features = ["default", "full", "visit", "visit-mut"] } # Hack to keep features unified between normal and build deps
tempfile = "3.8.1"
thiserror = "1.0.50"
timely = { path = "./external/timely-dataflow/timely", features = ["bincode"] }
tokio = "1.33.0"
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies = [
"diskcache >= 5.2.1",
"exceptiongroup >= 1.1.3; python_version < '3.11'",
"boto3 >= 1.26.76",
"google-api-python-client >= 2.108.0",
]

[project.optional-dependencies]
Expand Down
12 changes: 11 additions & 1 deletion python/pathway/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,16 @@
unwrap,
)
from pathway.internals.api import PathwayType as Type
from pathway.stdlib import graphs, indexing, ml, ordered, statistical, temporal, utils
from pathway.stdlib import (
graphs,
indexing,
ml,
ordered,
stateful,
statistical,
temporal,
utils,
)
from pathway.stdlib.utils.async_transformer import AsyncTransformer
from pathway.stdlib.utils.pandas_transformer import pandas_transformer

Expand Down Expand Up @@ -161,6 +170,7 @@
"Duration",
"Json",
"table_transformer",
"stateful",
]


Expand Down
56 changes: 48 additions & 8 deletions python/pathway/debug/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,27 @@ def table_to_pandas(table: Table):
return res


def _validate_dataframe(df: pd.DataFrame) -> None:
for pseudocolumn in api.PANDAS_PSEUDOCOLUMNS:
if pseudocolumn in df.columns:
if not pd.api.types.is_integer_dtype(df[pseudocolumn].dtype):
raise ValueError(f"Column {pseudocolumn} has to contain integers only.")
if api.TIME_PSEUDOCOLUMN in df.columns:
if any(df[api.TIME_PSEUDOCOLUMN] < 0):
raise ValueError(
f"Column {api.TIME_PSEUDOCOLUMN} cannot contain negative times."
)
if any(df[api.TIME_PSEUDOCOLUMN] % 2 == 1):
warn("timestamps are required to be even; all timestamps will be doubled")
df[api.TIME_PSEUDOCOLUMN] = 2 * df[api.TIME_PSEUDOCOLUMN]

if api.DIFF_PSEUDOCOLUMN in df.columns:
if any((df[api.DIFF_PSEUDOCOLUMN] != 1) & (df[api.DIFF_PSEUDOCOLUMN] != -1)):
raise ValueError(
f"Column {api.DIFF_PSEUDOCOLUMN} can only have 1 and -1 values."
)


@runtime_type_check
@trace_user_frame
def table_from_pandas(
Expand All @@ -128,14 +149,27 @@ def table_from_pandas(
unsafe_trusted_ids: bool = False,
schema: type[Schema] | None = None,
) -> Table:
"""
A function for creating a table from a pandas DataFrame. If it contains a special
column ``__time__``, rows will be split into batches with timestamps from the column.
A special column ``__diff__`` can be used to set an event type - with ``1`` treated
as inserting the row and ``-1`` as removing it.
"""
if id_from is not None and schema is not None:
raise ValueError("parameters `schema` and `id_from` are mutually exclusive")

ordinary_columns_names = [
column for column in df.columns if column not in api.PANDAS_PSEUDOCOLUMNS
]
if schema is None:
schema = schema_from_pandas(df, id_from=id_from)
elif list(df.columns) != schema.column_names():
schema = schema_from_pandas(
df, id_from=id_from, exclude_columns=api.PANDAS_PSEUDOCOLUMNS
)
elif ordinary_columns_names != schema.column_names():
raise ValueError("schema does not match given dataframe")

_validate_dataframe(df)

return table_from_datasource(
PandasDataSource(
schema=schema,
Expand Down Expand Up @@ -168,18 +202,28 @@ def _markdown_to_pandas(table_def):
).convert_dtypes()


def parse_to_table(
def table_from_markdown(
table_def,
id_from=None,
unsafe_trusted_ids=False,
schema: type[Schema] | None = None,
) -> Table:
"""
A function for creating a table from its definition in markdown. If it contains a special
column ``__time__``, rows will be split into batches with timestamps from the column.
A special column ``__diff__`` can be used to set an event type - with ``1`` treated
as inserting the row and ``-1`` as removing it.
"""
df = _markdown_to_pandas(table_def)
return table_from_pandas(
df, id_from=id_from, unsafe_trusted_ids=unsafe_trusted_ids, schema=schema
)


# XXX: clean this up
parse_to_table = table_from_markdown


@runtime_type_check
def table_from_parquet(
path: str | PathLike,
Expand All @@ -205,10 +249,6 @@ def table_to_parquet(table: Table, filename: str | PathLike):
return df.to_parquet(filename)


# XXX: clean this up
table_from_markdown = parse_to_table


class _EmptyConnectorSubject(ConnectorSubject):
def run(self):
pass
Expand Down Expand Up @@ -352,7 +392,7 @@ def table_from_pandas(
"""
if schema is None:
schema = schema_from_pandas(
df, exclude_columns=["_time", "_diff", "_worker"]
df, exclude_columns={"_time", "_diff", "_worker"}
)
schema, api_schema = read_schema(schema=schema)
value_fields: list[api.ValueField] = api_schema["value_fields"]
Expand Down
23 changes: 22 additions & 1 deletion python/pathway/engine.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,18 @@ class LegacyTable:
class Table:
"""Table with tuples containing values from multiple columns."""

class InputRow:
"""Row of data for static_table"""

def __init__(
self,
key: Pointer,
value: list[Value],
time: int = 0,
diff: int = 1,
shard: int | None = None,
) -> None: ...

class MissingValueError(BaseException):
"Marker class to indicate missing attributes"

Expand Down Expand Up @@ -374,7 +386,7 @@ class Scope:
def static_table(
self,
universe: Universe,
rows: Iterable[tuple[Pointer, list[Value]]],
rows: Iterable[InputRow],
dt: DType,
) -> Table: ...
def map_column(
Expand Down Expand Up @@ -617,6 +629,15 @@ class SnapshotAccess(Enum):
REPLAY: SnapshotAccess
FULL: SnapshotAccess

class DataEventType(Enum):
INSERT: DataEventType
DELETE: DataEventType
UPSERT: DataEventType

class SessionType(Enum):
NATIVE: SessionType
UPSERT: SessionType

class SnapshotEvent:
@staticmethod
def insert(key: Pointer, values: list[Value]) -> SnapshotEvent: ...
Expand Down
40 changes: 27 additions & 13 deletions python/pathway/internals/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ def ids_from_pandas(
return {k: ref_scalar(*args) for (k, *args) in df[id_from].itertuples()}


TIME_PSEUDOCOLUMN = "__time__"
DIFF_PSEUDOCOLUMN = "__diff__"
SHARD_PSEUDOCOLUMN = "__shard__"
PANDAS_PSEUDOCOLUMNS = {TIME_PSEUDOCOLUMN, DIFF_PSEUDOCOLUMN, SHARD_PSEUDOCOLUMN}


def static_table_from_pandas(
scope,
df: pd.DataFrame,
Expand All @@ -79,17 +85,19 @@ def static_table_from_pandas(
) -> Table:
ids = ids_from_pandas(df, connector_properties, id_from)

all_data: list[tuple[Pointer, list[Value]]] = [(key, []) for key in ids.values()]

data = {}
for c in df.columns:
data[c] = {ids[k]: denumpify(v) for k, v in df[c].items()}
data[c] = [denumpify(v) for _, v in df[c].items()]
# df[c].items() is used because df[c].values is a numpy array
ordinary_columns = [
column for column in df.columns if column not in PANDAS_PSEUDOCOLUMNS
]

if connector_properties is None:
column_properties = []
for c in df.columns:
for c in ordinary_columns:
dtype: type = int
for v in data[c].values():
for v in data[c]:
if v is not None:
dtype = type(v)
break
Expand All @@ -99,13 +107,19 @@ def static_table_from_pandas(
connector_properties = ConnectorProperties(column_properties=column_properties)

assert len(connector_properties.column_properties) == len(
df.columns
ordinary_columns
), "prrovided connector properties do not match the dataframe"

for c in df.columns:
for (key, values), (column_key, value) in zip(
all_data, data[c].items(), strict=True
):
assert key == column_key
values.append(value)
return scope.static_table(all_data, connector_properties)
input_data: list[InputRow] = []
for i, index in enumerate(df.index):
key = ids[index]
values = [data[c][i] for c in ordinary_columns]
time = data[TIME_PSEUDOCOLUMN][i] if TIME_PSEUDOCOLUMN in data else 0
diff = data[DIFF_PSEUDOCOLUMN][i] if DIFF_PSEUDOCOLUMN in data else 1
if diff not in [-1, 1]:
raise ValueError(f"Column {DIFF_PSEUDOCOLUMN} can only contain 1 and -1.")
shard = data[SHARD_PSEUDOCOLUMN][i] if SHARD_PSEUDOCOLUMN in data else None
input_row = InputRow(key, values, time=time, diff=diff, shard=shard)
input_data.append(input_row)

return scope.static_table(input_data, connector_properties)
2 changes: 1 addition & 1 deletion python/pathway/internals/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def schema_from_pandas(
*,
id_from: list[str] | None = None,
name: str | None = None,
exclude_columns: list[str] = [],
exclude_columns: set[str] = set(),
) -> type[Schema]:
if name is None:
name = "schema_from_pandas(" + str(dframe.columns) + ")"
Expand Down
2 changes: 2 additions & 0 deletions python/pathway/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
debezium,
elasticsearch,
fs,
gdrive,
http,
jsonlines,
kafka,
Expand Down Expand Up @@ -42,4 +43,5 @@
"subscribe",
"s3",
"s3_csv",
"gdrive",
]
Loading

0 comments on commit 59e8dae

Please sign in to comment.