Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

All python repo methods are sync now #218

Merged
merged 5 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 17 additions & 28 deletions icechunk-python/examples/dask_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
"""

import argparse
import asyncio
from dataclasses import dataclass
from typing import Any, cast
from urllib.parse import urlparse
Expand All @@ -58,7 +57,7 @@ def generate_task_array(task: Task, shape: tuple[int,...]) -> np.typing.ArrayLik
return np.random.rand(*shape)


async def execute_write_task(task: Task) -> icechunk.IcechunkStore:
def execute_write_task(task: Task) -> icechunk.IcechunkStore:
"""Execute task as a write task.

This will read the time coordinade from `task` and write a "pancake" in that position,
Expand All @@ -82,7 +81,7 @@ async def execute_write_task(task: Task) -> icechunk.IcechunkStore:
return store


async def execute_read_task(task: Task) -> None:
def execute_read_task(task: Task) -> None:
"""Execute task as a read task.

This will read the time coordinade from `task` and read a "pancake" in that position.
Expand All @@ -101,16 +100,6 @@ async def execute_read_task(task: Task) -> None:
dprint(f"t={task.time} verified")


def run_write_task(task: Task) -> icechunk.IcechunkStore:
"""Sync helper for write tasks"""
return asyncio.run(execute_write_task(task))


def run_read_task(task: Task) -> None:
"""Sync helper for read tasks"""
return asyncio.run(execute_read_task(task))


def storage_config(args: argparse.Namespace) -> dict[str, Any]:
"""Return the Icechunk store S3 configuration map"""
bucket = args.url.netloc
Expand All @@ -129,15 +118,15 @@ def store_config(args: argparse.Namespace) -> dict[str, Any]:
return {"inline_chunk_threshold_bytes": 1}


async def create(args: argparse.Namespace) -> None:
def create(args: argparse.Namespace) -> None:
"""Execute the create subcommand.

Creates an Icechunk store, a root group and an array named "array"
with the shape passed as arguments.

Commits the Icechunk repository when done.
"""
store = await icechunk.IcechunkStore.open(
store = icechunk.IcechunkStore.open(
storage=icechunk.StorageConfig.s3_from_env(**storage_config(args)),
mode="w",
config=icechunk.StoreConfig(**store_config(args)),
Expand All @@ -158,11 +147,11 @@ async def create(args: argparse.Namespace) -> None:
dtype="f8",
fill_value=float("nan"),
)
_first_snapshot = await store.commit("array created")
_first_snapshot = store.commit("array created")
print("Array initialized")


async def update(args: argparse.Namespace) -> None:
def update(args: argparse.Namespace) -> None:
"""Execute the update subcommand.

Uses Dask to write chunks to the Icechunk repository. Currently Icechunk cannot
Expand All @@ -177,7 +166,7 @@ async def update(args: argparse.Namespace) -> None:
storage_conf = storage_config(args)
store_conf = store_config(args)

