-
Notifications
You must be signed in to change notification settings - Fork 3
/
process.py
38 lines (32 loc) · 1.01 KB
/
process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import pickle
from typing import Dict
import numpy as np
import ray
def function_0(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
vec_a = batch["petal length (cm)"]
vec_b = batch["petal width (cm)"]
batch["petal area (cm^2)"] = vec_a * vec_b
return batch
def function_1(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
vec_a = batch["petal length (cm)"]
vec_b = batch["petal width (cm)"]
batch["DUPLICATE petal area (cm^2)"] = vec_a * vec_b
return batch
def process_0(input, output):
# ray.init()
ds_in = ray.data.read_csv(input)
ds_out = ds_in.map_batches(function_0)
idx_ds = ray.data.range(ds_out.count())
ds_out = idx_ds.zip(ds_out)
print(ds_out.show(limit=1))
ds_out.write_csv(output)
ray.shutdown()
return ds_out
def process_1(input, output):
# ray.init()
ds_in = ray.data.read_csv(input)
ds_out = ds_in.map_batches(function_1)
print(ds_out.show(limit=1))
ds_out.write_csv(output)
ray.shutdown()
return ds_out