Skip to content

Commit

Permalink
chore:[security]upgrade hashlib algo
Browse files Browse the repository at this point in the history
  • Loading branch information
GangGreenTemperTatum committed Jun 24, 2024
1 parent 696bf1f commit 2d858c3
Showing 1 changed file with 283 additions and 19 deletions.
302 changes: 283 additions & 19 deletions compass_sdk/utils.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,65 @@
import glob
import hashlib
import json
import os
from collections import deque
from concurrent.futures import Executor, Future
from typing import Callable, Iterable, Iterator, List, Optional, TypeVar
from collections import Counter
from io import BytesIO
from pathlib import Path
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union

import fsspec
import pandas as pd
import pyarrow.parquet as pq
from compass_parser import logger
from compass_parser.constants import FILETYPES_TO_CONTENT_TYPES
from compass_parser.types import CompassDocument, CompassDocumentMetadata, CompassFileType, CompassSdkStage
from fsspec import AbstractFileSystem
from unstructured.documents.elements import Element
from unstructured.file_utils.filetype import FileType, detect_filetype

from compass_sdk import CompassDocument, CompassDocumentMetadata, CompassSdkStage
SUPPORTED_DATASET_FILETYPES = {
CompassFileType.Parquet,
CompassFileType.Csv,
CompassFileType.Jsonl,
CompassFileType.Tsv,
CompassFileType.Dat,
CompassFileType.Xls,
CompassFileType.Xlsx,
}

T = TypeVar("T")
U = TypeVar("U")

def has_valid_dimensions(element: Element) -> bool:
"""Check if an element has valid dimensions, i.e., height and width are greater than 0"""
coords = element.metadata.coordinates
if not coords or not coords.points:
return True

def imap_queued(executor: Executor, f: Callable[[T], U], it: Iterable[T], max_queued: int) -> Iterator[U]:
assert max_queued >= 1
tasks = deque[Future[U]]()
height = abs(coords.points[1][1] - coords.points[0][1])
width = abs(coords.points[2][0] - coords.points[0][0])
return height > 0 and width > 0

for x in it:
tasks.append(executor.submit(f, x))

while len(tasks) > max_queued:
yield tasks.popleft().result()
def is_horizontal_text(element: Element, threshold=0.6) -> bool:
"""Check if a text element is in horizontal orientation."""
coords = element.metadata.coordinates
if not coords or not coords.points:
return True

while tasks:
yield tasks.popleft().result()
height = abs(coords.points[1][1] - coords.points[0][1])
width = abs(coords.points[2][0] - coords.points[0][0])
return width / (height + 1e-10) > threshold


def are_elements_unique(elements: List[Any]) -> bool:
"""
Check if all elements in a list are unique (ignoring None)
:param elements: the list of elements
:return: True if all elements are unique, False otherwise
"""

count = Counter(elements)
count.pop(None)
return all(v == 1 for v in count.values())


def get_fs(document_path: str) -> AbstractFileSystem:
Expand All @@ -47,12 +82,13 @@ def open_document(document_path) -> CompassDocument:
:param document_path: the path to the document
:return: a file-like object
"""
doc = CompassDocument(metadata=CompassDocumentMetadata(filename=document_path))
doc = CompassDocument(
metadata=CompassDocumentMetadata(filename=document_path))
try:
fs = get_fs(document_path)
with fs.open(document_path, "rb") as f:
val = f.read()
if isinstance(val, bytes):
if val is not None and isinstance(val, bytes):
doc.filebytes = val
else:
raise Exception(f"Expected bytes, got {type(val)}")
Expand All @@ -71,16 +107,244 @@ def scan_folder(folder_path: str, allowed_extensions: Optional[List[str]] = None
"""
fs = get_fs(folder_path)
all_files = []
path_prepend = f"{folder_path.split('://')[0]}://" if folder_path.find("://") >= 0 else ""
path_prepend = f"{folder_path.split(
'://')[0]}://" if folder_path.find("://") >= 0 else ""

if allowed_extensions is None:
allowed_extensions = [""]
else:
allowed_extensions = [f".{ext}" if not ext.startswith(".") else ext for ext in allowed_extensions]
allowed_extensions = [f".{ext}" if not ext.startswith(
".") else ext for ext in allowed_extensions]

