Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Native listing of http URLs #1405

Merged
merged 10 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/daft-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ log = {workspace = true}
openssl-sys = {version = "0.9.93", features = ["vendored"]}
pyo3 = {workspace = true, optional = true}
pyo3-log = {workspace = true, optional = true}
regex = {version = "1.9.5"}
serde = {workspace = true}
serde_json = {workspace = true}
snafu = {workspace = true}
Expand Down
129 changes: 123 additions & 6 deletions src/daft-io/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,22 @@ use std::{num::ParseIntError, ops::Range, string::FromUtf8Error, sync::Arc};
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};

use lazy_static::lazy_static;
use regex::Regex;
use reqwest::header::{CONTENT_LENGTH, RANGE};
use snafu::{IntoError, ResultExt, Snafu};
use url::Position;

use crate::object_io::LSResult;
use crate::object_io::{FileMetadata, FileType, LSResult};

use super::object_io::{GetResult, ObjectSource};

lazy_static! {
// Taken from: https://stackoverflow.com/a/15926317/3821154
static ref HTML_A_TAG_HREF_RE: Regex =
Regex::new(r#"<(a|A)\s+(?:[^>]*?\s+)?(href|HREF)=["'](?P<url>[^"']+)"#).unwrap();
}

#[derive(Debug, Snafu)]
enum Error {
#[snafu(display("Unable to connect to {}: {}", path, source))]
Expand Down Expand Up @@ -45,14 +54,80 @@ enum Error {
#[snafu(display(
"Unable to parse data as Utf8 while reading header for file: {path}. {source}"
))]
UnableToParseUtf8 { path: String, source: FromUtf8Error },
UnableToParseUtf8Header { path: String, source: FromUtf8Error },

#[snafu(display(
"Unable to parse data as Utf8 while reading body for file: {path}. {source}"
))]
UnableToParseUtf8Body {
path: String,
source: reqwest::Error,
},

#[snafu(display(
"Unable to parse data as Integer while reading header for file: {path}. {source}"
))]
UnableToParseInteger { path: String, source: ParseIntError },
}

/// Finds and retrieves FileMetadata from HTML text
///
/// This function will look for `<a href=***>` tags and return all the links that it finds as
/// absolute URLs
fn _get_file_metadata_from_html(path: &str, text: &str) -> super::Result<Vec<FileMetadata>> {
let path_url = url::Url::parse(path).with_context(|_| InvalidUrlSnafu { path })?;
let metas = HTML_A_TAG_HREF_RE
.captures_iter(text)
.map(|captures| {
// Parse the matched URL into an absolute URL
let matched_url = captures.name("url").unwrap().as_str();
let absolute_path = if let Ok(parsed_matched_url) = url::Url::parse(matched_url) {
// matched_url is already an absolute path
parsed_matched_url
} else if matched_url.starts_with('/') {
// matched_url is a path relative to the origin of `path`
let base = url::Url::parse(&path_url[..Position::BeforePath]).unwrap();
base.join(matched_url)
.with_context(|_| InvalidUrlSnafu { path: matched_url })?
} else {
// matched_url is a path relative to `path` and needs to be joined
path_url
.join(matched_url)
.with_context(|_| InvalidUrlSnafu { path: matched_url })?
};

// Ignore any links that are not descendants of `path` to avoid cycles
let relative = path_url.make_relative(&absolute_path);
match relative {
None => {
return Ok(None);
}
Some(relative_path)
if relative_path.is_empty() || relative_path.starts_with("..") =>
{
return Ok(None);
}
_ => (),
};

let filetype = if matched_url.ends_with('/') {
FileType::Directory
} else {
FileType::File
};
Ok(Some(FileMetadata {
filepath: absolute_path.to_string(),
// NOTE: This is consistent with fsspec behavior, but we may choose to HEAD the files to grab Content-Length
// for populating `size` if necessary
size: None,
filetype,
}))
})
.collect::<super::Result<Vec<_>>>()?;

Ok(metas.into_iter().flatten().collect())
}

