Skip to content
This repository has been archived by the owner on Jun 1, 2023. It is now read-only.

Commit

Permalink
Merge pull request #30 from IdentityPython/dldict
Browse files Browse the repository at this point in the history
Dldict
  • Loading branch information
peppelinux authored Apr 14, 2021
2 parents d139f1f + 4c3c63b commit 23c3017
Show file tree
Hide file tree
Showing 28 changed files with 2,278 additions and 1,988 deletions.
10 changes: 1 addition & 9 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ addons:
- rustc
- cargo
install:
- pip install --upgrade pip
- pip install codecov
- pip install isort
- pip install tox
Expand All @@ -24,12 +25,3 @@ after_success:
- codecov
notifications:
email: false
deploy:
provider: pypi
on:
tags: true
distributions: bdist_wheel
skip_existing: true
user: __token__
password:
secure: AH3jGGXVjV/oOlg4cOnsN7pURlZ7JMcd3Prr69Q++rxfsrmFpxCPtQLpO0LUNPisfyctoImpY64auNMHh20AHdlnvXQu8k/YFZCVcyK6N2d66wgJbO9AOT21N6IkFGyW11K3lYIHzURv9RsTEhzSkOhKmPUack5UhSJ+yAUTZXpt6iZqXBvmxMNzNiCLQdUmTMj4HxxkUVPabpef8PLqyDXvAxJxOCss+QcJVZuWFs85Niw0scTkU4SWz2lhOxeqQNg8s+CEgje2KaIoRy2kETywK53G3RFkSp5ytIJPp8RQK039laeal5yjMsWP4KlbDhHrywyNN7yS69FwPuLC41ppde5G054WcuJTm60Y2uckGu6L3oTBMHsAtSfZuEym/qfDngxYADA+xrATJQF5XSrCz13IiBnoz8Y9zI7t9s66PZSBHg99L85jM45M2kJYCDKxNPffJ/JzCnAMTP0yiBMEQ/UfguMDfJMw+6oSPzGcZHuQVzjLO5mUni71X528Psd/iEYCyN+Vi1QbvDZjbNo/oOLtvegOcnu/H1tGWkH4uEXsg2giqkld2hrZi6K3KfcpPtltuP66Z6ohMqcLegqGUNr8mPMP2I58p7if/6xLEu1e7MNZuoV459bnWepoNMMug2NLq/WIPCiGLNCyV4tdbzqcZQLNwjc5ruFLoz8=
8 changes: 4 additions & 4 deletions src/oidcmsg/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
__author__ = 'Roland Hedberg'
__version__ = '1.2.0'
__author__ = "Roland Hedberg"
__version__ = "1.3.0"

import os

VERIFIED_CLAIM_PREFIX = '__verified'
VERIFIED_CLAIM_PREFIX = "__verified"


def verified_claim_name(claim):
return '{}_{}'.format(VERIFIED_CLAIM_PREFIX, claim)
return "{}_{}".format(VERIFIED_CLAIM_PREFIX, claim)


def proper_path(path):
Expand Down
20 changes: 10 additions & 10 deletions src/oidcmsg/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,41 +11,41 @@
def add_issuer(conf, issuer):
res = {}
for key, val in conf.items():
if key == 'abstract_storage_cls':
if key == "abstract_storage_cls":
res[key] = val
else:
_val = copy.copy(val)
_val['issuer'] = quote_plus(issuer)
_val["issuer"] = quote_plus(issuer)
res[key] = _val
return res


class OidcContext(ImpExp):
parameter = {"keyjar": KeyJar, "issuer": None}

def __init__(self, config=None, keyjar=None, entity_id=''):
def __init__(self, config=None, keyjar=None, entity_id=""):
ImpExp.__init__(self)
if config is None:
config = {}

self.issuer = entity_id
self.keyjar = self._keyjar(keyjar, conf=config, entity_id=entity_id)

def _keyjar(self, keyjar=None, conf=None, entity_id=''):
def _keyjar(self, keyjar=None, conf=None, entity_id=""):
if keyjar is None:
if 'keys' in conf:
if "keys" in conf:
args = {k: v for k, v in conf["keys"].items() if k != "uri_path"}
_keyjar = init_key_jar(**args)
else:
_keyjar = KeyJar()
if 'jwks' in conf:
_keyjar.import_jwks(conf['jwks'], '')
if "jwks" in conf:
_keyjar.import_jwks(conf["jwks"], "")