for ext in allowed_extensions:
rec_glob = "**/" if recursive else ""
pattern = os.path.join(glob.escape(folder_path), f"{rec_glob}*{ext}")
scanned_files = fs.glob(pattern, recursive=recursive)
all_files.extend([f"{path_prepend}{f}" for f in scanned_files])
return all_files


def is_binary(document_path: str) -> bool:
"""
Returns True if the file is binary (e.g., PDF).
:param document_path: the path to the document
:return: True if the file is binary, False otherwise
"""
fs: fsspec.AbstractFileSystem = get_fs(document_path)
with fs.open(document_path, "rb") as f:
byte_sample = f.read(1024)
if isinstance(byte_sample, bytes):
return b"\x00" in byte_sample
return False


def get_dataset_type(doc_bytes: BytesIO, doc_path: str) -> CompassFileType:
"""
Get the type of the dataset file
:param doc_bytes: the file-like object
:param doc_path: the path to the document
:return: the type of the dataset file
"""
filetype = CompassFileType(Path(doc_path).suffix)
# JSONL is a special case of JSON
if filetype == CompassFileType.Json and is_jsonl(doc_bytes):
return CompassFileType.Jsonl
elif filetype in SUPPORTED_DATASET_FILETYPES:
return filetype

return CompassFileType.Unsupported


def is_jsonl(filebytes: BytesIO) -> bool:
"""
Returns True if the file is a JSONL file.
:param filebytes: the file-like object
:return: True if the file is a JSONL file, False otherwise
"""
try:
pd.read_json(filebytes, lines=True, nrows=2)
return True
except ValueError:
return False


def read_excel_batches(filepath: str, chunksize: Optional[int] = None) -> Iterator[pd.DataFrame]:
"""
Read an Excel file using the correct engine
:param filepath: the path to the Excel file
:param chunksize: the chunk size
"""
skip = 0
while True:
df = pd.read_excel(filepath, skiprows=skip, nrows=chunksize)
skip += chunksize if chunksize else len(df)
if df.empty:
break
yield df


DATASET_LOAD_FUNCTIONS = {
CompassFileType.Jsonl: pd.read_json,
CompassFileType.Csv: pd.read_csv,
CompassFileType.Tsv: pd.read_csv,
CompassFileType.Dat: pd.read_csv,
CompassFileType.Parquet: pq.ParquetFile,
CompassFileType.Xls: pd.read_excel,
CompassFileType.Xlsx: pd.read_excel,
}


# Functions that allow to load datasets in batches. Notice that some dataset formats,
# like JSON, can be loaded with the same Pandas function to return either the whole dataset or in batches.
BATCHED_DATASET_LOAD_FUNCTIONS = {
CompassFileType.Jsonl: pd.read_json,
CompassFileType.Csv: pd.read_csv,
CompassFileType.Tsv: pd.read_csv,
CompassFileType.Dat: pd.read_csv,
CompassFileType.Parquet: pq.ParquetFile,
CompassFileType.Xls: read_excel_batches,
CompassFileType.Xlsx: read_excel_batches,
}


def load_dataset(filebytes: BytesIO, filepath: str, dataset_type: CompassFileType) -> Tuple[Iterator, List[str]]:
"""
Load a dataset from a file using the appropriate function
:param filebytes: the file-like object
:param filepath: the path to the dataset
:param dataset_type: the type of the dataset
:return: a tuple with the dataset and the column names (if any)
"""
load_func = DATASET_LOAD_FUNCTIONS.get(dataset_type, None)
batched_load_func = BATCHED_DATASET_LOAD_FUNCTIONS.get(dataset_type, None)
if load_func is None or batched_load_func is None:
logger.error(f"Unknown dataset type for {filepath}")
return iter([]), []

load_args = {}
if dataset_type == CompassFileType.Jsonl:
load_args["lines"] = True
elif dataset_type == CompassFileType.Tsv:
load_args["sep"] = "\t"

header, load_args = get_dataset_header(
filebytes, dataset_type, load_func, load_args)

# Parquet files are loaded with Pyarrow and need a different iterator. The rest are loaded with Pandas
if dataset_type == CompassFileType.Parquet:
batches_iterator = batched_load_func(filebytes).iter_batches(10000)
else:
batches_iterator = batched_load_func(
filebytes, **(load_args | {"chunksize": 10000}))
return batches_iterator, header


