Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Streaming Catalog Writes #2966

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ daft-local-execution = {path = "src/daft-local-execution", default-features = fa
daft-micropartition = {path = "src/daft-micropartition", default-features = false}
daft-minhash = {path = "src/daft-minhash", default-features = false}
daft-parquet = {path = "src/daft-parquet", default-features = false}
daft-physical-plan = {path = "src/daft-physical-plan", default-features = false}
daft-plan = {path = "src/daft-plan", default-features = false}
daft-scan = {path = "src/daft-scan", default-features = false}
daft-scheduler = {path = "src/daft-scheduler", default-features = false}
Expand Down Expand Up @@ -47,6 +48,7 @@ python = [
"daft-json/python",
"daft-micropartition/python",
"daft-parquet/python",
"daft-physical-plan/python",
"daft-plan/python",
"daft-scan/python",
"daft-scheduler/python",
Expand Down
2 changes: 2 additions & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1716,7 +1716,9 @@ class LogicalPlanBuilder:
table_name: str,
table_location: str,
partition_spec: IcebergPartitionSpec,
partition_cols: list[PyExpr],
iceberg_schema: IcebergSchema,
daft_iceberg_schema: PySchema,
iceberg_properties: IcebergTableProperties,
catalog_columns: list[str],
io_config: IOConfig | None = None,
Expand Down
347 changes: 347 additions & 0 deletions daft/io/writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,347 @@
import uuid
from typing import TYPE_CHECKING, Dict, List, Optional, Union

Check warning on line 2 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L1-L2

Added lines #L1 - L2 were not covered by tests

from daft.context import get_context
from daft.daft import IOConfig, PyTable
from daft.dependencies import pa, pacsv, pq
from daft.filesystem import (

Check warning on line 7 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L4-L7

Added lines #L4 - L7 were not covered by tests
_resolve_paths_and_filesystem,
canonicalize_protocol,
get_protocol_from_path,
)
from daft.iceberg.iceberg_write import (

Check warning on line 12 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L12

Added line #L12 was not covered by tests
coerce_pyarrow_table_to_schema,
to_partition_representation,
)
from daft.series import Series
from daft.table.micropartition import MicroPartition
from daft.table.table import Table

Check warning on line 18 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L16-L18

Added lines #L16 - L18 were not covered by tests

if TYPE_CHECKING:
from pyiceberg.partitioning import PartitionSpec as IcebergPartitionSpec
from pyiceberg.schema import Schema as IcebergSchema
from pyiceberg.table import TableProperties as IcebergTableProperties

Check warning on line 23 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L20-L23

Added lines #L20 - L23 were not covered by tests


def partition_values_to_str_mapping(

Check warning on line 26 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L26

Added line #L26 was not covered by tests
partition_values: Table,
) -> Dict[str, str]:
null_part = Series.from_pylist([None])
pkey_names = partition_values.column_names()

Check warning on line 30 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L29-L30

Added lines #L29 - L30 were not covered by tests

partition_strings = {}

Check warning on line 32 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L32

Added line #L32 was not covered by tests

for c in pkey_names:
column = partition_values.get_column(c)
string_names = column._to_str_values()
null_filled = column.is_null().if_else(null_part, string_names)
partition_strings[c] = null_filled.to_pylist()[0]

Check warning on line 38 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L34-L38

Added lines #L34 - L38 were not covered by tests

return partition_strings

Check warning on line 40 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L40

Added line #L40 was not covered by tests


def partition_string_mapping_to_postfix(

Check warning on line 43 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L43

Added line #L43 was not covered by tests
partition_strings: Dict[str, str],
default_partition_fallback: str,
) -> str:
postfix = "/".join(

Check warning on line 47 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L47

Added line #L47 was not covered by tests
f"{k}={v if v is not None else default_partition_fallback}" for k, v in partition_strings.items()
)
return postfix

Check warning on line 50 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L50

Added line #L50 was not covered by tests


class FileWriterBase:
def __init__(

Check warning on line 54 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L53-L54

Added lines #L53 - L54 were not covered by tests
self,
root_dir: str,
file_idx: int,
file_format: str,
partition_values: Optional[PyTable] = None,
compression: Optional[str] = None,
io_config: Optional[IOConfig] = None,
default_partition_fallback: str = "__HIVE_DEFAULT_PARTITION__",
):
[self.resolved_path], self.fs = _resolve_paths_and_filesystem(root_dir, io_config=io_config)
protocol = get_protocol_from_path(root_dir)
canonicalized_protocol = canonicalize_protocol(protocol)
is_local_fs = canonicalized_protocol == "file"

Check warning on line 67 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L64-L67

Added lines #L64 - L67 were not covered by tests

self.file_name = f"{uuid.uuid4()}-{file_idx}.{file_format}"
self.partition_values = Table._from_pytable(partition_values) if partition_values is not None else None
if self.partition_values is not None:
partition_strings = partition_values_to_str_mapping(self.partition_values)
postfix = partition_string_mapping_to_postfix(partition_strings, default_partition_fallback)
self.dir_path = f"{self.resolved_path}/{postfix}"

Check warning on line 74 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L69-L74

Added lines #L69 - L74 were not covered by tests
else:
self.dir_path = f"{self.resolved_path}"

Check warning on line 76 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L76

Added line #L76 was not covered by tests

self.full_path = f"{self.dir_path}/{self.file_name}"
if is_local_fs:
self.fs.create_dir(self.dir_path, recursive=True)

Check warning on line 80 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L78-L80

Added lines #L78 - L80 were not covered by tests

self.compression = compression if compression is not None else "none"
self.current_writer: Optional[Union[pq.ParquetWriter, pacsv.CSVWriter]] = None

Check warning on line 83 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L82-L83

Added lines #L82 - L83 were not covered by tests

def _create_writer(self, schema: pa.Schema):
raise NotImplementedError("Subclasses must implement this method.")

Check warning on line 86 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L85-L86

Added lines #L85 - L86 were not covered by tests

def write(self, table: MicroPartition):
if self.current_writer is None:
self.current_writer = self._create_writer(table.schema().to_pyarrow_schema())
self.current_writer.write_table(table.to_arrow())

Check warning on line 91 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L88-L91

Added lines #L88 - L91 were not covered by tests

def close(self) -> PyTable:
if self.current_writer is not None:
self.current_writer.close()

Check warning on line 95 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L93-L95

Added lines #L93 - L95 were not covered by tests

metadata = {"path": Series.from_pylist([self.full_path])}
if self.partition_values is not None:
for col_name in self.partition_values.column_names():
metadata[col_name] = self.partition_values.get_column(col_name)
return Table.from_pydict(metadata)._table

Check warning on line 101 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L97-L101

Added lines #L97 - L101 were not covered by tests


class ParquetFileWriter(FileWriterBase):
def __init__(

Check warning on line 105 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L104-L105

Added lines #L104 - L105 were not covered by tests
self,
root_dir: str,
file_idx: int,
partition_values: Optional[PyTable] = None,
compression: str = "none",
io_config: Optional[IOConfig] = None,
):
super().__init__(root_dir, file_idx, "parquet", partition_values, compression, io_config)

Check warning on line 113 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L113

Added line #L113 was not covered by tests

def _create_writer(self, schema: pa.Schema) -> pq.ParquetWriter:
return pq.ParquetWriter(

Check warning on line 116 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L115-L116

Added lines #L115 - L116 were not covered by tests
self.full_path,
schema,
compression=self.compression,
use_compliant_nested_type=False,
filesystem=self.fs,
)


class CSVFileWriter(FileWriterBase):
def __init__(

Check warning on line 126 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L125-L126

Added lines #L125 - L126 were not covered by tests
self,
root_dir: str,
file_idx: int,
partition_values: Optional[PyTable] = None,
io_config: Optional[IOConfig] = None,
):
super().__init__(

Check warning on line 133 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L133

Added line #L133 was not covered by tests
root_dir,
file_idx,
"csv",
partition_values=partition_values,
io_config=io_config,
)

def _create_writer(self, schema: pa.Schema) -> pacsv.CSVWriter:
return pacsv.CSVWriter(

Check warning on line 142 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L141-L142

Added lines #L141 - L142 were not covered by tests
self.full_path,
schema,
)


class IcebergFileWriter(FileWriterBase):
def __init__(

Check warning on line 149 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L148-L149

Added lines #L148 - L149 were not covered by tests
self,
root_dir: str,
file_idx: int,
schema: "IcebergSchema",
properties: "IcebergTableProperties",
partition_spec: "IcebergPartitionSpec",
partition_values: Optional[PyTable] = None,
compression: str = "zstd",
io_config: Optional[IOConfig] = None,
):
from pyiceberg.io.pyarrow import schema_to_pyarrow
from pyiceberg.typedef import Record as IcebergRecord

Check warning on line 161 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L160-L161

Added lines #L160 - L161 were not covered by tests

super().__init__(root_dir, file_idx, "parquet", partition_values, compression, io_config, "null")

Check warning on line 163 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L163

Added line #L163 was not covered by tests

if partition_values is None:
self.part_record = IcebergRecord()

Check warning on line 166 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L165-L166

Added lines #L165 - L166 were not covered by tests
else:
part_vals = Table._from_pytable(partition_values).to_pylist()[0]
iceberg_part_vals = {k: to_partition_representation(v) for k, v in part_vals.items()}
self.part_record = IcebergRecord(**iceberg_part_vals)
self.iceberg_schema = schema
self.file_schema = schema_to_pyarrow(schema)
self.metadata_collector: List[pq.FileMetaData] = []
self.partition_spec = partition_spec
self.properties = properties

Check warning on line 175 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L168-L175

Added lines #L168 - L175 were not covered by tests

def _create_writer(self, schema: pa.Schema) -> pq.ParquetWriter:
return pq.ParquetWriter(

Check warning on line 178 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L177-L178

Added lines #L177 - L178 were not covered by tests
self.full_path,
schema,
compression=self.compression,
use_compliant_nested_type=False,
filesystem=self.fs,
metadata_collector=self.metadata_collector,
)

def write(self, table: MicroPartition):
if self.current_writer is None:
self.current_writer = self._create_writer(self.file_schema)
casted = coerce_pyarrow_table_to_schema(table.to_arrow(), self.file_schema)
self.current_writer.write_table(casted)

Check warning on line 191 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L187-L191

Added lines #L187 - L191 were not covered by tests

def close(self) -> PyTable:
import pyiceberg
from packaging.version import parse
from pyiceberg.io.pyarrow import (

Check warning on line 196 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L193-L196

Added lines #L193 - L196 were not covered by tests
compute_statistics_plan,
parquet_path_to_id_mapping,
)
from pyiceberg.manifest import DataFile, DataFileContent
from pyiceberg.manifest import FileFormat as IcebergFileFormat

Check warning on line 201 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L200-L201

Added lines #L200 - L201 were not covered by tests

super().close()
metadata = self.metadata_collector[0]
size = self.fs.get_file_info(self.full_path).size
kwargs = {

Check warning on line 206 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L203-L206

Added lines #L203 - L206 were not covered by tests
"content": DataFileContent.DATA,
"file_path": self.full_path,
"file_format": IcebergFileFormat.PARQUET,
"partition": self.part_record,
"file_size_in_bytes": size,
# After this has been fixed:
# https://github.com/apache/iceberg-python/issues/271
# "sort_order_id": task.sort_order_id,
"sort_order_id": None,
# Just copy these from the table for now
"spec_id": self.partition_spec.spec_id,
"equality_ids": None,
"key_metadata": None,
}

if parse(pyiceberg.__version__) >= parse("0.7.0"):
from pyiceberg.io.pyarrow import data_file_statistics_from_parquet_metadata

Check warning on line 223 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L222-L223

Added lines #L222 - L223 were not covered by tests

statistics = data_file_statistics_from_parquet_metadata(

Check warning on line 225 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L225

Added line #L225 was not covered by tests
parquet_metadata=metadata,
stats_columns=compute_statistics_plan(self.iceberg_schema, self.properties),
parquet_column_mapping=parquet_path_to_id_mapping(self.iceberg_schema),
)
data_file = DataFile(

Check warning on line 230 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L230

Added line #L230 was not covered by tests
**{
**kwargs,
**statistics.to_serialized_dict(),
}
)
else:
from pyiceberg.io.pyarrow import fill_parquet_file_metadata

Check warning on line 237 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L237

Added line #L237 was not covered by tests

data_file = DataFile(**kwargs)

Check warning on line 239 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L239

Added line #L239 was not covered by tests

fill_parquet_file_metadata(

Check warning on line 241 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L241

Added line #L241 was not covered by tests
data_file=data_file,
parquet_metadata=metadata,
stats_columns=compute_statistics_plan(self.iceberg_schema, self.properties),
parquet_column_mapping=parquet_path_to_id_mapping(self.iceberg_schema),
)
return Table.from_pydict({"data_file": [data_file]})._table

Check warning on line 247 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L247

Added line #L247 was not covered by tests


class DeltalakeFileWriter(FileWriterBase):
def __init__(

Check warning on line 251 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L250-L251

Added lines #L250 - L251 were not covered by tests
self,
root_dir: str,
file_idx: int,
version: int,
large_dtypes: bool,
partition_values: Optional[PyTable] = None,
io_config: Optional[IOConfig] = None,
):
from deltalake.writer import DeltaStorageHandler
from pyarrow.fs import PyFileSystem

Check warning on line 261 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L260-L261

Added lines #L260 - L261 were not covered by tests

from daft.io.object_store_options import io_config_to_storage_options

Check warning on line 263 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L263

Added line #L263 was not covered by tests

io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config
storage_options = io_config_to_storage_options(io_config, root_dir)
self.fs = PyFileSystem(DeltaStorageHandler(root_dir, storage_options))

Check warning on line 267 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L265-L267

Added lines #L265 - L267 were not covered by tests

protocol = get_protocol_from_path(root_dir)
canonicalized_protocol = canonicalize_protocol(protocol)
is_local_fs = canonicalized_protocol == "file"
if is_local_fs:
self.fs.create_dir(root_dir, recursive=True)

Check warning on line 273 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L269-L273

Added lines #L269 - L273 were not covered by tests

self.file_name = f"{version}-{uuid.uuid4()}-{file_idx}.parquet"
if partition_values is not None:
self.partition_values = Table._from_pytable(partition_values)
self.partition_str_mapping = partition_values_to_str_mapping(self.partition_values)
postfix = partition_string_mapping_to_postfix(self.partition_str_mapping, "__HIVE_DEFAULT_PARTITION__")
self.full_path = f"{postfix}/{self.file_name}"

Check warning on line 280 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L275-L280

Added lines #L275 - L280 were not covered by tests
else:
self.partition_values = None
self.partition_str_mapping = {}
self.full_path = self.file_name

Check warning on line 284 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L282-L284

Added lines #L282 - L284 were not covered by tests

self.current_writer: Optional[pq.ParquetWriter] = None
self.large_dtypes = large_dtypes
self.metadata_collector: List[pq.FileMetaData] = []

Check warning on line 288 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L286-L288

Added lines #L286 - L288 were not covered by tests

def _create_writer(self, schema: pa.Schema) -> pq.ParquetWriter:
return pq.ParquetWriter(

Check warning on line 291 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L290-L291

Added lines #L290 - L291 were not covered by tests
self.full_path,
schema,
use_compliant_nested_type=False,
filesystem=self.fs,
metadata_collector=self.metadata_collector,
)

def write(self, table: MicroPartition):
from deltalake.schema import _convert_pa_schema_to_delta

Check warning on line 300 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L299-L300

Added lines #L299 - L300 were not covered by tests

from daft.io._deltalake import large_dtypes_kwargs

Check warning on line 302 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L302

Added line #L302 was not covered by tests

arrow_table = table.to_arrow()
if self.partition_values is not None:
partition_keys = self.partition_values.column_names()
arrow_table = arrow_table.drop_columns(partition_keys)

Check warning on line 307 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L304-L307

Added lines #L304 - L307 were not covered by tests

converted_schema = _convert_pa_schema_to_delta(arrow_table.schema, **large_dtypes_kwargs(self.large_dtypes))
converted_arrow_table = arrow_table.cast(converted_schema)
if self.current_writer is None:
self.current_writer = self._create_writer(converted_arrow_table.schema)
self.current_writer.write_table(converted_arrow_table)

Check warning on line 313 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L309-L313

Added lines #L309 - L313 were not covered by tests

def close(self) -> PyTable:
import json
from datetime import datetime

Check warning on line 317 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L315-L317

Added lines #L315 - L317 were not covered by tests

import deltalake
from deltalake.writer import (

Check warning on line 320 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L319-L320

Added lines #L319 - L320 were not covered by tests
AddAction,
DeltaJSONEncoder,
get_file_stats_from_metadata,
)
from packaging.version import parse

Check warning on line 325 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L325

Added line #L325 was not covered by tests

# added to get_file_stats_from_metadata in deltalake v0.17.4: non-optional "num_indexed_cols" and "columns_to_collect_stats" arguments
# https://github.com/delta-io/delta-rs/blob/353e08be0202c45334dcdceee65a8679f35de710/python/deltalake/writer.py#L725
if parse(deltalake.__version__) < parse("0.17.4"):
file_stats_args = {}

Check warning on line 330 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L329-L330

Added lines #L329 - L330 were not covered by tests
else:
file_stats_args = {"num_indexed_cols": -1, "columns_to_collect_stats": None}

Check warning on line 332 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L332

Added line #L332 was not covered by tests

super().close()
metadata = self.metadata_collector[0]
stats = get_file_stats_from_metadata(metadata, **file_stats_args)
size = self.fs.get_file_info(self.full_path).size
add_action = AddAction(

Check warning on line 338 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L334-L338

Added lines #L334 - L338 were not covered by tests
self.full_path,
size,
self.partition_str_mapping,
int(datetime.now().timestamp() * 1000),
True,
json.dumps(stats, cls=DeltaJSONEncoder),
)

return Table.from_pydict({"add_action": [add_action]})._table

Check warning on line 347 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L347

Added line #L347 was not covered by tests
Loading
Loading