if '' in _keyjar and entity_id:
if "" in _keyjar and entity_id:
# make sure I have the keys under my own name too (if I know it)
_keyjar.import_jwks_as_json(_keyjar.export_jwks_as_json(True, ''), entity_id)
_keyjar.import_jwks_as_json(_keyjar.export_jwks_as_json(True, ""), entity_id)

_httpc_params = conf.get('httpc_params')
_httpc_params = conf.get("httpc_params")
if _httpc_params:
_keyjar.httpc_params = _httpc_params

Expand Down
2 changes: 1 addition & 1 deletion src/oidcmsg/exception.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__author__ = 'Roland Hedberg'
__author__ = "Roland Hedberg"


class OidcMsgError(Exception):
Expand Down
101 changes: 84 additions & 17 deletions src/oidcmsg/impexp.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import Any
from typing import List
from typing import Optional

from cryptojwt.utils import as_bytes
from cryptojwt.utils import importer
from cryptojwt.utils import qualified_name

Expand All @@ -9,19 +11,24 @@

class ImpExp:
parameter = {}
special_load_dump = {}
init_args = []

def __init__(self):
pass

def _dump(self, cls, item, exclude_attributes: Optional[List[str]] = None) -> dict:
if cls in [None, "", [], {}]:
val = item
def dump_attr(self, cls, item, exclude_attributes: Optional[List[str]] = None) -> dict:
if cls in [None, 0, "", [], {}, bool, b'']:
if cls == b'':
val = as_bytes(item)
else:
val = item
elif isinstance(item, Message):
val = {qualified_name(item.__class__): item.to_dict()}
elif cls == object:
val = qualified_name(item)
elif isinstance(cls, list):
val = [self._dump(cls[0], v, exclude_attributes) for v in item]
val = [self.dump_attr(cls[0], v, exclude_attributes) for v in item]
else:
val = item.dump(exclude_attributes=exclude_attributes)

Expand All @@ -31,42 +38,98 @@ def dump(self, exclude_attributes: Optional[List[str]] = None) -> dict:
_exclude_attributes = exclude_attributes or []
info = {}
for attr, cls in self.parameter.items():
if attr in _exclude_attributes:
if attr in _exclude_attributes or attr in self.special_load_dump:
continue

item = getattr(self, attr, None)
if item is None:
continue

info[attr] = self._dump(cls, item, exclude_attributes)
info[attr] = self.dump_attr(cls, item, exclude_attributes)

for attr, d in self.special_load_dump.items():
item = getattr(self, attr, None)
if item:
info[attr] = d["dump"](item, exclude_attributes=exclude_attributes)

return info

def _local_adjustments(self):
def local_load_adjustments(self, **kwargs):
pass

def _load(self, cls, item):
if cls in [None, "", [], {}]:
val = item
def load_attr(
self,
cls: Any,
item: dict,
init_args: Optional[dict] = None,
load_args: Optional[dict] = None,
) -> Any:
if load_args:
_kwargs = {"load_args": load_args}
_load_args = load_args
else:
_kwargs = {}
_load_args = {}

if init_args:
_kwargs["init_args"] = init_args

if cls in [None, 0, "", [], {}, bool, b'']:
if cls == b'':
val = as_bytes(item)
else:
val = item
elif cls == object:
val = importer(item)
elif isinstance(cls, list):
val = [cls[0]().load(v) for v in item]
if isinstance(cls[0], str):
_cls = importer(cls[0])
else:
_cls = cls[0]

if issubclass(_cls, ImpExp) and init_args:
_args = {k: v for k, v in init_args.items() if k in _cls.init_args}
else:
_args = {}

val = [_cls(**_args).load(v, **_kwargs) for v in item]
elif issubclass(cls, Message):
val = cls().from_dict(item)
_cls_name = list(item.keys())[0]
_cls = importer(_cls_name)
val = _cls().from_dict(item[_cls_name])
else:
val = cls().load(item)
if issubclass(cls, ImpExp) and init_args:
_args = {k: v for k, v in init_args.items() if k in cls.init_args}
else:
_args = {}

val = cls(**_args).load(item, **_kwargs)

return val

