From e95e19d7768c264e5cec02c2644ed26b31959f22 Mon Sep 17 00:00:00 2001 From: luffah Date: Mon, 3 May 2021 18:50:40 +0200 Subject: [PATCH] Fix scrutinizer issues + fix login error --- requirements.in | 2 +- src/nextcloud/__init__.py | 38 ++++--- src/nextcloud/api_wrappers/activity.py | 2 +- src/nextcloud/api_wrappers/apps.py | 4 +- src/nextcloud/api_wrappers/group_folders.py | 12 +-- src/nextcloud/api_wrappers/systemtags.py | 46 +++++--- src/nextcloud/api_wrappers/user.py | 1 - src/nextcloud/api_wrappers/user_ldap.py | 4 +- src/nextcloud/api_wrappers/webdav.py | 5 +- src/nextcloud/base.py | 27 +++-- src/nextcloud/common/properties.py | 25 +++-- src/nextcloud/compat.py | 30 +++--- src/nextcloud/requester.py | 113 ++++++++++++-------- src/nextcloud/response.py | 23 +++- src/nextcloud/session.py | 30 ++++-- tests/README.md | 2 +- tests/__init__.py | 1 + 17 files changed, 215 insertions(+), 150 deletions(-) diff --git a/requirements.in b/requirements.in index 87d1e29..f3950c1 100644 --- a/requirements.in +++ b/requirements.in @@ -1,2 +1,2 @@ requests>=2.0.1 -pytest +pytest>=4.6 diff --git a/src/nextcloud/__init__.py b/src/nextcloud/__init__.py index aa8cdd3..44f3cfd 100644 --- a/src/nextcloud/__init__.py +++ b/src/nextcloud/__init__.py @@ -31,21 +31,17 @@ class NextCloud(object): ... # some actions # """ - def __init__(self, endpoint, user=None, password=None, json_output=True, auth=None, session_kwargs=None): - self.query_components = [] - self._session = Session( + def __init__(self, endpoint=None, + user=None, password=None, json_output=True, auth=None, + session_kwargs=None, + session=None): + self.session = session or Session( url=endpoint, user=user, password=password, auth=auth, session_kwargs=session_kwargs ) self.json_output = json_output for functionality_class in API_WRAPPER_CLASSES: - json_able = getattr(functionality_class, 'JSON_ABLE', False) - require_client = getattr( - functionality_class, 'REQUIRE_CLIENT', False) - functionality_instance = functionality_class( - self._session, - json_output=(json_able and json_output), - client=(require_client and self)) + functionality_instance = functionality_class(self) for potential_method in dir(functionality_instance): if not potential_method.startswith('_'): if not callable(getattr(functionality_instance, potential_method)): @@ -56,11 +52,11 @@ def __init__(self, endpoint, user=None, password=None, json_output=True, auth=No @property def user(self): - return self._session.user + return self.session.user @property def url(self): - return self._session.url + return self.session.url def __enter__(self): self.login() @@ -71,14 +67,22 @@ def __exit__(self, *args): def login(self, user=None, password=None, auth=None): self.logout() - self._session.login(user=user, password=password, auth=auth) + return self.session.login(user=user, password=password, auth=auth, + client=self) + + def with_attr(self, **kwargs): + if 'auth' in kwargs: + return self.with_auth(**kwargs) + if 'session_kwargs' in kwargs: + return self.with_auth(self.session.auth, **kwargs) + return self.__class__(session=self.session, **kwargs) def with_auth(self, auth=None, **kwargs): - init_kwargs = {'session_kwargs': self._session._session_kwargs, + init_kwargs = {'session_kwargs': self.session._session_kwargs, 'json_output': self.json_output} init_kwargs.update(kwargs) - return (self.__class__)(self._session.url, auth=auth, **init_kwargs) + return self.__class__(self.session.url, auth=auth, **init_kwargs) def logout(self): - if self._session.session: - self._session.logout() + if self.session.session: + self.session.logout() diff --git a/src/nextcloud/api_wrappers/activity.py b/src/nextcloud/api_wrappers/activity.py index bff1975..141b3fe 100644 --- a/src/nextcloud/api_wrappers/activity.py +++ b/src/nextcloud/api_wrappers/activity.py @@ -39,5 +39,5 @@ def get_activities(self, since=None, limit=None, object_type=None, object_id=Non sort=sort ) if params['object_type'] and params['object_id']: - return self.requester.get(url="filter", params=params) + return self.requester.get(url="filter", params=params) return self.requester.get(params=params) diff --git a/src/nextcloud/api_wrappers/apps.py b/src/nextcloud/api_wrappers/apps.py index 64946a8..e3ac3be 100644 --- a/src/nextcloud/api_wrappers/apps.py +++ b/src/nextcloud/api_wrappers/apps.py @@ -16,7 +16,9 @@ def get_apps(self, filter=None): :param filter: str, optional "enabled" or "disabled" :return: """ - params = {"filter": filter} + params = { + "filter": filter + } return self.requester.get(params=params) def get_app(self, app_id): diff --git a/src/nextcloud/api_wrappers/group_folders.py b/src/nextcloud/api_wrappers/group_folders.py index 1008aeb..8b3fb28 100644 --- a/src/nextcloud/api_wrappers/group_folders.py +++ b/src/nextcloud/api_wrappers/group_folders.py @@ -75,8 +75,8 @@ def set_permissions_to_group_folder(self, fid, gid, permissions): :param permissions (int): The new permissions for the group as attribute of Permission class :returns: resquester response """ - url = '/'.join([str(fid), 'groups', gid]) - return self.requester.post(url=url, data={'permissions': permissions}) + url = "/".join([str(fid), "groups", gid]) + return self.requester.post(url=url, data={"permissions": permissions}) def set_quota_of_group_folder(self, fid, quota): """ @@ -86,8 +86,8 @@ def set_quota_of_group_folder(self, fid, quota): :param quota (int/str): The new quota for the folder in bytes, user -3 for unlimited :returns: resquester response """ - url = '/'.join([str(fid), 'quota']) - return self.requester.post(url, {'quota': quota}) + url = "/".join([str(fid), "quota"]) + return self.requester.post(url, {"quota": quota}) def rename_group_folder(self, fid, mountpoint): """ @@ -97,5 +97,5 @@ def rename_group_folder(self, fid, mountpoint): :param mountpoint (str): name for the new folder :returns: resquester response """ - url = '/'.join([str(fid), 'mountpoint']) - return self.requester.post(url=url, data={'mountpoint': mountpoint}) + url = "/".join([str(fid), "mountpoint"]) + return self.requester.post(url=url, data={"mountpoint": mountpoint}) diff --git a/src/nextcloud/api_wrappers/systemtags.py b/src/nextcloud/api_wrappers/systemtags.py index e38ace9..7eb404c 100644 --- a/src/nextcloud/api_wrappers/systemtags.py +++ b/src/nextcloud/api_wrappers/systemtags.py @@ -10,6 +10,7 @@ class Tag(PropertySet): + """ Define a Tag properties""" _attrs = [ Prop('oc:id'), Prop('oc:display-name', json='name', default='default_tag_name'), @@ -22,19 +23,27 @@ class Tag(PropertySet): class SystemTags(WebDAVApiWrapper): """ SystemTags API wrapper """ API_URL = '/remote.php/dav/systemtags' - JSON_ABLE = True def get_sytemtag(self, name, fields=None, json_output=None): + """ + Get attributes of a nammed tag + + :param name (str): tag name + :param fields (str): field names + :returns: requester response with Tag in data + """ if not fields: fields = Tag._fields - resp = self.requester.propfind(data=Tag.build_xml_propfind( - fields={'oc': ['display-name'] + fields})) + resp = self.requester.propfind( + data=Tag.build_xml_propfind(fields={ + 'oc': ['display-name'] + fields + })) if json_output is None: json_output = self.json_output return Tag.from_response(resp, json_output=json_output, init_attrs=True, - filtered=(lambda t: t.display_name == name)) + filtered=lambda t: t.display_name == name) def get_systemtags(self): """ @@ -43,8 +52,9 @@ def get_systemtags(self): :returns: requester response with Tag in data """ resp = self.requester.propfind( - data=Tag.build_xml_propfind(use_default=True)) - return Tag.from_response(resp, json_output=(self.json_output)) + data=Tag.build_xml_propfind(use_default=True) + ) + return Tag.from_response(resp, json_output=self.json_output) def create_systemtag(self, name, **kwargs): """ @@ -53,9 +63,12 @@ def create_systemtag(self, name, **kwargs): :param name: tag name :returns: requester response with tag id as data """ - data = (Tag.default_get)(name=name, **kwargs) - resp = self.requester.post(data=(json.dumps(data)), headers={ - 'Content-Type': 'application/json'}) + data = Tag.default_get(name=name, **kwargs) + resp = self.requester.post( + data=json.dumps(data), + headers={ + 'Content-Type': 'application/json' + }) if resp.is_ok: resp.data = int( resp.raw.headers['Content-Location'].split('/')[(-1)]) @@ -64,7 +77,7 @@ def create_systemtag(self, name, **kwargs): def delete_systemtag(self, name=None, tag_id=None): """ Delete systemtag - + :param name (str): tag name, not required it tag_id is provided :tag_id (int): tag id, not required if name is provided @@ -74,16 +87,15 @@ def delete_systemtag(self, name=None, tag_id=None): resp = self.get_sytemtag(name, ['id'], json_output=False) if resp.data: tag_id = resp.data[0].id - elif tag_id: - resp = self.requester.delete(url=(str(tag_id))) + if not tag_id: # lint only + return None + resp = self.requester.delete(url=(str(tag_id))) return resp class SystemTagsRelation(WebDAVApiWrapper): """ SystemTagsRelation API wrapper """ API_URL = '/remote.php/dav/systemtags-relations/files' - JSON_ABLE = True - REQUIRE_CLIENT = True def _get_fileid_from_path(self, path): """ Tricky function to fetch file """ @@ -121,7 +133,8 @@ def get_systemtags_relation(self, file_id=None, **kwargs): :returns: requester response with Tag in data """ - file_id, = self._arguments_get(['file_id'], locals()) + file_id, = self._arguments_get(['file_id'], dict(file_id=file_id, + **kwargs)) data = Tag.build_xml_propfind() resp = self.requester.propfind(additional_url=file_id, data=data) return Tag.from_response(resp, json_output=(self.json_output)) @@ -138,7 +151,7 @@ def delete_systemtags_relation(self, file_id=None, tag_id=None, **kwargs): :returns: requester response """ file_id, tag_id = self._arguments_get([ - 'file_id', 'tag_id'], locals()) + 'file_id', 'tag_id'], dict(file_id=file_id, tag_id=tag_id, **kwargs)) resp = self.requester.delete(url=('{}/{}'.format(file_id, tag_id))) return resp @@ -163,6 +176,5 @@ def add_systemtags_relation(self, file_id=None, tag_id=None, **kwargs): tag_id = resp.data if not file_id: raise ValueError('No file found') - data = Tag.build_xml_propfind() resp = self.requester.put(url=('{}/{}'.format(file_id, tag_id))) return resp diff --git a/src/nextcloud/api_wrappers/user.py b/src/nextcloud/api_wrappers/user.py index 0338949..4071816 100644 --- a/src/nextcloud/api_wrappers/user.py +++ b/src/nextcloud/api_wrappers/user.py @@ -9,7 +9,6 @@ class User(base.ProvisioningApiWrapper): """ User API wrapper """ API_URL = "/ocs/v1.php/cloud/users" - REQUIRE_CLIENT = True def add_user(self, uid, passwd): """ diff --git a/src/nextcloud/api_wrappers/user_ldap.py b/src/nextcloud/api_wrappers/user_ldap.py index d3af4d1..982233a 100644 --- a/src/nextcloud/api_wrappers/user_ldap.py +++ b/src/nextcloud/api_wrappers/user_ldap.py @@ -126,9 +126,7 @@ def edit_ldap_config(self, config_id, data): :returns: requester response """ - prepared_data = { - 'configData[{}]'.format(key): value - for key, value in data.items()} + prepared_data = {'configData[{}]'.format(key): value for key, value in data.items()} return self.requester.put(config_id, data=prepared_data) def ldap_cache_flush(self, config_id): diff --git a/src/nextcloud/api_wrappers/webdav.py b/src/nextcloud/api_wrappers/webdav.py index c10d08c..45e3f1c 100644 --- a/src/nextcloud/api_wrappers/webdav.py +++ b/src/nextcloud/api_wrappers/webdav.py @@ -46,13 +46,12 @@ def _extract_resource_type(file_property): file_type = list(file_property) if file_type: return re.sub('{.*}', '', file_type[0].tag) + return 'file' class WebDAV(WebDAVApiWrapper): """ WebDav API wrapper """ API_URL = "/remote.php/dav/files" - JSON_ABLE = True - REQUIRE_CLIENT = True def _get_path(self, path): if path: @@ -287,7 +286,7 @@ def get_file_property(self, path, field, tag='oc'): if ':' in field: tag, field = field.split(':') get_file_prop_xpath = '{DAV:}propstat/d:prop/%s:%s' % (tag, field) - data = (File.build_xml_propfind)(**{tag: [field]}) + data = File.build_xml_propfind(**{tag: [field]}) resp = self.requester.propfind(additional_url=(self._get_path(path)), headers={'Depth': str(0)}, data=data) response_data = resp.data diff --git a/src/nextcloud/base.py b/src/nextcloud/base.py index 6f76319..2beac81 100644 --- a/src/nextcloud/base.py +++ b/src/nextcloud/base.py @@ -10,11 +10,12 @@ class MetaWrapper(type): - def __new__(meta, name, bases, attrs): - cls = type.__new__(meta, name, bases, attrs) - if (cls.API_URL != NotImplementedError and cls.VERIFIED): - API_WRAPPER_CLASSES.append(cls) - return cls + """ Meta class to register wrappers """ + def __new__(cls, name, bases, attrs): + new_cls = type.__new__(cls, name, bases, attrs) + if (new_cls.API_URL != NotImplementedError and new_cls.VERIFIED): + API_WRAPPER_CLASSES.append(new_cls) + return new_cls class BaseApiWrapper(object, six.with_metaclass(MetaWrapper)): @@ -40,19 +41,19 @@ class BaseApiWrapper(object, six.with_metaclass(MetaWrapper)): API_URL = NotImplementedError VERIFIED = True JSON_ABLE = True - REQUIRE_CLIENT = False - REQUIRE_USER = False REQUESTER = Requester - def __init__(self, session, json_output=None, client=None, user=None): - self.json_output = json_output + def __init__(self, client=None): self.client = client - self.user = user - self.requester = self.REQUESTER(session, json_output=json_output) + self.requester = self.REQUESTER(self) for attr_name in ['API_URL', 'SUCCESS_CODE', 'METHODS_SUCCESS_CODES']: setattr(self.requester, attr_name, getattr(self, attr_name, None)) + @property + def json_output(self): + return self.JSON_ABLE and self.client.json_output + def _arguments_get(self, varnames, vals): """ allows to automatically fetch values of varnames @@ -66,6 +67,10 @@ def _arguments_get(self, varnames, vals): >>> return self.get_file_id_from_name(vals.get('name', None)) >>> >>> nxc.get_file_id(name='foo.bar') + + :param varmames: list of wanted python var names + :param vals: a dict object containing already set variables + :returns: list of wanted values """ if 'kwargs' in vals: vals.update(vals['kwargs']) diff --git a/src/nextcloud/common/properties.py b/src/nextcloud/common/properties.py index 0b1487b..6bf5fec 100644 --- a/src/nextcloud/common/properties.py +++ b/src/nextcloud/common/properties.py @@ -45,7 +45,8 @@ class Property(object, six.with_metaclass(MetaProperty)): def __init__(self, xml_name, json=None, default=None, parse_xml_value=None): if ':' in xml_name: (self.ns, self.xml_key) = xml_name.split(':') - self._name_convention = NAMESPACES_CLASSES[self.ns] + if self.ns in NAMESPACES_CLASSES: + self._name_convention = NAMESPACES_CLASSES[self.ns]._name_convention else: self.xml_key = xml_name if self.namespace: @@ -56,16 +57,23 @@ def __init__(self, xml_name, json=None, default=None, parse_xml_value=None): self.default_val = default self.parse_xml_value = parse_xml_value - @classmethod - def _xml_name_to_py_name(cls, name): - if name in cls._name_convention: - return cls._name_convention[name] + def __repr__(self): + return "<{}: ns={}, xml={}, py={}, json={}>".format( + self.__class__.__name__, + self.ns, + self.attr_name, + self.xml_key, + self.json_key + ) + + def _xml_name_to_py_name(self, name): + if name in self._name_convention: + return self._name_convention[name] else: return name.replace('-', '_') - @classmethod - def _py_name_to_xml_name(cls, name): - _reversed_convention = {v: k for k, v in cls._name_convention.items()} + def _py_name_to_xml_name(self, name): + _reversed_convention = {v: k for k, v in self._name_convention.items()} if name in _reversed_convention: return _reversed_convention[name] else: @@ -105,6 +113,7 @@ class DProp(Property): 'getcontentlength': 'content_length' } + class OCProp(Property): """ OwnCloud property """ namespace = ('oc', 'http://owncloud.org/ns') diff --git a/src/nextcloud/compat.py b/src/nextcloud/compat.py index 8ef00f1..4145d32 100644 --- a/src/nextcloud/compat.py +++ b/src/nextcloud/compat.py @@ -14,18 +14,18 @@ def encode_requests_password(word): """ if isinstance(word, bytes): return word + + ret = word + if six.PY2: + if isinstance(word, six.text_type): + # trick to work with tricks in requests lib + ret = word.encode('utf-8').decode('latin-1') else: - ret = word - if six.PY2: - if isinstance(word, unicode): - # trick to work with tricks in requests lib - ret = word.encode('utf-8').decode('latin-1') - else: - try: - ret = bytes(word, 'ascii') - except UnicodeEncodeError: - ret = bytes(word, 'utf-8') - return ret + try: + ret = bytes(word, 'ascii') + except UnicodeEncodeError: + ret = bytes(word, 'utf-8') + return ret def encode_string(string): @@ -36,12 +36,6 @@ def encode_string(string): :returns : encoded output as str """ if six.PY2: - if isinstance(string, unicode): + if isinstance(string, six.text_type): return string.encode('utf-8') return string - -# from six.moves.urllib import parse -# def prepare_url(s): -# if six.PY2 and isinstance(s, unicode): # noqa: F821 -# return parse.urlparse(s).path -# return s diff --git a/src/nextcloud/requester.py b/src/nextcloud/requester.py index 8fff845..3d061c4 100644 --- a/src/nextcloud/requester.py +++ b/src/nextcloud/requester.py @@ -1,48 +1,66 @@ # -*- coding: utf-8 -*- +""" +Define requesters +""" from .response import WebDAVResponse, OCSResponse from .compat import encode_string from .session import catch_connection_error -def _prepare_url(s): - return encode_string(s) +# from six.moves.urllib import parse +def _prepare_url(string): + return encode_string(string) +# if six.PY2 and isinstance(string, unicode): # noqa: F821 +# return parse.urlparse(string).path +# return s + class Requester(object): + """ Base requester """ - def __init__(self, session, json_output=None, url=None, - success_code=None): + def __init__(self, wrapper): self.query_components = [] - self.h_get = {'OCS-APIRequest': 'true'} - self.h_post = {'OCS-APIRequest': 'true', - 'Content-Type': 'application/x-www-form-urlencoded'} - self.session = session - self.json_output = None + self.h_get = {"OCS-APIRequest": "true"} + self.h_post = {"OCS-APIRequest": "true", + "Content-Type": "application/x-www-form-urlencoded"} + self.wrapper = wrapper self.API_URL = None self.SUCCESS_CODE = None + @property + def json_output(self): + return self.wrapper.json_output + + @property + def client(self): + return self.wrapper.client + + @property + def session(self): + return self.wrapper.client.session + def rtn(self, resp): if self.json_output: return resp.json() - else: - return resp.content.decode('UTF-8') + return resp.content.decode("UTF-8") @catch_connection_error - def get(self, url='', params=None, headers=None): + def get(self, url="", params=None, headers=None): url = self.get_full_url(url) res = self.session.request('get', url, headers=(headers or self.h_get), params=params) return self.rtn(res) @catch_connection_error - def post(self, url='', data=None, headers=None): + def post(self, url="", data=None, headers=None): url = self.get_full_url(url) res = self.session.request( 'post', url, data=data, headers=(headers or self.h_post)) return self.rtn(res) @catch_connection_error - def put_with_timestamp(self, url='', data=None, timestamp=None, headers=None): + def put_with_timestamp(self, url="", data=None, timestamp=None, headers=None): h_post = headers or self.h_post if isinstance(timestamp, (float, int)): h_post['X-OC-MTIME'] = '%.0f' % timestamp @@ -51,20 +69,20 @@ def put_with_timestamp(self, url='', data=None, timestamp=None, headers=None): return self.rtn(res) @catch_connection_error - def put(self, url='', data=None, headers=None): + def put(self, url="", data=None, headers=None): url = self.get_full_url(url) res = self.session.request( 'put', url, data=data, headers=(headers or self.h_post)) return self.rtn(res) @catch_connection_error - def delete(self, url='', data=None, headers=None): + def delete(self, url="", data=None, headers=None): url = self.get_full_url(url) res = self.session.request( 'delete', url, data=data, headers=(headers or self.h_post)) return self.rtn(res) - def get_full_url(self, additional_url=''): + def get_full_url(self, additional_url=""): """ Build full url for request to NextCloud api @@ -77,85 +95,86 @@ def get_full_url(self, additional_url=''): """ if isinstance(additional_url, int): additional_url = str(additional_url) - else: - if additional_url: - additional_url = _prepare_url(additional_url) - if not additional_url.startswith('/'): - additional_url = '/{}'.format(additional_url) - if self.json_output: - self.query_components.append('format=json') - ret = '{base_url}{api_url}{additional_url}'.format(base_url=(self.session.url), - api_url=( - self.API_URL), - additional_url=additional_url) - if self.json_output: - ret += '?format=json' + + if additional_url: + additional_url = _prepare_url(additional_url) + if not additional_url.startswith("/"): + additional_url = "/{}".format(additional_url) + if self.json_output: + self.query_components.append("format=json") + ret = "{base_url}{api_url}{additional_url}".format(base_url=(self.session.url), + api_url=( + self.API_URL), + additional_url=additional_url) + if self.json_output: + ret += "?format=json" + return ret class OCSRequester(Requester): - __doc__ = ' Requester for OCS API ' + """ Requester for OCS API """ def rtn(self, resp): - return OCSResponse(response=resp, json_output=(self.json_output), - success_code=(self.SUCCESS_CODE)) + return OCSResponse(response=resp, + json_output=self.json_output, success_code=self.SUCCESS_CODE) class WebDAVRequester(Requester): - __doc__ = ' Requester for WebDAV API ' + """ Requester for WebDAV API """ def __init__(self, *args, **kwargs): - (super(WebDAVRequester, self).__init__)(*args, **kwargs) + super(WebDAVRequester, self).__init__(*args, **kwargs) def rtn(self, resp, data=None): return WebDAVResponse(response=resp, data=data, success_code=self.SUCCESS_CODE) @catch_connection_error - def propfind(self, additional_url='', headers=None, data=None): + def propfind(self, additional_url="", headers=None, data=None): url = self.get_full_url(additional_url=additional_url) res = self.session.request('PROPFIND', url, headers=headers, data=data) return self.rtn(res) @catch_connection_error - def proppatch(self, additional_url='', data=None): + def proppatch(self, additional_url="", data=None): url = self.get_full_url(additional_url=additional_url) res = self.session.request('PROPPATCH', url, data=data) return self.rtn(resp=res) @catch_connection_error - def report(self, additional_url='', data=None): + def report(self, additional_url="", data=None): url = self.get_full_url(additional_url=additional_url) res = self.session.request('REPORT', url, data=data) return self.rtn(resp=res) @catch_connection_error - def download(self, url='', params=None): + def download(self, url="", params=None): url = self.get_full_url(url) res = self.session.request( 'get', url, headers=(self.h_get), params=params) return self.rtn(resp=res, data=(res.content)) @catch_connection_error - def make_collection(self, additional_url=''): + def make_collection(self, additional_url=""): url = self.get_full_url(additional_url=additional_url) - res = self.session.request('MKCOL', url=url) + res = self.session.request("MKCOL", url=url) return self.rtn(resp=res) @catch_connection_error def move(self, url, destination, overwrite=False): url = self.get_full_url(additional_url=url) destination_url = self.get_full_url(additional_url=destination) - headers = {'Destination': destination_url.encode('utf-8'), - 'Overwrite': 'T' if overwrite else 'F'} - res = self.session.request('MOVE', url=url, headers=headers) + headers = {"Destination": destination_url.encode("utf-8"), + "Overwrite": "T" if overwrite else "F"} + res = self.session.request("MOVE", url=url, headers=headers) return self.rtn(resp=res) @catch_connection_error def copy(self, url, destination, overwrite=False): url = self.get_full_url(additional_url=url) destination_url = self.get_full_url(additional_url=destination) - headers = {'Destination': destination_url.encode('utf-8'), - 'Overwrite': 'T' if overwrite else 'F'} - res = self.session.request('COPY', url=url, headers=headers) + headers = {"Destination": destination_url.encode("utf-8"), + "Overwrite": "T" if overwrite else "F"} + res = self.session.request("COPY", url=url, headers=headers) return self.rtn(resp=res) diff --git a/src/nextcloud/response.py b/src/nextcloud/response.py index 3883cee..5cd8699 100644 --- a/src/nextcloud/response.py +++ b/src/nextcloud/response.py @@ -1,4 +1,7 @@ # -*- coding: utf-8 -*- +""" +Define requests responses (automatically check if the request is OK) +""" try: from json import JSONDecodeError except ImportError: @@ -6,11 +9,17 @@ class BaseResponse(object): + """ + Base Response that take HTTP reponse and take the following attrs + - raw : the raw response + - status_code : the HTTP code + - data : the asked data (json or xml value) + - is_ok : True if the request is succesfully achieved + """ def __init__(self, response, data=None, json_output=True, status_code=None, success_code=None, **kwargs): self.raw = response - print(self.raw.content) self.data = data or ( response.json() if json_output else response.content.decode('UTF-8') ) @@ -32,17 +41,23 @@ def _compute_is_ok(self, success_code): self.is_ok = self.status_code in success_codes def __repr__(self): - is_ok_str = 'OK' if self.is_ok else 'Failed' - return '<{}: Status: {}>'.format(self.__class__.__name__, is_ok_str) + is_ok_str = "OK" if self.is_ok else "Failed" + return "<{}: Status: {}>".format(self.__class__.__name__, is_ok_str) class OCSResponse(BaseResponse): - """ Response class for OCS api methods """ + """ + Response class for OCS api methods + Add some attributes: + - meta : ocs json metadata + - full_data : json data of the ocs response + """ def __init__(self, response, json_output=True, success_code=None): data = None full_data = None meta = None + status_code = None if (success_code or json_output): try: diff --git a/src/nextcloud/session.py b/src/nextcloud/session.py index cd80c1c..983cc5d 100644 --- a/src/nextcloud/session.py +++ b/src/nextcloud/session.py @@ -7,6 +7,8 @@ class NextCloudConnectionError(Exception): """ A connection error occurred """ +class NextCloudLoginError(Exception): + """ A login error occurred """ def catch_connection_error(func): @@ -27,6 +29,7 @@ class Session(object): def __init__(self, url=None, user=None, password=None, auth=None, session_kwargs=None): self.session = None self.auth = None + self.user = None self._set_credentials(user, password, auth) self.url = url self._session_kwargs = session_kwargs or {} @@ -48,20 +51,21 @@ def _set_credentials(self, user, password, auth): def request(self, method, url, **kwargs): if self.session: - return (self.session.request)(method=method, url=url, **kwargs) + return self.session.request(method=method, url=url, **kwargs) else: _kwargs = self._session_kwargs _kwargs.update(kwargs) if not kwargs.get('auth', False): _kwargs['auth'] = self.auth - return (requests.request)(method, url, **_kwargs) + return requests.request(method, url, **_kwargs) - def login(self, user=None, password=None, auth=None): + def login(self, user=None, password=None, auth=None, client=None): """Create a stable session on the server. :param user_id: user id :param password: password :param auth: object for any auth method + :param client: object for any auth method :raises: HTTPResponseError in case an HTTP error status was returned """ self.session = requests.Session() @@ -70,14 +74,18 @@ def login(self, user=None, password=None, auth=None): self._set_credentials(user, password, auth) self.session.auth = self.auth - try: - resp = self.session.post(self.url) - except requests.exceptions.SSLError as e: - self.logout() - raise e - except Exception as e: - self.logout() - raise e + if client: + try: + resp = client.with_attr(json_output=True).get_user() + if not resp.is_ok: + raise NextCloudLoginError( + 'Failed to login to NextCloud', self.url, resp) + except requests.exceptions.SSLError as e: + self.logout() + raise e + except Exception as e: + self.logout() + raise e def logout(self): """Log out the authenticated user and close the session. diff --git a/tests/README.md b/tests/README.md index 9fe1399..3ad5aa6 100644 --- a/tests/README.md +++ b/tests/README.md @@ -19,4 +19,4 @@ Run tests: Run examples: - docker-compose run --rm python-api python ../example.py + docker-compose run --rm python-api python ../examples/user_management.py diff --git a/tests/__init__.py b/tests/__init__.py index 68ecb7d..38f9fcd 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,3 +1,4 @@ +""" Tests for NextCloud-Api module """ import sys from os.path import dirname from os.path import join