Spark based framework to ingest data from any source, and flush to any sink.
The code is designed to be used as a fat jar to be consumed via spark-submit
.
BaseDriver
is the app entry point, to be extended by each jop object.
- defines the job trait,
- reads App configs based on the
com.saswata.sparkseed.config
file name passed in as cmd line arg to program, - creates the spark context,
- configures
com.saswata.sparkseed.sources
andcom.saswata.sparkseed.sinks
- calls the
run
method to execute
AppConfig
contains all the HOCON configs defined.
BaseSource
reads a dataFrame based on the configs.
BaseSink
writes a dataFrame based on the configs.
Checkout GenericIngestJob
and GenericTransformers
for usages.
Refer transforms.Udfs
for preexisting spark sql functions.
The makefile
has recipes for linting, building the fat jar, and uploading it to s3, so it can be used in qubole/spark clusters.
Check out trip_events.conf
for a working sample of the config file.
The reference.conf
and local.conf
(if the --local
arg is passed) are merged with the main job conf to provide additional common configs.
source {
// for mongo
type = "mongo"
host = "host:port" // assumes auth db is admin
ssm-path = "/prefix/of/aws/ssm/mysql/credentials/"
user = "user" // use in local
password = "pass" // use in local
database = "data-base"
table = "some_table"
readPreference.name = "secondaryPreferred" // optional, but good to have
sampleSize = 10000000 // optional for better schema infer, use sensibly
// or for mysql
type = "jdbc"
url = "jdbc:mysql://mysql-host:port/db-name"
dbtable = "db-name.tbl-name"
ssm-path = "/prefix/of/aws/ssm/mysql/credentials/"
user = "user" // use in local
password = "pass" // use in local
fetchsize = "20000" // optional
}
sink {
type = "s3"
bucket = "bucket-name"
folder = "some/path/to"
format = "parquet"
save_mode = "error or overwrite = equvalent savemode"
}
spark {
master = "local[*]" // overrdide defaults if needed
// additional spark configs
}
schema {
time_partition_col = "col by which the data is to be partitioned"
time_partition_col_unit = "unit of time s/ms/us/ns"
epoch_cols = ["cols which must be sanitised as epochs"] // optional
numeric_cols = ["cols which must be sanitised as doubles"] // optional
bool_cols = ["cols which must be sanitised as bools"] // optional
flatten = false // optional, flatten StrucType-s
}
spark-submit --class com.saswata.sparkseed.drivers.GenericIngestJob --conf-file trip_events.conf --start-epoch 1557340200 --stop-epoch 1557426600
--class "canonical name of class for job"
--conf "conf to submit to spark"
--conf-file "name of conf file for this job"
--start-epoch "start time of data partition in seconds"
--stop-epoch "stop time of data partition in seconds"
--start-date "start time of data partition in yyyy-mm-dd"
--stop-date "stop time of data partition in yyyy-mm-dd"
--local "pass 'true' to run locally in intellij, using the 'mainRunner' config"
Using AWS Cli : https://gist.github.com/saswata-dutta/f456c8519ba01f3b45037ec26caa1d10