-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve the scalability of the join between the LHS and GroupBys by breaking up the join #621
base: main
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,6 +67,7 @@ class Join(joinConf: api.Join, | |
extends JoinBase(joinConf, endPartition, tableUtils, skipFirstHole, mutationScan, showDf) { | ||
|
||
private val bootstrapTable = joinConf.metaData.bootstrapTable | ||
private val joinsAtATime = 8 | ||
|
||
private def padFields(df: DataFrame, structType: sql.types.StructType): DataFrame = { | ||
structType.foldLeft(df) { | ||
|
@@ -263,7 +264,13 @@ class Join(joinConf: api.Join, | |
// a bootstrap source can cover a partial date range. we combine the columns using coalesce-rule | ||
rightResults | ||
.foldLeft(bootstrapDf) { | ||
case (partialDf, (rightPart, rightDf)) => joinWithLeft(partialDf, rightDf, rightPart) | ||
case (partialDf, ((rightPart, rightDf), i)) => | ||
val next = joinWithLeft(partialDf, rightDf, rightPart) | ||
if (((i + 1) % joinsAtATime) == 0) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we have 24 parts - there will be 3 cache points - at 8, 16, 24 16 should evict the 8 cache. 24 shouldn't cache since it is the last one. |
||
tableUtils.addJoinBreak(next) | ||
} else { | ||
next | ||
} | ||
} | ||
// drop all processing metadata columns | ||
.drop(Constants.MatchedHashes, Constants.TimePartitionColumn) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -324,6 +324,9 @@ case class TableUtils(sparkSession: SparkSession) { | |
df | ||
} | ||
|
||
def addJoinBreak(dataFrame: DataFrame): DataFrame = | ||
dataFrame.cache() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TableUtils has a cache_level param and a wrap with cache method that does exception handling to release the resources claimed by the cache. I think we should use that here. |
||
|
||
def insertUnPartitioned(df: DataFrame, | ||
tableName: String, | ||
tableProperties: Map[String, String] = null, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make this consume a spark conf param - via tableUtils?