store = await icechunk.IcechunkStore.open(
store = icechunk.IcechunkStore.open(
storage=icechunk.StorageConfig.s3_from_env(**storage_conf),
mode="r+",
config=icechunk.StoreConfig(**store_conf),
Expand All @@ -198,19 +187,19 @@ async def update(args: argparse.Namespace) -> None:

client = Client(n_workers=args.workers, threads_per_worker=1)

map_result = client.map(run_write_task, tasks)
map_result = client.map(execute_write_task, tasks)
worker_stores = client.gather(map_result)

print("Starting distributed commit")
# we can use the current store as the commit coordinator, because it doesn't have any pending changes,
# all changes come from the tasks, Icechunk doesn't care about where the changes come from, the only
# important thing is to not count changes twice
commit_res = await store.distributed_commit("distributed commit", [ws.change_set_bytes() for ws in worker_stores])
commit_res = store.distributed_commit("distributed commit", [ws.change_set_bytes() for ws in worker_stores])
assert commit_res
print("Distributed commit done")


async def verify(args: argparse.Namespace) -> None:
def verify(args: argparse.Namespace) -> None:
"""Execute the verify subcommand.

Uses Dask to read and verify chunks from the Icechunk repository. Currently Icechunk cannot
Expand All @@ -223,7 +212,7 @@ async def verify(args: argparse.Namespace) -> None:
storage_conf = storage_config(args)
store_conf = store_config(args)

store = await icechunk.IcechunkStore.open(
store = icechunk.IcechunkStore.open(
storage=icechunk.StorageConfig.s3_from_env(**storage_conf),
mode="r",
config=icechunk.StoreConfig(**store_conf),
Expand All @@ -244,12 +233,12 @@ async def verify(args: argparse.Namespace) -> None:

client = Client(n_workers=args.workers, threads_per_worker=1)

map_result = client.map(run_read_task, tasks)
map_result = client.map(execute_read_task, tasks)
client.gather(map_result)
print("done, all good")


async def main() -> None:
def main() -> None:
"""Main entry point for the script.

Parses arguments and delegates to a subcommand.
Expand Down Expand Up @@ -328,12 +317,12 @@ async def main() -> None:

match args.command:
case "create":
await create(args)
create(args)
case "update":
await update(args)
update(args)
case "verify":
await verify(args)
verify(args)


if __name__ == "__main__":
asyncio.run(main())
main()
25 changes: 12 additions & 13 deletions icechunk-python/examples/smoke-test.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async def run(store: Store) -> None:

first_commit = None
if isinstance(store, IcechunkStore):
first_commit = await store.commit("initial commit")
first_commit = store.commit("initial commit")

expected = {}
expected["root-foo"] = create_array(
Expand All @@ -79,32 +79,32 @@ async def run(store: Store) -> None:
group["root-foo"].attrs["update"] = "new attr"

if isinstance(store, IcechunkStore):
_second_commit = await store.commit("added array, updated attr")
_second_commit = store.commit("added array, updated attr")

assert len(group["root-foo"].attrs) == 2
assert len(group.members()) == 1

if isinstance(store, IcechunkStore) and first_commit is not None:
await store.checkout(first_commit)
store.checkout(first_commit)
group.attrs["update"] = "new attr 2"

if isinstance(store, IcechunkStore):
try:
await store.commit("new attr 2")
store.commit("new attr 2")
except ValueError:
pass
else:
raise ValueError("should have conflicted")

await store.reset() # FIXME: WHY
await store.checkout(branch="main")
store.reset()
store.checkout(branch="main")

group["root-foo"].attrs["update"] = "new attr 2"
if isinstance(store, IcechunkStore):
_third_commit = await store.commit("new attr 2")
_third_commit = store.commit("new attr 2")

try:
await store.commit("rewrote array")
store.commit("rewrote array")
except ValueError:
pass
else:
Expand Down Expand Up @@ -134,7 +134,7 @@ async def run(store: Store) -> None:
fill_value=-1234,
)
if isinstance(store, IcechunkStore):
_fourth_commit = await store.commit("added groups and arrays")
_fourth_commit = store.commit("added groups and arrays")

print(f"Write done in {time.time() - write_start} secs")

Expand All @@ -155,8 +155,8 @@ async def run(store: Store) -> None:


async def create_icechunk_store(*, storage: StorageConfig) -> IcechunkStore:
return await IcechunkStore.open(
storage=storage, mode="r+", config=StoreConfig(inline_chunk_threshold_bytes=1)
return IcechunkStore.open(
storage=storage, mode="w", config=StoreConfig(inline_chunk_threshold_bytes=1)
)


Expand All @@ -173,7 +173,6 @@ async def create_zarr_store(*, store: Literal["memory", "local", "s3"]) -> Store
"anon": False,
"key": "minio123",
"secret": "minio123",
"region": "us-east-1",
"endpoint_url": "http://localhost:9000",
},
)
Expand All @@ -199,5 +198,5 @@ async def create_zarr_store(*, store: Literal["memory", "local", "s3"]) -> Store
asyncio.run(run(store))

print("Zarr store")
zarr_store = asyncio.run(create_zarr_store(store="local"))
zarr_store = asyncio.run(create_zarr_store(store="s3"))
asyncio.run(run(zarr_store))
Loading
Loading