From aa1fbc8b7ae11202e48223b721bf0a839e4a65da Mon Sep 17 00:00:00 2001 From: Ben Boonstra Date: Tue, 22 Oct 2024 22:05:57 -0500 Subject: [PATCH] 1.2.0.1 [release] --- CHANGELOG.md | 14 ++ docs/docs/technical/fields.html | 4 +- effortless/search.py | 45 +++-- setup.py | 6 +- tests/test_search.py | 299 ++++++++++++++++++++++++++------ 5 files changed, 303 insertions(+), 65 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ff9a448..00c33a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,20 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.2.0.1] - 2024-10-22 + +### Added + +- Added Unix timestamp support to between_dates() conditions +- Date conditions are now timezone-aware (UTC) +- Fixed some typing issues + +### Docs + +- Added technical per-object documentation pages +- Updated themes and visuals +- Added a changelog page that reflects CHANGELOG.md + ## [1.2.0] - 2024-10-22 ### Added diff --git a/docs/docs/technical/fields.html b/docs/docs/technical/fields.html index b02629b..9538047 100644 --- a/docs/docs/technical/fields.html +++ b/docs/docs/technical/fields.html @@ -184,8 +184,8 @@

Creates a condition that checks if the field's date is between the given dates. Valid dates are either datetime - objects or strings in the yyyy-mm-dd format. Unix - timestamp support is coming soon! + objects, strings in the yyyy-mm-dd format, or Unix + timestamps.

