From d425111fbf98d901bb4a416fd0a414bbeeddd324 Mon Sep 17 00:00:00 2001 From: Hammad Bashir Date: Wed, 13 Mar 2024 16:56:03 -0700 Subject: [PATCH] [ENH] Add arrow blockfile provider (#1847) ## Description of changes *Summarize the changes made by this PR.* - Improvements & Bug fixes - / - New functionality - Adds a provider for ArrowBlockfiles ## Test plan *How are these changes tested?* Not tested for now. A future PR will contains tests. - [x] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes None --- .../blockstore/arrow_blockfile/provider.rs | 56 ++++++++++++++++++- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/rust/worker/src/blockstore/arrow_blockfile/provider.rs b/rust/worker/src/blockstore/arrow_blockfile/provider.rs index 59772bb3eaf..6b985b8e256 100644 --- a/rust/worker/src/blockstore/arrow_blockfile/provider.rs +++ b/rust/worker/src/blockstore/arrow_blockfile/provider.rs @@ -1,9 +1,61 @@ -use super::block::Block; -use crate::blockstore::{KeyType, ValueType}; +use super::{block::Block, blockfile::ArrowBlockfile}; +use crate::blockstore::{ + provider::{BlockfileProvider, CreateError, OpenError}, + Blockfile, KeyType, ValueType, +}; use parking_lot::RwLock; use std::{collections::HashMap, sync::Arc}; use uuid::Uuid; +/// A BlockFileProvider that creates ArrowBlockfiles (Arrow-backed blockfiles used for production). +/// For now, it keeps a simple local cache of blockfiles. +pub(super) struct ArrowBlockfileProvider { + block_provider: ArrowBlockProvider, + files: HashMap>, +} + +impl BlockfileProvider for ArrowBlockfileProvider { + fn new() -> Self { + Self { + block_provider: ArrowBlockProvider::new(), + files: HashMap::new(), + } + } + + fn open(&self, path: &str) -> Result, Box> { + match self.files.get(path) { + Some(file) => Ok(file.clone()), + None => Err(Box::new(OpenError::NotFound)), + } + } + + fn create( + &mut self, + path: &str, + key_type: KeyType, + value_type: ValueType, + ) -> Result, Box> { + match self.files.get(path) { + Some(_) => Err(Box::new(CreateError::AlreadyExists)), + None => { + let blockfile = Box::new(ArrowBlockfile::new( + key_type, + value_type, + self.block_provider.clone(), + )); + self.files.insert(path.to_string(), blockfile); + Ok(self.files.get(path).unwrap().clone()) + } + } + } +} + +/// A simple local cache of Arrow-backed blocks, the blockfile provider passes this +/// to the ArrowBlockfile when it creates a new blockfile. So that the blockfile can manage and access blocks +/// # Note +/// The implementation is currently very simple and not intended for robust production use. We should +/// introduce a more sophisticated cache that can handle tiered eviction and other features. This interface +/// is a placeholder for that. struct ArrowBlockProviderInner { blocks: HashMap>, }