Skip to content

Commit

Permalink
[FEAT] Native glob functionality (#1450)
Browse files Browse the repository at this point in the history
Adds a new `io_glob` Python function that performs globbing using the
rules provided by the globset crate
(https://docs.rs/globset/latest/globset/)

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Oct 3, 2023
1 parent ca41122 commit 9c32d73
Show file tree
Hide file tree
Showing 7 changed files with 736 additions and 4 deletions.
34 changes: 34 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/daft-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ common-error = {path = "../common/error", default-features = false}
common-io-config = {path = "../common/io-config", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
futures = {workspace = true}
globset = "0.4"
google-cloud-storage = {version = "0.13.0", default-features = false, features = ["default-tls", "auth"]}
hyper = "0.14.27"
hyper-tls = "0.5.0"
itertools = "0.11"
lazy_static = {workspace = true}
log = {workspace = true}
openssl-sys = {version = "0.9.93", features = ["vendored"]}
Expand Down
190 changes: 190 additions & 0 deletions src/daft-io/src/glob.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
use itertools::Itertools;
use std::{collections::HashSet, sync::Arc};

use globset::GlobMatcher;
use lazy_static::lazy_static;

lazy_static! {
/// Check if a given char is considered a special glob character
/// NOTE: we use the `globset` crate which defines the following glob behavior:
/// https://docs.rs/globset/latest/globset/index.html#syntax
static ref GLOB_SPECIAL_CHARACTERS: HashSet<char> = HashSet::from(['*', '?', '{', '}', '[', ']']);
}

const SCHEME_SUFFIX_LEN: usize = "://".len();

#[derive(Clone)]
pub(crate) struct GlobState {
// Current path in dirtree and glob_fragments
pub current_path: String,
pub current_fragment_idx: usize,

// Whether we have encountered wildcards yet in the process of parsing
pub wildcard_mode: bool,

// Carry along expensive data as Arcs to avoid recomputation
pub glob_fragments: Arc<Vec<GlobFragment>>,
pub full_glob_matcher: Arc<GlobMatcher>,
}

impl GlobState {
pub fn current_glob_fragment(&self) -> &GlobFragment {
&self.glob_fragments[self.current_fragment_idx]
}

pub fn advance(self, path: String, idx: usize) -> Self {
GlobState {
current_path: path,
current_fragment_idx: idx,
..self.clone()
}
}

pub fn with_wildcard_mode(self) -> Self {
GlobState {
wildcard_mode: true,
..self
}
}
}

#[derive(Debug, Clone)]
pub(crate) struct GlobFragment {
data: String,
escaped_data: String,
first_wildcard_idx: Option<usize>,
}

impl GlobFragment {
pub fn new(data: &str) -> Self {
let first_wildcard_idx = match data {
"" => None,
data if GLOB_SPECIAL_CHARACTERS.contains(&data.chars().nth(0).unwrap()) => Some(0),
_ => {
// Detect any special characters that are not preceded by an escape \
let mut idx = None;
for (i, window) in data
.chars()
.collect::<Vec<char>>()
.as_slice()
.windows(2)
.enumerate()
{
let &[c1, c2] = window else {
unreachable!("Window contains 2 elements")
};
if (c1 != '\\') && GLOB_SPECIAL_CHARACTERS.contains(&c2) {
idx = Some(i + 1);
break;
}
}
idx
}
};

// Sanitize `data`: removing '\' and converting '\\' to '\'
let mut escaped_data = String::new();
let mut ptr = 0;
while ptr < data.len() {
let remaining = &data[ptr..];
match remaining.find("\\\\") {
Some(backslash_idx) => {
escaped_data.push_str(&remaining[..backslash_idx].replace('\\', ""));
escaped_data.extend(std::iter::once('\\'));
ptr += backslash_idx + 2;
}
None => {
escaped_data.push_str(&remaining.replace('\\', ""));
break;
}
}
}

GlobFragment {
data: data.to_string(),
first_wildcard_idx,
escaped_data,
}
}

/// Checks if this GlobFragment has any special characters
pub fn has_special_character(&self) -> bool {
self.first_wildcard_idx.is_some()
}

/// Joins a slice of GlobFragments together with a separator
pub fn join(fragments: &[GlobFragment], sep: &str) -> Self {
GlobFragment::new(
fragments
.iter()
.map(|frag: &GlobFragment| frag.data.as_str())
.join(sep)
.as_str(),
)
}

/// Returns the fragment as a string with the backslash (\) escapes applied
/// 1. \\ is cleaned up to just \
/// 2. \ followed by anything else is just ignored
pub fn escaped_str(&self) -> &str {
self.escaped_data.as_str()
}

/// Returns the GlobFragment as a raw unescaped string, suitable for use by the globset crate
pub fn raw_str(&self) -> &str {
self.data.as_str()
}
}

/// Parses a glob URL string into "fragments"
/// Fragments are the glob URL string but:
/// 1. Split by delimiter ("/")
/// 2. Non-wildcard fragments are joined and coalesced by delimiter
/// 3. The first fragment is prefixed by "{scheme}://"
pub(crate) fn to_glob_fragments(glob_str: &str) -> super::Result<Vec<GlobFragment>> {
let delimiter = "/";

// NOTE: We only use the URL parse library to get the scheme, because it will escape some of our glob special characters
// such as ? and {}
let glob_url = url::Url::parse(glob_str).map_err(|e| super::Error::InvalidUrl {
path: glob_str.to_string(),
source: e,
})?;
let url_scheme = glob_url.scheme();

// Parse glob fragments: split by delimiter and join any non-special fragments
let mut coalesced_fragments = vec![];
let mut nonspecial_fragments_so_far = vec![];
for fragment in glob_str[url_scheme.len() + SCHEME_SUFFIX_LEN..]
.split(delimiter)
.map(GlobFragment::new)
{
match fragment {
fragment if fragment.data.is_empty() => (),
fragment if fragment.has_special_character() => {
if !nonspecial_fragments_so_far.is_empty() {
coalesced_fragments.push(GlobFragment::join(
nonspecial_fragments_so_far.drain(..).as_slice(),
delimiter,
));
}
coalesced_fragments.push(fragment);
}
_ => {
nonspecial_fragments_so_far.push(fragment);
}
}
}
if !nonspecial_fragments_so_far.is_empty() {
coalesced_fragments.push(GlobFragment::join(
nonspecial_fragments_so_far.drain(..).as_slice(),
delimiter,
));
}

// Ensure that the first fragment has the scheme prefixed
coalesced_fragments[0] =
GlobFragment::new((format!("{url_scheme}://") + coalesced_fragments[0].raw_str()).as_str());

Ok(coalesced_fragments)
}
1 change: 1 addition & 0 deletions src/daft-io/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![feature(async_closure)]
#![feature(let_chains)]
mod azure_blob;
mod glob;
mod google_cloud;
mod http;
mod local;
Expand Down
Loading

0 comments on commit 9c32d73

Please sign in to comment.