diff --git a/effortless/search.py b/effortless/search.py index 3c1e2e1..08de714 100644 --- a/effortless/search.py +++ b/effortless/search.py @@ -1,7 +1,7 @@ from abc import ABC, abstractmethod from typing import Callable, Union, Tuple, List, Type import re -from datetime import datetime +from datetime import datetime, timezone from difflib import SequenceMatcher @@ -85,7 +85,7 @@ def _create_field_validator(self): elif isinstance(self.field, str): return lambda entry: self._field_exists(entry, self.field) elif isinstance(self.field, (tuple, list)): - return lambda entry: all(self._field_exists(entry, f) for f in self.field) # type: ignore + return lambda entry: all(self._field_exists(entry, f) for f in self.field) # type: ignore return lambda entry: False def __and__(self, other): @@ -275,28 +275,39 @@ def between_dates(self, start_date, end_date): Create a condition that checks if the field's date is between the given dates. Args: - start_date (datetime): The start date of the range. - end_date (datetime): The end date of the range. + start_date (Union[datetime, str, int, float]): The start date of the range. + end_date (Union[datetime, str, int, float]): The end date of the range. Returns: Query: This query object with the new condition. Raises: - TypeError: If start_date or end_date is not a datetime object. - ValueError: If start_date is after end_date. + TypeError: If start_date or end_date is not a datetime, string, int, or float. + ValueError: If start_date is after end_date, if the date string format is invalid, + or if the Unix timestamp is invalid. """ def to_datetime(date): if isinstance(date, str): try: - return datetime.fromisoformat(date) + dt = datetime.fromisoformat(date) + return dt.replace(tzinfo=timezone.utc) if dt.tzinfo is None else dt except ValueError: raise ValueError(f"Invalid date format: {date}") elif isinstance(date, datetime): - return date + return ( + date.replace(tzinfo=timezone.utc) if date.tzinfo is None else date + ) + elif isinstance(date, (int, float)): + if date < 0: + raise ValueError(f"Invalid Unix timestamp: {date}") + try: + return datetime.fromtimestamp(date, tz=timezone.utc) + except (ValueError, OSError, OverflowError): + raise ValueError(f"Invalid Unix timestamp: {date}") else: raise TypeError( - f"Date must be a string or datetime object, not {type(date)}" + f"Date must be a string, datetime, int, or float object, not {type(date)}" ) start_date = to_datetime(start_date) @@ -309,9 +320,22 @@ def condition(entry): if isinstance(field_value, str): try: field_value = datetime.fromisoformat(field_value) + field_value = ( + field_value.replace(tzinfo=timezone.utc) + if field_value.tzinfo is None + else field_value + ) except ValueError: return False - if not isinstance(field_value, datetime): + elif isinstance(field_value, (int, float)): + field_value = datetime.fromtimestamp(field_value, tz=timezone.utc) + elif isinstance(field_value, datetime): + field_value = ( + field_value.replace(tzinfo=timezone.utc) + if field_value.tzinfo is None + else field_value + ) + else: return False return start_date <= field_value <= end_date @@ -530,4 +554,3 @@ class FieldNotFoundError(Exception): """ pass - diff --git a/setup.py b/setup.py index 32f796d..266ce85 100644 --- a/setup.py +++ b/setup.py @@ -4,12 +4,12 @@ setup( name="Effortless", - version="1.2.0", + version="1.2.0.1", packages=find_packages(), description="Databases should be Effortless.", long_description=open("README.md").read(), long_description_content_type="text/markdown", - url="https://github.com/bboonstra/Effortless", + url="https://bboonstra.github.io/Effortless/", author="Ben Boonstra", license="MIT", classifiers=[ @@ -30,7 +30,7 @@ keywords="database, effortless, simple storage, beginner, easy, db", project_urls={ "Bug Tracker": "https://github.com/bboonstra/Effortless/issues", - "Documentation": "https://github.com/bboonstra/Effortless", + "Documentation": "https://bboonstra.github.io/Effortless/", "Source Code": "https://github.com/bboonstra/Effortless", }, include_package_data=True, diff --git a/tests/test_search.py b/tests/test_search.py index d0f9e64..c9d5350 100644 --- a/tests/test_search.py +++ b/tests/test_search.py @@ -1,7 +1,7 @@ import unittest from effortless import EffortlessDB, Field, Query import re -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone import time import math @@ -40,70 +40,146 @@ def setUp(self): def test_equals(self): result = self.db.filter(Field("name").equals("Alice")) - self.assertEqual(len(result), 1, "There should be exactly one entry with name 'Alice'") - self.assertEqual(result[0]["name"], "Alice", "The filtered entry should have the name 'Alice'") + self.assertEqual( + len(result), 1, "There should be exactly one entry with name 'Alice'" + ) + self.assertEqual( + result[0]["name"], + "Alice", + "The filtered entry should have the name 'Alice'", + ) def test_contains_case_sensitive(self): result = self.db.filter(Field("skills").contains("Python")) - self.assertEqual(len(result), 2, "There should be two entries with 'Python' in their skills") - self.assertTrue(any(entry["id"] == 1 for entry in result), "Alice (id 1) should be in the result") - self.assertTrue(any(entry["id"] == 3 for entry in result), "Charlie (id 3) should be in the result") + self.assertEqual( + len(result), 2, "There should be two entries with 'Python' in their skills" + ) + self.assertTrue( + any(entry["id"] == 1 for entry in result), + "Alice (id 1) should be in the result", + ) + self.assertTrue( + any(entry["id"] == 3 for entry in result), + "Charlie (id 3) should be in the result", + ) def test_contains_case_insensitive(self): result = self.db.filter( Field("skills").contains("python", case_sensitive=False) ) - self.assertEqual(len(result), 2, "There should be two entries with 'python' (case-insensitive) in their skills") - self.assertTrue(any(entry["id"] == 1 for entry in result), "Alice (id 1) should be in the result") - self.assertTrue(any(entry["id"] == 3 for entry in result), "Charlie (id 3) should be in the result") + self.assertEqual( + len(result), + 2, + "There should be two entries with 'python' (case-insensitive) in their skills", + ) + self.assertTrue( + any(entry["id"] == 1 for entry in result), + "Alice (id 1) should be in the result", + ) + self.assertTrue( + any(entry["id"] == 3 for entry in result), + "Charlie (id 3) should be in the result", + ) def test_startswith_case_sensitive(self): result = self.db.filter(Field("name").startswith("A")) - self.assertEqual(len(result), 1, "There should be one entry with a name starting with 'A'") - self.assertEqual(result[0]["name"], "Alice", "The filtered entry should be 'Alice'") + self.assertEqual( + len(result), 1, "There should be one entry with a name starting with 'A'" + ) + self.assertEqual( + result[0]["name"], "Alice", "The filtered entry should be 'Alice'" + ) def test_startswith_case_insensitive(self): result = self.db.filter(Field("name").startswith("a", case_sensitive=False)) - self.assertEqual(len(result), 1, "There should be one entry with a name starting with 'a' (case-insensitive)") - self.assertEqual(result[0]["name"], "Alice", "The filtered entry should be 'Alice'") + self.assertEqual( + len(result), + 1, + "There should be one entry with a name starting with 'a' (case-insensitive)", + ) + self.assertEqual( + result[0]["name"], "Alice", "The filtered entry should be 'Alice'" + ) def test_endswith(self): result = self.db.filter(Field("name").endswith("e")) - self.assertEqual(len(result), 2, "There should be two entries with names ending in 'e'") - self.assertTrue(any(entry["id"] == 1 for entry in result), "Alice (id 1) should be in the result") - self.assertTrue(any(entry["id"] == 3 for entry in result), "Charlie (id 3) should be in the result") + self.assertEqual( + len(result), 2, "There should be two entries with names ending in 'e'" + ) + self.assertTrue( + any(entry["id"] == 1 for entry in result), + "Alice (id 1) should be in the result", + ) + self.assertTrue( + any(entry["id"] == 3 for entry in result), + "Charlie (id 3) should be in the result", + ) def test_greater_than(self): result = self.db.filter(Field("age").greater_than(30)) - self.assertEqual(len(result), 1, "There should be one entry with age greater than 30") - self.assertEqual(result[0]["name"], "Charlie", "The filtered entry should be 'Charlie'") + self.assertEqual( + len(result), 1, "There should be one entry with age greater than 30" + ) + self.assertEqual( + result[0]["name"], "Charlie", "The filtered entry should be 'Charlie'" + ) def test_less_than(self): result = self.db.filter(Field("age").less_than(30)) - self.assertEqual(len(result), 1, "There should be one entry with age less than 30") + self.assertEqual( + len(result), 1, "There should be one entry with age less than 30" + ) self.assertEqual(result[0]["name"], "Bob", "The filtered entry should be 'Bob'") def test_and_query(self): result = self.db.filter( Field("age").greater_than(25) & Field("skills").contains("Python") ) - self.assertEqual(len(result), 2, "There should be two entries with age > 25 and 'Python' in skills") - self.assertTrue(any(entry["name"] == "Alice" for entry in result), "Alice should be in the result") - self.assertTrue(any(entry["name"] == "Charlie" for entry in result), "Charlie should be in the result") + self.assertEqual( + len(result), + 2, + "There should be two entries with age > 25 and 'Python' in skills", + ) + self.assertTrue( + any(entry["name"] == "Alice" for entry in result), + "Alice should be in the result", + ) + self.assertTrue( + any(entry["name"] == "Charlie" for entry in result), + "Charlie should be in the result", + ) def test_or_query(self): result = self.db.filter( Field("age").less_than(26) | Field("name").equals("Charlie") ) - self.assertEqual(len(result), 2, "There should be two entries with age < 26 or name 'Charlie'") - self.assertTrue(any(entry["name"] == "Bob" for entry in result), "Bob should be in the result") - self.assertTrue(any(entry["name"] == "Charlie" for entry in result), "Charlie should be in the result") + self.assertEqual( + len(result), + 2, + "There should be two entries with age < 26 or name 'Charlie'", + ) + self.assertTrue( + any(entry["name"] == "Bob" for entry in result), + "Bob should be in the result", + ) + self.assertTrue( + any(entry["name"] == "Charlie" for entry in result), + "Charlie should be in the result", + ) def test_nested_field(self): result = self.db.filter(Field("address.country").equals("USA")) - self.assertEqual(len(result), 2, "There should be two entries with country 'USA'") - self.assertTrue(any(entry["name"] == "Alice" for entry in result), "Alice should be in the result") - self.assertTrue(any(entry["name"] == "Charlie" for entry in result), "Charlie should be in the result") + self.assertEqual( + len(result), 2, "There should be two entries with country 'USA'" + ) + self.assertTrue( + any(entry["name"] == "Alice" for entry in result), + "Alice should be in the result", + ) + self.assertTrue( + any(entry["name"] == "Charlie" for entry in result), + "Charlie should be in the result", + ) def test_complex_query(self): result = self.db.filter( @@ -113,18 +189,39 @@ def test_complex_query(self): & Field("address.city").equals("London") ) ) - self.assertEqual(len(result), 3, "There should be three entries matching the complex query") - self.assertTrue(any(entry["name"] == "Alice" for entry in result), "Alice should be in the result") - self.assertTrue(any(entry["name"] == "Bob" for entry in result), "Bob should be in the result") - self.assertTrue(any(entry["name"] == "Charlie" for entry in result), "Charlie should be in the result") + self.assertEqual( + len(result), 3, "There should be three entries matching the complex query" + ) + self.assertTrue( + any(entry["name"] == "Alice" for entry in result), + "Alice should be in the result", + ) + self.assertTrue( + any(entry["name"] == "Bob" for entry in result), + "Bob should be in the result", + ) + self.assertTrue( + any(entry["name"] == "Charlie" for entry in result), + "Charlie should be in the result", + ) def test_lambda_query(self): result = self.db.filter( Query(lambda entry: len(entry["skills"]) > 1 and entry["age"] < 35) ) - self.assertEqual(len(result), 2, "There should be two entries with more than 1 skill and age < 35") - self.assertTrue(any(entry["name"] == "Alice" for entry in result), "Alice should be in the result") - self.assertTrue(any(entry["name"] == "Bob" for entry in result), "Bob should be in the result") + self.assertEqual( + len(result), + 2, + "There should be two entries with more than 1 skill and age < 35", + ) + self.assertTrue( + any(entry["name"] == "Alice" for entry in result), + "Alice should be in the result", + ) + self.assertTrue( + any(entry["name"] == "Bob" for entry in result), + "Bob should be in the result", + ) def test_empty_result(self): result = self.db.filter(Field("name").equals("David")) @@ -132,11 +229,15 @@ def test_empty_result(self): def test_invalid_field(self): result = self.db.filter(Field("invalid_field").equals("value")) - self.assertEqual(len(result), 0, "There should be no entries with an 'invalid_field'") + self.assertEqual( + len(result), 0, "There should be no entries with an 'invalid_field'" + ) def test_invalid_nested_field(self): result = self.db.filter(Field("address.invalid_field").equals("value")) - self.assertEqual(len(result), 0, "There should be no entries with an 'address.invalid_field'") + self.assertEqual( + len(result), 0, "There should be no entries with an 'address.invalid_field'" + ) def test_multiple_conditions(self): result = self.db.filter( @@ -144,19 +245,40 @@ def test_multiple_conditions(self): & Field("skills").contains("Python") & Field("address.country").equals("USA") ) - self.assertEqual(len(result), 2, "There should be two entries matching all three conditions") - self.assertTrue(any(entry["name"] == "Alice" for entry in result), "Alice should be in the result") - self.assertTrue(any(entry["name"] == "Charlie" for entry in result), "Charlie should be in the result") + self.assertEqual( + len(result), 2, "There should be two entries matching all three conditions" + ) + self.assertTrue( + any(entry["name"] == "Alice" for entry in result), + "Alice should be in the result", + ) + self.assertTrue( + any(entry["name"] == "Charlie" for entry in result), + "Charlie should be in the result", + ) def test_complex_nested_query(self): query = (Field("age").greater_than(25) & Field("skills").contains("Python")) | ( Field("name").startswith("B") ) result = self.db.filter(query) - self.assertEqual(len(result), 3, "There should be three entries matching the complex nested query") - self.assertTrue(any(entry["name"] == "Alice" for entry in result), "Alice should be in the result") - self.assertTrue(any(entry["name"] == "Bob" for entry in result), "Bob should be in the result") - self.assertTrue(any(entry["name"] == "Charlie" for entry in result), "Charlie should be in the result") + self.assertEqual( + len(result), + 3, + "There should be three entries matching the complex nested query", + ) + self.assertTrue( + any(entry["name"] == "Alice" for entry in result), + "Alice should be in the result", + ) + self.assertTrue( + any(entry["name"] == "Bob" for entry in result), + "Bob should be in the result", + ) + self.assertTrue( + any(entry["name"] == "Charlie" for entry in result), + "Charlie should be in the result", + ) def test_nested_field_query(self): self.db.wipe() @@ -164,12 +286,20 @@ def test_nested_field_query(self): self.db.add({"user": {"name": "Bob", "age": 25}}) result = self.db.filter(Field("user.name").equals("Alice")) - self.assertEqual(len(result), 1, "There should be one entry with user.name 'Alice'") - self.assertEqual(result[0]["user"]["name"], "Alice", "The filtered entry should have user.name 'Alice'") + self.assertEqual( + len(result), 1, "There should be one entry with user.name 'Alice'" + ) + self.assertEqual( + result[0]["user"]["name"], + "Alice", + "The filtered entry should have user.name 'Alice'", + ) result = self.db.filter(Field("user.age").less_than(28)) self.assertEqual(len(result), 1, "There should be one entry with user.age < 28") - self.assertEqual(result[0]["user"]["name"], "Bob", "The filtered entry should be Bob") + self.assertEqual( + result[0]["user"]["name"], "Bob", "The filtered entry should be Bob" + ) class TestAdvancedSearch(unittest.TestCase): @@ -336,6 +466,77 @@ def test_performance(self): end_time - start_time, 1.0 ) # Assert that it takes less than 1 second + def test_between_dates_with_unix_timestamps(self): + # Add entries with Unix timestamp dates + now = time.time() + self.db.add( + { + "id": 4, + "name": "David", + "registration_date": now - 86400, # Yesterday + } + ) + self.db.add( + { + "id": 5, + "name": "Eve", + "registration_date": now, # Now + } + ) + self.db.add( + { + "id": 6, + "name": "Frank", + "registration_date": now + 86400, # Tomorrow + } + ) + + # Test with Unix timestamp input + start_date = now - 43200 # 12 hours ago + end_date = now + 43200 # 12 hours from now + + result = self.db.filter( + Field("registration_date").between_dates(start_date, end_date) + ) + self.assertEqual(len(result), 1) + self.assertEqual(result[0]["name"], "Eve") + + # Test with mixed input types + result = self.db.filter( + Field("registration_date").between_dates( + datetime.fromtimestamp(now - 86400, tz=timezone.utc), now + 43200 + ) + ) + self.assertEqual(len(result), 2) + self.assertTrue(any(entry["name"] == "David" for entry in result)) + self.assertTrue(any(entry["name"] == "Eve" for entry in result)) + + def test_between_dates_with_invalid_unix_timestamps(self): + # Test with negative Unix timestamp + with self.assertRaises(ValueError): + self.db.filter( + Field("registration_date").between_dates(-1, time.time()) + ) + + # Test with Unix timestamp that's too large + with self.assertRaises(ValueError): + self.db.filter( + Field("registration_date").between_dates(2**63, time.time()) + ) + + # Test with invalid Unix timestamp (string that's not a valid float) + with self.assertRaises(ValueError): + self.db.filter( + Field("registration_date").between_dates("not a timestamp", time.time()) + ) + + def test_between_dates_with_future_unix_timestamps(self): + far_future = time.time() + 31536000 # Approximately one year from now + result = self.db.filter( + Field("registration_date").between_dates(far_future, far_future + 86400) + ) + self.assertEqual(len(result), 0) # Should be no matches + class TestAdvancedSearchErrors(unittest.TestCase): def setUp(self): @@ -403,7 +604,7 @@ def test_fuzzy_match_type_error(self): # Test with non-numeric threshold with self.assertRaises(ValueError): - self.db.filter(Field("name").fuzzy_match("Alice", threshold="high")) # type: ignore + self.db.filter(Field("name").fuzzy_match("Alice", threshold="high")) # type: ignore def test_fuzzy_match_value_error(self): # Test with threshold out of range @@ -645,7 +846,7 @@ def test_passes_with_none_value(self): self.db.add({"id": 4, "name": "David", "age": None}) print("\nDatabase contents:") print(self.db.get_all()) - + def check_none(x): print(f"Checking value: {x}") return x is None