Replies: 3 comments 10 replies
-
To add a little more color. This The
Now that I think about it, maybe the expression compilation logic is in Rust (to avoid the need to serialize DataFusion expressions into Python), but the implementations of the various query methods are written in Python (to make the implementation a bit less awkward). |
Beta Was this translation helpful? Give feedback.
-
Making some progress here. In #363 I'm working on a adding a new The expressions passed in are generated from the DataFusion protobuf specifications. See It's missing tests, but I probably won't bother with that until we have an implementation to test. I'll get this released over the next couple of days, and from there development of an Ibis implementation could happen in pure Python outside of VegaFusion (though I'd be interested in integrating it once we have it working). |
Beta Was this translation helpful? Give feedback.
-
Ok, I just released VegaFusion 1.4.0rc3. Here's a starting point for an import vegafusion as vf
from typing import List, Optional, Literal, Union
from vegafusion.dataset import DataFrameDataset, DataFrameOperationNotSupportedError
from vegafusion.proto.datafusion_pb2 import LogicalExprNode
import pyarrow as pa
import ibis
class IbisDataset(DataFrameDataset):
_exprs = []
def __init__(self, ibis_tbl: ibis.expr.types.relations.Table):
self.tbl = ibis_tbl
def schema(self) -> pa.Schema:
"""DataFrame's pyarrow schema"""
return self.tbl.schema().to_pyarrow()
def __dataframe__(self, **kwargs):
# With ibis dev, pass kwargs through
return self.tbl.__dataframe__()
def collect(self) -> pa.Table:
"""Return DataFrame's value as a pyarrow Table"""
return ibis_tbl.to_pyarrow()
def select(
self,
exprs: List[LogicalExprNode]
) -> DataFrameDataset:
"""
Select columns from Dataset. Selection expressions may include column names,
column expressions, or window expressions
:param exprs: Selection expressions
:return: DataFrameDataset
"""
print(f"select -\n {exprs}")
raise DataFrameOperationNotSupportedError()
def sort(
self,
exprs: List[LogicalExprNode],
limit: Optional[int]
) -> DataFrameDataset:
"""
Sort and optionally limit dataset
:param exprs: Sort expressions
:param limit: Max number of rows to return
:return: DataFrameDataset
"""
print(f"sort -\n {exprs}\n {limit}")
raise DataFrameOperationNotSupportedError()
def aggregate(
self,
group_exprs: List[LogicalExprNode],
agg_exprs: List[LogicalExprNode]
) -> DataFrameDataset:
"""
Perform dataset aggregation. Resulting dataset includes grouping
columns and aggregate expressions
:param group_exprs: Expressions to group by
:param agg_exprs: Aggregate expressions
:return: DataFrameDataset
"""
print(f"aggregate -\n. {group_exprs}\n. {agg_exprs}")
raise DataFrameOperationNotSupportedError()
def joinaggregate(
self,
group_exprs: List[LogicalExprNode],
agg_exprs: List[LogicalExprNode]
) -> "DataFrameDataset":
"""
Perform joinaggregate dataset operation.
See: https://vega.github.io/vega/docs/transforms/joinaggregate/
:param group_exprs: Expressions to group by
:param agg_exprs: Aggregate expressions
:return: DataFrameDataset
"""
print(f"joinaggregate -\n. {group_exprs}\n. {agg_exprs}")
raise DataFrameOperationNotSupportedError()
def filter(self, predicate: LogicalExprNode) -> "DataFrameDataset":
"""
Filter dataset by predicate expression
:param predicate: Predicate expression
:return: DataFrameDataset
"""
print(f"filter -\n. {predicate}")
raise DataFrameOperationNotSupportedError()
def limit(self, limit: int) -> "DataFrameDataset":
"""
Limit dataset to max number of rows
:param limit: Max number of rows
:return: DataFrameDataset
"""
print(f"limit -\n. {limit}")
raise DataFrameOperationNotSupportedError()
def fold(
self,
fields: List[str],
value_col: str,
key_col: str,
order_field: Optional[str],
):
"""
See: https://vega.github.io/vega/docs/transforms/fold/
:param fields: List of fields to fold
:param value_col: Name of output value column
:param key_col: Name of output key column
:param order_field: Name of input ordering column or
None if input ordering is not defined
:return: DataFrameDataset
"""
print(f"fold -n\ {fields}\n {value_col}\n {key_col}\n {order_field}")
raise DataFrameOperationNotSupportedError()
def stack(
self,
field: str,
orderby: List[LogicalExprNode],
groupby: List[str],
start_field: str,
stop_field: str,
mode: Literal["zero", "center", "normalize"],
) -> "DataFrameDataset":
"""
Computes a layout of stacking groups of values
See: https://vega.github.io/vega/docs/transforms/stack/
:param field: Column that determines stack height
:param orderby: Criteria for sorting values within each stack
:param groupby: List of columns by which to partition data into separate stacks
:param start_field: Name of output stack start column
:param stop_field: Name of output stack stop column
:param mode: Stack mode. One of: "zero", "center", "normalize"
:return:
"""
print(f"stack -\n {field}\n {orderby}\n {groupby}\n {start_field}\n {stop_field}\n {mode}")
raise DataFrameOperationNotSupportedError()
def impute(
self,
field: str,
value: Union[str, int, float],
key: str,
groupby: List[str],
order_field: Optional[str],
) -> "DataFrameDataset":
"""
Performs imputation of missing data objects.
See: https://vega.github.io/vega/docs/transforms/impute/
:param field: Column for which missing values should be imputed
:param value: Value to impute with
:param key: Key column that uniquely identifies data objects within a group.
Missing key values (those occurring in the data but not in the current group)
will be imputed.
:param groupby: Optional list of columns to group by
:param order_field: Optional input ordering field. If not provided, input is
assumed to have arbitrary ordering
:return:
"""
print(f"impute -\n {field}\n {value}\n {key}\n {groupby}\n {order_field}")
raise DataFrameOperationNotSupportedError() The The implementation above doesn't take advantage of Ibis, but it's a totally valid implementation as far as VegaFusion is concerned. The expressions passed to these query methods are Using the latest Altair dev build, it's possible to use this Here's an example based on https://altair-viz.github.io/gallery/simple_histogram.html. import altair as alt
import pandas as pd
# Load movies dataset using pandas
movies_df = pd.read_json('https://cdn.jsdelivr.net/npm/[email protected]/data/movies.json')
movies_df["Title"] = movies_df["Title"].astype(str)
# Create an Ibis memtable
ibis_tbl = ibis.memtable(movies_df)
# Wrap Ibis memtable in IbisDataset
ibis_dataset = IbisDataset(ibis_tbl)
# Enable the VegaFusion data transformer!
alt.data_transformers.enable("vegafusion")
# Pass the IbisDataset instance to the altair Chart constructor
# (as if it were a pandas DataFrame)
chart = alt.Chart(ibis_dataset).mark_bar().encode(
alt.X("IMDB_Rating:Q", bin=True),
y='count()',
)
# Display the chart
chart Due to the print statements in the implementation, this will be printed out:
The first thing VegaFusion is trying to do here is add a ROW_NUMBER column named
Since the Implementation processIt should be possible to build out the Ibis functionality incrementally. Start with You can follow the pattern above to play with any Altair example from the Altair gallery. Just keep in mind that column type inference for objects implementing the DataFrame Interchange Protocol isn't on master yet. So you may need to add Adding IbisDataset to VegaFusionIt doesn't have to, but I'd be happy to have Another advantage is that we'll be able to test it against other backends more easily, and I'll know if I make changes to the API that break things. I can't commit to this interface being stable yet (that's probably a VegaFusion 2.0 milestone), but if we add it to the VegaFusion repo and get tests working then I can keep it working even if the interface needs to adapt. I'm really excited for this integration! Let me know how I can support you all |
Beta Was this translation helpful? Give feedback.
-
Background
At SciPy 2023, I had a nice chat with @jcrist about the potential of dispatching VegaFusion data transformations to Ibis, which would then dispatch transformations to the suite of SQL backends supported by Ibis.
Benefit
The benefit for end users would be the ability to pass Ibis tables to Altair
Chart
objects and have the Altair/Vega data transformations (e.g. histogram binning and aggregation) automatically pushed down to the Ibis backend (e.g. postgres) during chart display.Approach
One approach we talked about would be to add an
IbisDataFrame
Rust struct to thevegafusion-python-embed
crate. An instance of this struct would wrap a Python Ibis table as a PyO3 PyObect. The trait method implementations would use PyO3 to call Ibis data transformations methods on the wrapped Ibis table.We would need to write conversion functions from DataFusion expressions to Ibis expressions. I'm currently picturing that this logic would be written in Rust, and use PyO3 to call Python methods on the wrapped Ibis tables.
An alternative to writing this logic in Rust would be to come up with a declarative representation of DataFusion expressions that could be serialized to Python. Then the translation of these declarative expressions to Ibis expressions could be implemented in Python. One candidate for this declarative representation would be substrait. Ibis has early support for ingesting Substrait plans, but it's not well supported and is going to be removed from Ibis. So at the moment, writing the translation logic in Rust + PyO3 seems more straightforward than defining a new declarative representation and converting this representation to Ibis in Python.
Beta Was this translation helpful? Give feedback.
All reactions