diff --git a/CHANGELOG.md b/CHANGELOG.md index c3f85ef..17bf700 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,15 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.1.1] - 2024-10-21 + +### Added + +- Introduced new `passes` method for custom filtering functions +- Added `is_type` method for type checking in queries +- Expanded test suite to cover new `passes` and `is_type` functionalities +- Implements a blocking `finish_backup` method to databases to complete a backup before proceeding + ## [1.1.0] - 2024-10-20 ### Added diff --git a/README.md b/README.md index 43b811a..74d6c3e 100644 --- a/README.md +++ b/README.md @@ -101,7 +101,7 @@ db.add({"Anything": "will not work"}) # Raises an error ## New Filtering Capabilities -Effortless 1.1.0 introduces powerful filtering capabilities using the `Field` class: +Effortless 1.1 introduces powerful filtering capabilities using the `Field` class: - `equals`: Exact match - `contains`: Check if a value is in a string or list @@ -110,6 +110,8 @@ Effortless 1.1.0 introduces powerful filtering capabilities using the `Field` cl - `matches_regex`: Regular expression matching - `between_dates`: Date range filtering - `fuzzy_match`: Approximate string matching +- `passes`: Apply a custom function to filter items +- `is_type`: Check the type of a field You can combine these filters using `&` (AND) and `|` (OR) operators for complex queries. @@ -120,13 +122,19 @@ result = db.filter( ) ``` -For even more flexibility, you can use the `Query` class with a custom lambda function: +For even more flexibility, you can use the `passes` method with a custom function: ```python -from effortless import Query +def is_experienced(skills): + return len(skills) > 3 -custom_query = Query(lambda item: len(item["name"]) > 5 and item["age"] % 2 == 0) -result = db.filter(custom_query) +result = db.filter(Field("skills").passes(is_experienced)) +``` + +You can also check the type of a field: + +```python +result = db.filter(Field("age").is_type(int)) ``` These new filtering capabilities make Effortless more powerful while maintaining its simplicity and ease of use. diff --git a/effortless/effortless.py b/effortless/effortless.py index 505cc5d..c0dcc0a 100644 --- a/effortless/effortless.py +++ b/effortless/effortless.py @@ -29,6 +29,7 @@ def __init__(self, db_name: str = "db"): self.set_storage(db_name) self._autoconfigure() self._operation_count = 0 + self._backup_thread = None @staticmethod def default_db(): @@ -374,9 +375,37 @@ def _handle_backup(self) -> None: self._operation_count += 1 if self.config.backup and self._operation_count >= self.config.backup_interval: self._operation_count = 0 - threading.Thread(target=self._backup).start() - def _backup(self) -> None: + # If a backup thread is already running, we can stop it + if self._backup_thread and self._backup_thread.is_alive(): + self._backup_thread.join(timeout=0) # Non-blocking join + if self._backup_thread.is_alive() and self.config.debug: + logger.debug("Previous backup thread is alive and not stopping") + + # Start a new backup thread + self._backup_thread = threading.Thread(target=self._backup) + self._backup_thread.start() + + def finish_backup(self, timeout: float = None) -> bool: + """ + Wait for any ongoing backup operation to complete. + + This method blocks until the current backup thread (if any) has finished. + + Args: + timeout (float, optional): Maximum time to wait for the backup to complete, in seconds. + If None, wait indefinitely. Defaults to None. + + Returns: + bool: True if the backup completed (or there was no backup running), + False if the timeout was reached before the backup completed. + """ + if self._backup_thread and self._backup_thread.is_alive(): + self._backup_thread.join(timeout) + return not self._backup_thread.is_alive() + return True + + def _backup(self) -> bool: """ Perform a database backup. @@ -388,13 +417,23 @@ def _backup(self) -> None: """ if self.config.backup: try: + # Check if backup directory is valid + if not os.path.exists(self.config.backup) or not os.access( + self.config.backup, os.W_OK + ): + raise IOError( + f"Backup directory {self.config.backup} is not writable or does not exist." + ) + backup_path = os.path.join( self.config.backup, os.path.basename(self._storage_file) ) shutil.copy2(self._storage_file, backup_path) logger.debug(f"Database backed up to {backup_path}") + return True # Indicate success except IOError as e: logger.error(f"Backup failed: {str(e)}") + return False # Indicate failure def _compress_data(self, data: Dict[str, Any]) -> str: """ diff --git a/effortless/search.py b/effortless/search.py index 024df4e..ca08723 100644 --- a/effortless/search.py +++ b/effortless/search.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Callable, Union, Tuple, List +from typing import Callable, Union, Tuple, List, Type import re from datetime import datetime from difflib import SequenceMatcher @@ -73,6 +73,7 @@ def __init__( """ if callable(condition_or_field): self.condition = condition_or_field + self.field = None else: self.field = condition_or_field self.condition = None @@ -315,8 +316,19 @@ def between_dates(self, start_date, end_date): TypeError: If start_date or end_date is not a datetime object. ValueError: If start_date is after end_date. """ - if not isinstance(start_date, datetime) or not isinstance(end_date, datetime): - raise TypeError("Start and end dates must be datetime objects") + def to_datetime(date): + if isinstance(date, str): + try: + return datetime.fromisoformat(date) + except ValueError: + raise ValueError(f"Invalid date format: {date}") + elif isinstance(date, datetime): + return date + else: + raise TypeError(f"Date must be a string or datetime object, not {type(date)}") + + start_date = to_datetime(start_date) + end_date = to_datetime(end_date) if start_date > end_date: raise ValueError("Start date must be before or equal to end date") @@ -359,6 +371,48 @@ def condition(item): self.condition = lambda item: self._validate_field(item) and condition(item) return self + def passes(self, func): + """ + Create a condition that checks if the field passes the given function. + + Args: + func (callable): A function that takes the field value and returns a boolean. + + Returns: + Query: This query object with the new condition. + """ + + def condition(item): + field_value = self._get_nested_value(item, self.field) + try: + return func(field_value) + except Exception as e: + func_name = getattr(func, "__name__", "unnamed function") + raise ValueError( + f"Error checking condition '{func_name}': {str(e)}" + ) from e + + self.condition = lambda item: self._validate_field(item) and condition(item) + return self + + def is_type(self, expected_type: Type): + """ + Create a condition that checks if the field is of the expected type. + + Args: + expected_type (Type): The expected type of the field. + + Returns: + Query: This query object with the new condition. + """ + + def condition(item): + field_value = self._get_nested_value(item, self.field) + return isinstance(field_value, expected_type) + + self.condition = lambda item: self._validate_field(item) and condition(item) + return self + class AndQuery(BaseQuery): """ diff --git a/setup.py b/setup.py index 37f9101..4a2adae 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ setup( name="Effortless", - version="1.1.0", + version="1.1.1", packages=find_packages(), description="Databases should be Effortless.", long_description=open("README.md").read(), diff --git a/tests/test_configuration.py b/tests/test_configuration.py index e48abb1..f86d3ae 100644 --- a/tests/test_configuration.py +++ b/tests/test_configuration.py @@ -1,6 +1,7 @@ import unittest import tempfile import shutil +import os from effortless import EffortlessDB, EffortlessConfig @@ -65,10 +66,10 @@ def test_required_fields(self): def test_max_size_limit(self): self.db.wipe() self.db.configure(EffortlessConfig({"ms": 0.001})) # Set max size to 1 KB - + # This should work self.db.add({"small": "data"}) - + # This should raise an error large_data = {"large": "x" * 1000} # Approximately 1 KB with self.assertRaises(ValueError): @@ -103,6 +104,40 @@ def test_invalid_configuration_values(self): with self.assertRaises(ValueError): EffortlessConfig({"bpi": 0}) + def test_backup_interval(self): + # Configure the database with a backup path + backup_path = tempfile.mkdtemp() # Create a temporary directory for backups + new_config = { + "dbg": True, + "bp": backup_path, # Set backup path + "bpi": 1, # Backup after every operation + } + self.db.configure(EffortlessConfig(new_config)) + + # Assert that the backup path is properly configured + self.assertEqual(self.db.config.backup, backup_path) + + # Add an item to trigger a backup + self.db.add({"name": "Alice", "age": 30}) + + backup_file = os.path.join(backup_path, "test_db.effortless") + self.assertFalse( + os.path.exists(backup_file), + "DB should not be backed up after 1 operation if bpi == 2.", + ) + + # Add another item to trigger a backup again + self.db.add({"name": "Bob", "age": 25}) + + # Check if the backup file still exists and has been updated + self.assertTrue( + os.path.exists(backup_file), + "Backup file should still exist after adding the second item.", + ) + + # Clean up the backup directory + shutil.rmtree(backup_path) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_docs.py b/tests/test_docs.py index a1f9fad..338026a 100644 --- a/tests/test_docs.py +++ b/tests/test_docs.py @@ -1,8 +1,9 @@ import tempfile import unittest import shutil -from effortless import EffortlessDB, EffortlessConfig, Field +from effortless import EffortlessDB, EffortlessConfig, Field, Query import effortless +import os class TestDocs(unittest.TestCase): @@ -13,72 +14,182 @@ def tearDown(self): shutil.rmtree(self.test_dir, ignore_errors=True) def test_effortless_usage(self): - db = ( - effortless.db - ) # Same as from effortless import db, but trying not to clog namespace with db and effortless + db = effortless.db db.wipe(wipe_readonly=True) # Add items to the database db.add({"name": "Alice", "age": 30}) db.add({"name": "Bob", "age": 25}) - # Search for items - result = db.filter(Field("name").equals("Alice")) - self.assertEqual(result, {"1": {"name": "Alice", "age": 30}}) - - # Get all items + # Get all items from the DB all_items = db.get_all() self.assertEqual( all_items, {"1": {"name": "Alice", "age": 30}, "2": {"name": "Bob", "age": 25}}, ) + # Get items based on a field + result = db.filter(Field("name").equals("Alice")) + self.assertEqual(result, {"1": {"name": "Alice", "age": 30}}) + + # Wipe the database + db.wipe() + self.assertEqual(db.get_all(), {}) + def test_basic_usage(self): # Create a new Effortless instance - local_db = EffortlessDB() - local_db.wipe(wipe_readonly=True) + db = EffortlessDB() + db.wipe(wipe_readonly=True) + # Add items to the database - local_db.add({"name": "Charlie", "age": 35}) - local_db.add({"name": "David", "age": 28}) + db.add({"name": "Charlie", "age": 35}) + db.add({"name": "David", "age": 28}) - # Search for items - result = local_db.filter(Field("age").equals(28)) - self.assertEqual(result, {"2": {"name": "David", "age": 28}}) - - # Get all items - all_items = local_db.get_all() - self.assertEqual( - all_items, - {"1": {"name": "Charlie", "age": 35}, "2": {"name": "David", "age": 28}}, - ) + # Filter items + result = db.filter(Field("age").greater_than(30)) + self.assertEqual(result, {"1": {"name": "Charlie", "age": 35}}) def test_advanced_usage(self): - # Create a new EffortlessDB instance with a custom directory - advanced_db = EffortlessDB("advanced_db") - advanced_db.set_directory(self.test_dir) - advanced_db.wipe() + # Create a new Effortless instance with a custom directory + db = EffortlessDB("advanced_db") + db.set_directory(self.test_dir) + db.wipe() # Add multiple items - advanced_db.add({"id": 1, "name": "Eve", "skills": ["Python", "JavaScript"]}) - advanced_db.add({"id": 2, "name": "Frank", "skills": ["Java", "C++"]}) - advanced_db.add({"id": 3, "name": "Grace", "skills": ["Python", "Ruby"]}) - - # Complex search - python_devs = advanced_db.filter(Field("skills").contains("Python")) - self.assertEqual( - python_devs, + db.add( + { + "id": 1, + "name": "Eve", + "skills": ["Python", "JavaScript"], + "joined": "2023-01-15", + } + ) + db.add( + { + "id": 2, + "name": "Frank", + "skills": ["Java", "C++"], + "joined": "2023-02-20", + } + ) + db.add( { - "1": {"id": 1, "name": "Eve", "skills": ["Python", "JavaScript"]}, - "3": {"id": 3, "name": "Grace", "skills": ["Python", "Ruby"]}, - }, + "id": 3, + "name": "Grace", + "skills": ["Python", "Ruby"], + "joined": "2023-03-10", + } + ) + + # Complex filtering + python_devs = db.filter( + Field("skills").contains("Python") + & Field("joined").between_dates("2023-01-01", "2023-02-28") ) + self.assertEqual(len(python_devs), 1) + self.assertEqual(python_devs["1"]["name"], "Eve") + + # Custom query using Query class + custom_query = Query( + lambda item: len(item["skills"]) > 1 and "Python" in item["skills"] + ) + multi_skill_python_devs = db.filter(custom_query) + self.assertEqual(len(multi_skill_python_devs), 2) + self.assertIn("1", multi_skill_python_devs) + self.assertIn("3", multi_skill_python_devs) # Update configuration - advanced_db.configure(EffortlessConfig({"index_fields": ["id", "name"]})) + db.configure(EffortlessConfig({"ro": True})) + with self.assertRaises(Exception): # The exact exception type may vary + db.add({"Anything": "will not work"}) - # Wipe the database - advanced_db.wipe() - self.assertEqual(advanced_db.get_all(), {}) + def test_new_filtering_capabilities(self): + db = EffortlessDB() + db.wipe(wipe_readonly=True) + + db.add({"name": "Alice", "age": 30, "skills": ["Python", "JavaScript"]}) + db.add({"name": "Bob", "age": 25, "skills": ["Java"]}) + db.add({"name": "Charlie", "age": 35, "skills": ["Python", "Ruby"]}) + + # Complex query + result = db.filter( + (Field("age").greater_than(25) & Field("skills").contains("Python")) + | Field("name").startswith("A") + ) + self.assertEqual(len(result), 2) + self.assertIn("1", result) + self.assertIn("3", result) + + # passes method + def is_experienced(skills): + return len(skills) > 1 + + result = db.filter(Field("skills").passes(is_experienced)) + self.assertEqual(len(result), 2) + self.assertIn("1", result) + self.assertIn("3", result) + + # is_type method + result = db.filter(Field("age").is_type(int)) + self.assertEqual(len(result), 3) + + def test_safety_first(self): + db = EffortlessDB() + db.wipe(wipe_readonly=True) + + new_configuration = EffortlessConfig() + new_configuration.backup = self.test_dir + db.configure(new_configuration) + + # Add some data + db.add({"name": "Test", "value": 123}) + + # wait for all threads in the db to complete + db.finish_backup() + + self.assertEqual(db.config.backup, self.test_dir) + + # Check if backup file is created (this is a simplified check) + + backup_files = [ + f for f in os.listdir(self.test_dir) if f.endswith(".effortless") + ] + self.assertGreater(len(backup_files), 0) + + def test_powerful_querying(self): + db = EffortlessDB() + db.wipe(wipe_readonly=True) + + db.add( + { + "username": "bboonstra", + "known_programming_languages": ["Python", "JavaScript", "Ruby"], + } + ) + db.add({"username": "user1", "known_programming_languages": ["Python", "Java"]}) + db.add( + { + "username": "user2", + "known_programming_languages": [ + "C++", + "Java", + "Rust", + "Go", + "Python", + "JavaScript", + ], + } + ) + + is_bboonstra = Field("username").equals("bboonstra") + is_experienced = Field("known_programming_languages").passes( + lambda langs: len(langs) > 5 + ) + GOATs = db.filter(is_bboonstra | is_experienced) + + self.assertEqual(len(GOATs), 2) + self.assertIn("1", GOATs) + self.assertIn("3", GOATs) if __name__ == "__main__": diff --git a/tests/test_search.py b/tests/test_search.py index 1c37b95..c5fc35d 100644 --- a/tests/test_search.py +++ b/tests/test_search.py @@ -2,8 +2,9 @@ from effortless import EffortlessDB, Field, Query import re from datetime import datetime, timedelta -from difflib import SequenceMatcher import time +import math + class TestFilter(unittest.TestCase): def setUp(self): @@ -148,7 +149,9 @@ def test_multiple_conditions(self): self.assertIn("3", result) def test_complex_nested_query(self): - query = (Field("age").greater_than(25) & Field("skills").contains("Python")) | (Field("name").startswith("B")) + query = (Field("age").greater_than(25) & Field("skills").contains("Python")) | ( + Field("name").startswith("B") + ) result = self.db.filter(query) self.assertEqual(len(result), 3) self.assertIn("1", result) @@ -159,11 +162,11 @@ def test_nested_field_query(self): self.db.wipe() self.db.add({"user": {"name": "Alice", "age": 30}}) self.db.add({"user": {"name": "Bob", "age": 25}}) - + result = self.db.filter(Field("user.name").equals("Alice")) self.assertEqual(len(result), 1) self.assertEqual(result["1"]["user"]["name"], "Alice") - + result = self.db.filter(Field("user.age").less_than(28)) self.assertEqual(len(result), 1) self.assertEqual(result["2"]["user"]["name"], "Bob") @@ -173,30 +176,36 @@ class TestAdvancedSearch(unittest.TestCase): def setUp(self): self.db = EffortlessDB() self.db.wipe() - self.db.add({ - "id": 1, - "name": "Alice Smith", - "email": "alice@example.com", - "age": 30, - "registration_date": "2023-01-15", - "skills": ["Python", "JavaScript"], - }) - self.db.add({ - "id": 2, - "name": "Bob Johnson", - "email": "bob@example.com", - "age": 25, - "registration_date": "2023-02-20", - "skills": ["Java", "C++"], - }) - self.db.add({ - "id": 3, - "name": "Charlie Brown", - "email": "charlie@example.com", - "age": 35, - "registration_date": "2023-03-10", - "skills": ["Python", "Ruby"], - }) + self.db.add( + { + "id": 1, + "name": "Alice Smith", + "email": "alice@example.com", + "age": 30, + "registration_date": "2023-01-15", + "skills": ["Python", "JavaScript"], + } + ) + self.db.add( + { + "id": 2, + "name": "Bob Johnson", + "email": "bob@example.com", + "age": 25, + "registration_date": "2023-02-20", + "skills": ["Java", "C++"], + } + ) + self.db.add( + { + "id": 3, + "name": "Charlie Brown", + "email": "charlie@example.com", + "age": 35, + "registration_date": "2023-03-10", + "skills": ["Python", "Ruby"], + } + ) def test_matches_regex(self): # Test email pattern @@ -204,25 +213,33 @@ def test_matches_regex(self): self.assertEqual(len(result), 3) # Test name pattern - result = self.db.filter(Field("name").matches_regex(r"^[A-Z][a-z]+ [A-Z][a-z]+$")) + result = self.db.filter( + Field("name").matches_regex(r"^[A-Z][a-z]+ [A-Z][a-z]+$") + ) self.assertEqual(len(result), 3) # Test with flags - result = self.db.filter(Field("name").matches_regex(r"^alice", flags=re.IGNORECASE)) + result = self.db.filter( + Field("name").matches_regex(r"^alice", flags=re.IGNORECASE) + ) self.assertEqual(len(result), 1) self.assertEqual(result["1"]["name"], "Alice Smith") def test_between_dates(self): start_date = datetime(2023, 2, 1) end_date = datetime(2023, 3, 1) - - result = self.db.filter(Field("registration_date").between_dates(start_date, end_date)) + + result = self.db.filter( + Field("registration_date").between_dates(start_date, end_date) + ) self.assertEqual(len(result), 1) self.assertEqual(result["2"]["name"], "Bob Johnson") # Test inclusive range end_date = datetime(2023, 3, 10) - result = self.db.filter(Field("registration_date").between_dates(start_date, end_date)) + result = self.db.filter( + Field("registration_date").between_dates(start_date, end_date) + ) self.assertEqual(len(result), 2) def test_fuzzy_match(self): @@ -245,8 +262,8 @@ def test_combined_advanced_queries(self): start_date = datetime(2023, 2, 1) end_date = datetime(2023, 12, 31) result = self.db.filter( - Field("email").matches_regex(r"^[bc].*@example\.com$") & - Field("registration_date").between_dates(start_date, end_date) + Field("email").matches_regex(r"^[bc].*@example\.com$") + & Field("registration_date").between_dates(start_date, end_date) ) self.assertEqual(len(result), 2) self.assertIn("2", result) @@ -254,8 +271,8 @@ def test_combined_advanced_queries(self): # Combine fuzzy match and age range result = self.db.filter( - Field("name").fuzzy_match("Charlie", threshold=0.7) & - Field("age").greater_than(30) + Field("name").fuzzy_match("Charlie", threshold=0.7) + & Field("age").greater_than(30) ) self.assertEqual(len(result), 1) self.assertEqual(result["3"]["name"], "Charlie Brown") @@ -268,69 +285,106 @@ def test_edge_cases(self): # Test date range with no matches start_date = datetime(2024, 1, 1) end_date = datetime(2024, 12, 31) - result = self.db.filter(Field("registration_date").between_dates(start_date, end_date)) + result = self.db.filter( + Field("registration_date").between_dates(start_date, end_date) + ) self.assertEqual(len(result), 0) # Test fuzzy match with very low threshold - result = self.db.filter(Field("name").fuzzy_match("Completely Different", threshold=0.1)) + result = self.db.filter( + Field("name").fuzzy_match("Completely Different", threshold=0.1) + ) self.assertEqual(len(result), 3) # Should match all due to very low threshold def test_performance(self): # Add a large number of items to test performance for i in range(1000): - self.db.add({ - "id": i + 4, - "name": f"Test User {i}", - "email": f"user{i}@example.com", - "age": 20 + (i % 60), - "registration_date": (datetime(2023, 1, 1) + timedelta(days=i)).isoformat(), - "skills": ["Python"] if i % 2 == 0 else ["Java"], - }) + self.db.add( + { + "id": i + 4, + "name": f"Test User {i}", + "email": f"user{i}@example.com", + "age": 20 + (i % 60), + "registration_date": ( + datetime(2023, 1, 1) + timedelta(days=i) + ).isoformat(), + "skills": ["Python"] if i % 2 == 0 else ["Java"], + } + ) # Test regex performance start_time = time.time() - result = self.db.filter(Field("email").matches_regex(r"^user[0-9]+@example\.com$")) + result = self.db.filter( + Field("email").matches_regex(r"^user[0-9]+@example\.com$") + ) end_time = time.time() self.assertEqual(len(result), 1000) - self.assertLess(end_time - start_time, 1.0) # Assert that it takes less than 1 second + self.assertLess( + end_time - start_time, 1.0 + ) # Assert that it takes less than 1 second # Test date range performance start_date = datetime(2023, 6, 1) end_date = datetime(2023, 12, 31) start_time = time.time() - result = self.db.filter(Field("registration_date").between_dates(start_date, end_date)) + result = self.db.filter( + Field("registration_date").between_dates(start_date, end_date) + ) end_time = time.time() self.assertGreater(len(result), 0) - self.assertLess(end_time - start_time, 1.0) # Assert that it takes less than 1 second + self.assertLess( + end_time - start_time, 1.0 + ) # Assert that it takes less than 1 second + class TestAdvancedSearchErrors(unittest.TestCase): def setUp(self): self.db = EffortlessDB() self.db.wipe() - self.db.add({ - "id": 1, - "name": "Alice Smith", - "email": "alice@example.com", - "age": 30, - "registration_date": "2023-01-15", - "skills": ["Python", "JavaScript"], - }) + self.db.add( + { + "id": 1, + "name": "Alice Smith", + "email": "alice@example.com", + "age": 30, + "registration_date": "2023-01-15", + "skills": ["Python", "JavaScript"], + } + ) def test_between_dates_type_error(self): - # Test with non-datetime objects - with self.assertRaises(TypeError): - self.db.filter(Field("registration_date").between_dates("2023-01-01", "2023-12-31")) + # Test with inconvertible date string + with self.assertRaises(ValueError): + self.db.filter( + Field("registration_date").between_dates("not-a-date", "2023-12-31") + ) + + # Test with convertible date strings (should not raise an exception) + result = self.db.filter( + Field("registration_date").between_dates("2023-01-01", "2023-12-31") + ) + self.assertEqual(len(result), 1) + + # Test with mixed types (datetime and string) + result = self.db.filter( + Field("registration_date").between_dates(datetime(2023, 1, 1), "2023-12-31") + ) + self.assertEqual(len(result), 1) # Test with mixed types - with self.assertRaises(TypeError): - self.db.filter(Field("registration_date").between_dates(datetime(2023, 1, 1), "2023-12-31")) + result = self.db.filter( + Field("registration_date").between_dates(datetime(2023, 1, 1), "2023-12-31") + ) + self.assertEqual(len(result), 1) def test_between_dates_value_error(self): # Test with end date before start date with self.assertRaises(ValueError): - self.db.filter(Field("registration_date").between_dates( - datetime(2023, 12, 31), datetime(2023, 1, 1) - )) + self.db.filter( + Field("registration_date").between_dates( + datetime(2023, 12, 31), datetime(2023, 1, 1) + ) + ) def test_matches_regex_type_error(self): # Test with non-string pattern @@ -374,11 +428,13 @@ def test_empty_database(self): self.assertEqual(len(result), 0) def test_nested_field_errors(self): - self.db.add({ - "id": 2, - "name": "Bob Johnson", - "address": {"city": "New York", "country": "USA"} - }) + self.db.add( + { + "id": 2, + "name": "Bob Johnson", + "address": {"city": "New York", "country": "USA"}, + } + ) # Test with non-existent nested field result = self.db.filter(Field("address.state").equals("NY")) @@ -403,13 +459,17 @@ def test_combined_query_type_mismatch(self): def test_performance_with_invalid_queries(self): # Add a large number of items for i in range(1000): - self.db.add({ - "id": i + 2, - "name": f"Test User {i}", - "email": f"user{i}@example.com", - "age": 20 + (i % 60), - "registration_date": (datetime(2023, 1, 1) + timedelta(days=i)).isoformat(), - }) + self.db.add( + { + "id": i + 2, + "name": f"Test User {i}", + "email": f"user{i}@example.com", + "age": 20 + (i % 60), + "registration_date": ( + datetime(2023, 1, 1) + timedelta(days=i) + ).isoformat(), + } + ) # Test performance with an invalid regex start_time = time.time() @@ -421,11 +481,237 @@ def test_performance_with_invalid_queries(self): # Test performance with an invalid date range start_time = time.time() with self.assertRaises(ValueError): - self.db.filter(Field("registration_date").between_dates( - datetime(2023, 12, 31), datetime(2023, 1, 1) - )) + self.db.filter( + Field("registration_date").between_dates( + datetime(2023, 12, 31), datetime(2023, 1, 1) + ) + ) end_time = time.time() self.assertLess(end_time - start_time, 1.0) # Should fail quickly + +class TestPassesMethod(unittest.TestCase): + def setUp(self): + self.db = EffortlessDB() + self.db.wipe() + self.db.add( + { + "id": 1, + "name": "Alice", + "age": 30, + "height": 165.5, + "is_active": True, + "skills": ["Python", "JavaScript"], + "address": {"city": "New York", "country": "USA"}, + } + ) + self.db.add( + { + "id": 2, + "name": "Bob", + "age": 25, + "height": 180.0, + "is_active": False, + "skills": ["Java", "C++"], + "address": {"city": "London", "country": "UK"}, + } + ) + self.db.add( + { + "id": 3, + "name": "Charlie", + "age": 35, + "height": 170.2, + "is_active": True, + "skills": ["Python", "Ruby"], + "address": {"city": "Paris", "country": "France"}, + } + ) + + def test_passes_simple_function(self): + result = self.db.filter(Field("age").passes(lambda x: x > 30)) + self.assertEqual(len(result), 1) + self.assertEqual(result["3"]["name"], "Charlie") + + def test_passes_complex_function(self): + def complex_check(x): + return x > 25 and x % 2 == 0 + + result = self.db.filter(Field("age").passes(complex_check)) + self.assertEqual(len(result), 1) + self.assertEqual(result["1"]["name"], "Alice") + + def test_passes_with_external_variable(self): + threshold = 28 + result = self.db.filter(Field("age").passes(lambda x: x > threshold)) + self.assertEqual(len(result), 2) + self.assertIn("1", result) + self.assertIn("3", result) + + def test_passes_with_multiple_fields(self): + def check_name_and_age(item): + return len(item["name"]) > 3 and item["age"] < 31 + + result = self.db.filter(Query(check_name_and_age)) + self.assertEqual(len(result), 1) + self.assertIn("1", result) + + def test_passes_with_nested_field(self): + result = self.db.filter( + Field("address.city").passes(lambda x: x.startswith("L")) + ) + self.assertEqual(len(result), 1) + self.assertEqual(result["2"]["name"], "Bob") + + def test_passes_with_list_field(self): + result = self.db.filter(Field("skills").passes(lambda x: "Python" in x)) + self.assertEqual(len(result), 2) + self.assertIn("1", result) + self.assertIn("3", result) + + def test_passes_with_boolean_field(self): + result = self.db.filter(Field("is_active").passes(lambda x: x is True)) + self.assertEqual(len(result), 2) + self.assertIn("1", result) + self.assertIn("3", result) + + def test_passes_with_float_field(self): + result = self.db.filter(Field("height").passes(lambda x: 165 < x < 175)) + self.assertEqual(len(result), 2) + self.assertIn("1", result) + self.assertIn("3", result) + + def test_passes_with_math_function(self): + result = self.db.filter( + Field("height").passes(lambda x: math.isclose(x, 170, abs_tol=5)) + ) + self.assertEqual(len(result), 2) + self.assertIn("1", result) + self.assertIn("3", result) + + def test_passes_with_exception_handling(self): + def risky_function(x): + return 10 / (x - 30) # Will raise ZeroDivisionError for x = 30 + + with self.assertRaises(ValueError) as context: + self.db.filter(Field("age").passes(risky_function)) + + self.assertTrue( + "Error checking condition 'risky_function'" in str(context.exception) + ) + self.assertTrue("division by zero" in str(context.exception)) + + def test_passes_with_type_checking(self): + result = self.db.filter(Field("name").passes(lambda x: isinstance(x, str))) + self.assertEqual(len(result), 3) + + def test_passes_with_no_matches(self): + result = self.db.filter(Field("age").passes(lambda x: x > 100)) + self.assertEqual(len(result), 0) + + def test_passes_with_all_matches(self): + result = self.db.filter(Field("age").passes(lambda x: x > 0)) + self.assertEqual(len(result), 3) + + def test_passes_with_lambda_and_method(self): + result = self.db.filter( + Field("name").passes(lambda x: x.lower().startswith("a")) + ) + self.assertEqual(len(result), 1) + self.assertEqual(result["1"]["name"], "Alice") + + def test_passes_with_combined_queries(self): + result = self.db.filter( + Field("age").passes(lambda x: x > 25) + & Field("skills").passes(lambda x: "Python" in x) + ) + self.assertEqual(len(result), 2) + self.assertIn("1", result) + self.assertIn("3", result) + + def test_passes_with_or_combined_queries(self): + result = self.db.filter( + Field("age").passes(lambda x: x < 26) + | Field("height").passes(lambda x: x > 175) + ) + self.assertEqual(len(result), 1) + self.assertIn("2", result) + + def test_passes_with_nonexistent_field(self): + result = self.db.filter(Field("nonexistent").passes(lambda x: x is not None)) + self.assertEqual(len(result), 0) + + def test_passes_with_none_value(self): + self.db.add({"id": 4, "name": "David", "age": None}) + result = self.db.filter(Field("age").passes(lambda x: x is None)) + self.assertEqual(len(result), 1) + self.assertEqual(result["4"]["name"], "David") + + +class TestIsType(unittest.TestCase): + def setUp(self): + self.db = EffortlessDB() + self.db.wipe() + self.db.add( + { + "id": 1, + "name": "Alice", + "age": 30, + "height": 165.5, + "is_active": True, + "skills": ["Python", "JavaScript"], + "address": {"city": "New York", "country": "USA"}, + } + ) + self.db.add( + { + "id": 2, + "name": "Bob", + "age": "25", # String instead of int + "height": "180.0", # String instead of float + "is_active": "true", # String instead of bool + "skills": "Java, C++", # String instead of list + "address": "London, UK", # String instead of dict + } + ) + + def test_is_type(self): + result = self.db.filter(Field("age").is_type(int)) + self.assertEqual(len(result), 1) + self.assertEqual(result["1"]["name"], "Alice") + + result = self.db.filter(Field("height").is_type(float)) + self.assertEqual(len(result), 1) + self.assertEqual(result["1"]["name"], "Alice") + + result = self.db.filter(Field("is_active").is_type(bool)) + self.assertEqual(len(result), 1) + self.assertEqual(result["1"]["name"], "Alice") + + result = self.db.filter(Field("skills").is_type(list)) + self.assertEqual(len(result), 1) + self.assertEqual(result["1"]["name"], "Alice") + + result = self.db.filter(Field("address").is_type(dict)) + self.assertEqual(len(result), 1) + self.assertEqual(result["1"]["name"], "Alice") + + def test_is_type_with_string(self): + result = self.db.filter(Field("age").is_type(str)) + self.assertEqual(len(result), 1) + self.assertEqual(result["2"]["name"], "Bob") + + def test_is_type_combined_query(self): + result = self.db.filter( + Field("age").is_type(int) & Field("height").is_type(float) + ) + self.assertEqual(len(result), 1) + self.assertEqual(result["1"]["name"], "Alice") + + def test_is_type_no_matches(self): + result = self.db.filter(Field("name").is_type(int)) + self.assertEqual(len(result), 0) + + if __name__ == "__main__": unittest.main()