diff --git a/integrations/unstructured/src/haystack_integrations/components/converters/unstructured/converter.py b/integrations/unstructured/src/haystack_integrations/components/converters/unstructured/converter.py index 9230ecb0d..e65f91989 100644 --- a/integrations/unstructured/src/haystack_integrations/components/converters/unstructured/converter.py +++ b/integrations/unstructured/src/haystack_integrations/components/converters/unstructured/converter.py @@ -9,9 +9,9 @@ from typing import Any, Dict, List, Literal, Optional, Union from haystack import Document, component, default_from_dict, default_to_dict -from haystack.components.converters.utils import normalize_metadata +from haystack.components.converters.utils import get_bytestream_from_source, normalize_metadata +from haystack.dataclasses.byte_stream import ByteStream from haystack.utils import Secret, deserialize_secrets_inplace -from tqdm import tqdm from unstructured.documents.elements import Element from unstructured.partition.api import partition_via_api @@ -123,64 +123,79 @@ def from_dict(cls, data: Dict[str, Any]) -> "UnstructuredFileConverter": @component.output_types(documents=List[Document]) def run( self, - paths: Union[List[str], List[os.PathLike]], + sources: Union[List[Union[str, os.PathLike, ByteStream]]], meta: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None, ): """ - Convert files to Haystack Documents using the Unstructured API. + Convert files or byte streams to Haystack Documents using the Unstructured API. - :param paths: List of paths to convert. Paths can be files or directories. - If a path is a directory, all files in the directory are converted. Subdirectories are ignored. + :param sources: List of file paths or byte streams to convert. + Paths can be files or directories. ByteStream is also supported. :param meta: Optional metadata to attach to the Documents. - This value can be either a list of dictionaries or a single dictionary. - If it's a single dictionary, its content is added to the metadata of all produced Documents. - If it's a list, the length of the list must match the number of paths, because the two lists will be zipped. - Please note that if the paths contain directories, `meta` can only be a single dictionary - (same metadata for all files). - + This value can be a single dictionary or a list of dictionaries, matching the number of sources. :returns: A dictionary with the following key: - `documents`: List of Haystack Documents. - - :raises ValueError: If `meta` is a list and `paths` contains directories. + :raises ValueError: If `meta` is a list and `sources` contains directories. """ - paths_obj = [Path(path) for path in paths] - filepaths = [path for path in paths_obj if path.is_file()] - filepaths_in_directories = [ - filepath for path in paths_obj if path.is_dir() for filepath in path.glob("*.*") if filepath.is_file() - ] - if filepaths_in_directories and isinstance(meta, list): - error = """"If providing directories in the `paths` parameter, - `meta` can only be a dictionary (metadata applied to every file), - and not a list. To specify different metadata for each file, - provide an explicit list of direct paths instead.""" - raise ValueError(error) - - all_filepaths = filepaths + filepaths_in_directories - # currently, the files are converted sequentially to gently handle API failures + documents = [] - meta_list = normalize_metadata(meta, sources_count=len(all_filepaths)) - - for filepath, metadata in tqdm( - zip(all_filepaths, meta_list), desc="Converting files to Haystack Documents", disable=not self.progress_bar - ): - elements = self._partition_file_into_elements(filepath=filepath) - docs_for_file = self._create_documents( - filepath=filepath, + all_sources = [] + meta_sources = normalize_metadata(meta, len(sources)) + + # Iterate over the sources + for source in sources: + if isinstance(source, (str, os.PathLike)): + path_obj = Path(source) + + if path_obj.is_file(): + # Add individual file + all_sources.append(path_obj) + + elif path_obj.is_dir(): + # Ensure meta is a dict when directories are provided + if not isinstance(meta, dict): + error = """"If providing directories in the `paths` parameter, + `meta` can only be a dictionary (metadata applied to every file), + and not a list. To specify different metadata for each file, + provide an explicit list of direct paths instead.""" + raise ValueError(error) + + # If the source is a directory, add all files in the directory + for file in path_obj.glob("*.*"): + if file.is_file(): + all_sources.append(file) # Add each file in the directory + + elif isinstance(source, ByteStream): + # Handle ByteStream + all_sources.append(source) + + for source, metadata in zip(all_sources, meta_sources): + try: + bytestream = get_bytestream_from_source(source=source) + except Exception as e: + logger.warning("Could not read {source}. Skipping it. Error: {error}", source=source, error=e) + continue + + elements = self._partition_source_into_elements(source=source) + merged_metadata = {**bytestream.meta, **metadata} + + docs_for_stream = self._create_documents( elements=elements, document_creation_mode=self.document_creation_mode, separator=self.separator, - meta=metadata, + meta=merged_metadata, ) - documents.extend(docs_for_file) + documents.extend(docs_for_stream) + return {"documents": documents} @staticmethod def _create_documents( - filepath: Path, elements: List[Element], document_creation_mode: Literal["one-doc-per-file", "one-doc-per-page", "one-doc-per-element"], separator: str, meta: Dict[str, Any], + filepath: Optional[Path] = None, ) -> List[Document]: """ Create Haystack Documents from the elements returned by Unstructured. @@ -190,7 +205,8 @@ def _create_documents( if document_creation_mode == "one-doc-per-file": text = separator.join([str(el) for el in elements]) metadata = copy.deepcopy(meta) - metadata["file_path"] = str(filepath) + if filepath: + metadata["file_path"] = str(filepath) # Only include file path if provided docs = [Document(content=text, meta=metadata)] elif document_creation_mode == "one-doc-per-page": @@ -198,7 +214,8 @@ def _create_documents( meta_per_page: defaultdict[int, dict] = defaultdict(dict) for el in elements: metadata = copy.deepcopy(meta) - metadata["file_path"] = str(filepath) + if filepath: + metadata["file_path"] = str(filepath) if hasattr(el, "metadata"): metadata.update(el.metadata.to_dict()) page_number = int(metadata.get("page_number", 1)) @@ -211,7 +228,8 @@ def _create_documents( elif document_creation_mode == "one-doc-per-element": for index, el in enumerate(elements): metadata = copy.deepcopy(meta) - metadata["file_path"] = str(filepath) + if filepath: + metadata["file_path"] = str(filepath) metadata["element_index"] = index if hasattr(el, "metadata"): metadata.update(el.metadata.to_dict()) @@ -219,20 +237,30 @@ def _create_documents( metadata["category"] = el.category doc = Document(content=str(el), meta=metadata) docs.append(doc) + return docs - def _partition_file_into_elements(self, filepath: Path) -> List[Element]: + def _partition_source_into_elements(self, source: Union[Path, ByteStream]) -> List[Element]: """ Partition a file into elements using the Unstructured API. """ elements = [] try: - elements = partition_via_api( - filename=str(filepath), - api_url=self.api_url, - api_key=self.api_key.resolve_value() if self.api_key else None, - **self.unstructured_kwargs, - ) + if isinstance(source, Path): + elements = partition_via_api( + filename=str(source), + api_url=self.api_url, + api_key=self.api_key.resolve_value() if self.api_key else None, + **self.unstructured_kwargs, + ) + else: + elements = partition_via_api( + file=source.data, + metadata_filename=str(source.meta), + api_url=self.api_url, + api_key=self.api_key.resolve_value() if self.api_key else None, + **self.unstructured_kwargs, + ) except Exception as e: - logger.warning(f"Unstructured could not process file {filepath}. Error: {e}") + logger.warning(f"Unstructured could not process source {source}. Error: {e}") return elements diff --git a/integrations/unstructured/tests/test_converter.py b/integrations/unstructured/tests/test_converter.py index 5d1a6c091..7dea5cbd8 100644 --- a/integrations/unstructured/tests/test_converter.py +++ b/integrations/unstructured/tests/test_converter.py @@ -1,7 +1,11 @@ # SPDX-FileCopyrightText: 2023-present deepset GmbH # # SPDX-License-Identifier: Apache-2.0 +from pathlib import Path + import pytest +from haystack.dataclasses.byte_stream import ByteStream + from haystack_integrations.components.converters.unstructured import UnstructuredFileConverter @@ -86,6 +90,34 @@ def test_run_one_doc_per_file(self, samples_path): assert len(documents) == 1 assert documents[0].meta == {"file_path": str(pdf_path)} + @pytest.mark.integration + def test_run_one_doc_per_file_bytestream(self, samples_path): + pdf_path = samples_path / "sample_pdf.pdf" + pdf_stream = ByteStream.from_file_path(pdf_path) + + local_converter = UnstructuredFileConverter( + api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-file" + ) + + documents = local_converter.run([pdf_stream])["documents"] + + assert len(documents) == 1 + + @pytest.mark.integration + def test_run_one_doc_per_page_bytestream(self, samples_path): + pdf_path = samples_path / "sample_pdf.pdf" + pdf_stream = ByteStream.from_file_path(pdf_path) + + local_converter = UnstructuredFileConverter( + api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-page" + ) + + documents = local_converter.run([pdf_stream])["documents"] + + assert len(documents) == 4 + for i, doc in enumerate(documents, start=1): + assert doc.meta["page_number"] == i + @pytest.mark.integration def test_run_one_doc_per_page(self, samples_path): pdf_path = samples_path / "sample_pdf.pdf" @@ -127,7 +159,7 @@ def test_run_one_doc_per_file_with_meta(self, samples_path): api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-file" ) - documents = local_converter.run(paths=[pdf_path], meta=meta)["documents"] + documents = local_converter.run(sources=[pdf_path], meta=meta)["documents"] assert len(documents) == 1 assert documents[0].meta["file_path"] == str(pdf_path) @@ -143,7 +175,7 @@ def test_run_one_doc_per_page_with_meta(self, samples_path): api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-page" ) - documents = local_converter.run(paths=[pdf_path], meta=meta)["documents"] + documents = local_converter.run(sources=[pdf_path], meta=meta)["documents"] assert len(documents) == 4 for i, doc in enumerate(documents, start=1): assert doc.meta["file_path"] == str(pdf_path) @@ -159,7 +191,7 @@ def test_run_one_doc_per_element_with_meta(self, samples_path): api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-element" ) - documents = local_converter.run(paths=[pdf_path], meta=meta)["documents"] + documents = local_converter.run(sources=[pdf_path], meta=meta)["documents"] assert len(documents) > 4 first_element_index = 0 @@ -185,7 +217,7 @@ def test_run_one_doc_per_element_with_meta_list_two_files(self, samples_path): api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-element" ) - documents = local_converter.run(paths=pdf_path, meta=meta)["documents"] + documents = local_converter.run(sources=pdf_path, meta=meta)["documents"] assert len(documents) > 4 for doc in documents: @@ -205,7 +237,7 @@ def test_run_one_doc_per_element_with_meta_list_folder_fail(self, samples_path): api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-element" ) with pytest.raises(ValueError): - local_converter.run(paths=pdf_path, meta=meta)["documents"] + local_converter.run(sources=pdf_path, meta=meta)["documents"] @pytest.mark.integration def test_run_one_doc_per_element_with_meta_list_folder(self, samples_path): @@ -216,7 +248,7 @@ def test_run_one_doc_per_element_with_meta_list_folder(self, samples_path): api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-element" ) - documents = local_converter.run(paths=pdf_path, meta=meta)["documents"] + documents = local_converter.run(sources=pdf_path, meta=meta)["documents"] assert len(documents) > 4 for doc in documents: @@ -226,3 +258,59 @@ def test_run_one_doc_per_element_with_meta_list_folder(self, samples_path): assert "category" in doc.meta assert "common_meta" in doc.meta assert doc.meta["common_meta"] == "common" + + @pytest.mark.integration + def test_run_one_doc_per_element_with_meta_list_multiple_sources(self, samples_path): + sources = [ + ByteStream(data=b"content", meta={"file_path": "some_file.md"}), + "README.md", + ByteStream(data=b"content", meta={"file_path": "yet_another_file.md"}), + Path(__file__), + ByteStream(data=b"content", meta={"file_path": "my_file.md"}), + ] + + meta = [ + {"type": "ByteStream"}, + {"type": "str"}, + {"type": "ByteStream"}, + {"type": "Path"}, + {"type": "ByteStream"}, + ] + + local_converter = UnstructuredFileConverter( + api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-page" + ) + + documents = local_converter.run(sources=sources, meta=meta)["documents"] + + assert len(documents) == 5 + assert documents[0].meta["type"] == "ByteStream" + assert documents[0].meta["file_path"] == "some_file.md" + assert documents[1].meta["type"] == "str" + assert documents[1].meta["file_path"] == "README.md" + assert documents[2].meta["type"] == "ByteStream" + assert documents[2].meta["file_path"] == "yet_another_file.md" + assert documents[3].meta["type"] == "Path" + assert documents[4].meta["type"] == "ByteStream" + assert documents[4].meta["file_path"] == "my_file.md" + + @pytest.mark.integration + def test_run_one_doc_per_element_with_meta_list_multiple_sources_directory(self, samples_path): + sources = [ + "README.md", + ByteStream(data=b"Some content", meta={"file_path": "some_file.md"}), + samples_path, + Path(__file__), + ] + + meta = {"common_meta": "applies_to_all"} + + + local_converter = UnstructuredFileConverter( + api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-page" + ) + + documents = local_converter.run(sources=sources, meta=meta)["documents"] + + for doc in documents: + assert doc.meta["common_meta"] == "applies_to_all"