Skip to content

Commit

Permalink
[IMP] File object / CRUD / add methods : get_file, get_folder, list, …
Browse files Browse the repository at this point in the history
…download, upload_file, delete
  • Loading branch information
luffah committed May 4, 2021
1 parent a585f29 commit f7cdc46
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 25 deletions.
201 changes: 183 additions & 18 deletions src/nextcloud/api_wrappers/webdav.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,34 @@
from nextcloud.base import WebDAVApiWrapper
from nextcloud.common.collections import PropertySet
from nextcloud.common.properties import Property as Prop, NAMESPACES_MAP
from nextcloud.common.value_parsing import timestamp_to_epoch_time
from nextcloud.common.value_parsing import (
timestamp_to_epoch_time,
datetime_to_timestamp
)


class NextCloudFileConflict(Exception):
""" Exception to raise when you try to create a File that alreay exists """


class File(PropertySet):
"""
Define properties on a WebDav file/folder
Additionnally, provide an objective CRUD API
(that probably consume more energy than fetching specific attributes)
Example :
>>> root = nxc.get_folder() # get root
>>> def _list_rec(d, indent=""):
>>> # list files recursively
>>> print("%s%s%s" % (indent, d.basename(), '/' if d.isdir() else ''))
>>> if d.isdir():
>>> for i in d.list():
>>> _list_rec(i, indent=indent+" ")
>>>
>>> _list_rec(root)
"""
_attrs = [
Prop('d:getlastmodified'),
Prop('d:getetag'),
Expand Down Expand Up @@ -56,6 +80,97 @@ def isdir(self):
""" say if the file is a directory /!\\ ressourcetype property shall be loaded """
return self.resource_type == self.COLLECTION_RESOURCE_TYPE

def get_relative_path(self):
""" get path relative to user root """
return self._wrapper.get_relative_path(self.href)

def _get_remote_path(self, path=None):
_url = self.get_relative_path()
return '/'.join([_url, path]) if path else _url

def basename(self):
""" basename """
_path = self._get_remote_path()
return _path.split('/')[-2] if _path.endswith('/') else _path.split('/')[-1]

def dirname(self):
""" dirname """
_path = self._get_remote_path()
return '/'.join(_path.split('/')[:-2]) if _path.endswith('/') else '/'.join(_path.split('/')[:-1])

def __eq__(self, b):
return self.href == b.href

# MINIMAL SET OF CRUD OPERATIONS
def get_folder(self, path=None):
"""
Get folder (see WebDav wrapper)
:param subpath: if empty list current dir
:returns: a folder (File object)
Note : To check if sub folder exists, use get_file method
"""
return self._wrapper.get_folder(self._get_remote_path(path))

def get_folder(self, path=None):
"""
Get folder (see WebDav wrapper)
:param subpath: if empty list current dir
:returns: a file or folder (File object)
"""
return self._wrapper.get_file(self._get_remote_path(path))

def list(self, subpath=''):
"""
List folder (see WebDav wrapper)
:param subpath: if empty list current dir
:returns: list of Files
"""
resp = self._wrapper.list_folders(
self._get_remote_path(subpath),
depth=1,
all_properties=True
)
if resp.is_ok and resp.data:
_dirs = resp.data
# remove current dir
if _dirs[0] == self:
_dirs = _dirs[1:]
return _dirs
return []

def upload_file(self, local_filepath, name, timestamp=None):
"""
Upload file (see WebDav wrapper)
:param name: name of the new file
:returns: True if success
"""
resp = self._wrapper.upload_file(local_filepath,
self._get_remote_path(name),
timestamp=timestamp)
return resp.is_ok

def download(self, name=None, target_dir=None):
"""
file (see WebDav wrapper)
:param name: name of the new file
:returns: True if success
"""
path = self._get_remote_path(name)
target_path, _file_info = self._wrapper.download_file(path,
target_dir=target_dir)
assert os.path.isfile(target_path), "Download failed"
return target_path

def delete(self, subpath=''):
"""
Delete file or folder (see WebDav wrapper)
:param subpath: if empty, delete current file
:returns: True if success
"""
resp = self._wrapper.delete_path(self._get_remote_path(subpath))
return resp.is_ok


class WebDAV(WebDAVApiWrapper):
""" WebDav API wrapper """
Expand Down Expand Up @@ -90,9 +205,10 @@ def list_folders(self, path=None, depth=1, all_properties=False,
resp = self.requester.propfind(additional_url=self._get_path(path),
headers={'Depth': str(depth)},
data=data)
return File.from_response(resp, json_output=(self.json_output))
return File.from_response(resp, json_output=self.json_output,
wrapper=self)

def download_file(self, path):
def download_file(self, path, target_dir=None):
"""
Download file by path (for current user)
File will be saved to working directory
Expand All @@ -108,32 +224,35 @@ def download_file(self, path):
path (str): file path
Returns:
None
a tuple (target_path, File object)
"""
if not target_dir:
target_dir='./'
filename = path.split('/')[(-1)] if '/' in path else path
file_data = self.list_folders(path=path, depth=0)
file_data = self.get_file(path)
if not file_data:
raise ValueError("Given path doesn't exist")
file_resource_type = (file_data.data[0].get('resource_type')
if self.json_output
else file_data.data[0].resource_type)
file_resource_type = file_data.resource_type
if file_resource_type == File.COLLECTION_RESOURCE_TYPE:
raise ValueError("This is a collection, please specify file path")
if filename in os.listdir('./'):
raise ValueError( "File with such name already exists in this directory")
if filename in os.listdir(target_dir):
raise ValueError(
"File with such name already exists in this directory")
filename = os.path.join(target_dir, filename)
res = self.requester.download(self._get_path(path))
with open(filename, 'wb') as f:
f.write(res.data)

# get timestamp of downloaded file from file property on Nextcloud
# If it succeeded, set the timestamp to saved local file
# If the timestamp string is invalid or broken, the timestamp is downloaded time.
file_timestamp_str = (file_data.data[0].get('last_modified')
if self.json_output
else file_data.data[0].last_modified)
file_timestamp_str = file_data.last_modified
file_timestamp = timestamp_to_epoch_time(file_timestamp_str)
if isinstance(file_timestamp, int):
os.utime(filename, (datetime.now().timestamp(), file_timestamp))
os.utime(filename, (
datetime_to_timestamp(datetime.now()),
file_timestamp))
return (filename, file_data)

def upload_file(self, local_filepath, remote_filepath, timestamp=None):
"""
Expand Down Expand Up @@ -233,7 +352,8 @@ def move_path(self, path, destination_path, overwrite=False):
requester response
"""
return self.requester.move(url=self._get_path(path),
destination=self._get_path(destination_path),
destination=self._get_path(
destination_path),
overwrite=overwrite)

def copy_path(self, path, destination_path, overwrite=False):
Expand All @@ -249,7 +369,8 @@ def copy_path(self, path, destination_path, overwrite=False):
requester response
"""
return self.requester.copy(url=self._get_path(path),
destination=self._get_path(destination_path),
destination=self._get_path(
destination_path),
overwrite=overwrite)

def set_favorites(self, path):
Expand Down Expand Up @@ -277,8 +398,10 @@ def list_favorites(self, path=''):
"""
data = File.build_xml_propfind(
instr='oc:filter-files', filter_rules={'oc': {'favorite': 1}})
resp = self.requester.report(additional_url=self._get_path(path), data=data)
return File.from_response(resp, json_output=self.json_output)
resp = self.requester.report(
additional_url=self._get_path(path), data=data)
return File.from_response(resp, json_output=self.json_output,
wrapper=self)

def get_file_property(self, path, field, tag='oc'):
"""
Expand Down Expand Up @@ -310,3 +433,45 @@ def get_file_property(self, path, field, tag='oc'):
break

return resp

def get_file(self, path):
"""
Return the File object associated to the path
:param path: path to the file
:returns: File object or None
"""
resp = self.client.with_attr(json_output=False).list_folders(
path, all_properties=True, depth=0)
if resp.is_ok:
if resp.data:
return resp.data[0]
return None

def get_folder(self, path=None):
"""
Return the File object associated to the path
If the file (folder or 'collection') doesn't exists, create it.
:param path: path to the file/folder, if empty use root
:returns: File object
"""
fileobj = self.get_file(path)
if fileobj:
if not fileobj.isdir():
raise NextCloudFileConflict(fileobj.href)
else:
self.client.create_folder(path)
fileobj = self.get_file(path)

return fileobj

def get_relative_path(self, href):
"""
Returns relative (to application / user) path
:param href(str): file href
:returns (str): relative path
"""
_app_root = '/'.join([self.API_URL, self.client.user])
return href[len(_app_root):]
13 changes: 9 additions & 4 deletions src/nextcloud/common/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
class PropertySet(object):
"""
Set of nextcloud.common.properties.Prop
defined in _attrs class variable
defined in _attrs class variable.
The inherited classes can do additionnal complex operations
if wrapper instance is defined at initialization.
"""
SUCCESS_STATUS = 'HTTP/1.1 200 OK'
COLLECTION_RESOURCE_TYPE = 'collection'
Expand All @@ -31,11 +34,12 @@ def _fetch_property(cls, key, attr='xml_key'):
if getattr(k, attr) == key:
return k

def __init__(self, xml_data, init_attrs=False):
def __init__(self, xml_data, init_attrs=False, wrapper=None):
if init_attrs:
for attr in self._attrs:
setattr(self, attr.attr_name, None)

self._wrapper = wrapper
self.href = xml_data.find('d:href', NAMESPACES_MAP).text
for propstat in xml_data.iter('{DAV:}propstat'):
if propstat.find('d:status', NAMESPACES_MAP).text != self.SUCCESS_STATUS:
Expand Down Expand Up @@ -83,15 +87,16 @@ def build_xml_propupdate(cls, values):
return SimpleXml.build_propupdate_datas(values)

@classmethod
def from_response(cls, resp, json_output=None, filtered=None, init_attrs=None):
def from_response(cls, resp, json_output=None, filtered=None,
init_attrs=None, wrapper=None):
""" Build list of PropertySet from a NextcloudResponse """
if not resp.is_ok:
resp.data = None
return resp
else:
response_data = resp.data
response_xml_data = SimpleXml.fromstring(response_data)
attr_datas = [cls(xml_data, init_attrs=init_attrs)
attr_datas = [cls(xml_data, init_attrs=init_attrs, wrapper=wrapper)
for xml_data in response_xml_data]
if filtered:
if callable(filtered):
Expand Down
7 changes: 4 additions & 3 deletions src/nextcloud/common/value_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Extra tools for value parsing
"""
from datetime import datetime
from nextcloud.compat import datetime_to_timestamp


def timestamp_to_epoch_time(rfc1123_date=''):
Expand All @@ -18,9 +19,9 @@ def timestamp_to_epoch_time(rfc1123_date=''):
int or None : Epoch time, if date string value is invalid return None
"""
try:
epoch_time = datetime.strptime(
rfc1123_date, '%a, %d %b %Y %H:%M:%S GMT').timestamp()
_time = datetime.strptime(
rfc1123_date, '%a, %d %b %Y %H:%M:%S GMT')
except ValueError:
return
else:
return int(epoch_time)
return datetime_to_timestamp(_time)
15 changes: 15 additions & 0 deletions src/nextcloud/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Tools for python2/3 unicode compatibility
"""
import six
import time


def encode_requests_password(word):
Expand Down Expand Up @@ -39,3 +40,17 @@ def encode_string(string):
if isinstance(string, six.text_type):
return string.encode('utf-8')
return string


def datetime_to_timestamp(_time):
"""
Returns int(<datetime>.timestamp())
"""
if six.PY2:
return int(
time.mktime(_time.timetuple()) + _time.microsecond/1000000.0
)
else:
return int(
_time.timestamp()
)

0 comments on commit f7cdc46

Please sign in to comment.