def load(self, item: dict):
def load(self, item: dict, init_args: Optional[dict] = None, load_args: Optional[dict] = None):

if load_args:
_kwargs = {"load_args": load_args}
_load_args = load_args
else:
_kwargs = {}
_load_args = {}

if init_args:
_kwargs["init_args"] = init_args

for attr, cls in self.parameter.items():
if attr not in item:
if attr not in item or attr in self.special_load_dump:
continue

setattr(self, attr, self._load(cls, item[attr]))
setattr(self, attr, self.load_attr(cls, item[attr], **_kwargs))

for attr, func in self.special_load_dump.items():
if attr in item:
setattr(self, attr, func["load"](item[attr], **_kwargs))

self._local_adjustments()
self.local_load_adjustments(**_load_args)
return self

def flush(self):
Expand All @@ -78,6 +141,10 @@ def flush(self):
for attr, cls in self.parameter.items():
if cls is None:
setattr(self, attr, None)
elif cls == 0:
setattr(self, attr, 0)
elif cls is bool:
setattr(self, attr, False)
elif cls == "":
setattr(self, attr, "")
elif cls == []:
Expand Down
132 changes: 132 additions & 0 deletions src/oidcmsg/item.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
from typing import List
from typing import Optional

from oidcmsg.impexp import ImpExp
from oidcmsg.message import Message
from oidcmsg.storage import importer
from oidcmsg.storage.utils import qualified_name


class DLDict(ImpExp):
parameter = {"db": {}}

def __init__(self, **kwargs):
ImpExp.__init__(self)
self.db = kwargs

def __setitem__(self, key: str, val):
self.db[key] = val

def __getitem__(self, key: str):
return self.db[key]

def __delitem__(self, key: str):
del self.db[key]

def dump(self, exclude_attributes: Optional[List[str]] = None) -> dict:
res = {}

for k, v in self.db.items():
_class = qualified_name(v.__class__)
res[k] = [_class, v.dump(exclude_attributes=exclude_attributes)]

return res

def load(
self, spec: dict, init_args: Optional[dict] = None, load_args: Optional[dict] = None
) -> "DLDict":
if load_args:
_kwargs = {"load_args": load_args}
_load_args = {}
else:
_load_args = {}
_kwargs = {}

if init_args:
_kwargs["init_args"] = init_args

for attr, (_item_cls, _item) in spec.items():
_cls = importer(_item_cls)

if issubclass(_cls, ImpExp) and init_args:
_args = {k: v for k, v in init_args.items() if k in _cls.init_args}
else:
_args = {}

_x = _cls(**_args)
_x.load(_item, **_kwargs)
self.db[attr] = _x

self.local_load_adjustments(**_load_args)

return self

def keys(self):
return self.db.keys()

def items(self):
return self.db.items()

def values(self):
return self.db.values()

def __contains__(self, item):
return item in self.db

def get(self, item, default=None):
return self.db.get(item, default)

def __len__(self):
return len(self.db)


def dump_dldict(item, exclude_attributes: Optional[List[str]] = None) -> dict:
res = {}

for k, v in item.items():
_class = qualified_name(v.__class__)
if isinstance(v, Message):
res[k] = [_class, v.to_dict()]
else:
res[k] = [_class, v.dump(exclude_attributes=exclude_attributes)]

return res


def load_dldict(
spec: dict, init_args: Optional[dict] = None, load_args: Optional[dict] = None
) -> dict:
db = {}

for attr, (_item_cls, _item) in spec.items():
_class = importer(_item_cls)
if issubclass(_class, Message):
db[attr] = _class().from_dict(_item)
else:
if issubclass(_class, ImpExp) and init_args:
_args = {k: v for k, v in init_args.items() if k in _class.init_args}
else:
_args = {}

db[attr] = _class(**_args).load(_item)

return db


def dump_class_map(item, exclude_attributes: Optional[List[str]] = None) -> dict:
_dump = {}
for key, val in item.items():
if isinstance(val, str):
_dump[key] = val
else:
_dump[key] = qualified_name(val)
return _dump


def load_class_map(
spec: dict, init_args: Optional[dict] = None, load_args: Optional[dict] = None
) -> dict:
_item = {}
for key, val in spec.items():
_item[key] = importer(val)
return _item
Loading

0 comments on commit 23c3017

Please sign in to comment.