diff --git a/Cargo.lock b/Cargo.lock index f56c399227..5ec5a3e035 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1088,6 +1088,7 @@ dependencies = [ "openssl-sys", "pyo3", "pyo3-log", + "regex", "reqwest", "serde", "serde_json", @@ -2066,9 +2067,9 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" [[package]] name = "memchr" -version = "2.5.0" +version = "2.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c" [[package]] name = "memoffset" @@ -2834,9 +2835,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.1" +version = "1.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2eae68fc220f7cf2532e4494aded17545fce192d59cd996e0fe7887f4ceb575" +checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" dependencies = [ "aho-corasick", "memchr", @@ -2846,9 +2847,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.2" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83d3daa6976cffb758ec878f108ba0e062a45b2d6ca3a2cca965338855476caf" +checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" dependencies = [ "aho-corasick", "memchr", @@ -2857,9 +2858,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.7.3" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ab07dc67230e4a4718e70fd5c20055a4334b121f1f9db8fe63ef39ce9b8c846" +checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" [[package]] name = "reqwest" diff --git a/src/daft-io/Cargo.toml b/src/daft-io/Cargo.toml index e18cd06c00..e36ae93a19 100644 --- a/src/daft-io/Cargo.toml +++ b/src/daft-io/Cargo.toml @@ -24,6 +24,7 @@ log = {workspace = true} openssl-sys = {version = "0.9.93", features = ["vendored"]} pyo3 = {workspace = true, optional = true} pyo3-log = {workspace = true, optional = true} +regex = {version = "1.9.5"} serde = {workspace = true} serde_json = {workspace = true} snafu = {workspace = true} diff --git a/src/daft-io/src/http.rs b/src/daft-io/src/http.rs index a98728d91b..5c2f3b252f 100644 --- a/src/daft-io/src/http.rs +++ b/src/daft-io/src/http.rs @@ -3,13 +3,22 @@ use std::{num::ParseIntError, ops::Range, string::FromUtf8Error, sync::Arc}; use async_trait::async_trait; use futures::{StreamExt, TryStreamExt}; +use lazy_static::lazy_static; +use regex::Regex; use reqwest::header::{CONTENT_LENGTH, RANGE}; use snafu::{IntoError, ResultExt, Snafu}; +use url::Position; -use crate::object_io::LSResult; +use crate::object_io::{FileMetadata, FileType, LSResult}; use super::object_io::{GetResult, ObjectSource}; +lazy_static! { + // Taken from: https://stackoverflow.com/a/15926317/3821154 + static ref HTML_A_TAG_HREF_RE: Regex = + Regex::new(r#"<(a|A)\s+(?:[^>]*?\s+)?(href|HREF)=["'](?P[^"']+)"#).unwrap(); +} + #[derive(Debug, Snafu)] enum Error { #[snafu(display("Unable to connect to {}: {}", path, source))] @@ -45,7 +54,15 @@ enum Error { #[snafu(display( "Unable to parse data as Utf8 while reading header for file: {path}. {source}" ))] - UnableToParseUtf8 { path: String, source: FromUtf8Error }, + UnableToParseUtf8Header { path: String, source: FromUtf8Error }, + + #[snafu(display( + "Unable to parse data as Utf8 while reading body for file: {path}. {source}" + ))] + UnableToParseUtf8Body { + path: String, + source: reqwest::Error, + }, #[snafu(display( "Unable to parse data as Integer while reading header for file: {path}. {source}" @@ -53,6 +70,64 @@ enum Error { UnableToParseInteger { path: String, source: ParseIntError }, } +/// Finds and retrieves FileMetadata from HTML text +/// +/// This function will look for `` tags and return all the links that it finds as +/// absolute URLs +fn _get_file_metadata_from_html(path: &str, text: &str) -> super::Result> { + let path_url = url::Url::parse(path).with_context(|_| InvalidUrlSnafu { path })?; + let metas = HTML_A_TAG_HREF_RE + .captures_iter(text) + .map(|captures| { + // Parse the matched URL into an absolute URL + let matched_url = captures.name("url").unwrap().as_str(); + let absolute_path = if let Ok(parsed_matched_url) = url::Url::parse(matched_url) { + // matched_url is already an absolute path + parsed_matched_url + } else if matched_url.starts_with('/') { + // matched_url is a path relative to the origin of `path` + let base = url::Url::parse(&path_url[..Position::BeforePath]).unwrap(); + base.join(matched_url) + .with_context(|_| InvalidUrlSnafu { path: matched_url })? + } else { + // matched_url is a path relative to `path` and needs to be joined + path_url + .join(matched_url) + .with_context(|_| InvalidUrlSnafu { path: matched_url })? + }; + + // Ignore any links that are not descendants of `path` to avoid cycles + let relative = path_url.make_relative(&absolute_path); + match relative { + None => { + return Ok(None); + } + Some(relative_path) + if relative_path.is_empty() || relative_path.starts_with("..") => + { + return Ok(None); + } + _ => (), + }; + + let filetype = if matched_url.ends_with('/') { + FileType::Directory + } else { + FileType::File + }; + Ok(Some(FileMetadata { + filepath: absolute_path.to_string(), + // NOTE: This is consistent with fsspec behavior, but we may choose to HEAD the files to grab Content-Length + // for populating `size` if necessary + size: None, + filetype, + })) + }) + .collect::>>()?; + + Ok(metas.into_iter().flatten().collect()) +} + pub(crate) struct HttpSource { client: reqwest::Client, } @@ -135,8 +210,9 @@ impl ObjectSource for HttpSource { let headers = response.headers(); match headers.get(CONTENT_LENGTH) { Some(v) => { - let size_bytes = String::from_utf8(v.as_bytes().to_vec()) - .with_context(|_| UnableToParseUtf8Snafu:: { path: uri.into() })?; + let size_bytes = String::from_utf8(v.as_bytes().to_vec()).with_context(|_| { + UnableToParseUtf8HeaderSnafu:: { path: uri.into() } + })?; Ok(size_bytes .parse() @@ -148,11 +224,52 @@ impl ObjectSource for HttpSource { async fn ls( &self, - _path: &str, + path: &str, _delimiter: Option<&str>, _continuation_token: Option<&str>, ) -> super::Result { - unimplemented!("http ls"); + let request = self.client.get(path); + let response = request + .send() + .await + .context(UnableToConnectSnafu:: { path: path.into() })? + .error_for_status() + .with_context(|_| UnableToOpenFileSnafu { path })?; + + // Reconstruct the actual path of the request, which may have been redirected via a 301 + // This is important because downstream URL joining logic relies on proper trailing-slashes/index.html + let path = response.url().to_string(); + let path = if path.ends_with('/') { + format!("{}/", path.trim_end_matches('/')) + } else { + path + }; + + match response.headers().get("content-type") { + // If the content-type is text/html, we treat the data on this path as a traversable "directory" + Some(header_value) if header_value.to_str().map_or(false, |v| v == "text/html") => { + let text = response + .text() + .await + .with_context(|_| UnableToParseUtf8BodySnafu { + path: path.to_string(), + })?; + let file_metadatas = _get_file_metadata_from_html(path.as_str(), text.as_str())?; + Ok(LSResult { + files: file_metadatas, + continuation_token: None, + }) + } + // All other forms of content-type is treated as a raw file + _ => Ok(LSResult { + files: vec![FileMetadata { + filepath: path.to_string(), + filetype: FileType::File, + size: response.content_length(), + }], + continuation_token: None, + }), + } } } diff --git a/tests/integration/docker-compose/nginx-serve-static-files.conf b/tests/integration/docker-compose/nginx-serve-static-files.conf index 0c097a5096..9673ecd43b 100644 --- a/tests/integration/docker-compose/nginx-serve-static-files.conf +++ b/tests/integration/docker-compose/nginx-serve-static-files.conf @@ -11,7 +11,7 @@ http { listen [::]:8080; resolver 127.0.0.11; - autoindex off; + autoindex on; server_name _; server_tokens off; diff --git a/tests/integration/io/conftest.py b/tests/integration/io/conftest.py index 13076bca96..e950cafda1 100644 --- a/tests/integration/io/conftest.py +++ b/tests/integration/io/conftest.py @@ -129,21 +129,32 @@ def mount_data_nginx(nginx_config: tuple[str, pathlib.Path], folder: pathlib.Pat """ server_url, static_assets_tmpdir = nginx_config - # Copy data - for root, dirs, files in os.walk(folder, topdown=False): - for file in files: - shutil.copy2(os.path.join(root, file), os.path.join(static_assets_tmpdir, file)) - for dir in dirs: - shutil.copytree(os.path.join(root, dir), os.path.join(static_assets_tmpdir, dir)) - - yield [f"{server_url}/{p.relative_to(folder)}" for p in folder.glob("**/*") if p.is_file()] - - # Delete data - for root, dirs, files in os.walk(static_assets_tmpdir, topdown=False): - for file in files: - os.remove(os.path.join(root, file)) - for dir in dirs: - os.rmdir(os.path.join(root, dir)) + # Cleanup any old stuff in mount folder + for item in os.listdir(static_assets_tmpdir): + path = static_assets_tmpdir / item + if path.is_dir(): + shutil.rmtree(path) + else: + os.remove(path) + + # Copy data to mount folder + for item in os.listdir(folder): + src = folder / item + dest = static_assets_tmpdir / item + if src.is_dir(): + shutil.copytree(str(src), str(dest)) + else: + shutil.copy2(src, dest) + + try: + yield [f"{server_url}/{p.relative_to(folder)}" for p in folder.glob("**/*") if p.is_file()] + finally: + for item in os.listdir(static_assets_tmpdir): + path = static_assets_tmpdir / item + if path.is_dir(): + shutil.rmtree(static_assets_tmpdir / item) + else: + os.remove(static_assets_tmpdir / item) ### diff --git a/tests/integration/io/test_list_files_http.py b/tests/integration/io/test_list_files_http.py new file mode 100644 index 0000000000..2471133c3f --- /dev/null +++ b/tests/integration/io/test_list_files_http.py @@ -0,0 +1,146 @@ +from __future__ import annotations + +from pathlib import Path + +import pytest +from fsspec.implementations.http import HTTPFileSystem + +from daft.daft import io_list +from tests.integration.io.conftest import mount_data_nginx + + +def compare_http_result(daft_ls_result: list, fsspec_result: list): + daft_files = [(f["path"], f["type"].lower(), f["size"]) for f in daft_ls_result] + httpfs_files = [(f["name"], f["type"], f["size"]) for f in fsspec_result] + assert len(daft_files) == len(httpfs_files) + assert sorted(daft_files) == sorted(httpfs_files) + + +@pytest.fixture(scope="module") +def nginx_http_url(nginx_config, tmpdir_factory): + tmpdir = tmpdir_factory.mktemp("test-list-http") + data_path = Path(tmpdir) + (Path(data_path) / "file.txt").touch() + (Path(data_path) / "test_ls").mkdir() + (Path(data_path) / "test_ls" / "file.txt").touch() + (Path(data_path) / "test_ls" / "paginated-10-files").mkdir() + for i in range(10): + (Path(data_path) / "test_ls" / "paginated-10-files" / f"file.{i}.txt").touch() + + with mount_data_nginx(nginx_config, data_path): + yield nginx_config[0] + + +@pytest.mark.integration() +@pytest.mark.parametrize( + "path", + [ + f"", + f"/", + f"/test_ls", + f"/test_ls/", + f"/test_ls//", + f"/test_ls/paginated-10-files/", + ], +) +def test_http_flat_directory_listing(path, nginx_http_url): + http_path = f"{nginx_http_url}{path}" + fs = HTTPFileSystem() + fsspec_result = fs.ls(http_path, detail=True) + daft_ls_result = io_list(http_path) + compare_http_result(daft_ls_result, fsspec_result) + + +@pytest.mark.integration() +def test_gs_single_file_listing(nginx_http_url): + path = f"{nginx_http_url}/test_ls/file.txt" + daft_ls_result = io_list(path) + + # NOTE: FSSpec will return size 0 list for this case, but we want to return 1 element to be + # consistent with behavior of our other file listing utilities + # fs = HTTPFileSystem() + # fsspec_result = fs.ls(path, detail=True) + + assert len(daft_ls_result) == 1 + assert daft_ls_result[0] == {"path": path, "size": 0, "type": "File"} + + +@pytest.mark.integration() +def test_http_notfound(nginx_http_url): + path = f"{nginx_http_url}/test_ls/MISSING" + fs = HTTPFileSystem() + with pytest.raises(FileNotFoundError, match=path): + fs.ls(path, detail=True) + + with pytest.raises(FileNotFoundError, match=path): + io_list(path) + + +@pytest.mark.integration() +@pytest.mark.parametrize( + "path", + [ + f"", + f"/", + ], +) +def test_http_flat_directory_listing_recursive(path, nginx_http_url): + http_path = f"{nginx_http_url}/{path}" + fs = HTTPFileSystem() + fsspec_result = list(fs.glob(http_path.rstrip("/") + "/**", detail=True).values()) + daft_ls_result = io_list(http_path, recursive=True) + compare_http_result(daft_ls_result, fsspec_result) + + +@pytest.mark.integration() +def test_http_listing_absolute_urls(nginx_config, tmpdir): + nginx_http_url, _ = nginx_config + + tmpdir = Path(tmpdir) + test_manifest_file = tmpdir / "index.html" + test_manifest_file.write_text( + f""" + this is an absolute path to a file + this is an absolute path to a dir + """ + ) + + with mount_data_nginx(nginx_config, tmpdir): + http_path = f"{nginx_http_url}/index.html" + daft_ls_result = io_list(http_path, recursive=False) + + # NOTE: Cannot use fsspec here because they do not correctly find the links + # fsspec_result = fs.ls(http_path, detail=True) + # compare_http_result(daft_ls_result, fsspec_result) + + assert daft_ls_result == [ + {"type": "File", "path": f"{nginx_http_url}/other.html", "size": None}, + {"type": "Directory", "path": f"{nginx_http_url}/dir/", "size": None}, + ] + + +@pytest.mark.integration() +def test_http_listing_absolute_base_urls(nginx_config, tmpdir): + nginx_http_url, _ = nginx_config + + tmpdir = Path(tmpdir) + test_manifest_file = tmpdir / "index.html" + test_manifest_file.write_text( + f""" + this is an absolute base path to a file + this is an absolute base path to a dir + """ + ) + + with mount_data_nginx(nginx_config, tmpdir): + http_path = f"{nginx_http_url}/index.html" + daft_ls_result = io_list(http_path, recursive=False) + + # NOTE: Cannot use fsspec here because they do not correctly find the links + # fsspec_result = fs.ls(http_path, detail=True) + # compare_http_result(daft_ls_result, fsspec_result) + + assert daft_ls_result == [ + {"type": "File", "path": f"{nginx_http_url}/other.html", "size": None}, + {"type": "Directory", "path": f"{nginx_http_url}/dir/", "size": None}, + ]