pub(crate) struct HttpSource {
client: reqwest::Client,
}
Expand Down Expand Up @@ -135,8 +210,9 @@ impl ObjectSource for HttpSource {
let headers = response.headers();
match headers.get(CONTENT_LENGTH) {
Some(v) => {
let size_bytes = String::from_utf8(v.as_bytes().to_vec())
.with_context(|_| UnableToParseUtf8Snafu::<String> { path: uri.into() })?;
let size_bytes = String::from_utf8(v.as_bytes().to_vec()).with_context(|_| {
UnableToParseUtf8HeaderSnafu::<String> { path: uri.into() }
})?;

Ok(size_bytes
.parse()
Expand All @@ -148,11 +224,52 @@ impl ObjectSource for HttpSource {

async fn ls(
&self,
_path: &str,
path: &str,
_delimiter: Option<&str>,
_continuation_token: Option<&str>,
) -> super::Result<LSResult> {
unimplemented!("http ls");
let request = self.client.get(path);
let response = request
.send()
.await
.context(UnableToConnectSnafu::<String> { path: path.into() })?
.error_for_status()
.with_context(|_| UnableToOpenFileSnafu { path })?;

// Reconstruct the actual path of the request, which may have been redirected via a 301
// This is important because downstream URL joining logic relies on proper trailing-slashes/index.html
let path = response.url().to_string();
let path = if path.ends_with('/') {
format!("{}/", path.trim_end_matches('/'))
} else {
path
};

match response.headers().get("content-type") {
// If the content-type is text/html, we treat the data on this path as a traversable "directory"
Some(header_value) if header_value.to_str().map_or(false, |v| v == "text/html") => {
let text = response
.text()
.await
.with_context(|_| UnableToParseUtf8BodySnafu {
path: path.to_string(),
})?;
let file_metadatas = _get_file_metadata_from_html(path.as_str(), text.as_str())?;
Ok(LSResult {
files: file_metadatas,
continuation_token: None,
})
}
// All other forms of content-type is treated as a raw file
_ => Ok(LSResult {
files: vec![FileMetadata {
filepath: path.to_string(),
filetype: FileType::File,
size: response.content_length(),
}],
continuation_token: None,
}),
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ http {
listen [::]:8080;

resolver 127.0.0.11;
autoindex off;
autoindex on;

server_name _;
server_tokens off;
Expand Down
41 changes: 26 additions & 15 deletions tests/integration/io/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,21 +129,32 @@ def mount_data_nginx(nginx_config: tuple[str, pathlib.Path], folder: pathlib.Pat
"""
server_url, static_assets_tmpdir = nginx_config

# Copy data
for root, dirs, files in os.walk(folder, topdown=False):
for file in files:
shutil.copy2(os.path.join(root, file), os.path.join(static_assets_tmpdir, file))
for dir in dirs:
shutil.copytree(os.path.join(root, dir), os.path.join(static_assets_tmpdir, dir))

yield [f"{server_url}/{p.relative_to(folder)}" for p in folder.glob("**/*") if p.is_file()]

# Delete data
for root, dirs, files in os.walk(static_assets_tmpdir, topdown=False):
for file in files:
os.remove(os.path.join(root, file))
for dir in dirs:
os.rmdir(os.path.join(root, dir))
# Cleanup any old stuff in mount folder
for item in os.listdir(static_assets_tmpdir):
path = static_assets_tmpdir / item
if path.is_dir():
shutil.rmtree(path)
else:
os.remove(path)

# Copy data to mount folder
for item in os.listdir(folder):
src = folder / item
dest = static_assets_tmpdir / item
if src.is_dir():
shutil.copytree(str(src), str(dest))
else:
shutil.copy2(src, dest)

try:
yield [f"{server_url}/{p.relative_to(folder)}" for p in folder.glob("**/*") if p.is_file()]
finally:
for item in os.listdir(static_assets_tmpdir):
path = static_assets_tmpdir / item
if path.is_dir():
shutil.rmtree(static_assets_tmpdir / item)
else:
os.remove(static_assets_tmpdir / item)


###
Expand Down
Loading
Loading