-
Notifications
You must be signed in to change notification settings - Fork 0
/
spark_detection_script.py
66 lines (55 loc) · 1.55 KB
/
spark_detection_script.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# Import SparkSession
from pyspark.sql import SparkSession
from gluoncv import model_zoo, data, utils
import pickle
import time
import mxnet as mx
import boto3
import numpy as np
import cv2
import io
# Constants
NET = model_zoo.get_model('yolo3_darknet53_coco', pretrained=True)
MEMORY_SPARK = '5gb'
BATCH_SIZE = 1000
# Build the SparkSession
spark = SparkSession.builder \
.master("local") \
.appName("gluoncv") \
.config("spark.executor.memory", MEMORY_SPARK) \
.getOrCreate()
def lista_arquivos():
s3 = boto3.resource('s3')
bucket = s3.Bucket('cocodataset')
files = []
for k, obj in enumerate(bucket.objects.all()):
if obj.key[-4:] == '.jpg':
files.append(obj.key)
return files
def process_image(path):
s3 = boto3.resource('s3')
bucket = s3.Bucket('cocodataset')
# path = path.encode("ascii")
# print(path)
with io.BytesIO() as f:
bucket.download_fileobj(path, f)
f.seek(io.SEEK_SET)
x = cv2.imdecode(np.frombuffer(f.getvalue(), dtype=np.uint8), flags=3)
x, img = data.transforms.presets.yolo.transform_test(mx.nd.array(x), short=512)
class_IDs, scores, _ = NET(x)
return [class_IDs.asnumpy(), scores.asnumpy(), path]
init = 0
end = BATCH_SIZE
over = False
while(!over):
if (end > len(files)):
end = len(files)
over = True
rdd = sc.parallelize(files[init:end]).map(process_image)
results = rdd.collect()
pickle_out = open(name, "wb")
pickle.dump(results, pickle_out)
init += BATCH_SIZE
end += BATCH_SIZE
iter += 1
spark.stop()