Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fill values as BinaryArray #8

Merged
merged 7 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/rust-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest-m]
os: [ubuntu-latest]

steps:
- name: Checkout repository
Expand All @@ -63,7 +63,7 @@ jobs:
run: cargo install --locked cargo-deny

- name: Check
if: matrix.os == 'ubuntu-latest-m' || github.event_name == 'push'
if: matrix.os == 'ubuntu-latest' || github.event_name == 'push'
env:
AWS_ACCESS_KEY_ID: minio123
AWS_SECRET_ACCESS_KEY: minio123
Expand Down
9 changes: 9 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
repos:
- repo: local
hooks:
- id: format-lint
name: just
description: Run the just pre-commit step
entry: just pre-commit
language: system
pass_filenames: false
91 changes: 55 additions & 36 deletions src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ use std::{collections::HashMap, sync::Arc};

use crate::{
AddNodeError, ArrayIndices, AttributesTable, ChunkPayload, Dataset, ManifestRef,
ManifestsTable, NodeData, NodeId, NodeStructure, ObjectId, Path, Storage,
StructureTable, UpdateNodeError, UserAttributes, UserAttributesStructure,
ZarrArrayMetadata,
ManifestsTable, NodeData, NodeId, NodeStructure, ObjectId, Path, Storage, StructureTable,
UpdateNodeError, UserAttributes, UserAttributesStructure, ZarrArrayMetadata,
};

