Given an ADLS file system in which we have different partitions (e.g. ~/year/month/day/), the aim of this project is to compact (format: snappy) the last partition's parquet files up to a given file size.
JDK | SDK |
---|---|
1.8 | 2.11.12 |
VMOptions | Program arguments |
---|---|
-Xmx512m | -p your-configuration-file-name.conf |
The configuration file name must be at the directory abfss://[email protected]/ On the contrary, you must specify the directory path not including the one written above. For example:
-p my-full-path/my-configuration-file.conf
This project is using credentials from Azure Data Lake Storage (ADLS). In order to get a successful compilation, you should change the the ADLS file system name, its storage account name and the ADLS access key (you can also use a key-vault method) in the code where they are required.
IntelliJ is highly recommended.
For a better comprehension, in this section we will explain the steps followed during the code development.
Firstly, we must establish connection with ADLS. We are using Spark and also Hadoop inside a Spark Context, so we need to set the ADLS credentials onto them:
// Hadoop
spark.sparkContext.hadoopConfiguration.set("fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net", "<your-data-lake-key>")
spark.sparkContext.hadoopConfiguration.set("fs.defaultFS", "abfss://<your-file-system-name>@<your-storage-account-name>.dfs.core.windows.net")
// Spark
spark.conf.set("fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net", "<your-data-lake-key>")
We will have a configuration file stored at our ADLS to which we well access using
val configFile = spark.read.textFile(<configuration-file-path>).collect()
In the following box we will specify the configuration file format: size= origin_path="abfss://@.dfs.core.windows.net/" partitions=["partition1","partition2","partitionN"] filter=[["partition1"],["partition1","partition2"],["partition1","partition2","partition3"]]
The filter attribute contains an example of different types of filter that can be applied when compacting. It will always be a list of lists. In order to compact everything from origin_path, just let filter be [[]].
It is also important that size must be specified in MB.
The configuration file is parsed and we generate a path list to compact taking into account filter attribute.
In this part we create a Hadoop File System in order to calculate the partition size.
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val size = fs.getContentSummary(new Path(<path-from-path-list>)).getLength
val fileNum = size/(<size-from-configuration-file> * 1024 * 1024) + 1 // MB to B
We take every parquet file at the directory and we create a partition of the data frame using fileNum and write it on a temporary directory.
val df = spark.read.parquet(<path-from-path-list> + "/")
df.repartition(fileNum.toInt).write.mode(SaveMode.Append).parquet(<pathfrom-path-list> + "-tmp")
When the compaction has been successfully accomplished, a new file called _SUCCESS with the commit information is created.
At the end of the process, we loop over the paths list that has been generated from the filters setted by the user and, for each one, we do:
val p = new Path(<path-from-path-list>)
val pTmp = new Path(<path-from-path-list> + "-tmp")
fs.mkdirs(<path-from-path-list> + "-tmp")
depthCompaction(<path-from-path-list> + "/", config.size)
fs.delete(p, true)
fs.rename(p, pTmp)