forked from datopian/deploy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
flow.py
159 lines (131 loc) · 5.1 KB
/
flow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#!/usr/bin/python
# -*- coding: utf-8 -*-
import json
import os
import subprocess
import git
import tempfile
import shutil
import requests
import re
class Flow(object):
def __init__(self):
# self.config = json.load(open(configfile))
self.tmpdir = tempfile.mkdtemp()
def check_flow(self):
"""Check flow for dpm - publish, tag, delete, undelete, purge"""
git.Repo.clone_from('https://github.com/zelima/country-continents.git', self.tmpdir)
with cd(self.tmpdir):
# publish
out = subprocess.check_output(['dpm', 'publish'])
url = out[out.find('http'):].replace('\n', '')
res = requests.get(url)
# get url for datapackage.json on s3
bit_store_urls = re.findall(
'http[s]?://bits(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+.json', res.text
)
assert (res.status_code == 200)
assert (len(bit_store_urls))
res = requests.get(bit_store_urls[0])
assert (res.status_code == 200)
print 'Successfully published'
# delete and undelete
subprocess.check_output(['dpm', 'delete'])
res = requests.get(url)
assert (res.status_code == 404)
res = requests.get(bit_store_urls[0])
assert (res.status_code == 403)
print 'Successfully deleted'
subprocess.check_output(['dpm', 'undelete'])
res = requests.get(url)
assert (res.status_code == 200)
res = requests.get(bit_store_urls[0])
assert (res.status_code == 200)
print 'Successfully undeleted'
# purge
subprocess.check_output(['dpm', 'purge'])
res = requests.get(url)
assert (res.status_code == 404)
res = requests.get(bit_store_urls[0])
assert (res.status_code == 404)
print 'Successfully purged'
# publish and tag
subprocess.check_output(['dpm', 'publish'])
subprocess.check_output(['dpm', 'tag', 'testing'])
res = requests.get(url)
bit_store_urls = re.findall(
'http[s]?://bits(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+.json', res.text
)
assert (res.status_code == 200)
assert (len(bit_store_urls))
# for some reasons urls are still pointing to _v/latest. commenting for now
# assert('testing' in bit_store_urls[0])
print 'Successfully published and tagged'
# delete and undelete taged datapackage
subprocess.check_output(['dpm', 'delete'])
res = requests.get(url)
assert (res.status_code == 404)
res = requests.get(bit_store_urls[0])
assert (res.status_code == 403)
print 'Successfully deleted the tagged version'
subprocess.check_output(['dpm', 'undelete'])
res = requests.get(url)
assert (res.status_code == 200)
res = requests.get(bit_store_urls[0])
assert (res.status_code == 200)
print 'Successfully undeleted the tagged version'
subprocess.check_output(['dpm', 'purge'])
res = requests.get(url)
assert (res.status_code == 404)
res = requests.get(bit_store_urls[0])
assert (res.status_code == 404)
print 'Successfully purged the tagged version'
shutil.rmtree(self.tmpdir)
# ==============================================
# CD
class cd:
"""Context manager for changing the current working directory"""
def __init__(self, newPath):
self.newPath = os.path.expanduser(newPath)
def __enter__(self):
self.savedPath = os.getcwd()
os.chdir(self.newPath)
def __exit__(self, etype, value, traceback):
os.chdir(self.savedPath)
# ==============================================
# CLI
import sys
import optparse
import inspect
def _object_methods(obj):
methods = inspect.getmembers(obj, inspect.ismethod)
methods = filter(lambda (name, y): not name.startswith('_'), methods)
methods = dict(methods)
return methods
def _main(functions_or_object):
isobject = inspect.isclass(functions_or_object)
if isobject:
_methods = _object_methods(functions_or_object)
else:
_methods = _module_functions(functions_or_object)
usage = '''%prog {action}
Actions:
'''
usage += '\n '.join(
['%s: %s' % (name, m.__doc__.split('\n')[0] if m.__doc__ else '') for (name, m)
in sorted(_methods.items())])
parser = optparse.OptionParser(usage)
# Optional: for a config file
# parser.add_option('-c', '--config', dest='config',
# help='Config file to use.')
options, args = parser.parse_args()
if not args or not args[0] in _methods:
parser.print_help()
sys.exit(1)
method = args[0]
if isobject:
getattr(functions_or_object(), method)(*args[1:])
else:
_methods[method](*args[1:])
if __name__ == '__main__':
_main(Flow)