def get_dataset_header(
filebytes: BytesIO, dataset_type: CompassFileType, load_func: Callable, load_args: Dict
) -> Tuple[List[str], Dict[str, Any]]:
"""
Extract the dataset header if it exists (only tabular data, i.e., not JSON). Otherwise, create a default one
:param filebytes: the file-like object
:param dataset_type: the type of the dataset
:param load_func: the function to use to load the dataset
:param load_args: the arguments to pass to the load
:return: the dataset header
"""
# JSON files do not have headers, and Parquet files always have headers
header = []
if dataset_type in {CompassFileType.Json, CompassFileType.Jsonl, CompassFileType.Parquet}:
return header, load_args

df = load_func(filebytes, **(load_args | {"header": None, "nrows": 10}))
filebytes.seek(0)
df_header = load_func(filebytes, **(load_args | {"nrows": 10}))
filebytes.seek(0)
if tuple(df.dtypes) != tuple(df_header.dtypes):
header = df_header.columns.tolist()

if not header:
header = [f"Column {i}" for i in range(len(df.columns))]

return header, load_args


def iter_batch_rows(batch_iterator: Any, dataset_type: CompassFileType) -> Iterator:
"""
Iterate over the rows of a dataset
:param batch_iterator: the batch iterator
:param dataset_type: the type of the dataset
:return: an iterator over the rows
"""
if dataset_type == CompassFileType.Parquet:
batch_iterator = batch_iterator.to_pandas()
return batch_iterator.iterrows()


def load_nested_json(json_object: Union[Dict[str, Any], List, str]) -> Any:
"""
Load a nested JSON and decode and navigate its nested JSON strings
:param json_object: the JSON string
:return: the loaded JSON
"""
if isinstance(json_object, dict):
return {k: load_nested_json(v) for k, v in json_object.items()}
elif isinstance(json_object, list):
return [load_nested_json(v) for v in json_object]
elif isinstance(json_object, str):
try:
decoded_str = json.loads(json_object)
return load_nested_json(decoded_str)
except json.JSONDecodeError:
pass
return json_object


def get_content_type(file: BytesIO, filepath: str) -> str:
"""
Wrapper function for Unstructured's detect_filetype to get a file's
type and fix the bug when determining the content type for LibreOffice files.
This function first attempts to determine the file type using the file-like object.
If the file type is MSG but the file extension says otherwise, then it will attempt to
determine the file type using the file's extension. Finally, it will return the content type
based on the file type, which is eventually passed to Unstructured to override its buggy content type logic
:param file: the file-like object
:param filepath: the path to the file
"""
filetype = detect_filetype(file=file) or FileType.EMPTY
ext = Path(filepath).suffix.lower()

# If the file type is unknown, then try to detect the file type using the file's extension.
# If the file type is MSG but the file extension says otherwise, then use the file's extension.
# This is a workaround for the bug in Unstructured's content type detection for LibreOffice files.
if filetype == FileType.UNK or filetype == FileType.MSG and ext and not ext == ".msg":
filetype = detect_filetype(filepath)

# If the file is a valid JSON, we treat it as text because Unstructured does not support JSON content type
filetype = FileType.TXT if filetype == FileType.JSON else filetype

if filetype not in FILETYPES_TO_CONTENT_TYPES:
return "application/octet-stream"

return FILETYPES_TO_CONTENT_TYPES[filetype]


def generate_file_id(doc_text: str, is_dataset: bool, filepath: Optional[str] = None) -> str:
"""
Generate a unique document id. If a document id is provided, then it is used right away.
Otherwise, a document id is generated based on the document's text or file path. If a file path exists,
a document id is generated based on:
- the file path and text if it's a dataset
- the file path if it's a regular document
This prevents different documents coming out of the same dataset ending up having the same id.
If a file path does not exist, a document id is generated based on the document's text.
:param filepath: the path of the document
:param doc_text: the text of the document
:param is_dataset: whether the document is a dataset
:return: a unique document id
"""
if filepath:
text = f"{filepath}_{doc_text}" if is_dataset else filepath
else:
text = doc_text
return hashlib.sha256(text.encode()).hexdigest()

0 comments on commit 2d858c3

Please sign in to comment.