Skip to content

Commit

Permalink
Enable fluent programming e.g. save().close()
Browse files Browse the repository at this point in the history
added test for wrong password
  • Loading branch information
ckunki committed Sep 27, 2023
1 parent 0dd7b68 commit a54ec1a
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 26 deletions.
21 changes: 10 additions & 11 deletions secret-store/secret_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ class Table:
columns: list[str]


CONFIG_ITEMS_TABLE = Table("config_items", ["item"])
SECRETS_TABLE = Table("secrets", ["user", "password"])
CONFIG_ITEMS_TABLE = Table("config_items", ["item"])


@dataclass(frozen=True)
Expand Down Expand Up @@ -80,7 +80,7 @@ def create_tables(self) -> None:
for table in (SECRETS_TABLE, CONFIG_ITEMS_TABLE):
self.create_table(table)

def _save_data(self, table: Table, key: str, data: list[str]) -> None:
def _save_data(self, table: Table, key: str, data: list[str]) -> "Secrets":
cur = self.cursor()
res = cur.execute(f"SELECT * FROM {table.name} WHERE key=?", [key])
if res and res.fetchone():
Expand All @@ -94,20 +94,19 @@ def _save_data(self, table: Table, key: str, data: list[str]) -> None:
f"INSERT INTO {table.name} VALUES (?, {columns})",
[key] + data)
self.connection().commit()
return self

# def save_config_item(self, key: str, item: str) -> None:
# self._save_data(CONFIG_ITEMS_TABLE, key, [item])
#
# def save_credentials(self, key: str, user: str, password: str) -> None:
# self._save_data(SECRETS_TABLE, key, [user, password])

def save(self, key: str, data: Union[str, Credentials]) -> None:
def save(self, key: str, data: Union[str, Credentials]) -> "Secrets":
if isinstance(data, str):
self._save_data(CONFIG_ITEMS_TABLE, key, [data])
return
return self._save_data(CONFIG_ITEMS_TABLE, key, [data])
if isinstance(data, Credentials):
self._save_data(SECRETS_TABLE, key, [data.user, data.password])
return
return self._save_data(SECRETS_TABLE, key, [data.user, data.password])
raise Exception("Unsupported type of data: " + type(data).__name__)

def _get_data(self, table: Table, key: str) -> Optional[list[str]]:
Expand All @@ -128,19 +127,19 @@ def get_config_item(self, key: str) -> Optional[str]:

# def sample_usage():
# secrets = Secrets("mydb.db", master_password="my secret master password")
#
#
# c = secrets.get_credentials("aws")
# print(f'old value of aws credentials {c}')
# secrets.save("aws", Credentials("user-a", "pwd-aaa"))
# c = secrets.get_credentials("aws")
# print(f'aws credentials: {c}')
#
#
# c = secrets.get_config_item("url")
# print(f'old value of config item "url" {c}')
# secrets.save("url", "http://def")
# c = secrets.get_config_item("url")
# print(f'config item url: {c}')
#
#
#
#
# if __name__ == "__main__":
# sample_usage()
35 changes: 20 additions & 15 deletions secret-store/test/test_secret_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import pytest
from pathlib import Path
from secret_store import Secrets, Credentials
from sqlcipher3 import dbapi2 as sqlcipher


@pytest.fixture
Expand Down Expand Up @@ -30,33 +31,37 @@ def test_database_file_from_config_item(secrets):

def test_credentials(secrets):
credentials = Credentials("user", "password")
secrets.save("app", credentials)
secrets.close()
assert secrets.get_credentials("app") == credentials
secrets.save("key", credentials).close()
assert secrets.get_credentials("key") == credentials


def test_config_item(secrets):
config_item = "some configuration"
secrets.save("url", config_item)
secrets.close()
assert secrets.get_config_item("url") == config_item
secrets.save("key", config_item).close()
assert secrets.get_config_item("key") == config_item


def test_update_credentials(secrets):
initial = Credentials("user", "password")
secrets.save("app", initial)
secrets.close()
secrets.save("key", initial).close()
other = Credentials("other", "changed")
secrets.save("app", other)
secrets.save("key", other)
secrets.close()
assert secrets.get_credentials("app") == other
assert secrets.get_credentials("key") == other


def test_update_config_item(secrets):
initial = "initial value"
secrets.save("url", initial)
secrets.close()
secrets.save("key", initial).close()
other = "other value"
secrets.save("url", other)
secrets.close()
assert secrets.get_config_item("url") == other
secrets.save("key", other).close()
assert secrets.get_config_item("key") == other


def test_wrong_password(sample_file):
secrets = Secrets(sample_file, "correct password")
secrets.save("key", Credentials("usr", "pass")).close()
invalid = Secrets(sample_file, "wrong password")
with pytest.raises(sqlcipher.DatabaseError) as ex:
invalid.get_credentials("key")
assert "file is not a database" == str(ex.value)

0 comments on commit a54ec1a

Please sign in to comment.