diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml
index 6fbcf65..34b322a 100644
--- a/.github/workflows/pytest.yml
+++ b/.github/workflows/pytest.yml
@@ -29,6 +29,9 @@ jobs:
echo "$CA_CERT" > ca-cert.pem
echo "$CA_KEY" > ca-key.pem
echo "$CA_SIGNING_KEY" > ca-signing-key.pem
+ - name: Start the proxy
+ run: |
+ poetry run python ontologytimemachine/custom_proxy.py &
- name: Test with pytest
run: |
- poetry run pytest
+ poetry run pytest -v
diff --git a/Dockerfile b/Dockerfile
index bc75693..21f939a 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -21,6 +21,4 @@ RUN pip install poetry==$POETRY_VERSION
RUN poetry config virtualenvs.create false
RUN poetry install --no-dev && rm pyproject.toml
-
-CMD python3 -m proxy --ca-key-file ca-key.pem --ca-cert-file ca-cert.pem --ca-signing-key-file ca-signing-key.pem --hostname 0.0.0.0 --port $PORT --plugins ontologytimemachine.custom_proxy.OntologyTimeMachinePlugin
-
+ENTRYPOINT ["python3", "ontologytimemachine/custom_proxy.py"]
\ No newline at end of file
diff --git a/README.md b/README.md
index 413afd3..39e9ecb 100644
--- a/README.md
+++ b/README.md
@@ -35,3 +35,15 @@ cp ca-signing-key.pem ~/ontology-time-machine/ca-signing-key.pem
### Not working:
- curl -x http://0.0.0.0:8899 -H "Accept: text/turtle" --cacert ca-cert.pem http://ontologi.es/days#
+
+### Install poetry virtual environment
+```
+poetry install
+```
+
+### Activate poetry environment
+```
+poetry shell
+```
+
+python3 ontologytimemachine/custom_proxy.py --ontoFormat ntriples --ontoVersion originalFailoverLiveLatest --ontoPrecedence enforcedPriority
\ No newline at end of file
diff --git a/ontologytimemachine/custom_proxy.py b/ontologytimemachine/custom_proxy.py
index 95cf79f..f147b60 100644
--- a/ontologytimemachine/custom_proxy.py
+++ b/ontologytimemachine/custom_proxy.py
@@ -1,107 +1,132 @@
from proxy.http.proxy import HttpProxyBasePlugin
-from proxy.http.parser import HttpParser, httpParserTypes
+from proxy.http.parser import HttpParser
from proxy.common.utils import build_http_response
-from proxy.http.methods import HttpMethods
-from ontologytimemachine.utils.utils import proxy_logic, parse_arguments
-from ontologytimemachine.utils.utils import check_if_archivo_ontology_requested
from ontologytimemachine.utils.mock_responses import mock_response_403
-from requests.exceptions import SSLError, Timeout, ConnectionError, RequestException
+from ontologytimemachine.proxy_wrapper import HttpRequestWrapper
+from ontologytimemachine.utils.proxy_logic import (
+ get_response_from_request,
+ if_not_block_host,
+ is_archivo_ontology_request,
+)
+from ontologytimemachine.utils.config import Config, parse_arguments
from http.client import responses
import proxy
import sys
import logging
-IP = '0.0.0.0'
-PORT = '8899'
+IP = "0.0.0.0"
+PORT = "8899"
+config = None
-logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
+logging.basicConfig(
+ level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
+)
logger = logging.getLogger(__name__)
+
class OntologyTimeMachinePlugin(HttpProxyBasePlugin):
def __init__(self, *args, **kwargs):
+ logger.info("Init")
super().__init__(*args, **kwargs)
- (self.ontoFormat, self.ontoVersion, self.only_ontologies,
- self.https_intercept, self.inspect_redirects, self.forward_headers,
- self.subject_binary_search_threshold) = parse_arguments()
+ self.config = config
+ def before_upstream_connection(self, request: HttpParser) -> HttpParser | None:
+ print(config)
+ logger.info("Before upstream connection hook")
+ logger.info(
+ f"Request method: {request.method} - Request host: {request.host} - Request path: {request.path} - Request headers: {request.headers}"
+ )
+ wrapped_request = HttpRequestWrapper(request)
- def before_upstream_connection(self, request: HttpParser):
- logger.info('Before upstream connection hook')
- logger.info(f'Request method: {request.method} - Request host: {request.host} - Request path: {request.path} - Request headers: {request.headers}')
+ if wrapped_request.is_connect_request():
+ logger.info(f"HTTPS interception mode: {self.config.httpsInterception}")
- if request.method == b'CONNECT':
- logger.info(f'HTTPS interception mode: {self.https_intercept}')
# Only intercept if interception is enabled
- if self.https_intercept in ['all', 'archivo']:
+ if if_not_block_host(self.config):
+ logger.info("HTTPS interception is on, forwardig the request")
return request
else:
+ logger.info("HTTPS interception is blocked")
return None
-
- ontology_request = check_if_archivo_ontology_requested(request)
- # If only ontology mode, return None in all other cases
- if self.only_ontologies and not ontology_request:
- logger.warning('Request denied: not an ontology request and only ontologies mode is enabled')
- self.queue_response(mock_response_403)
- return None
-
- if ontology_request:
- logger.debug('The request is for an ontology')
- response = proxy_logic(request, self.ontoFormat, self.ontoVersion)
+ # # If only ontology mode, return None in all other cases
+ logger.info(f"Config: {self.config}")
+ response = get_response_from_request(wrapped_request, self.config)
+ if response:
self.queue_response(response)
return None
- return request
-
- def handle_client_request(self, request: HttpParser):
- logger.info('Handle client request hook')
- logger.info(f'Request method: {request.method} - Request host: {request.host} - Request path: {request.path} - Request headers: {request.headers}')
-
- logger.debug(request.method)
- if request.method == b'CONNECT':
- return request
+ return request
- ontology_request = check_if_archivo_ontology_requested(request)
- if not ontology_request:
- logger.info('The requested IRI is not part of DBpedia Archivo')
- return request
+ def do_intercept(self, _request: HttpParser) -> bool:
+ wrapped_request = HttpRequestWrapper(_request)
+ if self.config.httpsInterception in ["all"]:
+ return True
+ elif self.config.httpsInterception in ["none"]:
+ return False
+ elif self.config.httpsInterception in ["archivo"]:
+ if is_archivo_ontology_request(wrapped_request):
+ return True
+ return False
+ else:
+ logger.info(
+ f"httpsInterception: {self.config.httpsInterception} option is not allowed."
+ )
+ return False
- response = proxy_logic(request, self.ontoFormat, self.ontoVersion)
- self.queue_response(response)
+ def handle_client_request(self, request: HttpParser) -> HttpParser:
+ logger.info("Handle client request hook")
+ logger.info(
+ f"Request method: {request.method} - Request host: {request.host} - Request path: {request.path} - Request headers: {request.headers}"
+ )
- return None
-
+ return request
def handle_upstream_chunk(self, chunk: memoryview):
return chunk
-
def queue_response(self, response):
self.client.queue(
build_http_response(
- response.status_code,
- reason=bytes(responses[response.status_code], 'utf-8'),
+ response.status_code,
+ reason=bytes(responses[response.status_code], "utf-8"),
headers={
- b'Content-Type': bytes(response.headers.get('Content-Type'), 'utf-8')
- },
- body=response.content
+ b"Content-Type": bytes(
+ response.headers.get("Content-Type"), "utf-8"
+ )
+ },
+ body=response.content,
)
)
-if __name__ == '__main__':
+if __name__ == "__main__":
+
+ config = parse_arguments()
+
+ sys.argv = [sys.argv[0]]
+
+ # check it https interception is enabled
+ if config.httpsInterception != "none":
+ sys.argv += [
+ "--ca-key-file",
+ "ca-key.pem",
+ "--ca-cert-file",
+ "ca-cert.pem",
+ "--ca-signing-key-file",
+ "ca-signing-key.pem",
+ ]
sys.argv += [
- '--ca-key-file', 'ca-key.pem',
- '--ca-cert-file', 'ca-cert.pem',
- '--ca-signing-key-file', 'ca-signing-key.pem',
- ]
- sys.argv += [
- '--hostname', IP,
- '--port', PORT,
- '--plugins', __name__ + '.OntologyTimeMachinePlugin'
+ "--hostname",
+ IP,
+ "--port",
+ PORT,
+ "--plugins",
+ __name__ + ".OntologyTimeMachinePlugin",
]
+
logger.info("Starting OntologyTimeMachineProxy server...")
- proxy.main()
\ No newline at end of file
+ proxy.main()
diff --git a/ontologytimemachine/proxy_wrapper.py b/ontologytimemachine/proxy_wrapper.py
new file mode 100644
index 0000000..6829154
--- /dev/null
+++ b/ontologytimemachine/proxy_wrapper.py
@@ -0,0 +1,110 @@
+from abc import ABC, abstractmethod
+from proxy.http.parser import HttpParser
+import logging
+from typing import Tuple, Dict, Any
+
+# Configure logger
+logging.basicConfig(
+ level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
+)
+logger = logging.getLogger(__name__)
+
+
+class AbstractRequestWrapper(ABC):
+ def __init__(self, request: Any) -> None:
+ self.request = request
+
+ @abstractmethod
+ def is_get_request(self) -> bool:
+ pass
+
+ @abstractmethod
+ def is_connect_request(self) -> bool:
+ pass
+
+ @abstractmethod
+ def is_head_request(self) -> bool:
+ pass
+
+ @abstractmethod
+ def is_https_request(self) -> bool:
+ pass
+
+ @abstractmethod
+ def get_request_host(self) -> str:
+ pass
+
+ @abstractmethod
+ def get_request_path(self) -> str:
+ pass
+
+ @abstractmethod
+ def get_request_headers(self) -> Dict[str, str]:
+ pass
+
+ @abstractmethod
+ def get_request_accept_header(self) -> str:
+ pass
+
+ @abstractmethod
+ def set_request_accept_header(self, mime_type: str) -> None:
+ pass
+
+ @abstractmethod
+ def get_request_url_host_path(self) -> Tuple[str, str, str]:
+ pass
+
+
+class HttpRequestWrapper(AbstractRequestWrapper):
+ def __init__(self, request: HttpParser) -> None:
+ super().__init__(request)
+
+ def is_get_request(self) -> bool:
+ return self.request.method == b"GET"
+
+ def is_connect_request(self) -> bool:
+ return self.request.method == b"CONNECT"
+
+ def is_head_request(self) -> bool:
+ return self.request.method == b"HEAD"
+
+ def is_https_request(self) -> bool:
+ return self.request.method == b"CONNECT" or self.request.headers.get(
+ b"Host", b""
+ ).startswith(b"https")
+
+ def get_request_host(self) -> str:
+ return self.request.host.decode("utf-8")
+
+ def get_request_path(self) -> str:
+ return self.request.path.decode("utf-8")
+
+ def get_request_headers(self) -> Dict[str, str]:
+ headers: Dict[str, str] = {}
+ for k, v in self.request.headers.items():
+ headers[v[0].decode("utf-8")] = v[1].decode("utf-8")
+ return headers
+
+ def get_request_accept_header(self) -> str:
+ logger.info("Wrapper - get_request_accept_header")
+ return self.request.headers[b"accept"][1].decode("utf-8")
+
+ def set_request_accept_header(self, mime_type: str) -> None:
+ self.request.headers[b"accept"] = (b"Accept", mime_type.encode("utf-8"))
+ logger.info(f'Accept header set to: {self.request.headers[b"accept"][1]}')
+
+ def get_request_url_host_path(self) -> Tuple[str, str, str]:
+ logger.info("Get ontology from request")
+ if (self.request.method in {b"GET", b"HEAD"}) and not self.request.host:
+ for k, v in self.request.headers.items():
+ if v[0].decode("utf-8") == "Host":
+ host = v[1].decode("utf-8")
+ path = self.request.path.decode("utf-8")
+ url = f"https://{host}{path}"
+ else:
+ host = self.request.host.decode("utf-8")
+ path = self.request.path.decode("utf-8")
+ url = str(self.request._url)
+
+ logger.info(f"Ontology: {url}")
+ return url, host, path
diff --git a/ontologytimemachine/utils/config.py b/ontologytimemachine/utils/config.py
new file mode 100644
index 0000000..48093d1
--- /dev/null
+++ b/ontologytimemachine/utils/config.py
@@ -0,0 +1,169 @@
+import argparse
+from dataclasses import dataclass
+from enum import Enum
+from typing import Dict, Any
+
+
+class LogLevel(Enum):
+ DEBUG = "debug"
+ INFO = "info"
+ WARNING = "warning"
+ ERROR = "error"
+
+
+class OntoFormat(Enum):
+ TURTLE = "turtle"
+ NTRIPLES = "ntriples"
+ RDFXML = "rdfxml"
+ HTMLDOCU = "htmldocu"
+
+
+class OntoPrecedence(Enum):
+ DEFAULT = "default"
+ ENFORCED_PRIORITY = "enforcedPriority"
+ ALWAYS = "always"
+
+
+class OntoVersion(Enum):
+ ORIGINAL = "original"
+ ORIGINAL_FAILOVER_LIVE_LATEST = "originalFailoverLiveLatest"
+ LATEST_ARCHIVED = "latestArchived"
+ TIMESTAMP_ARCHIVED = "timestampArchived"
+ DEPENDENCY_MANIFEST = "dependencyManifest"
+
+
+class HttpsInterception(Enum):
+ NONE = "none"
+ ALL = "all"
+ BLOCK = "block"
+ ARCHIVO = "archivo"
+
+
+@dataclass
+class Config:
+ logLevel: LogLevel = LogLevel.INFO
+ ontoFormat: Dict[str, Any] = None
+ ontoVersion: OntoVersion = (OntoVersion.ORIGINAL_FAILOVER_LIVE_LATEST,)
+ restrictedAccess: bool = False
+ httpsInterception: HttpsInterception = (HttpsInterception.ALL,)
+ disableRemovingRedirects: bool = False
+ timestamp: str = ""
+ # manifest: Dict[str, Any] = None
+
+
+def enum_parser(enum_class, value):
+ value_lower = value.lower()
+ try:
+ return next(e.value for e in enum_class if e.value.lower() == value_lower)
+ except StopIteration:
+ valid_options = ", ".join([e.value for e in enum_class])
+ raise ValueError(
+ f"Invalid value '{value}'. Available options are: {valid_options}"
+ )
+
+
+def parse_arguments() -> Config:
+ parser = argparse.ArgumentParser(description="Process ontology format and version.")
+
+ # Defining ontoFormat argument with nested options
+ parser.add_argument(
+ "--ontoFormat",
+ type=lambda s: enum_parser(OntoFormat, s),
+ default=OntoFormat.TURTLE.value,
+ help="Format of the ontology: turtle, ntriples, rdfxml, htmldocu",
+ )
+
+ parser.add_argument(
+ "--ontoPrecedence",
+ type=lambda s: enum_parser(OntoPrecedence, s),
+ default=OntoPrecedence.ENFORCED_PRIORITY.value,
+ help="Precedence of the ontology: default, enforcedPriority, always",
+ )
+
+ parser.add_argument(
+ "--patchAcceptUpstream",
+ type=bool,
+ default=False,
+ help="Defines if the Accept Header is patched upstream in original mode.",
+ )
+
+ # Defining ontoVersion argument
+ parser.add_argument(
+ "--ontoVersion",
+ type=lambda s: enum_parser(OntoVersion, s),
+ default=OntoVersion.ORIGINAL_FAILOVER_LIVE_LATEST.value,
+ help="Version of the ontology: original, originalFailoverLive, originalFailoverArchivoMonitor, latestArchive, timestampArchive, dependencyManifest",
+ )
+
+ # Enable/disable mode to only proxy requests to ontologies
+ parser.add_argument(
+ "--restrictedAccess",
+ type=bool,
+ default=False,
+ help="Enable/disable mode to only proxy requests to ontologies stored in Archivo.",
+ )
+
+ # Enable HTTPS interception for specific domains
+ parser.add_argument(
+ "--httpsInterception",
+ type=lambda s: enum_parser(HttpsInterception, s),
+ default=HttpsInterception.ALL.value,
+ help="Enable HTTPS interception for specific domains: none, archivo, all, listfilename.",
+ )
+
+ # Enable/disable inspecting or removing redirects
+ parser.add_argument(
+ "--disableRemovingRedirects",
+ type=bool,
+ default=False,
+ help="Enable/disable inspecting or removing redirects.",
+ )
+
+ # Log level
+ parser.add_argument(
+ "--logLevel",
+ type=lambda s: enum_parser(LogLevel, s),
+ default=LogLevel.INFO.value,
+ help="Level of the logging: debug, info, warning, error.",
+ )
+
+ args = parser.parse_args()
+
+ # Check the value of --ontoVersion and prompt for additional arguments if needed
+ if args.ontoVersion == "timestampArchived":
+ args.timestamp = input("Please provide the timestamp (e.g., YYYY-MM-DD): ")
+ # Commenting manifest related code as it is not supported in the current version
+ # elif args.ontoVersion == 'dependencyManifest':
+ # args.manifest = input('Please provide the manifest file path: ')
+
+ # Accessing the arguments
+ if hasattr(args, "timestamp"):
+ timestamp = args.timestamp
+ else:
+ timestamp = None
+
+ # if hasattr(args, 'manifest'):
+ # logger.info(f"Manifest File Path: {args.manifest}")
+ # manifest = args.manifest
+ # else:
+ # manifest = None
+
+ # Create ontoFormat dictionary
+ ontoFormat = {
+ "format": args.ontoFormat,
+ "precedence": args.ontoPrecedence,
+ "patchAcceptUpstream": args.patchAcceptUpstream,
+ }
+
+ # Initialize the Config class with parsed arguments
+ config = Config(
+ logLevel=args.logLevel,
+ ontoFormat=ontoFormat,
+ ontoVersion=args.ontoVersion,
+ restrictedAccess=args.restrictedAccess,
+ httpsInterception=args.httpsInterception,
+ disableRemovingRedirects=args.disableRemovingRedirects,
+ timestamp=args.timestamp if hasattr(args, "timestamp") else "",
+ )
+
+ return config
diff --git a/ontologytimemachine/utils/download_archivo_urls.py b/ontologytimemachine/utils/download_archivo_urls.py
new file mode 100644
index 0000000..030fff5
--- /dev/null
+++ b/ontologytimemachine/utils/download_archivo_urls.py
@@ -0,0 +1,138 @@
+import os
+import hashlib
+import logging
+import requests
+import schedule
+import time
+import csv
+from datetime import datetime, timedelta
+from urllib.parse import urlparse
+from typing import Set, Tuple
+
+
+ARCHIVO_PARSED_URLS: Set[Tuple[str, str]] = set()
+
+
+ARCHIVO_FILE_PATH = "ontologytimemachine/utils/archivo_ontologies_download.txt"
+ARCHIVO_URL = "https://databus.dbpedia.org/ontologies/archivo-indices/ontologies/2024.07.26-220000/ontologies_type=official.csv"
+HASH_FILE_PATH = "ontologytimemachine/utils/archivo_ontologies_hash.txt"
+
+
+LAST_DOWNLOAD_TIMESTAMP = None
+DOWNLOAD_INTERVAL = timedelta(days=1) # 1 day interval for checking the download
+
+
+logging.basicConfig(
+ level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
+)
+logger = logging.getLogger(__name__)
+
+
+def schedule_daily_download():
+ """Schedule the download to run at 3 AM every day."""
+ schedule.every().day.at("03:00").do(download_archivo_urls)
+
+ while True:
+ schedule.run_pending()
+ time.sleep(60) # Check every minute if there’s a scheduled task
+
+
+# Start the scheduler in the background
+def start_scheduler():
+ logger.info("Starting the scheduler for daily archivo ontology download.")
+ schedule_daily_download()
+
+
+# Function to calculate hash of the downloaded file
+def calculate_file_hash(file_path):
+ sha256_hash = hashlib.sha256()
+ with open(file_path, "rb") as f:
+ for byte_block in iter(lambda: f.read(4096), b""):
+ sha256_hash.update(byte_block)
+ return sha256_hash.hexdigest()
+
+
+# Function to download and update archivo URLs file
+def download_archivo_urls():
+ """Download the archivo ontologies file, extract the first column, and save to a text file if a new version is available."""
+ try:
+ logger.info("Checking for new version of archivo ontologies")
+
+ # Download the latest archivo ontologies CSV
+ response = requests.get(ARCHIVO_URL)
+ response.raise_for_status() # Ensure the request was successful
+
+ # Save the file temporarily to calculate the hash
+ temp_file_path = "temp_ontology_indices.csv"
+ with open(temp_file_path, "wb") as temp_file:
+ temp_file.write(response.content)
+
+ # Calculate the hash of the new file
+ new_file_hash = calculate_file_hash(temp_file_path)
+
+ # Compare with the existing hash if available
+ if os.path.exists(HASH_FILE_PATH):
+ with open(HASH_FILE_PATH, "r") as hash_file:
+ old_file_hash = hash_file.read().strip()
+ else:
+ old_file_hash = None
+
+ if new_file_hash != old_file_hash:
+ # New version detected, extract the first column and save to the text file
+ with open(temp_file_path, "r", newline="", encoding="utf-8") as csv_file:
+ csv_reader = csv.reader(csv_file, delimiter=",")
+ with open(ARCHIVO_FILE_PATH, "w") as txt_file:
+ for row in csv_reader:
+ if row:
+ txt_file.write(
+ row[0].strip() + "\n"
+ ) # Write only the first column (URL) to the text file
+
+ # Save the new hash
+ with open(HASH_FILE_PATH, "w") as hash_file:
+ hash_file.write(new_file_hash)
+
+ logger.info("New version of archivo ontologies downloaded and saved.")
+ else:
+ # No new version, remove the temporary file
+ os.remove(temp_file_path)
+ logger.info("No new version of archivo ontologies detected.")
+
+ # Update the last download timestamp
+ global LAST_DOWNLOAD_TIMESTAMP
+ LAST_DOWNLOAD_TIMESTAMP = datetime.now()
+
+ except requests.RequestException as e:
+ logger.error(f"Failed to download archivo ontologies: {e}")
+
+
+def load_archivo_urls():
+ """Load the archivo URLs into the global variable if not already loaded or if a day has passed since the last download."""
+ global ARCHIVO_PARSED_URLS
+ global LAST_DOWNLOAD_TIMESTAMP
+
+ # Check if ARCHIVO_PARSED_URLS is empty or the last download was over a day ago
+ if not ARCHIVO_PARSED_URLS or (
+ LAST_DOWNLOAD_TIMESTAMP is None
+ or datetime.now() - LAST_DOWNLOAD_TIMESTAMP > DOWNLOAD_INTERVAL
+ ):
+ logger.info(
+ "ARCHIVO_PARSED_URLS is empty or more than a day has passed since the last download."
+ )
+ download_archivo_urls()
+
+ # Load archivo URLs after downloading or if already present
+ if not ARCHIVO_PARSED_URLS: # Load only if the set is empty
+ logger.info("Loading archivo ontologies from file")
+ try:
+ with open(ARCHIVO_FILE_PATH, "r") as file:
+ ARCHIVO_PARSED_URLS = {
+ (urlparse(line.strip()).netloc, urlparse(line.strip()).path)
+ for line in file
+ }
+ logger.info(f"Loaded {len(ARCHIVO_PARSED_URLS)} ontology URLs.")
+
+ except FileNotFoundError:
+ logger.error("Archivo ontology file not found.")
+ except Exception as e:
+ logger.error(f"Error loading archivo ontology URLs: {e}")
diff --git a/ontologytimemachine/utils/proxy_logic.py b/ontologytimemachine/utils/proxy_logic.py
new file mode 100644
index 0000000..77655d5
--- /dev/null
+++ b/ontologytimemachine/utils/proxy_logic.py
@@ -0,0 +1,234 @@
+import logging
+import requests
+from ontologytimemachine.utils.utils import (
+ set_onto_format_headers,
+ get_format_from_accept_header,
+)
+from ontologytimemachine.utils.download_archivo_urls import load_archivo_urls
+from ontologytimemachine.utils.utils import (
+ parse_accept_header_with_priority,
+ archivo_api,
+ passthrough_status_codes,
+)
+from ontologytimemachine.utils.mock_responses import (
+ mock_response_403,
+ mock_response_404,
+ mock_response_500,
+)
+from typing import Set, Tuple
+
+
+logging.basicConfig(
+ level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
+)
+logger = logging.getLogger(__name__)
+
+
+def if_not_block_host(config):
+ if config.httpsInterception in ["none", "all"]:
+ return True
+ elif config.httpsInterception in ["block"]:
+ return False
+ return False
+
+
+def do_deny_request_due_non_archivo_ontology_uri(wrapped_request, only_ontologies):
+ if only_ontologies:
+ is_archivo_ontology = is_archivo_ontology_request(wrapped_request)
+ if not is_archivo_ontology:
+ return True
+ return False
+
+
+def get_response_from_request(wrapped_request, config):
+ do_deny = do_deny_request_due_non_archivo_ontology_uri(
+ wrapped_request, config.restrictedAccess
+ )
+ if do_deny:
+ logger.warning(
+ "Request denied: not an ontology request and only ontologies mode is enabled"
+ )
+ return mock_response_403
+
+ response = proxy_logic(wrapped_request, config)
+ return response
+
+
+def is_archivo_ontology_request(wrapped_request):
+ """Check if the requested ontology is in the archivo."""
+ logger.info("Check if the requested ontology is in archivo")
+
+ # Ensure the archivo URLs are loaded
+ load_archivo_urls()
+ from ontologytimemachine.utils.download_archivo_urls import ARCHIVO_PARSED_URLS
+
+ # Extract the request's host and path
+ request_host = wrapped_request.get_request_host()
+ request_path = wrapped_request.get_request_path()
+
+ if (request_host, request_path) in ARCHIVO_PARSED_URLS:
+ logger.info(f"Requested URL: {request_host+request_path} is in Archivo")
+ return True
+
+ # Remove last hash and check again
+ if request_path.endswith("/"):
+ request_path = request_path.rstrip("/")
+ if (request_host, request_path) in ARCHIVO_PARSED_URLS:
+ logger.info(f"Requested URL: {request_host+request_path} is in Archivo")
+ return True
+
+ # Cut the last part of the path
+
+ path_parts = request_path.split("/")
+ new_path = "/".join(path_parts[:-1])
+
+ if (request_host, new_path) in ARCHIVO_PARSED_URLS:
+ logger.info(f"Requested URL: {request_host+request_path} is in Archivo")
+ return True
+
+ new_path = "/".join(path_parts[:-2])
+ if (request_host, new_path) in ARCHIVO_PARSED_URLS:
+ logger.info(f"Requested URL: {request_host+request_path} is in Archivo")
+ return True
+
+ logger.info(f"Requested URL: {request_host+request_path} is NOT in Archivo")
+ return False
+
+
+def request_ontology(url, headers, disableRemovingRedirects=False, timeout=5):
+ allow_redirects = not disableRemovingRedirects
+ try:
+ response = requests.get(
+ url=url, headers=headers, allow_redirects=allow_redirects, timeout=5
+ )
+ logger.info("Successfully fetched original ontology")
+ return response
+ except Exception as e:
+ logger.error(f"Error fetching original ontology: {e}")
+ return mock_response_404()
+
+
+# change the function definition and pass only the config
+def proxy_logic(wrapped_request, config):
+ logger.info("Proxy has to intervene")
+
+ set_onto_format_headers(wrapped_request, config)
+
+ headers = wrapped_request.get_request_headers()
+ ontology, _, _ = wrapped_request.get_request_url_host_path()
+
+ # if the requested format is not in Archivo and the ontoVersion is not original
+ # we can stop because the archivo request will not go through
+ format = get_format_from_accept_header(headers)
+ if not format and config.ontoVersion != "original":
+ logger.info(f"No format can be used from Archivo")
+ return mock_response_500
+
+ if config.ontoVersion == "original":
+ response = fetch_original(ontology, headers, config)
+ elif config.ontoVersion == "originalFailoverLiveLatest":
+ response = fetch_failover(
+ wrapped_request, ontology, headers, config.disableRemovingRedirects
+ )
+ elif config.ontoVersion == "latestArchived":
+ response = fetch_latest_archived(wrapped_request, ontology, headers)
+ elif config.ontoVersion == "timestampArchived":
+ response = fetch_timestamp_archived(wrapped_request, ontology, headers, config)
+ # Commenting the manifest related part because it is not supported in the current version
+ # elif ontoVersion == 'dependencyManifest':
+ # response = fetch_dependency_manifest(ontology, headers, manifest)
+
+ return response
+
+
+# Fetch from the original source, no matter what
+def fetch_original(ontology, headers, disableRemovingRedirects):
+ logger.info(f"Fetching original ontology from URL: {ontology}")
+ return request_ontology(ontology, headers, disableRemovingRedirects)
+
+
+# Failover mode
+def fetch_failover(wrapped_request, ontology, headers, disableRemovingRedirects):
+ logger.info(f"Fetching original ontology with failover from URL: {ontology}")
+ original_response = request_ontology(ontology, headers, disableRemovingRedirects)
+ if original_response.status_code in passthrough_status_codes:
+ requested_mimetypes_with_priority = parse_accept_header_with_priority(
+ headers["Accept"]
+ )
+ requested_mimetypes = [x[0] for x in requested_mimetypes_with_priority]
+ response_mime_type = original_response.headers.get("Content-Type", ";").split(
+ ";"
+ )[0]
+ logger.info(f"Requested mimetypes: {requested_mimetypes}")
+ logger.info(f"Response mimetype: {response_mime_type}")
+ if response_mime_type in requested_mimetypes:
+ return original_response
+ else:
+ logging.info(f"The returned type is not the same as the requested one")
+ return fetch_latest_archived(wrapped_request, ontology, headers)
+ else:
+ logger.info(
+ f"The returend status code is not accepted: {original_response.status_code}"
+ )
+ return fetch_latest_archived(wrapped_request, ontology, headers)
+
+
+# Fetch the lates version from archivo (no timestamp defined)
+def fetch_latest_archived(wrapped_request, ontology, headers):
+ if not is_archivo_ontology_request(wrapped_request):
+ logger.info(
+ "Data needs to be fetched from Archivo, but ontology is not available on Archivo."
+ )
+ return mock_response_404()
+ logger.info("Fetch latest archived")
+ format = get_format_from_accept_header(headers)
+ dbpedia_url = f"{archivo_api}?o={ontology}&f={format}"
+ logger.info(f"Fetching from DBpedia Archivo API: {dbpedia_url}")
+ return request_ontology(dbpedia_url, headers)
+
+
+def fetch_timestamp_archived(wrapped_request, ontology, headers, config):
+ if not is_archivo_ontology_request(wrapped_request):
+ logger.info(
+ "Data needs to be fetched from Archivo, but ontology is not available on Archivo."
+ )
+ return mock_response_404()
+ logger.info("Fetch archivo timestamp")
+ format = get_format_from_accept_header(headers)
+ dbpedia_url = f"{archivo_api}?o={ontology}&f={format}&v={config.timestamp}"
+ logger.info(f"Fetching from DBpedia Archivo API: {dbpedia_url}")
+ return request_ontology(dbpedia_url, headers)
+
+
+def fetch_dependency_manifest(ontology, headers, manifest):
+ logger.info(f"The dependency manifest is currently not supported")
+ return mock_response_500
+ # # Parse RDF data from the dependencies file
+ # manifest_g = rdflib.Graph()
+ # manifest_g.parse(manifest, format="turtle")
+
+ # version_namespace = rdflib.Namespace(ontology)
+
+ # # Extract dependencies related to the ontology link
+ # ontology = rdflib.URIRef(ontology)
+
+ # dependencies = manifest_g.subjects(predicate=version_namespace.dependency, object=ontology)
+
+ # for dependency in dependencies:
+ # dep_snapshot = g.value(subject=dependency, predicate=version_namespace.snapshot)
+ # dep_file = g.value(subject=dependency, predicate=version_namespace.file)
+
+ # # Make request to DBpedia archive API
+ # if dep_file:
+ # version_param = dep_file.split('v=')[1]
+ # api_url = f"{archivo_api}?o={ontology}&v={version_param}"
+ # else:
+ # api_url = f"{archivo_api}?o={ontology}"
+
+ # response = requests.get(api_url)
+ # if response.status_code == 200:
+ # logger.info(f"Successfully fetched {api_url}")
+ # return response
+ # else:
+ # logger.error(f"Failed to fetch {api_url}, status code: {response.status_code}")
+ # return mock_response_404
diff --git a/ontologytimemachine/utils/utils.py b/ontologytimemachine/utils/utils.py
index 227b8eb..36075c7 100644
--- a/ontologytimemachine/utils/utils.py
+++ b/ontologytimemachine/utils/utils.py
@@ -1,301 +1,158 @@
-from proxy.http.parser import HttpParser, httpParserTypes
-from requests.exceptions import SSLError, Timeout, ConnectionError, RequestException
-from ontologytimemachine.utils.mock_responses import mock_response_403, mock_response_404, mock_response_500, mock_response_200
-from http.client import responses
-from urllib.parse import urlparse
import logging
-import requests
import argparse
-import mimetypes
+from werkzeug.http import parse_accept_header
-logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
+logging.basicConfig(
+ level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
+)
logger = logging.getLogger(__name__)
-dbpedia_api = 'https://archivo.dbpedia.org/download'
-
+archivo_api = "https://archivo.dbpedia.org/download"
+archivo_mimetypes = [
+ "application/rdf+xml",
+ "application/owl+xml",
+ "text/turtle",
+ "application/n-triples",
+]
-passthrough_status_codes_http = [
- 100, 101, 102, 103,
+passthrough_status_codes = [
+ 100,
+ 101,
+ 102,
+ 103,
200,
- 300, 301, 302, 303, 304, 307, 308,
+ 300,
+ 301,
+ 302,
+ 303,
+ 304,
+ 307,
+ 308,
451,
]
-def parse_arguments():
- parser = argparse.ArgumentParser(description='Process ontology format and version.')
-
- # Defining ontoFormat argument with nested options
- parser.add_argument('--ontoFormat', type=str, choices=['turtle', 'ntriples', 'rdfxml', 'htmldocu'],
- default='turtle', help='Format of the ontology: turtle, ntriples, rdfxml, htmldocu')
-
- parser.add_argument('--ontoPrecedence', type=str, choices=['default', 'enforcedPriority', 'always'],
- default='enforcedPriority', help='Precedence of the ontology: default, enforcedPriority, always')
-
- parser.add_argument('--patchAcceptUpstream', type=bool, default=False,
- help='Defines if the Accept Header is patched upstream in original mode.')
-
- # Defining ontoVersion argument
- parser.add_argument('--ontoVersion', type=str, choices=['original', 'originalFailoverLive', 'originalFailoverArchivoMonitor',
- 'latestArchive', 'timestampArchive', 'dependencyManifest'],
- default='originalFailoverLive', help='Version of the ontology: original, originalFailoverLive, originalFailoverArchivoMonitor, latestArchive, timestampArchive, dependencyManifest')
- # Enable/disable mode to only proxy requests to ontologies
- parser.add_argument('--onlyOntologies', type=bool, default=False,
- help='Enable/disable mode to only proxy requests to ontologies.')
-
- # Enable HTTPS interception for specific domains
- parser.add_argument('--httpsIntercept', type=str, choices=['none', 'archivo', 'all', 'listfilename'],
- default='all', help='Enable HTTPS interception for specific domains: none, archivo, all, listfilename.')
-
- # Enable/disable inspecting or removing redirects
- parser.add_argument('--inspectRedirects', type=bool, default=True,
- help='Enable/disable inspecting or removing redirects.')
-
- # Enable/disable proxy forward headers
- parser.add_argument('--forwardHeaders', type=bool, default=True,
- help='Enable/disable proxy forward headers.')
-
- # SubjectBinarySearchThreshold
- parser.add_argument('--subjectBinarySearchThreshold', type=int, default=100,
- help='SubjectBinarySearchThreshold value.')
-
- # Proxy native parameters
- parser.add_argument('--ca-key-file', type=str, required=True,
- help='Path to the CA key file.')
+def get_mime_type(format="turtle"):
+ # Define a mapping of formats to MIME types
+ format_to_mime = {
+ "turtle": "text/turtle",
+ "ntriples": "application/n-triples",
+ "rdfxml": "application/rdf+xml",
+ "htmldocu": "text/html",
+ }
- parser.add_argument('--ca-cert-file', type=str, required=True,
- help='Path to the CA certificate file.')
+ # Return the MIME type based on the format or use a generic default
+ return format_to_mime.get(format, "text/turtle")
- parser.add_argument('--ca-signing-key-file', type=str, required=True,
- help='Path to the CA signing key file.')
- parser.add_argument('--hostname', type=str, required=True,
- help='Hostname for the proxy server.')
+def map_mime_to_format(mime_type):
+ # Map file extensions to formats
+ mime_to_format = {
+ "application/rdf+xml": "owl", # Common MIME type for OWL files
+ "application/owl+xml": "owl", # Specific MIME type for OWL
+ "text/turtle": "ttl", # MIME type for Turtle format
+ "application/n-triples": "nt", # MIME type for N-Triples format
+ }
- parser.add_argument('--port', type=int, required=True,
- help='Port for the proxy server.')
+ return mime_to_format.get(mime_type, None)
- parser.add_argument('--plugins', type=str, required=True,
- help='Plugins for the proxy server.')
- args = parser.parse_args()
-
- ontoFormat = {
- 'format': args.ontoFormat,
- 'precedence': args.ontoPrecedence,
- 'patchAcceptUpstream': args.patchAcceptUpstream
- }
+def set_onto_format_headers(wrapped_request, config):
+ logger.info(
+ f"Setting headers based on ontoFormat: {config.ontoFormat} and ontoVersion: {config.ontoVersion}"
+ )
- logger.info(f'Ontology Format: {ontoFormat}')
- logger.info(f'Ontology Version: {args.ontoVersion}')
- #logger.info(f'Only Ontologies Mode: {args.onlyOntologies}')
- #logger.info(f'HTTPS Interception: {args.httpsIntercept}')
- #logger.info(f'Inspect Redirects: {args.inspectRedirects}')
- #logger.info(f'Forward Headers: {args.forwardHeaders}')
- #logger.info(f'Subject Binary Search Threshold: {args.subjectBinarySearchThreshold}')
- return ontoFormat, args.ontoVersion, args.onlyOntologies, args.httpsIntercept, args.inspectRedirects, args.forwardHeaders, args.subjectBinarySearchThreshold
-
-
-def check_if_archivo_ontology_requested(request):
- with open('ontologytimemachine/utils/archivo_ontologies.txt', 'r') as file:
- urls = [line.strip() for line in file]
- parsed_urls = [(urlparse(url).netloc, urlparse(url).path) for url in urls]
-
- _, request_host, request_path = get_ontology_from_request(request)
- for host, path in parsed_urls:
- if request_host == host and request_path.startswith(path):
- return True
- return False
-
-
-def get_headers(request):
- headers = {}
- for k, v in request.headers.items():
- headers[v[0].decode('utf-8')] = v[1].decode('utf-8')
- return headers
-
-
-def get_ontology_from_request(request):
- logger.info('Get ontology from request')
- if (request.method == b'GET' or request.method == b'HEAD') and not request.host:
- for k, v in request.headers.items():
- if v[0].decode('utf-8') == 'Host':
- host = v[1].decode('utf-8')
- path = request.path.decode('utf-8')
- ontology = 'https://' + host + request.path.decode('utf-8')
- else:
- host = request.host.decode('utf-8')
- path = request.path.decode('utf-8')
- ontology = str(request._url)
- logger.info(f'Ontology: {ontology}')
- return ontology, host, path
-
-
-def get_mime_type(format):
- # Guess the MIME type based on the format
- mime_type, _ = mimetypes.guess_type(f'file.{format}')
- # Return the guessed MIME type or a generic default if guessing fails
- return mime_type or 'text/turtle'
-
-
-def set_onto_format_headers(request, ontoFormat, ontoVersion):
- logger.info(f'Setting headers based on ontoFormat: {ontoFormat}')
+ # if ontoVersion is original and patchAcceptUpstream is False nothing to do here
+ if (
+ config.ontoVersion == "original"
+ and not config.ontoFormat["patchAcceptUpstream"]
+ ):
+ return
# Determine the correct MIME type for the format
- mime_type = get_mime_type(ontoFormat['format'])
-
- # Check the precedence and update the 'Accept' header if necessary
- if ontoFormat['precedence'] in ['always', 'enforcedPriority'] or \
- (ontoFormat['precedence'] == 'default' and b'accept' not in request.headers):
- request.headers[b'accept'] = (b'Accept', mime_type.encode('utf-8'))
- logger.info(f'Accept header set to: {request.headers[b"accept"][1]}')
-
- # Check if patchAcceptUpstream is true and ontoVersion is 'original'
- if ontoFormat['patchAcceptUpstream'] and ontoVersion == 'original':
- request.headers[b'accept'] = (b'Accept', mime_type.encode('utf-8'))
- logger.info(f'Accept header patched upstream: {request.headers[b"accept"][1]}')
-
-
-def proxy_logic(request: HttpParser, ontoFormat, ontoVersion):
- logger.info('Proxy has to intervene')
- set_onto_format_headers(request, ontoFormat, ontoVersion)
- headers = get_headers(request)
- logger.info(f'Updated headers: {request.headers}')
- ontology, _, _ = get_ontology_from_request(request)
- if ontoVersion == 'original':
- response = fetch_original(ontology, headers)
- elif ontoVersion == 'originalFailoverLive':
- response = fetch_failover(ontology, headers, live=True)
- elif ontoVersion == 'originalFailoverMonitor':
- response = fetch_failover(ontology, headers, monitor=True)
- elif ontoVersion == 'latestArchive':
- response = fetch_latest_archive(ontology, headers)
- elif ontoVersion == 'timestampArchive':
- response = fetch_timestamp_archive(ontology, headers)
- elif ontoVersion == 'dependencyManifest':
- response = fetch_dependency_manifest(ontology, headers)
-
- return response
-
-
-# Fetch from the original source, no matter what
-def fetch_original(ontology, headers):
- logger.info(f'Fetching original ontology from URL: {ontology}')
- try:
- response = requests.get(url=ontology, headers=headers, timeout=5)
- logger.info('Successfully fetched original ontology')
- return response
- except Exception as e:
- logger.error(f'Error fetching original ontology: {e}')
- return mock_response_500()
-
-
-# Failover mode
-def fetch_failover(ontology, headers, live=False, monitor=False):
- try:
- logger.info(f'Fetching original ontology with failover from URL: {ontology}')
- response = requests.get(url=ontology, headers=headers, timeout=5)
- logger.info('Successfully fetched original ontology')
- if response.status_code in passthrough_status_codes_http:
- return response
- else:
- logging.info(f'Status code: {response.status_code}')
- return fetch_from_dbpedia_archivo_api(ontology, headers)
- except Exception as e:
- logger.error(f'Error fetching original ontology: {e}')
- if live:
- logger.info('Attempting to fetch live version due to failover')
- return fetch_from_dbpedia_archivo_api(ontology, headers)
- elif monitor:
- logger.info('Attempting to fetch archive monitor version due to failover')
- # TODO
- return mock_response_404
- else:
- return mock_response_500
-
-
-# Fetch the lates version from archivo (no timestamp defined)
-def fetch_latest_archive(ontology, headers):
- logger.info(f'Fetching latest archive ontology from URL: {ontology}/latest')
- try:
- response = requests.get(url=ontology, headers=headers, timeout=5)
- logger.info('Successfully fetched latest archive ontology')
- return response
- except Exception as e:
- logger.error(f'Error fetching latest archive ontology: {e}')
- return mock_response_500
-
-
-def fetch_timestamp_archive(ontology, headers):
- return mock_response_404
-
-
-def fetch_dependency_manifest(ontology, headers):
- return mock_response_404
-
-
-def failover_mode(request):
- headers = get_headers(request)
- logger.info('Failover mode')
-
- ontology, _, _ = get_ontology_from_request(request)
- try:
- response = requests.get(url=ontology, headers=headers, timeout=5)
- if response.history:
- logger.debug("Request was redirected")
- for resp in response.history:
- logger.debug(f"{resp.status_code}, {resp.url}")
- logger.debug(f"Final destination: {response.status_code}, {response.url}")
- else:
- logger.debug("Request was not redirected")
- content_type = response.headers.get('Content-Type')
- logger.debug(content_type)
- if response.status_code in passthrough_status_codes_http:
- return response
- else:
- logging.info(f'Status code: {response.status_code}')
- return fetch_from_dbpedia_archivo_api(ontology, headers)
- except (SSLError, Timeout, ConnectionError, RequestException) as e:
- return fetch_from_dbpedia_archivo_api(ontology, headers)
-
-
-def fetch_from_dbpedia_archivo_api(ontology, headers):
- format, version, versionMatching = get_parameters_from_headers(headers)
- dbpedia_url = f'{dbpedia_api}?o={ontology}&f={format}'
- try:
- logger.info(f'Fetching from DBpedia Archivo API: {dbpedia_url}')
- response = requests.get(dbpedia_url, timeout=5)
- return response
- except requests.exceptions.RequestException as e:
- logging.error(f'Exception occurred while fetching from DBpedia Archivo API: {e}')
- return mock_response_404()
-
+ mime_type = get_mime_type(config.ontoFormat["format"])
+ logger.info(f"Requested mimetype by proxy: {mime_type}")
+
+ # Define conditions for modifying the accept header
+ request_accept_header = wrapped_request.get_request_accept_header()
+ logger.info(f"Accept header by request: {request_accept_header}")
+ req_headers_with_priority = parse_accept_header_with_priority(request_accept_header)
+ req_headers = [x[0] for x in req_headers_with_priority]
+ if not req_headers and config.ontoFormat["precedence"] in [
+ "default",
+ ["enforcedPriority"],
+ ]:
+ wrapped_request.set_request_accept_header(mime_type)
+ elif (
+ len(req_headers) == 1
+ and req_headers[0] == "*/*"
+ and config.ontoFormat["precedence"] in ["default", "enforcedPriority"]
+ ):
+ wrapped_request.set_request_accept_header(mime_type)
+ elif (
+ len(req_headers) > 1
+ and mime_type in req_headers
+ and config.ontoFormat["precedence"] == "enforcedPriority"
+ ):
+ wrapped_request.set_request_accept_header(mime_type)
+ elif config.ontoFormat["precedence"] == "always":
+ wrapped_request.set_request_accept_header(mime_type)
+
+
+def select_highest_priority_mime_from_archivo(mime_list):
+ # Sort the MIME types by their priority in descending order
+ sorted_mime_list = sorted(mime_list, key=lambda x: x[1], reverse=True)
+
+ # Track the highest priority value
+ highest_priority = sorted_mime_list[0][1]
+
+ # Filter MIME types that match the highest priority
+ highest_priority_mimes = [
+ mime for mime, priority in sorted_mime_list if priority == highest_priority
+ ]
+
+ # Check if any of the highest priority MIME types are in the archivo list
+ for mime in highest_priority_mimes:
+ if mime in archivo_mimetypes:
+ return mime
+
+ # If none of the preferred MIME types are present, return nothing
+ return None
+
+
+def parse_accept_header_with_priority(accept_header):
+ logger.info("Parse accept header")
+ # Parse the Accept header to extract MIME types and their priority (q values)
+ parsed = parse_accept_header(accept_header)
+
+ # Create a list of tuples with MIME types and their corresponding q values
+ mime_types_with_priority = [(item[0], item[1]) for item in parsed]
+ logger.info(f"Accept headers with priority: {mime_types_with_priority}")
+
+ return mime_types_with_priority
+
+
+def get_format_from_accept_header(headers):
+ if not headers:
+ return None
-def map_mime_to_format(mime_type):
- # Use the mimetypes library to get the file extension
- extension = mimetypes.guess_extension(mime_type)
- if not extension:
+ # Map MIME types to formats
+ accept_header = headers.get("Accept", None)
+ logger.info(f"Accept header: {accept_header}")
+ if not accept_header:
return None
-
- # Map file extensions to formats
- ext_to_format = {
- '.rdf': 'owl',
- '.xml': 'owl',
- '.ttl': 'ttl',
- '.nt': 'nt',
- # Add more mappings if needed
- }
-
- return ext_to_format.get(extension, None)
+ accept_header_with_priority = parse_accept_header_with_priority(accept_header)
-def get_parameters_from_headers(headers):
- # Map MIME types to formats
- mime_type = headers.get('Accept', None)
- format = map_mime_to_format(mime_type)
+ selected_mimetype = select_highest_priority_mime_from_archivo(
+ accept_header_with_priority
+ )
+
+ if not selected_mimetype:
+ logger.info(f"The requested mimetype is not supported by DBpedia Archivo")
+ return None
- version = headers.get('Version', None)
- versionMatching = headers.get('VersionMatching', None)
- return format, version, versionMatching
\ No newline at end of file
+ format = map_mime_to_format(selected_mimetype)
+ return format
diff --git a/poetry.lock b/poetry.lock
index ce05dc1..1e34442 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -2,13 +2,13 @@
[[package]]
name = "certifi"
-version = "2024.6.2"
+version = "2024.8.30"
description = "Python package for providing Mozilla's CA Bundle."
optional = false
python-versions = ">=3.6"
files = [
- {file = "certifi-2024.6.2-py3-none-any.whl", hash = "sha256:ddc6c8ce995e6987e7faf5e3f1b02b302836a0e5d98ece18392cb1a36c72ad56"},
- {file = "certifi-2024.6.2.tar.gz", hash = "sha256:3cd43f1c6fa7dedc5899d69d3ad0398fd018ad1a17fba83ddaf78aa46c747516"},
+ {file = "certifi-2024.8.30-py3-none-any.whl", hash = "sha256:922820b53db7a7257ffbda3f597266d435245903d80737e34f8a45ff3e3230d8"},
+ {file = "certifi-2024.8.30.tar.gz", hash = "sha256:bec941d2aa8195e248a60b31ff9f0558284cf01a52591ceda73ea9afffd69fd9"},
]
[[package]]
@@ -123,13 +123,13 @@ files = [
[[package]]
name = "exceptiongroup"
-version = "1.2.1"
+version = "1.2.2"
description = "Backport of PEP 654 (exception groups)"
optional = false
python-versions = ">=3.7"
files = [
- {file = "exceptiongroup-1.2.1-py3-none-any.whl", hash = "sha256:5258b9ed329c5bbdd31a309f53cbfb0b155341807f6ff7606a1e801a891b29ad"},
- {file = "exceptiongroup-1.2.1.tar.gz", hash = "sha256:a4785e48b045528f5bfe627b6ad554ff32def154f42372786903b7abcfe1aa16"},
+ {file = "exceptiongroup-1.2.2-py3-none-any.whl", hash = "sha256:3111b9d131c238bec2f8f516e123e14ba243563fb135d3fe885990585aa7795b"},
+ {file = "exceptiongroup-1.2.2.tar.gz", hash = "sha256:47c2edf7c6738fafb49fd34290706d1a1a2f4d1c6df275526b62cbb4aa5393cc"},
]
[package.extras]
@@ -137,15 +137,18 @@ test = ["pytest (>=6)"]
[[package]]
name = "idna"
-version = "3.7"
+version = "3.10"
description = "Internationalized Domain Names in Applications (IDNA)"
optional = false
-python-versions = ">=3.5"
+python-versions = ">=3.6"
files = [
- {file = "idna-3.7-py3-none-any.whl", hash = "sha256:82fee1fc78add43492d3a1898bfa6d8a904cc97d8427f683ed8e798d07761aa0"},
- {file = "idna-3.7.tar.gz", hash = "sha256:028ff3aadf0609c1fd278d8ea3089299412a7a8b9bd005dd08b9f8285bcb5cfc"},
+ {file = "idna-3.10-py3-none-any.whl", hash = "sha256:946d195a0d259cbba61165e88e65941f16e9b36ea6ddb97f00452bae8b1287d3"},
+ {file = "idna-3.10.tar.gz", hash = "sha256:12f65c9b470abda6dc35cf8e63cc574b1c52b11df2c86030af0ac09b01b13ea9"},
]
+[package.extras]
+all = ["flake8 (>=7.1.1)", "mypy (>=1.11.2)", "pytest (>=8.3.2)", "ruff (>=0.6.2)"]
+
[[package]]
name = "iniconfig"
version = "2.0.0"
@@ -157,15 +160,98 @@ files = [
{file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"},
]
+[[package]]
+name = "isodate"
+version = "0.6.1"
+description = "An ISO 8601 date/time/duration parser and formatter"
+optional = false
+python-versions = "*"
+files = [
+ {file = "isodate-0.6.1-py2.py3-none-any.whl", hash = "sha256:0751eece944162659049d35f4f549ed815792b38793f07cf73381c1c87cbed96"},
+ {file = "isodate-0.6.1.tar.gz", hash = "sha256:48c5881de7e8b0a0d648cb024c8062dc84e7b840ed81e864c7614fd3c127bde9"},
+]
+
+[package.dependencies]
+six = "*"
+
+[[package]]
+name = "markupsafe"
+version = "2.1.5"
+description = "Safely add untrusted strings to HTML/XML markup."
+optional = false
+python-versions = ">=3.7"
+files = [
+ {file = "MarkupSafe-2.1.5-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:a17a92de5231666cfbe003f0e4b9b3a7ae3afb1ec2845aadc2bacc93ff85febc"},
+ {file = "MarkupSafe-2.1.5-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:72b6be590cc35924b02c78ef34b467da4ba07e4e0f0454a2c5907f473fc50ce5"},
+ {file = "MarkupSafe-2.1.5-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e61659ba32cf2cf1481e575d0462554625196a1f2fc06a1c777d3f48e8865d46"},
+ {file = "MarkupSafe-2.1.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2174c595a0d73a3080ca3257b40096db99799265e1c27cc5a610743acd86d62f"},
+ {file = "MarkupSafe-2.1.5-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ae2ad8ae6ebee9d2d94b17fb62763125f3f374c25618198f40cbb8b525411900"},
+ {file = "MarkupSafe-2.1.5-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:075202fa5b72c86ad32dc7d0b56024ebdbcf2048c0ba09f1cde31bfdd57bcfff"},
+ {file = "MarkupSafe-2.1.5-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:598e3276b64aff0e7b3451b72e94fa3c238d452e7ddcd893c3ab324717456bad"},
+ {file = "MarkupSafe-2.1.5-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:fce659a462a1be54d2ffcacea5e3ba2d74daa74f30f5f143fe0c58636e355fdd"},
+ {file = "MarkupSafe-2.1.5-cp310-cp310-win32.whl", hash = "sha256:d9fad5155d72433c921b782e58892377c44bd6252b5af2f67f16b194987338a4"},
+ {file = "MarkupSafe-2.1.5-cp310-cp310-win_amd64.whl", hash = "sha256:bf50cd79a75d181c9181df03572cdce0fbb75cc353bc350712073108cba98de5"},
+ {file = "MarkupSafe-2.1.5-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:629ddd2ca402ae6dbedfceeba9c46d5f7b2a61d9749597d4307f943ef198fc1f"},
+ {file = "MarkupSafe-2.1.5-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:5b7b716f97b52c5a14bffdf688f971b2d5ef4029127f1ad7a513973cfd818df2"},
+ {file = "MarkupSafe-2.1.5-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6ec585f69cec0aa07d945b20805be741395e28ac1627333b1c5b0105962ffced"},
+ {file = "MarkupSafe-2.1.5-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b91c037585eba9095565a3556f611e3cbfaa42ca1e865f7b8015fe5c7336d5a5"},
+ {file = "MarkupSafe-2.1.5-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7502934a33b54030eaf1194c21c692a534196063db72176b0c4028e140f8f32c"},
+ {file = "MarkupSafe-2.1.5-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:0e397ac966fdf721b2c528cf028494e86172b4feba51d65f81ffd65c63798f3f"},
+ {file = "MarkupSafe-2.1.5-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:c061bb86a71b42465156a3ee7bd58c8c2ceacdbeb95d05a99893e08b8467359a"},
+ {file = "MarkupSafe-2.1.5-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:3a57fdd7ce31c7ff06cdfbf31dafa96cc533c21e443d57f5b1ecc6cdc668ec7f"},
+ {file = "MarkupSafe-2.1.5-cp311-cp311-win32.whl", hash = "sha256:397081c1a0bfb5124355710fe79478cdbeb39626492b15d399526ae53422b906"},
+ {file = "MarkupSafe-2.1.5-cp311-cp311-win_amd64.whl", hash = "sha256:2b7c57a4dfc4f16f7142221afe5ba4e093e09e728ca65c51f5620c9aaeb9a617"},
+ {file = "MarkupSafe-2.1.5-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:8dec4936e9c3100156f8a2dc89c4b88d5c435175ff03413b443469c7c8c5f4d1"},
+ {file = "MarkupSafe-2.1.5-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:3c6b973f22eb18a789b1460b4b91bf04ae3f0c4234a0a6aa6b0a92f6f7b951d4"},
+ {file = "MarkupSafe-2.1.5-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ac07bad82163452a6884fe8fa0963fb98c2346ba78d779ec06bd7a6262132aee"},
+ {file = "MarkupSafe-2.1.5-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f5dfb42c4604dddc8e4305050aa6deb084540643ed5804d7455b5df8fe16f5e5"},
+ {file = "MarkupSafe-2.1.5-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ea3d8a3d18833cf4304cd2fc9cbb1efe188ca9b5efef2bdac7adc20594a0e46b"},
+ {file = "MarkupSafe-2.1.5-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:d050b3361367a06d752db6ead6e7edeb0009be66bc3bae0ee9d97fb326badc2a"},
+ {file = "MarkupSafe-2.1.5-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:bec0a414d016ac1a18862a519e54b2fd0fc8bbfd6890376898a6c0891dd82e9f"},
+ {file = "MarkupSafe-2.1.5-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:58c98fee265677f63a4385256a6d7683ab1832f3ddd1e66fe948d5880c21a169"},
+ {file = "MarkupSafe-2.1.5-cp312-cp312-win32.whl", hash = "sha256:8590b4ae07a35970728874632fed7bd57b26b0102df2d2b233b6d9d82f6c62ad"},
+ {file = "MarkupSafe-2.1.5-cp312-cp312-win_amd64.whl", hash = "sha256:823b65d8706e32ad2df51ed89496147a42a2a6e01c13cfb6ffb8b1e92bc910bb"},
+ {file = "MarkupSafe-2.1.5-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:c8b29db45f8fe46ad280a7294f5c3ec36dbac9491f2d1c17345be8e69cc5928f"},
+ {file = "MarkupSafe-2.1.5-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ec6a563cff360b50eed26f13adc43e61bc0c04d94b8be985e6fb24b81f6dcfdf"},
+ {file = "MarkupSafe-2.1.5-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a549b9c31bec33820e885335b451286e2969a2d9e24879f83fe904a5ce59d70a"},
+ {file = "MarkupSafe-2.1.5-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:4f11aa001c540f62c6166c7726f71f7573b52c68c31f014c25cc7901deea0b52"},
+ {file = "MarkupSafe-2.1.5-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:7b2e5a267c855eea6b4283940daa6e88a285f5f2a67f2220203786dfa59b37e9"},
+ {file = "MarkupSafe-2.1.5-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:2d2d793e36e230fd32babe143b04cec8a8b3eb8a3122d2aceb4a371e6b09b8df"},
+ {file = "MarkupSafe-2.1.5-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:ce409136744f6521e39fd8e2a24c53fa18ad67aa5bc7c2cf83645cce5b5c4e50"},
+ {file = "MarkupSafe-2.1.5-cp37-cp37m-win32.whl", hash = "sha256:4096e9de5c6fdf43fb4f04c26fb114f61ef0bf2e5604b6ee3019d51b69e8c371"},
+ {file = "MarkupSafe-2.1.5-cp37-cp37m-win_amd64.whl", hash = "sha256:4275d846e41ecefa46e2015117a9f491e57a71ddd59bbead77e904dc02b1bed2"},
+ {file = "MarkupSafe-2.1.5-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:656f7526c69fac7f600bd1f400991cc282b417d17539a1b228617081106feb4a"},
+ {file = "MarkupSafe-2.1.5-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:97cafb1f3cbcd3fd2b6fbfb99ae11cdb14deea0736fc2b0952ee177f2b813a46"},
+ {file = "MarkupSafe-2.1.5-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1f3fbcb7ef1f16e48246f704ab79d79da8a46891e2da03f8783a5b6fa41a9532"},
+ {file = "MarkupSafe-2.1.5-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fa9db3f79de01457b03d4f01b34cf91bc0048eb2c3846ff26f66687c2f6d16ab"},
+ {file = "MarkupSafe-2.1.5-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ffee1f21e5ef0d712f9033568f8344d5da8cc2869dbd08d87c84656e6a2d2f68"},
+ {file = "MarkupSafe-2.1.5-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:5dedb4db619ba5a2787a94d877bc8ffc0566f92a01c0ef214865e54ecc9ee5e0"},
+ {file = "MarkupSafe-2.1.5-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:30b600cf0a7ac9234b2638fbc0fb6158ba5bdcdf46aeb631ead21248b9affbc4"},
+ {file = "MarkupSafe-2.1.5-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:8dd717634f5a044f860435c1d8c16a270ddf0ef8588d4887037c5028b859b0c3"},
+ {file = "MarkupSafe-2.1.5-cp38-cp38-win32.whl", hash = "sha256:daa4ee5a243f0f20d528d939d06670a298dd39b1ad5f8a72a4275124a7819eff"},
+ {file = "MarkupSafe-2.1.5-cp38-cp38-win_amd64.whl", hash = "sha256:619bc166c4f2de5caa5a633b8b7326fbe98e0ccbfacabd87268a2b15ff73a029"},
+ {file = "MarkupSafe-2.1.5-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:7a68b554d356a91cce1236aa7682dc01df0edba8d043fd1ce607c49dd3c1edcf"},
+ {file = "MarkupSafe-2.1.5-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:db0b55e0f3cc0be60c1f19efdde9a637c32740486004f20d1cff53c3c0ece4d2"},
+ {file = "MarkupSafe-2.1.5-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3e53af139f8579a6d5f7b76549125f0d94d7e630761a2111bc431fd820e163b8"},
+ {file = "MarkupSafe-2.1.5-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:17b950fccb810b3293638215058e432159d2b71005c74371d784862b7e4683f3"},
+ {file = "MarkupSafe-2.1.5-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:4c31f53cdae6ecfa91a77820e8b151dba54ab528ba65dfd235c80b086d68a465"},
+ {file = "MarkupSafe-2.1.5-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:bff1b4290a66b490a2f4719358c0cdcd9bafb6b8f061e45c7a2460866bf50c2e"},
+ {file = "MarkupSafe-2.1.5-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:bc1667f8b83f48511b94671e0e441401371dfd0f0a795c7daa4a3cd1dde55bea"},
+ {file = "MarkupSafe-2.1.5-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:5049256f536511ee3f7e1b3f87d1d1209d327e818e6ae1365e8653d7e3abb6a6"},
+ {file = "MarkupSafe-2.1.5-cp39-cp39-win32.whl", hash = "sha256:00e046b6dd71aa03a41079792f8473dc494d564611a8f89bbbd7cb93295ebdcf"},
+ {file = "MarkupSafe-2.1.5-cp39-cp39-win_amd64.whl", hash = "sha256:fa173ec60341d6bb97a89f5ea19c85c5643c1e7dedebc22f5181eb73573142c5"},
+ {file = "MarkupSafe-2.1.5.tar.gz", hash = "sha256:d283d37a890ba4c1ae73ffadf8046435c76e7bc2247bbb63c00bd1a709c6544b"},
+]
+
[[package]]
name = "packaging"
-version = "24.0"
+version = "24.1"
description = "Core utilities for Python packages"
optional = false
-python-versions = ">=3.7"
+python-versions = ">=3.8"
files = [
- {file = "packaging-24.0-py3-none-any.whl", hash = "sha256:2ddfb553fdf02fb784c234c7ba6ccc288296ceabec964ad2eae3777778130bc5"},
- {file = "packaging-24.0.tar.gz", hash = "sha256:eb82c5e3e56209074766e6885bb04b8c38a0c015d0a30036ebe7ece34c9989e9"},
+ {file = "packaging-24.1-py3-none-any.whl", hash = "sha256:5b8f2217dbdbd2f7f384c41c628544e6d52f2d0f53c6d0c3ea61aa5d1d7ff124"},
+ {file = "packaging-24.1.tar.gz", hash = "sha256:026ed72c8ed3fcce5bf8950572258698927fd1dbda10a5e981cdf0ac37f4f002"},
]
[[package]]
@@ -185,24 +271,44 @@ testing = ["pytest", "pytest-benchmark"]
[[package]]
name = "proxy-py"
-version = "2.4.4"
-description = "⚡ Fast • 🪶 Lightweight • 0️⃣ Dependency • 🔌 Pluggable • 😈 TLS interception • 🔒 DNS-over-HTTPS • 🔥 Poor Mans VPN • ⏪ Reverse & ⏩ Forward • 👮🏿 Proxy Server framework • 🌐 Web Server framework • ➵ ➶ ➷ ➠ PubSub framework • 👷 Work acceptor & executor framework."
+version = "2.4.8"
+description = "\\u26a1 Fast \\u2022 \\U0001fab6 Lightweight \\u2022 \\U0001f51f Dependency \\u2022 \\U0001f50c Pluggable \\u2022 \\U0001f608 TLS interception \\u2022 \\U0001f512 DNS-over-HTTPS \\u2022 \\U0001f525 Poor Mans VPN \\u2022 \\u23ea Reverse & \\u23e9 Forward \\u2022 \\U0001f46e\\U0001f3ff Proxy Server framework \\u2022 \\U0001f310 Web Server framework \\u2022 \\u27b5 \\u27b6 \\u27b7 \\u27a0 PubSub framework \\u2022 \\U0001f477 Work acceptor & executor framework."
optional = false
python-versions = ">=3.6"
files = [
- {file = "proxy.py-2.4.4-py3-none-any.whl", hash = "sha256:8d02fa2f1f94ad5bee96962c7b02badb9b4075d1a99d31700577ea15713ba4d3"},
- {file = "proxy_py-2.4.4.tar.gz", hash = "sha256:216581f70ad673f4ecb5f6b27f52491aaf1c056829f4a670f5ea3b5a340f4272"},
+ {file = "proxy.py-2.4.8-py3-none-any.whl", hash = "sha256:316cbed3184c8ddf4f9b3143f7dc449ef1d44a7c5ca1988276a01444f6426e51"},
+ {file = "proxy_py-2.4.8.tar.gz", hash = "sha256:77088312aa558c9402af2b88d135a1e261af51f5e38242f1d37867559a0a65cb"},
]
+[package.extras]
+metrics = ["prometheus-client (==0.17.1)", "prometheus-client (==0.20.0)"]
+release = ["setuptools-scm (==6.3.2)", "twine (==3.8.0)"]
+testing = ["autopep8 (==1.6.0)", "coverage (==6.2)", "coverage (==7.4.4)", "flake8 (==4.0.1)", "h2 (==4.1.0)", "hpack (==4.0.0)", "httpx (==0.22.0)", "httpx (==0.27.0)", "hyperframe (==6.0.1)", "mccabe (==0.6.1)", "mypy (==0.971)", "pre-commit (==2.16.0)", "py-spy (==0.3.12)", "pylint (==2.13.7)", "pylint (==3.1.0)", "pytest (==7.0.1)", "pytest (==8.1.1)", "pytest-asyncio (==0.16.0)", "pytest-asyncio (==0.21.1)", "pytest-cov (==3.0.0)", "pytest-cov (==5.0.0)", "pytest-mock (==3.14.0)", "pytest-mock (==3.6.1)", "pytest-xdist (==2.5.0)", "pytest-xdist (==3.5.0)", "python-coveralls (==2.9.3)", "rope (==1.1.1)", "tox (==3.28.0)", "tox (==4.14.2)", "types-requests (==2.28.11.5)", "types-setuptools (==64.0.1)", "wheel (==0.37.1)"]
+tunnel = ["cryptography (==36.0.2)", "cryptography (==39.0.1)", "paramiko (==2.11.0)", "paramiko (==3.4.0)", "types-paramiko (==2.11.3)", "types-paramiko (==3.4.0.20240311)"]
+
+[[package]]
+name = "pyparsing"
+version = "3.1.4"
+description = "pyparsing module - Classes and methods to define and execute parsing grammars"
+optional = false
+python-versions = ">=3.6.8"
+files = [
+ {file = "pyparsing-3.1.4-py3-none-any.whl", hash = "sha256:a6a7ee4235a3f944aa1fa2249307708f893fe5717dc603503c6c7969c070fb7c"},
+ {file = "pyparsing-3.1.4.tar.gz", hash = "sha256:f86ec8d1a83f11977c9a6ea7598e8c27fc5cddfa5b07ea2241edbbde1d7bc032"},
+]
+
+[package.extras]
+diagrams = ["jinja2", "railroad-diagrams"]
+
[[package]]
name = "pytest"
-version = "8.2.2"
+version = "8.3.3"
description = "pytest: simple powerful testing with Python"
optional = false
python-versions = ">=3.8"
files = [
- {file = "pytest-8.2.2-py3-none-any.whl", hash = "sha256:c434598117762e2bd304e526244f67bf66bbd7b5d6cf22138be51ff661980343"},
- {file = "pytest-8.2.2.tar.gz", hash = "sha256:de4bb8104e201939ccdc688b27a89a7be2079b22e2bd2b07f806b6ba71117977"},
+ {file = "pytest-8.3.3-py3-none-any.whl", hash = "sha256:a6853c7375b2663155079443d2e45de913a911a11d669df02a50814944db57b2"},
+ {file = "pytest-8.3.3.tar.gz", hash = "sha256:70b98107bd648308a7952b06e6ca9a50bc660be218d53c257cc1fc94fda10181"},
]
[package.dependencies]
@@ -210,12 +316,33 @@ colorama = {version = "*", markers = "sys_platform == \"win32\""}
exceptiongroup = {version = ">=1.0.0rc8", markers = "python_version < \"3.11\""}
iniconfig = "*"
packaging = "*"
-pluggy = ">=1.5,<2.0"
+pluggy = ">=1.5,<2"
tomli = {version = ">=1", markers = "python_version < \"3.11\""}
[package.extras]
dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"]
+[[package]]
+name = "rdflib"
+version = "7.0.0"
+description = "RDFLib is a Python library for working with RDF, a simple yet powerful language for representing information."
+optional = false
+python-versions = ">=3.8.1,<4.0.0"
+files = [
+ {file = "rdflib-7.0.0-py3-none-any.whl", hash = "sha256:0438920912a642c866a513de6fe8a0001bd86ef975057d6962c79ce4771687cd"},
+ {file = "rdflib-7.0.0.tar.gz", hash = "sha256:9995eb8569428059b8c1affd26b25eac510d64f5043d9ce8c84e0d0036e995ae"},
+]
+
+[package.dependencies]
+isodate = ">=0.6.0,<0.7.0"
+pyparsing = ">=2.1.0,<4"
+
+[package.extras]
+berkeleydb = ["berkeleydb (>=18.1.0,<19.0.0)"]
+html = ["html5lib (>=1.0,<2.0)"]
+lxml = ["lxml (>=4.3.0,<5.0.0)"]
+networkx = ["networkx (>=2.0.0,<3.0.0)"]
+
[[package]]
name = "requests"
version = "2.32.3"
@@ -237,6 +364,31 @@ urllib3 = ">=1.21.1,<3"
socks = ["PySocks (>=1.5.6,!=1.5.7)"]
use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"]
+[[package]]
+name = "schedule"
+version = "1.2.2"
+description = "Job scheduling for humans."
+optional = false
+python-versions = ">=3.7"
+files = [
+ {file = "schedule-1.2.2-py3-none-any.whl", hash = "sha256:5bef4a2a0183abf44046ae0d164cadcac21b1db011bdd8102e4a0c1e91e06a7d"},
+ {file = "schedule-1.2.2.tar.gz", hash = "sha256:15fe9c75fe5fd9b9627f3f19cc0ef1420508f9f9a46f45cd0769ef75ede5f0b7"},
+]
+
+[package.extras]
+timezone = ["pytz"]
+
+[[package]]
+name = "six"
+version = "1.16.0"
+description = "Python 2 and 3 compatibility utilities"
+optional = false
+python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*"
+files = [
+ {file = "six-1.16.0-py2.py3-none-any.whl", hash = "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"},
+ {file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"},
+]
+
[[package]]
name = "tomli"
version = "2.0.1"
@@ -250,13 +402,13 @@ files = [
[[package]]
name = "urllib3"
-version = "2.2.1"
+version = "2.2.3"
description = "HTTP library with thread-safe connection pooling, file post, and more."
optional = false
python-versions = ">=3.8"
files = [
- {file = "urllib3-2.2.1-py3-none-any.whl", hash = "sha256:450b20ec296a467077128bff42b73080516e71b56ff59a60a02bef2232c4fa9d"},
- {file = "urllib3-2.2.1.tar.gz", hash = "sha256:d0570876c61ab9e520d776c38acbbb5b05a776d3f9ff98a5c8fd5162a444cf19"},
+ {file = "urllib3-2.2.3-py3-none-any.whl", hash = "sha256:ca899ca043dcb1bafa3e262d73aa25c465bfb49e0bd9dd5d59f1d0acba2f8fac"},
+ {file = "urllib3-2.2.3.tar.gz", hash = "sha256:e7d814a81dad81e6caf2ec9fdedb284ecc9c73076b62654547cc64ccdcae26e9"},
]
[package.extras]
@@ -265,7 +417,24 @@ h2 = ["h2 (>=4,<5)"]
socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"]
zstd = ["zstandard (>=0.18.0)"]
+[[package]]
+name = "werkzeug"
+version = "3.0.4"
+description = "The comprehensive WSGI web application library."
+optional = false
+python-versions = ">=3.8"
+files = [
+ {file = "werkzeug-3.0.4-py3-none-any.whl", hash = "sha256:02c9eb92b7d6c06f31a782811505d2157837cea66aaede3e217c7c27c039476c"},
+ {file = "werkzeug-3.0.4.tar.gz", hash = "sha256:34f2371506b250df4d4f84bfe7b0921e4762525762bbd936614909fe25cd7306"},
+]
+
+[package.dependencies]
+MarkupSafe = ">=2.1.1"
+
+[package.extras]
+watchdog = ["watchdog (>=2.3)"]
+
[metadata]
lock-version = "2.0"
python-versions = "^3.10"
-content-hash = "0ca0bb1508c8da08eb6aeac6255865d858133aef5b09b9b119872d880df76b60"
+content-hash = "9efdbca22e8f7d122208d160253c194f4f3d177e77a011491bbaac34fac5c237"
diff --git a/pyproject.toml b/pyproject.toml
index 4c11494..ebce3c9 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -11,6 +11,9 @@ python = "^3.10"
pytest = "^8.2.1"
requests = "^2.32.3"
proxy-py = "^2.4.4"
+rdflib = "^7.0.0"
+werkzeug = "^3.0.4"
+schedule = "^1.2.2"
[build-system]
diff --git a/tests/dependency.ttl b/tests/dependency.ttl
new file mode 100644
index 0000000..d28bac7
--- /dev/null
+++ b/tests/dependency.ttl
@@ -0,0 +1,25 @@
+@prefix ex-version:
+ owl:imports , ;
+ ex-version:current ;
+ ex-version:version
+ ,
+ [
+ ex-version:snapshot ;
+ ex-version:file ;
+ ex-version:dependency , ;
+ ] .
+
+
+ ex-version:snapshot ;
+ ex-version:file ;
+ ex-version:dependency , ;
+]
+
+ ex-version:snapshot ;
+ ex-version:file .
+
+ ex-version:snapshot ;
+ ex-version:file .
+
+ ex-version:snapshot ;
+ ex-version:file .
diff --git a/tests/test_config.py b/tests/test_config.py
new file mode 100644
index 0000000..8d1db7e
--- /dev/null
+++ b/tests/test_config.py
@@ -0,0 +1,36 @@
+import unittest
+from ontologytimemachine.utils.config import parse_arguments, Config
+import sys
+
+
+class TestConfig(unittest.TestCase):
+
+ def test_parse_arguments(self):
+ test_args = [
+ "test",
+ "--ontoFormat",
+ "turtle",
+ "--ontoPrecedence",
+ "enforcedPriority",
+ "--patchAcceptUpstream",
+ "False",
+ "--ontoVersion",
+ "original",
+ "--httpsInterception",
+ "none",
+ "--disableRemovingRedirects",
+ "False",
+ "--logLevel",
+ "info",
+ ]
+ sys.argv = test_args
+ config = parse_arguments()
+ self.assertIsInstance(config, Config)
+ self.assertEqual(config.ontoFormat["format"], "turtle")
+ self.assertEqual(config.ontoVersion, "original")
+ self.assertEqual(config.restrictedAccess, False)
+ self.assertEqual(config.httpsInterception, "none")
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/test_integration.py b/tests/test_integration.py
new file mode 100644
index 0000000..f33e088
--- /dev/null
+++ b/tests/test_integration.py
@@ -0,0 +1,166 @@
+import pytest
+import requests
+import time
+import subprocess
+import itertools
+from ontologytimemachine.custom_proxy import IP, PORT
+
+
+PROXY = f"{IP}:{PORT}"
+HTTP_PROXY = f"http://{PROXY}"
+HTTPS_PROXY = f"http://{PROXY}"
+PROXIES = {"http": HTTP_PROXY, "https": HTTPS_PROXY}
+CA_CERT_PATH = "ca-cert.pem"
+
+
+# @pytest.fixture(scope="module", autouse=True)
+# def start_proxy_server():
+# # Start the proxy server in a subprocess
+# process = subprocess.Popen(
+# [
+# 'python3', 'ontologytimemachine/custom_proxy.py',
+# ],
+# stdout=subprocess.PIPE,
+# stderr=subprocess.PIPE
+# )
+
+# # Wait a bit to ensure the server starts
+# time.sleep(5)
+
+# yield
+# "http://0.0.0.0:8899"
+# # Terminate the proxy server after tests
+# process.terminate()
+# process.wait()
+
+
+def test_12_data_globalchange():
+ iri = "http://data.globalchange.gov/gcis.owl"
+ generic_test(iri, "text/turtle")
+
+
+def test_13_data_ontotext():
+ iri = "http://data.ontotext.com/resource/leak/"
+ generic_test(iri, "text/turtle")
+
+
+def test_1_babelnet():
+ iri = "http://babelnet.org/rdf/"
+ generic_test(iri, "text/turtle")
+
+
+def test_2_bag_basisregistraties():
+ iri = "http://bag.basisregistraties.overheid.nl/def/bag"
+ generic_test(iri, "text/turtle")
+
+
+def test_3_bblfish():
+ iri = "http://bblfish.net/work/atom-owl/2006-06-06/"
+ generic_test(iri, "text/turtle")
+
+
+def test_4_brk_basisregistraties():
+ iri = "http://brk.basisregistraties.overheid.nl/def/brk"
+ generic_test(iri, "text/turtle")
+
+
+def test_5_brt_basisregistraties():
+ iri = "http://brt.basisregistraties.overheid.nl/def/top10nl"
+ generic_test(iri, "text/turtle")
+
+
+def test_6_brt_basisregistraties_begrippenkader():
+ iri = "http://brt.basisregistraties.overheid.nl/id/begrippenkader/top10nl"
+ generic_test(iri, "text/turtle")
+
+
+def test_7_buzzword():
+ iri = "http://buzzword.org.uk/rdf/personal-link-types#"
+ generic_test(iri, "text/turtle")
+
+
+def test_8_catalogus_professorum():
+ iri = "http://catalogus-professorum.org/cpm/2/"
+ generic_test(iri, "text/turtle")
+
+
+def test_9_data_gov():
+ iri = "http://data-gov.tw.rpi.edu/2009/data-gov-twc.rdf"
+ generic_test(iri, "text/turtle")
+
+
+def test_10_data_bigdatagrapes():
+ iri = "http://data.bigdatagrapes.eu/resource/ontology/"
+ generic_test(iri, "text/turtle")
+
+
+def test_11_data_europa_esco():
+ iri = "http://data.europa.eu/esco/flow"
+ generic_test(iri, "text/turtle")
+
+
+def test_14_data_ordnancesurvey_50kGazetteer():
+ iri = "http://dbpedia.org/ontology/Person"
+ generic_test(iri, "text/turtle")
+
+
+def test_15_linked_web_apis():
+ iri = "http://linked-web-apis.fit.cvut.cz/ns/core"
+ generic_test(iri, "text/turtle")
+
+
+def generic_test(iri, content_type):
+ response = requests.get(iri, proxies=PROXIES, verify=CA_CERT_PATH)
+ assert response.status_code == 200
+ assert iri in response.content.decode("utf-8")
+
+
+def iri_generic_test(iri):
+ try:
+ response = requests.get(iri, proxies=PROXIES, verify=CA_CERT_PATH)
+ assert response.status_code == 200
+ assert iri in response.content.decode("utf-8")
+ except AssertionError:
+ return e
+ except requests.exceptions.RequestException as e:
+ return e
+
+
+def get_parameter_combinations():
+ # Define the possible values for each parameter
+ ontoFormat = ["turtle", "ntriples", "rdfxml", "htmldocu"]
+ ontoPrecedence = ["default", "enforcedPriority", "always"]
+ patchAcceptUpstream = [True, False]
+ ontoVersion = [
+ "original",
+ "originalFailoverLive",
+ "originalFailoverArchivoMonitor",
+ "latestArchive",
+ "timestampArchive",
+ "dependencyManifest",
+ ]
+ onlyOntologies = [True, False]
+ httpsIntercept = [True, False]
+ inspectRedirects = [True, False]
+ forwardHeaders = [True, False]
+ subjectBinarySearchThreshold = [1, 2, 3, 4, 5, 10, 25, 50, 100]
+
+ combinations = list(
+ itertools.product(
+ ontoFormat,
+ ontoPrecedence,
+ patchAcceptUpstream,
+ ontoVersion,
+ onlyOntologies,
+ httpsIntercept,
+ inspectRedirects,
+ forwardHeaders,
+ subjectBinarySearchThreshold,
+ )
+ )
+ return combinations
+
+
+if __name__ == "__main__":
+
+ pytest.main()
diff --git a/tests/test_mock_responses.py b/tests/test_mock_responses.py
new file mode 100644
index 0000000..a145d60
--- /dev/null
+++ b/tests/test_mock_responses.py
@@ -0,0 +1,34 @@
+import unittest
+from ontologytimemachine.utils.mock_responses import (
+ mock_response_200,
+ mock_response_403,
+ mock_response_404,
+ mock_response_500,
+)
+
+
+class TestMockResponses(unittest.TestCase):
+
+ def test_mock_response_200(self):
+ response = mock_response_200()
+ self.assertEqual(response.status_code, 200)
+ self.assertIn("To be implemented
", response.text)
+
+ def test_mock_response_403(self):
+ response = mock_response_403()
+ self.assertEqual(response.status_code, 403)
+ self.assertIn("403 Forbidden", response.text)
+
+ def test_mock_response_404(self):
+ response = mock_response_404()
+ self.assertEqual(response.status_code, 404)
+ self.assertIn("404 Not Found", response.text)
+
+ def test_mock_response_500(self):
+ response = mock_response_500()
+ self.assertEqual(response.status_code, 500)
+ self.assertIn("500 Internal Server Error", response.text)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/test_proxy.py b/tests/test_proxy.py
deleted file mode 100644
index 34dde2f..0000000
--- a/tests/test_proxy.py
+++ /dev/null
@@ -1,172 +0,0 @@
-import pytest
-import requests
-import time
-import subprocess
-from ontologytimemachine.custom_proxy import IP, PORT
-
-
-PROXY = f'{IP}:{PORT}'
-HTTP_PROXY = f'http://{PROXY}'
-HTTPS_PROXY = f'http://{PROXY}'
-PROXIES = {
- "http": HTTP_PROXY,
- "https": HTTPS_PROXY
-}
-CA_CERT_PATH = "ca-cert.pem"
-
-
-@pytest.fixture(scope="module", autouse=True)
-def start_proxy_server():
- # Start the proxy server in a subprocess
- process = subprocess.Popen(
- [
- 'python3', '-m', 'proxy',
- '--ca-key-file', 'ca-key.pem',
- '--ca-cert-file', 'ca-cert.pem',
- '--ca-signing-key-file', 'ca-signing-key.pem',
- '--hostname', IP,
- '--port', PORT,
- '--plugins', 'ontologytimemachine.custom_proxy.OntologyTimeMachinePlugin'
- ],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE
- )
-
- # Wait a bit to ensure the server starts
- time.sleep(5)
-
- yield
- "http://0.0.0.0:8899"
- # Terminate the proxy server after tests
- process.terminate()
- process.wait()
-
-
-def test_babelnet():
- iri = 'http://babelnet.org/rdf/'
- generic_test(iri, 'text/turtle')
-
-
-def test_bag_basisregistraties():
- iri = 'http://bag.basisregistraties.overheid.nl/def/bag'
- generic_test(iri, 'text/turtle')
-
-
-def test_bblfish():
- iri = 'http://bblfish.net/work/atom-owl/2006-06-06/'
- generic_test(iri, 'text/turtle')
-
-
-def test_brk_basisregistraties():
- iri = 'http://brk.basisregistraties.overheid.nl/def/brk'
- generic_test(iri, 'text/turtle')
-
-
-def test_brt_basisregistraties():
- iri = 'http://brt.basisregistraties.overheid.nl/def/top10nl'
- generic_test(iri, 'text/turtle')
-
-
-def test_brt_basisregistraties_begrippenkader():
- iri = 'http://brt.basisregistraties.overheid.nl/id/begrippenkader/top10nl'
- generic_test(iri, 'text/turtle')
-
-
-def test_buzzword():
- iri = 'http://buzzword.org.uk/rdf/personal-link-types#'
- generic_test(iri, 'text/turtle')
-
-
-def test_catalogus_professorum():
- iri = 'http://catalogus-professorum.org/cpm/2/'
- generic_test(iri, 'text/turtle')
-
-
-def test_data_gov():
- iri = 'http://data-gov.tw.rpi.edu/2009/data-gov-twc.rdf'
- generic_test(iri, 'text/turtle')
-
-
-def test_data_bigdatagrapes():
- iri = 'http://data.bigdatagrapes.eu/resource/ontology/'
- generic_test(iri, 'text/turtle')
-
-
-def test_data_europa_esco():
- iri = 'http://data.europa.eu/esco/flow'
- generic_test(iri, 'text/turtle')
-
-
-def test_data_globalchange():
- iri = 'http://data.globalchange.gov/gcis.owl'
- generic_test(iri, 'text/turtle')
-
-
-def test_data_ontotext():
- iri = 'http://data.ontotext.com/resource/leak/'
- generic_test(iri, 'text/turtle')
-
-
-def test_data_opendiscoveryspace():
- iri = 'http://data.opendiscoveryspace.eu/lom_ontology_ods.owl#'
- generic_test(iri, 'text/turtle')
-
-
-def test_data_ordnancesurvey_50kGazetteer():
- iri = 'http://data.ordnancesurvey.co.uk/ontology/50kGazetteer/'
- generic_test(iri, 'text/turtle')
-
-
-def test_data_ordnancesurvey_50kGazetteer():
- iri = 'http://dbpedia.org/ontology/Person'
- generic_test(iri, 'text/turtle')
-
-
-def test_linked_web_apis():
- iri = 'http://linked-web-apis.fit.cvut.cz/ns/core'
- generic_test(iri, 'text/turtle')
-
-
-#def test_ontologi_es():
-# iri = 'http://ontologi.es/days#'
-# generic_test(iri, 'text/turtle')
-
-
-def test_https():
- iri = "https://www.w3id.org/simulation/ontology/"
- generic_test(iri, 'text/plain; charset=utf-8')
-
-
-def test_https():
- iri = "https://vocab.eccenca.com/auth/"
- generic_test(iri, 'text/plain; charset=utf-8')
-
-
-def not_test_all_iris():
- with open('tests/archivo_ontologies_test.txt', 'r') as file:
- for line in file:
- iri = line.strip()
- if iri: # Ensure it's not an empty line
- iri_generic_test(iri)
-
-
-def generic_test(iri, content_type):
- response = requests.get(iri, proxies=PROXIES, verify=CA_CERT_PATH)
- assert response.status_code == 200
- assert iri in response.content.decode('utf-8')
-
-
-def iri_generic_test(iri):
- try:
- response = requests.get(iri, proxies=PROXIES, verify=CA_CERT_PATH)
- assert response.status_code == 200
- assert iri in response.content.decode('utf-8')
- print(f"Test passed for IRI: {iri}")
- except AssertionError:
- print(f"Test failed for IRI: {iri}")
- except requests.exceptions.RequestException as e:
- print(f"Request failed for IRI: {iri}, Error: {e}")
-
-
-if __name__ == '__main__':
- pytest.main()
diff --git a/tests/test_proxy_logic.py b/tests/test_proxy_logic.py
new file mode 100644
index 0000000..473e4bd
--- /dev/null
+++ b/tests/test_proxy_logic.py
@@ -0,0 +1,38 @@
+import unittest
+from ontologytimemachine.utils.proxy_logic import (
+ if_not_block_host,
+ do_deny_request_due_non_archivo_ontology_uri,
+ load_archivo_urls,
+ is_archivo_ontology_request,
+ proxy_logic,
+ fetch_original,
+)
+
+
+class TestProxyLogic(unittest.TestCase):
+
+ def test_do_deny_request_due_non_archivo_ontology_uri(self):
+ # Assuming we are using some sample data structure
+ class WrappedRequest:
+ def __init__(self, request):
+ self.request = {"host": request[0], "path": request[1]}
+
+ def get_request_host(self) -> str:
+ return self.request["host"].decode("utf-8")
+
+ def get_request_path(self) -> str:
+ return self.request["path"].decode("utf-8")
+
+ request = WrappedRequest((b"example.com", b"/ontology"))
+ self.assertTrue(do_deny_request_due_non_archivo_ontology_uri(request, True))
+ self.assertFalse(do_deny_request_due_non_archivo_ontology_uri(request, False))
+
+ def test_fetch_original(self):
+ url = "https://example.com"
+ headers = {"Accept": "text/html"}
+ response = fetch_original(url, headers, False)
+ self.assertEqual(response.status_code, 200)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/test_unit.py b/tests/test_unit.py
deleted file mode 100644
index f0f76e8..0000000
--- a/tests/test_unit.py
+++ /dev/null
@@ -1,145 +0,0 @@
-import unittest
-from unittest.mock import patch, Mock
-import argparse
-import requests
-from ontologytimemachine.utils.mock_responses import (
- mock_response_200,
- mock_response_403,
- mock_response_404,
- mock_response_500
-)
-from ontologytimemachine.utils.utils import (
- parse_arguments,
- fetch_from_dbpedia_archivo_api,
- map_mime_to_format,
- get_parameters_from_headers
-)
-
-
-class TestUtils(unittest.TestCase):
-
- @patch('argparse.ArgumentParser.parse_args')
- def test_parse_arguments(self, mock_parse_args):
- mock_parse_args.return_value = argparse.Namespace(
- ontoFormat='turtle',
- ontoPrecedence='enforcedPriority',
- patchAcceptUpstream=False,
- ontoVersion='originalFailoverLive',
- onlyOntologies=True,
- httpsIntercept=False,
- inspectRedirects=True,
- forwardHeaders=True,
- subjectBinarySearchThreshold=100
- )
-
- args = parse_arguments()
-
- self.assertEqual(args[0]['format'], 'turtle')
- self.assertEqual(args[0]['precedence'], 'enforcedPriority')
- self.assertFalse(args[0]['patchAcceptUpstream'])
- self.assertEqual(args[1], 'originalFailoverLive')
- self.assertTrue(args[2])
- self.assertFalse(args[3])
- self.assertTrue(args[4])
- self.assertTrue(args[5])
- self.assertEqual(args[6], 100)
-
- mock_parse_args.return_value = argparse.Namespace(
- ontoFormat='ntriples',
- ontoPrecedence='default',
- patchAcceptUpstream=True,
- ontoVersion='latestArchive',
- onlyOntologies=False,
- httpsIntercept=True,
- inspectRedirects=False,
- forwardHeaders=False,
- subjectBinarySearchThreshold=50
- )
-
- args = parse_arguments()
-
- self.assertEqual(args[0]['format'], 'ntriples')
- self.assertEqual(args[0]['precedence'], 'default')
- self.assertTrue(args[0]['patchAcceptUpstream'])
- self.assertEqual(args[1], 'latestArchive')
- self.assertFalse(args[2])
- self.assertTrue(args[3])
- self.assertFalse(args[4])
- self.assertFalse(args[5])
- self.assertEqual(args[6], 50)
-
-
- @patch('requests.get')
- def test_fetch_from_dbpedia_archivo_api(self, mock_get):
- mock_response = Mock()
- mock_response.status_code = 200
- mock_get.return_value = mock_response
-
- ontology = 'http://dbpedia.org/ontology/Person'
- headers = {'Accept': 'text/turtle'}
-
- response = fetch_from_dbpedia_archivo_api(ontology, headers)
- self.assertEqual(response.status_code, 200)
-
- mock_get.side_effect = requests.exceptions.RequestException
- response = fetch_from_dbpedia_archivo_api(ontology, headers)
- self.assertEqual(response.status_code, 404)
-
- def test_map_mime_to_format(self):
- self.assertEqual(map_mime_to_format('application/rdf+xml'), 'owl')
- self.assertEqual(map_mime_to_format('text/turtle'), 'ttl')
- self.assertEqual(map_mime_to_format('application/n-triples'), 'nt')
- self.assertIsNone(map_mime_to_format('unknown/mime'))
-
- def test_get_parameters_from_headers(self):
- headers = {
- 'Accept': 'application/rdf+xml',
- 'Version': '1.0',
- 'VersionMatching': 'exact'
- }
- format, version, versionMatching = get_parameters_from_headers(headers)
- self.assertEqual(format, 'owl')
- self.assertEqual(version, '1.0')
- self.assertEqual(versionMatching, 'exact')
-
- headers = {
- 'Accept': 'unknown/mime',
- 'Version': '2.0',
- 'VersionMatching': 'compatible'
- }
- format, version, versionMatching = get_parameters_from_headers(headers)
- self.assertIsNone(format)
- self.assertEqual(version, '2.0')
- self.assertEqual(versionMatching, 'compatible')
-
-
-
-class TestMockResponses(unittest.TestCase):
-
- def test_mock_response_200(self):
- response = mock_response_200()
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.url, 'https://example.com/success')
- self.assertEqual(response.headers['Content-Type'], 'text/html')
- self.assertIn(b'To be implemented
', response.content)
-
- def test_mock_response_403(self):
- response = mock_response_403()
- self.assertEqual(response.status_code, 403)
- self.assertEqual(response.url, 'https://example.com/forbidden')
- self.assertEqual(response.headers['Content-Type'], 'text/html')
- self.assertIn(b'403 Forbidden
', response.content)
-
- def test_mock_response_404(self):
- response = mock_response_404()
- self.assertEqual(response.status_code, 404)
- self.assertEqual(response.url, 'https://example.com/resource-not-found')
- self.assertEqual(response.headers['Content-Type'], 'text/html')
- self.assertIn(b'404 Not Found
', response.content)
-
- def test_mock_response_500(self):
- response = mock_response_500()
- self.assertEqual(response.status_code, 500)
- self.assertEqual(response.url, 'https://example.com/internal-server-error')
- self.assertEqual(response.headers['Content-Type'], 'text/html')
- self.assertIn(b'500 Internal Server Error
', response.content)
\ No newline at end of file
diff --git a/tests/test_utils.py b/tests/test_utils.py
new file mode 100644
index 0000000..fb0ca0a
--- /dev/null
+++ b/tests/test_utils.py
@@ -0,0 +1,101 @@
+import unittest
+from unittest.mock import patch, Mock
+import argparse
+import requests
+
+from ontologytimemachine.utils.utils import (
+ get_mime_type,
+ map_mime_to_format,
+ get_format_from_accept_header,
+ select_highest_priority_mime_from_archivo,
+ parse_accept_header_with_priority,
+ set_onto_format_headers,
+)
+
+
+class TestUtils(unittest.TestCase):
+
+ def test_get_mime_type(self):
+ self.assertEqual(get_mime_type("turtle"), "text/turtle")
+ self.assertEqual(get_mime_type("rdfxml"), "application/rdf+xml")
+ self.assertEqual(get_mime_type("ntriples"), "application/n-triples")
+ self.assertEqual(get_mime_type("htmldocu"), "text/html")
+ self.assertEqual(get_mime_type("unknown"), "text/turtle") # Default
+
+ def test_map_mime_to_format(self):
+ self.assertEqual(map_mime_to_format("application/rdf+xml"), "owl")
+ self.assertEqual(map_mime_to_format("application/owl+xml"), "owl")
+ self.assertEqual(map_mime_to_format("text/turtle"), "ttl")
+ self.assertEqual(map_mime_to_format("application/n-triples"), "nt")
+ self.assertIsNone(map_mime_to_format("unknown/mime"))
+
+ def test_select_highest_priority_mime_from_archivo(self):
+ archivo_mime_types = [
+ ("application/rdf+xml", 1.0),
+ ("text/turtle", 0.8),
+ ("application/n-triples", 1.0),
+ ]
+ result = select_highest_priority_mime_from_archivo(archivo_mime_types)
+ self.assertEqual(result, "application/rdf+xml")
+
+ archivo_mime_types = [
+ ("text/html", 0.8), # Unsupported type
+ ]
+ result = select_highest_priority_mime_from_archivo(archivo_mime_types)
+ self.assertIsNone(result)
+
+ def test_parse_accept_header_with_priority(self):
+ accept_header = (
+ "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8"
+ )
+ parsed_result = parse_accept_header_with_priority(accept_header)
+ expected_result = [
+ ("text/html", 1),
+ ("application/xhtml+xml", 1),
+ ("image/webp", 1),
+ ("application/xml", 0.9),
+ ("*/*", 0.8),
+ ]
+ self.assertEqual(parsed_result, expected_result)
+
+ def test_get_format_from_accept_header(self):
+ headers = {"Accept": "application/rdf+xml,text/turtle;q=0.9,*/*;q=0.8"}
+ format_result = get_format_from_accept_header(headers)
+ self.assertEqual(format_result, "owl")
+
+ headers_empty = {}
+ format_result = get_format_from_accept_header(headers_empty)
+ self.assertIsNone(format_result)
+
+ @patch("requests.get")
+ def test_fetch_latest_archived(self, mock_get):
+ mock_response = Mock()
+ mock_response.status_code = 200
+ mock_get.return_value = mock_response
+
+ ontology = "http://dbpedia.org/ontology/Person"
+ headers = {"Accept": "text/turtle"}
+
+ def test_map_mime_to_format(self):
+ self.assertEqual(map_mime_to_format("application/rdf+xml"), "owl")
+ self.assertEqual(map_mime_to_format("text/turtle"), "ttl")
+ self.assertEqual(map_mime_to_format("application/n-triples"), "nt")
+ self.assertIsNone(map_mime_to_format("unknown/mime"))
+
+ def test_get_format_from_accept_header(self):
+ headers = {"Accept": "application/json"}
+ format = get_format_from_accept_header(headers)
+ self.assertEqual(format, None)
+
+ headers = {}
+ format = get_format_from_accept_header(headers)
+
+ self.assertIsNone(format, None)
+
+ headers = {"Accept": "text/turtle"}
+ format = get_format_from_accept_header(headers)
+ self.assertEqual(format, "ttl")
+
+
+if __name__ == "__main__":
+ unittest.main()