/// FIXME: what do we want to do with implicit groups?
Expand Down Expand Up @@ -63,7 +62,10 @@ impl Dataset {
) -> Result<(), UpdateNodeError> {
match self.get_node(&path).await {
None => Err(UpdateNodeError::NotFound),
Some(NodeStructure { node_data: NodeData::Array(..), .. }) => {
Some(NodeStructure {
node_data: NodeData::Array(..),
..
}) => {
self.updated_arrays.insert(path, metadata);
Ok(())
}
Expand Down Expand Up @@ -97,7 +99,10 @@ impl Dataset {
) -> Result<(), UpdateNodeError> {
match self.get_node(&path).await {
None => Err(UpdateNodeError::NotFound),
Some(NodeStructure { node_data: NodeData::Array(..), .. }) => {
Some(NodeStructure {
node_data: NodeData::Array(..),
..
}) => {
self.set_chunks.insert((path, coord), data);
Ok(())
}
Expand All @@ -109,14 +114,17 @@ impl Dataset {
// FIXME: errors
match self.storage.fetch_structure(&self.structure_id).await.ok() {
None => 0,
Some(structure) => {
structure.iter().max_by_key(|s| s.id).map_or(0, |node| node.id)
}
Some(structure) => structure
.iter()
.max_by_key(|s| s.id)
.map_or(0, |node| node.id),
}
}

async fn reserve_node_id(&mut self) -> NodeId {
let last = self.last_node_id.unwrap_or(self.compute_last_node_id().await);
let last = self
.last_node_id
.unwrap_or(self.compute_last_node_id().await);
let new = last + 1;
self.last_node_id = Some(new);
new
Expand All @@ -126,11 +134,16 @@ impl Dataset {

// FIXME: we should have errros here, not only None
pub async fn get_node(&self, path: &Path) -> Option<NodeStructure> {
self.get_new_node(path).or(self.get_existing_node(path).await)
self.get_new_node(path)
.or(self.get_existing_node(path).await)
}

async fn get_existing_node(&self, path: &Path) -> Option<NodeStructure> {
let structure = self.storage.fetch_structure(&self.structure_id).await.ok()?;
let structure = self
.storage
.fetch_structure(&self.structure_id)
.await
.ok()?;
let session_atts = self
.updated_attributes
.get(path)
Expand Down Expand Up @@ -186,26 +199,23 @@ impl Dataset {
})
}

pub async fn get_chunk(
&self,
path: &Path,
coords: &ArrayIndices,
) -> Option<ChunkPayload> {
pub async fn get_chunk(&self, path: &Path, coords: &ArrayIndices) -> Option<ChunkPayload> {
// FIXME: better error type
let node = self.get_node(path).await?;
match node.node_data {
NodeData::Group => None,
NodeData::Array(_, manifests) => {
// check the chunks modified in this session first
// TODO: I hate rust forces me to clone to search in a hashmap. How to do better?
let session_chunk =
self.set_chunks.get(&(path.clone(), coords.clone())).cloned();
let session_chunk = self
.set_chunks
.get(&(path.clone(), coords.clone()))
.cloned();
// If session_chunk is not None we have to return it, because is the update the
// user made in the current session
// If session_chunk == None, user hasn't modified the chunk in this session and we
// need to fallback to fetching the manifests
session_chunk
.unwrap_or(self.get_old_chunk(manifests.as_slice(), coords).await)
session_chunk.unwrap_or(self.get_old_chunk(manifests.as_slice(), coords).await)
}
}
}
Expand All @@ -217,8 +227,11 @@ impl Dataset {
) -> Option<ChunkPayload> {
// FIXME: use manifest extents
for manifest in manifests {
let manifest_structure =
self.storage.fetch_manifests(&manifest.object_id).await.ok()?;
let manifest_structure = self
.storage
.fetch_manifests(&manifest.object_id)
.await
.ok()?;
if let Some(payload) = manifest_structure
.get_chunk_info(coords, &manifest.location)
.map(|info| info.payload)
Expand All @@ -237,7 +250,11 @@ impl Dataset {
/// Files that are reused from previous commits are not returned because they don't need saving
pub async fn consolidate(
&mut self,
) -> (Arc<StructureTable>, Vec<Arc<AttributesTable>>, Vec<Arc<ManifestsTable>>) {
) -> (
Arc<StructureTable>,
Vec<Arc<AttributesTable>>,
Vec<Arc<ManifestsTable>>,
) {
todo!()
}
}
Expand All @@ -247,10 +264,9 @@ mod tests {
use std::{error::Error, num::NonZeroU64, path::PathBuf};

use crate::{
manifest::mk_manifests_table, storage::InMemoryStorage,
structure::mk_structure_table, ChunkInfo, ChunkKeyEncoding, ChunkRef, ChunkShape,
Codecs, DataType, FillValue, Flags, ManifestExtents, StorageTransformers,
TableRegion,
manifest::mk_manifests_table, storage::InMemoryStorage, structure::mk_structure_table,
ChunkInfo, ChunkKeyEncoding, ChunkRef, ChunkShape, Codecs, DataType, FillValue, Flags,
ManifestExtents, StorageTransformers, TableRegion,
};

use super::*;
Expand Down Expand Up @@ -319,9 +335,7 @@ mod tests {
NodeStructure {
path: array1_path.clone(),
id: array_id,
user_attributes: Some(UserAttributesStructure::Inline(
"{foo:1}".to_string(),
)),
user_attributes: Some(UserAttributesStructure::Inline("{foo:1}".to_string())),
node_data: NodeData::Array(zarr_meta1.clone(), vec![manifest_ref]),
},
];
Expand Down Expand Up @@ -380,9 +394,7 @@ mod tests {
Some(NodeStructure {
path: "/group/array2".into(),
id: 4,
user_attributes: Some(UserAttributesStructure::Inline(
"{n:42}".to_string(),
)),
user_attributes: Some(UserAttributesStructure::Inline("{n:42}".to_string(),)),
node_data: NodeData::Array(zarr_meta2.clone(), vec![]),
})
);
Expand Down Expand Up @@ -410,11 +422,16 @@ mod tests {
let node = ds.get_node(&array1_path).await.unwrap();
assert_eq!(
node.user_attributes,
Some(UserAttributesStructure::Inline("{updated: true}".to_string()))
Some(UserAttributesStructure::Inline(
"{updated: true}".to_string()
))
);

// update old array zarr metadata and check it
let new_zarr_meta1 = ZarrArrayMetadata { shape: vec![2, 2, 3], ..zarr_meta1 };
let new_zarr_meta1 = ZarrArrayMetadata {
shape: vec![2, 2, 3],
..zarr_meta1
};
ds.update_array(array1_path.clone(), new_zarr_meta1)
.await
.map_err(|err| format!("{err:#?}"))?;
Expand All @@ -438,7 +455,9 @@ mod tests {
.await
.map_err(|err| format!("{err:#?}"))?;

let chunk = ds.get_chunk(&array1_path, &ArrayIndices(vec![0, 0, 0])).await;
let chunk = ds
.get_chunk(&array1_path, &ArrayIndices(vec![0, 0, 0]))
.await;
assert_eq!(chunk, Some(ChunkPayload::Inline(vec![0, 0, 0, 99])));

Ok(())
Expand Down
Loading