From bfae9882bcb9451485c72a3d484187c174afa54c Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Thu, 21 Sep 2023 18:55:50 -0700 Subject: [PATCH 01/10] init tests --- .../nginx-serve-static-files.conf | 2 +- tests/integration/io/test_list_files_http.py | 98 +++++++++++++++++++ 2 files changed, 99 insertions(+), 1 deletion(-) create mode 100644 tests/integration/io/test_list_files_http.py diff --git a/tests/integration/docker-compose/nginx-serve-static-files.conf b/tests/integration/docker-compose/nginx-serve-static-files.conf index 0c097a5096..9673ecd43b 100644 --- a/tests/integration/docker-compose/nginx-serve-static-files.conf +++ b/tests/integration/docker-compose/nginx-serve-static-files.conf @@ -11,7 +11,7 @@ http { listen [::]:8080; resolver 127.0.0.11; - autoindex off; + autoindex on; server_name _; server_tokens off; diff --git a/tests/integration/io/test_list_files_http.py b/tests/integration/io/test_list_files_http.py new file mode 100644 index 0000000000..47cd7644ec --- /dev/null +++ b/tests/integration/io/test_list_files_http.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +from pathlib import Path + +import pytest +from fsspec.implementations.http import HTTPFileSystem + +from tests.integration.io.conftest import mount_data_nginx + + +def compare_http_result(daft_ls_result: list, fsspec_result: list): + daft_files = [(f["path"], f["type"].lower()) for f in daft_ls_result] + gcsfs_files = [(f"http://{f['name']}", f["type"]) for f in fsspec_result] + + # Perform necessary post-processing of fsspec results to match expected behavior from Daft: + + # # NOTE: gcsfs sometimes does not return the trailing / for directories, so we have to ensure it + # gcsfs_files = [ + # (f"{path.rstrip('/')}/", type_) if type_ == "directory" else (path, type_) for path, type_ in gcsfs_files + # ] + + # # NOTE: gcsfs will sometimes return 0-sized marker files for manually-created folders, which we ignore here + # # Be careful here because this will end up pruning any truly size-0 files that are actually files and not folders! + # size_0_files = {f"gs://{f['name']}" for f in fsspec_result if f["size"] == 0 and f["type"] == "file"} + # gcsfs_files = [(path, type_) for path, type_ in gcsfs_files if path not in size_0_files] + + assert len(daft_files) == len(gcsfs_files) + assert sorted(daft_files) == sorted(gcsfs_files) + + +@pytest.fixture(scope="module") +def nginx_http_url(nginx_config, tmpdir_factory): + tmpdir = tmpdir_factory.mktemp("test-list-http") + data_path = Path(tmpdir) + (Path(data_path) / "file.txt").touch() + (Path(data_path) / "test_ls").mkdir() + (Path(data_path) / "test_ls" / "file.txt").touch() + (Path(data_path) / "test_ls" / "paginated-10-files").mkdir() + for i in range(10): + (Path(data_path) / "test_ls" / "paginated-10-files" / f"file.{i}.txt").touch() + + with mount_data_nginx(nginx_config, data_path): + yield nginx_config[0] + + +@pytest.mark.integration() +@pytest.mark.parametrize( + "path", + [ + f"", + f"/", + f"test_ls", + f"test_ls/", + f"test_ls/paginated-10-files/", + ], +) +def test_http_flat_directory_listing(path, nginx_http_url): + http_path = f"{nginx_http_url}/{path}" + fs = HTTPFileSystem() + fsspec_result = fs.ls(http_path, detail=True) + # compare_http_result(daft_ls_result, fsspec_result) + + +# @pytest.mark.integration() +# def test_gs_single_file_listing(): +# path = f"gs://{BUCKET}/test_ls/file.txt" +# fs = gcsfs.GCSFileSystem() +# daft_ls_result = io_list(path) +# fsspec_result = fs.ls(path, detail=True) +# compare_http_result(daft_ls_result, fsspec_result) + + +# @pytest.mark.integration() +# def test_gs_notfound(): +# path = f"gs://{BUCKET}/test_ls/MISSING" +# fs = gcsfs.GCSFileSystem() +# with pytest.raises(FileNotFoundError): +# fs.ls(path, detail=True) + +# # NOTE: Google Cloud does not return a 404 to indicate anything missing, but just returns empty results +# # Thus Daft is unable to differentiate between "missing" folders and "empty" folders +# daft_ls_result = io_list(path) +# assert daft_ls_result == [] + + +# @pytest.mark.integration() +# @pytest.mark.parametrize( +# "path", +# [ +# f"gs://{BUCKET}/test_ls", +# f"gs://{BUCKET}/test_ls/", +# ], +# ) +# def test_gs_flat_directory_listing_recursive(path): +# fs = gcsfs.GCSFileSystem() +# daft_ls_result = io_list(path, recursive=True) +# fsspec_result = list(fs.glob(path.rstrip("/") + "/**", detail=True).values()) +# compare_gcs_result(daft_ls_result, fsspec_result) From 8d38a1abb2721f3b0fd681e6dc1aebe4ecfd7697 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Thu, 21 Sep 2023 19:49:54 -0700 Subject: [PATCH 02/10] Initial impl, needs proper handling of appending URLs --- Cargo.lock | 17 ++--- src/daft-io/Cargo.toml | 1 + src/daft-io/src/http.rs | 71 ++++++++++++++++++-- tests/integration/io/test_list_files_http.py | 4 +- 4 files changed, 78 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f56c399227..5ec5a3e035 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1088,6 +1088,7 @@ dependencies = [ "openssl-sys", "pyo3", "pyo3-log", + "regex", "reqwest", "serde", "serde_json", @@ -2066,9 +2067,9 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" [[package]] name = "memchr" -version = "2.5.0" +version = "2.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c" [[package]] name = "memoffset" @@ -2834,9 +2835,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.1" +version = "1.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2eae68fc220f7cf2532e4494aded17545fce192d59cd996e0fe7887f4ceb575" +checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" dependencies = [ "aho-corasick", "memchr", @@ -2846,9 +2847,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.2" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83d3daa6976cffb758ec878f108ba0e062a45b2d6ca3a2cca965338855476caf" +checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" dependencies = [ "aho-corasick", "memchr", @@ -2857,9 +2858,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.7.3" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ab07dc67230e4a4718e70fd5c20055a4334b121f1f9db8fe63ef39ce9b8c846" +checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" [[package]] name = "reqwest" diff --git a/src/daft-io/Cargo.toml b/src/daft-io/Cargo.toml index e18cd06c00..e36ae93a19 100644 --- a/src/daft-io/Cargo.toml +++ b/src/daft-io/Cargo.toml @@ -24,6 +24,7 @@ log = {workspace = true} openssl-sys = {version = "0.9.93", features = ["vendored"]} pyo3 = {workspace = true, optional = true} pyo3-log = {workspace = true, optional = true} +regex = {version = "1.9.5"} serde = {workspace = true} serde_json = {workspace = true} snafu = {workspace = true} diff --git a/src/daft-io/src/http.rs b/src/daft-io/src/http.rs index a98728d91b..9a46e43892 100644 --- a/src/daft-io/src/http.rs +++ b/src/daft-io/src/http.rs @@ -3,13 +3,20 @@ use std::{num::ParseIntError, ops::Range, string::FromUtf8Error, sync::Arc}; use async_trait::async_trait; use futures::{StreamExt, TryStreamExt}; +use lazy_static::lazy_static; +use regex::Regex; use reqwest::header::{CONTENT_LENGTH, RANGE}; use snafu::{IntoError, ResultExt, Snafu}; -use crate::object_io::LSResult; +use crate::object_io::{FileMetadata, FileType, LSResult}; use super::object_io::{GetResult, ObjectSource}; +lazy_static! { + static ref HTML_A_TAG_HREF_RE: Regex = + Regex::new(r#"<(a|A)\s+(?:[^>]*?\s+)?(href|HREF)=["'](?P[^"']+)"#).unwrap(); +} + #[derive(Debug, Snafu)] enum Error { #[snafu(display("Unable to connect to {}: {}", path, source))] @@ -45,7 +52,15 @@ enum Error { #[snafu(display( "Unable to parse data as Utf8 while reading header for file: {path}. {source}" ))] - UnableToParseUtf8 { path: String, source: FromUtf8Error }, + UnableToParseUtf8Header { path: String, source: FromUtf8Error }, + + #[snafu(display( + "Unable to parse data as Utf8 while reading body for file: {path}. {source}" + ))] + UnableToParseUtf8Body { + path: String, + source: reqwest::Error, + }, #[snafu(display( "Unable to parse data as Integer while reading header for file: {path}. {source}" @@ -135,8 +150,9 @@ impl ObjectSource for HttpSource { let headers = response.headers(); match headers.get(CONTENT_LENGTH) { Some(v) => { - let size_bytes = String::from_utf8(v.as_bytes().to_vec()) - .with_context(|_| UnableToParseUtf8Snafu:: { path: uri.into() })?; + let size_bytes = String::from_utf8(v.as_bytes().to_vec()).with_context(|_| { + UnableToParseUtf8HeaderSnafu:: { path: uri.into() } + })?; Ok(size_bytes .parse() @@ -148,11 +164,54 @@ impl ObjectSource for HttpSource { async fn ls( &self, - _path: &str, + path: &str, _delimiter: Option<&str>, _continuation_token: Option<&str>, ) -> super::Result { - unimplemented!("http ls"); + let request = self.client.get(path); + let response = request + .send() + .await + .context(UnableToConnectSnafu:: { path: path.into() })?; + match response.headers().get("content-type") { + // If the content-type is text/html, we treat the data on this path as a traversable "directory" + Some(header_value) if header_value.to_str().map_or(false, |v| v == "text/html") => { + let text = response + .text() + .await + .with_context(|_| UnableToParseUtf8BodySnafu { + path: path.to_string(), + })?; + let metas = HTML_A_TAG_HREF_RE + .captures_iter(text.as_str()) + .map(|captures| { + let matched_url = captures.name("url").unwrap().as_str(); + let filetype = if matched_url.ends_with('/') { + FileType::Directory + } else { + FileType::File + }; + FileMetadata { + filepath: matched_url.to_string(), + size: None, // TODO: fire HEAD requests to grab the content-length headers + filetype, + } + }); + Ok(LSResult { + files: metas.collect(), + continuation_token: None, + }) + } + // All other forms of content-type is treated as a raw file + _ => Ok(LSResult { + files: vec![FileMetadata { + filepath: path.to_string(), + filetype: FileType::File, + size: response.content_length(), + }], + continuation_token: None, + }), + } } } diff --git a/tests/integration/io/test_list_files_http.py b/tests/integration/io/test_list_files_http.py index 47cd7644ec..4797af7daa 100644 --- a/tests/integration/io/test_list_files_http.py +++ b/tests/integration/io/test_list_files_http.py @@ -5,6 +5,7 @@ import pytest from fsspec.implementations.http import HTTPFileSystem +from daft.daft import io_list from tests.integration.io.conftest import mount_data_nginx @@ -58,7 +59,8 @@ def test_http_flat_directory_listing(path, nginx_http_url): http_path = f"{nginx_http_url}/{path}" fs = HTTPFileSystem() fsspec_result = fs.ls(http_path, detail=True) - # compare_http_result(daft_ls_result, fsspec_result) + daft_ls_result = io_list(http_path) + compare_http_result(daft_ls_result, fsspec_result) # @pytest.mark.integration() From 6eea1a6cb5fb545f309ffe69b3d658c4e1666a5f Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Fri, 22 Sep 2023 11:27:16 -0700 Subject: [PATCH 03/10] Add URL parsing to present absolute URLs always --- src/daft-io/src/http.rs | 71 +++++++++++++++----- tests/integration/io/test_list_files_http.py | 21 +++--- 2 files changed, 66 insertions(+), 26 deletions(-) diff --git a/src/daft-io/src/http.rs b/src/daft-io/src/http.rs index 9a46e43892..8761245ba0 100644 --- a/src/daft-io/src/http.rs +++ b/src/daft-io/src/http.rs @@ -7,6 +7,7 @@ use lazy_static::lazy_static; use regex::Regex; use reqwest::header::{CONTENT_LENGTH, RANGE}; use snafu::{IntoError, ResultExt, Snafu}; +use url::Position; use crate::object_io::{FileMetadata, FileType, LSResult}; @@ -68,6 +69,58 @@ enum Error { UnableToParseInteger { path: String, source: ParseIntError }, } +/// Finds and retrieves FileMetadata from HTML text +/// +/// This function will look for `` tags and return all the links that it finds as +/// absolute URLs +fn _get_file_metadata_from_html(path: &str, text: &str) -> super::Result> { + let metas = HTML_A_TAG_HREF_RE + .captures_iter(text) + .map(|captures| { + let matched_url = captures.name("url").unwrap().as_str(); + + // Ignore "FTP-like" links to parent folder + if matched_url == "../" { + return Ok(None); + } + + let absolute_path = if let Ok(parsed_matched_url) = url::Url::parse(matched_url) { + // matched_url is already an absolute path + parsed_matched_url.to_string() + } else if matched_url.starts_with('/') { + // matched_url is a path relative to the origin of `path` + let path_url = url::Url::parse(path).with_context(|_| InvalidUrlSnafu { path })?; + let base = url::Url::parse(&path_url[..Position::BeforePath]).unwrap(); + base.join(matched_url) + .with_context(|_| InvalidUrlSnafu { path: matched_url })? + .to_string() + } else { + // matched_url is a path relative to `path` + let path = format!("{}/", path.trim_end_matches('/')); // Ensure suffix is a single '/' so that it properly works with Url::join + let path_url = + url::Url::parse(path.as_str()).with_context(|_| InvalidUrlSnafu { path })?; + path_url + .join(matched_url) + .with_context(|_| InvalidUrlSnafu { path: matched_url })? + .to_string() + }; + + let filetype = if matched_url.ends_with('/') { + FileType::Directory + } else { + FileType::File + }; + Ok(Some(FileMetadata { + filepath: absolute_path, + size: None, // TODO: fire HEAD requests to grab the content-length headers + filetype, + })) + }) + .collect::>>()?; + + Ok(metas.into_iter().flatten().collect()) +} + pub(crate) struct HttpSource { client: reqwest::Client, } @@ -182,23 +235,9 @@ impl ObjectSource for HttpSource { .with_context(|_| UnableToParseUtf8BodySnafu { path: path.to_string(), })?; - let metas = HTML_A_TAG_HREF_RE - .captures_iter(text.as_str()) - .map(|captures| { - let matched_url = captures.name("url").unwrap().as_str(); - let filetype = if matched_url.ends_with('/') { - FileType::Directory - } else { - FileType::File - }; - FileMetadata { - filepath: matched_url.to_string(), - size: None, // TODO: fire HEAD requests to grab the content-length headers - filetype, - } - }); + let file_metadatas = _get_file_metadata_from_html(path, text.as_str())?; Ok(LSResult { - files: metas.collect(), + files: file_metadatas, continuation_token: None, }) } diff --git a/tests/integration/io/test_list_files_http.py b/tests/integration/io/test_list_files_http.py index 4797af7daa..9f31b9c93d 100644 --- a/tests/integration/io/test_list_files_http.py +++ b/tests/integration/io/test_list_files_http.py @@ -11,7 +11,7 @@ def compare_http_result(daft_ls_result: list, fsspec_result: list): daft_files = [(f["path"], f["type"].lower()) for f in daft_ls_result] - gcsfs_files = [(f"http://{f['name']}", f["type"]) for f in fsspec_result] + httpfs_files = [(f["name"], f["type"]) for f in fsspec_result] # Perform necessary post-processing of fsspec results to match expected behavior from Daft: @@ -25,8 +25,8 @@ def compare_http_result(daft_ls_result: list, fsspec_result: list): # size_0_files = {f"gs://{f['name']}" for f in fsspec_result if f["size"] == 0 and f["type"] == "file"} # gcsfs_files = [(path, type_) for path, type_ in gcsfs_files if path not in size_0_files] - assert len(daft_files) == len(gcsfs_files) - assert sorted(daft_files) == sorted(gcsfs_files) + assert len(daft_files) == len(httpfs_files) + assert sorted(daft_files) == sorted(httpfs_files) @pytest.fixture(scope="module") @@ -89,12 +89,13 @@ def test_http_flat_directory_listing(path, nginx_http_url): # @pytest.mark.parametrize( # "path", # [ -# f"gs://{BUCKET}/test_ls", -# f"gs://{BUCKET}/test_ls/", +# f"", +# f"/", # ], # ) -# def test_gs_flat_directory_listing_recursive(path): -# fs = gcsfs.GCSFileSystem() -# daft_ls_result = io_list(path, recursive=True) -# fsspec_result = list(fs.glob(path.rstrip("/") + "/**", detail=True).values()) -# compare_gcs_result(daft_ls_result, fsspec_result) +# def test_http_flat_directory_listing_recursive(path, nginx_http_url): +# http_path = f"{nginx_http_url}/{path}" +# fs = HTTPFileSystem() +# fsspec_result = list(fs.glob(http_path.rstrip("/") + "/**", detail=True).values()) +# # daft_ls_result = io_list(http_path, recursive=True) +# # compare_http_result(daft_ls_result, fsspec_result) From 699dc9ca5aa86336ae62fd3afc2b02cacdc00d24 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Fri, 22 Sep 2023 11:43:22 -0700 Subject: [PATCH 04/10] Add proper error handling for response --- src/daft-io/src/http.rs | 5 +- tests/integration/io/test_list_files_http.py | 88 +++++++++----------- 2 files changed, 43 insertions(+), 50 deletions(-) diff --git a/src/daft-io/src/http.rs b/src/daft-io/src/http.rs index 8761245ba0..b1c59e6621 100644 --- a/src/daft-io/src/http.rs +++ b/src/daft-io/src/http.rs @@ -225,7 +225,10 @@ impl ObjectSource for HttpSource { let response = request .send() .await - .context(UnableToConnectSnafu:: { path: path.into() })?; + .context(UnableToConnectSnafu:: { path: path.into() })? + .error_for_status() + .with_context(|_| UnableToOpenFileSnafu { path })?; + match response.headers().get("content-type") { // If the content-type is text/html, we treat the data on this path as a traversable "directory" Some(header_value) if header_value.to_str().map_or(false, |v| v == "text/html") => { diff --git a/tests/integration/io/test_list_files_http.py b/tests/integration/io/test_list_files_http.py index 9f31b9c93d..6243ae6385 100644 --- a/tests/integration/io/test_list_files_http.py +++ b/tests/integration/io/test_list_files_http.py @@ -12,19 +12,6 @@ def compare_http_result(daft_ls_result: list, fsspec_result: list): daft_files = [(f["path"], f["type"].lower()) for f in daft_ls_result] httpfs_files = [(f["name"], f["type"]) for f in fsspec_result] - - # Perform necessary post-processing of fsspec results to match expected behavior from Daft: - - # # NOTE: gcsfs sometimes does not return the trailing / for directories, so we have to ensure it - # gcsfs_files = [ - # (f"{path.rstrip('/')}/", type_) if type_ == "directory" else (path, type_) for path, type_ in gcsfs_files - # ] - - # # NOTE: gcsfs will sometimes return 0-sized marker files for manually-created folders, which we ignore here - # # Be careful here because this will end up pruning any truly size-0 files that are actually files and not folders! - # size_0_files = {f"gs://{f['name']}" for f in fsspec_result if f["size"] == 0 and f["type"] == "file"} - # gcsfs_files = [(path, type_) for path, type_ in gcsfs_files if path not in size_0_files] - assert len(daft_files) == len(httpfs_files) assert sorted(daft_files) == sorted(httpfs_files) @@ -63,39 +50,42 @@ def test_http_flat_directory_listing(path, nginx_http_url): compare_http_result(daft_ls_result, fsspec_result) -# @pytest.mark.integration() -# def test_gs_single_file_listing(): -# path = f"gs://{BUCKET}/test_ls/file.txt" -# fs = gcsfs.GCSFileSystem() -# daft_ls_result = io_list(path) -# fsspec_result = fs.ls(path, detail=True) -# compare_http_result(daft_ls_result, fsspec_result) - - -# @pytest.mark.integration() -# def test_gs_notfound(): -# path = f"gs://{BUCKET}/test_ls/MISSING" -# fs = gcsfs.GCSFileSystem() -# with pytest.raises(FileNotFoundError): -# fs.ls(path, detail=True) - -# # NOTE: Google Cloud does not return a 404 to indicate anything missing, but just returns empty results -# # Thus Daft is unable to differentiate between "missing" folders and "empty" folders -# daft_ls_result = io_list(path) -# assert daft_ls_result == [] - - -# @pytest.mark.integration() -# @pytest.mark.parametrize( -# "path", -# [ -# f"", -# f"/", -# ], -# ) -# def test_http_flat_directory_listing_recursive(path, nginx_http_url): -# http_path = f"{nginx_http_url}/{path}" -# fs = HTTPFileSystem() -# fsspec_result = list(fs.glob(http_path.rstrip("/") + "/**", detail=True).values()) -# # daft_ls_result = io_list(http_path, recursive=True) -# # compare_http_result(daft_ls_result, fsspec_result) +@pytest.mark.integration() +def test_gs_single_file_listing(nginx_http_url): + path = f"{nginx_http_url}/test_ls/file.txt" + daft_ls_result = io_list(path) + + # NOTE: FSSpec will return size 0 list for this case, but we want to return 1 element to be + # consistent with behavior of our other file listing utilities + # fs = HTTPFileSystem() + # fsspec_result = fs.ls(path, detail=True) + + assert len(daft_ls_result) == 1 + assert daft_ls_result[0] == {"path": path, "size": 24, "type": "File"} + + +@pytest.mark.integration() +def test_http_notfound(nginx_http_url): + path = f"{nginx_http_url}/test_ls/MISSING" + fs = HTTPFileSystem() + with pytest.raises(FileNotFoundError, match=path): + fs.ls(path, detail=True) + + with pytest.raises(FileNotFoundError, match=path): + io_list(path) + + +@pytest.mark.integration() +@pytest.mark.parametrize( + "path", + [ + f"", + f"/", + ], +) +def test_http_flat_directory_listing_recursive(path, nginx_http_url): + http_path = f"{nginx_http_url}/{path}" + fs = HTTPFileSystem() + fsspec_result = list(fs.glob(http_path.rstrip("/") + "/**", detail=True).values()) + daft_ls_result = io_list(http_path, recursive=True) + compare_http_result(daft_ls_result, fsspec_result) From bfa7f392bbfe3318e2048e9ab53c53a9c7fb613f Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Fri, 22 Sep 2023 13:13:37 -0700 Subject: [PATCH 05/10] size=None for http implementation --- src/daft-io/src/http.rs | 4 +++- tests/integration/io/test_list_files_http.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/daft-io/src/http.rs b/src/daft-io/src/http.rs index b1c59e6621..82299a8e22 100644 --- a/src/daft-io/src/http.rs +++ b/src/daft-io/src/http.rs @@ -112,7 +112,9 @@ fn _get_file_metadata_from_html(path: &str, text: &str) -> super::Result Date: Fri, 22 Sep 2023 13:30:17 -0700 Subject: [PATCH 06/10] Single file listing actually does return the content length --- tests/integration/io/test_list_files_http.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration/io/test_list_files_http.py b/tests/integration/io/test_list_files_http.py index 886d32413b..9831ec7238 100644 --- a/tests/integration/io/test_list_files_http.py +++ b/tests/integration/io/test_list_files_http.py @@ -10,8 +10,8 @@ def compare_http_result(daft_ls_result: list, fsspec_result: list): - daft_files = [(f["path"], f["type"].lower()) for f in daft_ls_result] - httpfs_files = [(f["name"], f["type"]) for f in fsspec_result] + daft_files = [(f["path"], f["type"].lower(), f["size"]) for f in daft_ls_result] + httpfs_files = [(f["name"], f["type"], f["size"]) for f in fsspec_result] assert len(daft_files) == len(httpfs_files) assert sorted(daft_files) == sorted(httpfs_files) @@ -61,7 +61,7 @@ def test_gs_single_file_listing(nginx_http_url): # fsspec_result = fs.ls(path, detail=True) assert len(daft_ls_result) == 1 - assert daft_ls_result[0] == {"path": path, "size": None, "type": "File"} + assert daft_ls_result[0] == {"path": path, "size": 0, "type": "File"} @pytest.mark.integration() From f1e29a33f2418e3d0f1f4005a1e76e4392f48db2 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Fri, 22 Sep 2023 16:21:43 -0700 Subject: [PATCH 07/10] Only return paths that are strictly children of the base path --- src/daft-io/src/http.rs | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/src/daft-io/src/http.rs b/src/daft-io/src/http.rs index 82299a8e22..3bf83a8444 100644 --- a/src/daft-io/src/http.rs +++ b/src/daft-io/src/http.rs @@ -74,35 +74,40 @@ enum Error { /// This function will look for `` tags and return all the links that it finds as /// absolute URLs fn _get_file_metadata_from_html(path: &str, text: &str) -> super::Result> { + let path = format!("{}/", path.trim_end_matches('/')); // Ensure suffix is a single '/' so that it properly works with Url::join + let path_url = url::Url::parse(path.as_str()).with_context(|_| InvalidUrlSnafu { path })?; let metas = HTML_A_TAG_HREF_RE .captures_iter(text) .map(|captures| { let matched_url = captures.name("url").unwrap().as_str(); - // Ignore "FTP-like" links to parent folder - if matched_url == "../" { - return Ok(None); - } - let absolute_path = if let Ok(parsed_matched_url) = url::Url::parse(matched_url) { // matched_url is already an absolute path - parsed_matched_url.to_string() + parsed_matched_url } else if matched_url.starts_with('/') { // matched_url is a path relative to the origin of `path` - let path_url = url::Url::parse(path).with_context(|_| InvalidUrlSnafu { path })?; let base = url::Url::parse(&path_url[..Position::BeforePath]).unwrap(); base.join(matched_url) .with_context(|_| InvalidUrlSnafu { path: matched_url })? - .to_string() } else { // matched_url is a path relative to `path` - let path = format!("{}/", path.trim_end_matches('/')); // Ensure suffix is a single '/' so that it properly works with Url::join - let path_url = - url::Url::parse(path.as_str()).with_context(|_| InvalidUrlSnafu { path })?; path_url .join(matched_url) .with_context(|_| InvalidUrlSnafu { path: matched_url })? - .to_string() + }; + + // Ignore any links that are not descendants of `path` to avoid cycles + let relative = path_url.make_relative(&absolute_path); + match relative { + None => { + return Ok(None); + } + Some(relative_path) + if relative_path.is_empty() || relative_path.starts_with("..") => + { + return Ok(None); + } + _ => (), }; let filetype = if matched_url.ends_with('/') { @@ -111,7 +116,7 @@ fn _get_file_metadata_from_html(path: &str, text: &str) -> super::Result Date: Fri, 22 Sep 2023 16:22:26 -0700 Subject: [PATCH 08/10] Attribution of regex to stackoverflow --- src/daft-io/src/http.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/daft-io/src/http.rs b/src/daft-io/src/http.rs index 3bf83a8444..e9540fae89 100644 --- a/src/daft-io/src/http.rs +++ b/src/daft-io/src/http.rs @@ -14,6 +14,7 @@ use crate::object_io::{FileMetadata, FileType, LSResult}; use super::object_io::{GetResult, ObjectSource}; lazy_static! { + // Taken from: https://stackoverflow.com/a/15926317/3821154 static ref HTML_A_TAG_HREF_RE: Regex = Regex::new(r#"<(a|A)\s+(?:[^>]*?\s+)?(href|HREF)=["'](?P[^"']+)"#).unwrap(); } From 966cbeef4daf1193c7969301eb85aad3685bcc2e Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Fri, 22 Sep 2023 17:58:46 -0700 Subject: [PATCH 09/10] Refactor conftest --- tests/integration/io/conftest.py | 41 +++++++++++++------- tests/integration/io/test_list_files_http.py | 1 + 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/tests/integration/io/conftest.py b/tests/integration/io/conftest.py index 13076bca96..e950cafda1 100644 --- a/tests/integration/io/conftest.py +++ b/tests/integration/io/conftest.py @@ -129,21 +129,32 @@ def mount_data_nginx(nginx_config: tuple[str, pathlib.Path], folder: pathlib.Pat """ server_url, static_assets_tmpdir = nginx_config - # Copy data - for root, dirs, files in os.walk(folder, topdown=False): - for file in files: - shutil.copy2(os.path.join(root, file), os.path.join(static_assets_tmpdir, file)) - for dir in dirs: - shutil.copytree(os.path.join(root, dir), os.path.join(static_assets_tmpdir, dir)) - - yield [f"{server_url}/{p.relative_to(folder)}" for p in folder.glob("**/*") if p.is_file()] - - # Delete data - for root, dirs, files in os.walk(static_assets_tmpdir, topdown=False): - for file in files: - os.remove(os.path.join(root, file)) - for dir in dirs: - os.rmdir(os.path.join(root, dir)) + # Cleanup any old stuff in mount folder + for item in os.listdir(static_assets_tmpdir): + path = static_assets_tmpdir / item + if path.is_dir(): + shutil.rmtree(path) + else: + os.remove(path) + + # Copy data to mount folder + for item in os.listdir(folder): + src = folder / item + dest = static_assets_tmpdir / item + if src.is_dir(): + shutil.copytree(str(src), str(dest)) + else: + shutil.copy2(src, dest) + + try: + yield [f"{server_url}/{p.relative_to(folder)}" for p in folder.glob("**/*") if p.is_file()] + finally: + for item in os.listdir(static_assets_tmpdir): + path = static_assets_tmpdir / item + if path.is_dir(): + shutil.rmtree(static_assets_tmpdir / item) + else: + os.remove(static_assets_tmpdir / item) ### diff --git a/tests/integration/io/test_list_files_http.py b/tests/integration/io/test_list_files_http.py index 9831ec7238..c9a74579e7 100644 --- a/tests/integration/io/test_list_files_http.py +++ b/tests/integration/io/test_list_files_http.py @@ -39,6 +39,7 @@ def nginx_http_url(nginx_config, tmpdir_factory): f"/", f"test_ls", f"test_ls/", + f"test_ls//", f"test_ls/paginated-10-files/", ], ) From c459e94dede1a3990e02b3c30970745312a509bf Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Mon, 25 Sep 2023 18:04:40 -0700 Subject: [PATCH 10/10] Add tests for absolute and absolute-base URL cases --- src/daft-io/src/http.rs | 18 ++++-- tests/integration/io/test_list_files_http.py | 64 ++++++++++++++++++-- 2 files changed, 72 insertions(+), 10 deletions(-) diff --git a/src/daft-io/src/http.rs b/src/daft-io/src/http.rs index e9540fae89..5c2f3b252f 100644 --- a/src/daft-io/src/http.rs +++ b/src/daft-io/src/http.rs @@ -75,13 +75,12 @@ enum Error { /// This function will look for `` tags and return all the links that it finds as /// absolute URLs fn _get_file_metadata_from_html(path: &str, text: &str) -> super::Result> { - let path = format!("{}/", path.trim_end_matches('/')); // Ensure suffix is a single '/' so that it properly works with Url::join - let path_url = url::Url::parse(path.as_str()).with_context(|_| InvalidUrlSnafu { path })?; + let path_url = url::Url::parse(path).with_context(|_| InvalidUrlSnafu { path })?; let metas = HTML_A_TAG_HREF_RE .captures_iter(text) .map(|captures| { + // Parse the matched URL into an absolute URL let matched_url = captures.name("url").unwrap().as_str(); - let absolute_path = if let Ok(parsed_matched_url) = url::Url::parse(matched_url) { // matched_url is already an absolute path parsed_matched_url @@ -91,7 +90,7 @@ fn _get_file_metadata_from_html(path: &str, text: &str) -> super::Result { @@ -246,7 +254,7 @@ impl ObjectSource for HttpSource { .with_context(|_| UnableToParseUtf8BodySnafu { path: path.to_string(), })?; - let file_metadatas = _get_file_metadata_from_html(path, text.as_str())?; + let file_metadatas = _get_file_metadata_from_html(path.as_str(), text.as_str())?; Ok(LSResult { files: file_metadatas, continuation_token: None, diff --git a/tests/integration/io/test_list_files_http.py b/tests/integration/io/test_list_files_http.py index c9a74579e7..2471133c3f 100644 --- a/tests/integration/io/test_list_files_http.py +++ b/tests/integration/io/test_list_files_http.py @@ -37,14 +37,14 @@ def nginx_http_url(nginx_config, tmpdir_factory): [ f"", f"/", - f"test_ls", - f"test_ls/", - f"test_ls//", - f"test_ls/paginated-10-files/", + f"/test_ls", + f"/test_ls/", + f"/test_ls//", + f"/test_ls/paginated-10-files/", ], ) def test_http_flat_directory_listing(path, nginx_http_url): - http_path = f"{nginx_http_url}/{path}" + http_path = f"{nginx_http_url}{path}" fs = HTTPFileSystem() fsspec_result = fs.ls(http_path, detail=True) daft_ls_result = io_list(http_path) @@ -90,3 +90,57 @@ def test_http_flat_directory_listing_recursive(path, nginx_http_url): fsspec_result = list(fs.glob(http_path.rstrip("/") + "/**", detail=True).values()) daft_ls_result = io_list(http_path, recursive=True) compare_http_result(daft_ls_result, fsspec_result) + + +@pytest.mark.integration() +def test_http_listing_absolute_urls(nginx_config, tmpdir): + nginx_http_url, _ = nginx_config + + tmpdir = Path(tmpdir) + test_manifest_file = tmpdir / "index.html" + test_manifest_file.write_text( + f""" + this is an absolute path to a file + this is an absolute path to a dir + """ + ) + + with mount_data_nginx(nginx_config, tmpdir): + http_path = f"{nginx_http_url}/index.html" + daft_ls_result = io_list(http_path, recursive=False) + + # NOTE: Cannot use fsspec here because they do not correctly find the links + # fsspec_result = fs.ls(http_path, detail=True) + # compare_http_result(daft_ls_result, fsspec_result) + + assert daft_ls_result == [ + {"type": "File", "path": f"{nginx_http_url}/other.html", "size": None}, + {"type": "Directory", "path": f"{nginx_http_url}/dir/", "size": None}, + ] + + +@pytest.mark.integration() +def test_http_listing_absolute_base_urls(nginx_config, tmpdir): + nginx_http_url, _ = nginx_config + + tmpdir = Path(tmpdir) + test_manifest_file = tmpdir / "index.html" + test_manifest_file.write_text( + f""" + this is an absolute base path to a file + this is an absolute base path to a dir + """ + ) + + with mount_data_nginx(nginx_config, tmpdir): + http_path = f"{nginx_http_url}/index.html" + daft_ls_result = io_list(http_path, recursive=False) + + # NOTE: Cannot use fsspec here because they do not correctly find the links + # fsspec_result = fs.ls(http_path, detail=True) + # compare_http_result(daft_ls_result, fsspec_result) + + assert daft_ls_result == [ + {"type": "File", "path": f"{nginx_http_url}/other.html", "size": None}, + {"type": "Directory", "path": f"{nginx_http_url}/dir/", "size": None}, + ]