-
Notifications
You must be signed in to change notification settings - Fork 3
/
sync.py
141 lines (110 loc) · 3.68 KB
/
sync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import os
import json
from collections import OrderedDict
from copy import copy
from requests_aws4auth import AWS4Auth
from elasticsearch import Elasticsearch, RequestError, RequestsHttpConnection
from boto.utils import get_instance_metadata
import boto3
def meta_constructor(metadata):
internal_meta = copy(metadata)
data_geometry = {
'type': 'Polygon',
'crs': {
'type': 'name',
'properties': {
'name': 'urn:ogc:def:crs:EPSG:8.9:4326'
}
},
'coordinates': [[
[metadata.get('upperRightCornerLongitude'), metadata.get('upperRightCornerLatitude')],
[metadata.get('upperLeftCornerLongitude'), metadata.get('upperLeftCornerLatitude')],
[metadata.get('lowerLeftCornerLongitude'), metadata.get('lowerLeftCornerLatitude')],
[metadata.get('lowerRightCornerLongitude'), metadata.get('lowerRightCornerLatitude')],
[metadata.get('upperRightCornerLongitude'), metadata.get('upperRightCornerLatitude')]
]]
}
body = OrderedDict([
('scene_id', metadata.get('sceneID')),
('satellite_name', 'landsat-8'),
('cloud_coverage', metadata.get('cloudCoverFull', 100)),
('date', metadata.get('acquisitionDate')),
('thumbnail', metadata.get('browseURL')),
('data_geometry', data_geometry)
])
body.update(internal_meta)
return body
def get_credentials():
obj = get_instance_metadata()
return obj['iam']['security-credentials'].values()[0]
def connection_to_es(es_host, es_port, aws=False):
args = {}
cred = get_credentials()
access_key = cred['AccessKeyId']
secret_access = cred['SecretAccessKey']
token = cred['Token']
region = os.getenv('AWS_DEFAULT_REGION', 'us-east-1')
awsauth = AWS4Auth(access_key, secret_access, region, 'es',
session_token=token)
args = {
'http_auth': awsauth,
'use_ssl': True,
'verify_certs': True,
'connection_class': RequestsHttpConnection
}
es = Elasticsearch(hosts=[{
'host': es_host,
'port': es_port
}], **args)
return es
def get_items(limit=100, last_key=None):
"""Gets items from DynamoDB"""
items = []
list_key = None
client = boto3.client('dynamodb', region_name='us-east-1')
args = {
'TableName': 'landsat',
'Limit': limit
}
if last_key:
args['ExclusiveStartKey'] = last_key
response = client.scan(**args)
if response['Count'] > 0:
for item in response['Items']:
items.append(json.loads(item['body']['S']))
return (items, response['LastEvaluatedKey'])
else:
raise Execption('No record found')
def bulk_updater(records):
data = []
for record in records:
data.append({
'index': {
'_index': 'sat-api',
'_type': 'landsat8',
'_id': record['sceneID']
}
})
data.append(meta_constructor(record))
es = connection_to_es(os.getenv('ES_HOST'), 443)
elasticsearch.helpers.parallel_bulk(es, data)
# r = es.bulk(index='sat-api', body=data, refresh=True)
# print(r)
def update_es():
counter = 0
limit = 200
# get items from DynamoDB
items, last_key = get_items(limit)
if last_key:
while True:
bulk_updater(items)
items, last_key = get_items(limit, last_key)
counter = counter + limit
print(str(counter) + '\r')
if not last_key:
break
else:
bulk_updater(items)
update_es()
#obj = get_instance_metadata()
#print(obj['iam']['security-credentials'].values())