From 6ef4e7898ed6ee60b707d0dde24069b81d9724c3 Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Wed, 29 Mar 2023 10:25:20 -0700 Subject: [PATCH 1/4] Add a retryable mechanism for SWA features when files get deleted --- .../offline/job/LocalFeatureJoinJob.scala | 3 + .../offline/join/DataFrameFeatureJoiner.scala | 21 ++++- .../swa/SlidingWindowAggregationJoiner.scala | 33 ++++++-- .../offline/SlidingWindowAggIntegTest.scala | 77 +++++++++++++++++++ 4 files changed, 126 insertions(+), 8 deletions(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/LocalFeatureJoinJob.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/LocalFeatureJoinJob.scala index d54fc43d3..815acdd55 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/LocalFeatureJoinJob.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/LocalFeatureJoinJob.scala @@ -17,6 +17,9 @@ import org.apache.spark.sql.SparkSession */ object LocalFeatureJoinJob { + // This is a config for local test only to induce a FileNotFoundException. + var shouldRetryAddingSWAFeatures = false + // for user convenience, create spark session within this function, so user does not need to create one // this also ensure it has same setting as the real feathr join job val ss: SparkSession = createSparkSession(enableHiveSupport = true) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala index b37d9d62f..df58e2a40 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala @@ -327,7 +327,7 @@ private[offline] class DataFrameFeatureJoiner(logicalPlan: MultiStageJoinPlan, d offline.FeatureDataFrame(obsToJoinWithFeatures, Map()) } else { val swaJoiner = new SlidingWindowAggregationJoiner(featureGroups.allWindowAggFeatures, anchorToDataSourceMapper) - swaJoiner.joinWindowAggFeaturesAsDF( + val (featureDataFrame, retryableFeatureNames) = swaJoiner.joinWindowAggFeaturesAsDF( ss, obsToJoinWithFeatures, joinConfig, @@ -338,6 +338,25 @@ private[offline] class DataFrameFeatureJoiner(logicalPlan: MultiStageJoinPlan, d swaObsTime, failOnMissingPartition, swaHandler) + + // We will retry the SWA features which could not added because of + val retryableErasedEntityTaggedFeatures = requiredWindowAggFeatures.filter(x => retryableFeatureNames.contains(x.getFeatureName)) + if (retryableFeatureNames.nonEmpty) { + swaJoiner.joinWindowAggFeaturesAsDF( + ss, + featureDataFrame.df, + joinConfig, + keyTagIntsToStrings, + windowAggFeatureStages, + retryableErasedEntityTaggedFeatures, + bloomFilters, + swaObsTime, + failOnMissingPartition, + swaHandler, + false)._1 + } else { + featureDataFrame + } } } } diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala index 6f3458df7..63b0bf0c6 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala @@ -9,7 +9,7 @@ import com.linkedin.feathr.offline.anchored.keyExtractor.{MVELSourceKeyExtractor import com.linkedin.feathr.offline.client.DataFrameColName import com.linkedin.feathr.offline.config.FeatureJoinConfig import com.linkedin.feathr.offline.exception.FeathrIllegalStateException -import com.linkedin.feathr.offline.job.PreprocessedDataFrameManager +import com.linkedin.feathr.offline.job.{LocalFeatureJoinJob, PreprocessedDataFrameManager} import com.linkedin.feathr.offline.join.DataFrameKeyCombiner import com.linkedin.feathr.offline.source.DataSource import com.linkedin.feathr.offline.source.accessor.DataSourceAccessor @@ -22,11 +22,14 @@ import com.linkedin.feathr.offline.{FeatureDataFrame, JoinStage} import com.linkedin.feathr.swj.{FactData, LabelData, SlidingWindowJoin} import com.linkedin.feathr.{common, offline} import org.apache.logging.log4j.LogManager +import org.apache.spark.SparkException import org.apache.spark.sql.functions.{col, lit} import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.util.sketch.BloomFilter +import java.io.FileNotFoundException import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer /** * Case class containing other SWA handler methods @@ -65,6 +68,9 @@ private[offline] class SlidingWindowAggregationJoiner( * @param obsDF Observation data * @param swaObsTimeOpt start and end time of observation data * @param failOnMissingPartition whether to fail the data loading if some of the date partitions are missing. + * @param swaHandler External SWA libraries if any should handle the SWA join + * @param isRetry If this is a retry attempt to retry adding features which were missed because of IOExceptions. + * Default is set to true. * @return pair of : * 1) dataframe with feature column appended to the obsData, * it can be converted to a pair RDD of (observation data record, feature record), @@ -81,7 +87,9 @@ private[offline] class SlidingWindowAggregationJoiner( bloomFilters: Option[Map[Seq[Int], BloomFilter]], swaObsTimeOpt: Option[DateTimeInterval], failOnMissingPartition: Boolean, - swaHandler: Option[SWAHandler]): FeatureDataFrame = { + swaHandler: Option[SWAHandler], + isRetry: Boolean = true): (FeatureDataFrame, Seq[String]) = { + val retryableSwaFeatures = ArrayBuffer.empty[String] val joinConfigSettings = joinConfig.settings // extract time window settings if (joinConfigSettings.isEmpty) { @@ -256,8 +264,19 @@ private[offline] class SlidingWindowAggregationJoiner( } val origContextObsColumns = labelDataDef.dataSource.columns - contextDF = if (swaHandler.isDefined) swaHandler.get.join(labelDataDef, factDataDefs.toList) else SlidingWindowJoin.join(labelDataDef, factDataDefs.toList) + try { + // THIS IS FOR LOCAL TEST ONLY. It is to induce a spark exception with the root cause of FileNotFoundException. + if (isRetry && LocalFeatureJoinJob.shouldRetryAddingSWAFeatures) throw new SparkException("file not found", new FileNotFoundException()) + contextDF = if (swaHandler.isDefined) swaHandler.get.join(labelDataDef, factDataDefs.toList) else SlidingWindowJoin.join(labelDataDef, factDataDefs.toList) + } catch { + // Many times the files which are to be loaded gets deleted midway. We will retry all the features at this stage again by reloading the datasets. + case exception: SparkException => if (isRetry && exception.getCause != null && exception.getCause.isInstanceOf[FileNotFoundException]) { + val unjoinedFeatures = factDataDefs.flatMap(factData => factData.aggFeatures.map(_.name)) + retryableSwaFeatures ++= unjoinedFeatures + } + } + val finalJoinedFeatures = joinedFeatures diff retryableSwaFeatures contextDF = if (shouldFilterNulls && !factDataRowsWithNulls.isEmpty) { val nullDfWithFeatureCols = joinedFeatures.foldLeft(factDataRowsWithNulls)((s, x) => s.withColumn(x, lit(null))) contextDF.union(nullDfWithFeatureCols) @@ -272,13 +291,13 @@ private[offline] class SlidingWindowAggregationJoiner( .asInstanceOf[TimeWindowConfigurableAnchorExtractor].features(nameToFeatureAnchor._1).columnFormat) val FeatureDataFrame(withFDSFeatureDF, inferredTypes) = - SlidingWindowFeatureUtils.convertSWADFToFDS(contextDF, joinedFeatures.toSet, featureNameToColumnFormat, userSpecifiedTypesConfig) + SlidingWindowFeatureUtils.convertSWADFToFDS(contextDF, finalJoinedFeatures.toSet, featureNameToColumnFormat, userSpecifiedTypesConfig) // apply default on FDS dataset val withFeatureContextDF = - substituteDefaults(withFDSFeatureDF, defaults.keys.filter(joinedFeatures.contains).toSeq, defaults, userSpecifiedTypesConfig, ss) + substituteDefaults(withFDSFeatureDF, defaults.keys.filter(finalJoinedFeatures.contains).toSeq, defaults, userSpecifiedTypesConfig, ss) allInferredFeatureTypes ++= inferredTypes - contextDF = standardizeFeatureColumnNames(origContextObsColumns, withFeatureContextDF, joinedFeatures, keyTags.map(keyTagList)) + contextDF = standardizeFeatureColumnNames(origContextObsColumns, withFeatureContextDF, finalJoinedFeatures, keyTags.map(keyTagList)) if (shouldCheckPoint(ss)) { // checkpoint complicated dataframe for each stage to avoid Spark failure contextDF = contextDF.checkpoint(true) @@ -292,7 +311,7 @@ private[offline] class SlidingWindowAggregationJoiner( } } }}) - offline.FeatureDataFrame(contextDF, allInferredFeatureTypes.toMap) + (offline.FeatureDataFrame(contextDF, allInferredFeatureTypes.toMap), retryableSwaFeatures) } /** diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala index 3370a6646..30da906da 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala @@ -1,6 +1,7 @@ package com.linkedin.feathr.offline import com.linkedin.feathr.offline.AssertFeatureUtils.{rowApproxEquals, validateRows} +import com.linkedin.feathr.offline.job.LocalFeatureJoinJob import com.linkedin.feathr.offline.util.FeathrUtils import com.linkedin.feathr.offline.util.FeathrUtils.{FILTER_NULLS, SKIP_MISSING_FEATURE, setFeathrJobParam} import org.apache.spark.sql.Row @@ -328,6 +329,80 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { setFeathrJobParam(FILTER_NULLS, "false") } + /** + * test SWA with dense vector feature with retry. + * This should get handled by the SWA retry method. + */ + @Test + def testLocalAnchorSWAWithDenseVectorWithRetry(): Unit = { + LocalFeatureJoinJob.shouldRetryAddingSWAFeatures = true + val res = runLocalFeatureJoinForTest( + """ + | settings: { + | joinTimeSettings: { + | timestampColumn: { + | def: "timestamp" + | format: "yyyy-MM-dd" + | } + | simulateTimeDelay: 1d + | } + |} + | + |features: [ + | { + | key: [mId], + | featureList: ["aEmbedding", "memberEmbeddingAutoTZ"] + | } + |] + """.stripMargin, + """ + |sources: { + | swaSource: { + | location: { path: "generation/daily" } + | timePartitionPattern: "yyyy/MM/dd" + | timeWindowParameters: { + | timestampColumn: "timestamp" + | timestampColumnFormat: "yyyy-MM-dd" + | } + | } + |} + | + |anchors: { + | swaAnchor: { + | source: "swaSource" + | key: "x" + | features: { + | aEmbedding: { + | def: "embedding" + | aggregation: LATEST + | window: 3d + | } + | memberEmbeddingAutoTZ: { + | def: "embedding" + | aggregation: LATEST + | window: 3d + | type: { + | type: TENSOR + | tensorCategory: SPARSE + | dimensionType: [INT] + | valType: FLOAT + | } + | } + | } + | } + |} + """.stripMargin, + observationDataPath = "slidingWindowAgg/csvTypeTimeFile1.csv").data + + val featureList = res.collect().sortBy(row => if (row.get(0) != null) row.getAs[String]("mId") else "null") + + assertEquals(featureList.size, 2) + assertEquals(featureList(0).getAs[Row]("aEmbedding"), mutable.WrappedArray.make(Array(5.5f, 5.8f))) + assertEquals(featureList(0).getAs[Row]("memberEmbeddingAutoTZ"), + TestUtils.build1dSparseTensorFDSRow(Array(0, 1), Array(5.5f, 5.8f))) + LocalFeatureJoinJob.shouldRetryAddingSWAFeatures = false + } + /** * test SWA with dense vector feature * The feature dataset generation/daily has different but compatible schema for different partitions, @@ -335,6 +410,7 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { */ @Test def testLocalAnchorSWAWithDenseVector(): Unit = { + LocalFeatureJoinJob.shouldRetry = true val res = runLocalFeatureJoinForTest( """ | settings: { @@ -399,6 +475,7 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { assertEquals(featureList(0).getAs[Row]("aEmbedding"), mutable.WrappedArray.make(Array(5.5f, 5.8f))) assertEquals(featureList(0).getAs[Row]("memberEmbeddingAutoTZ"), TestUtils.build1dSparseTensorFDSRow(Array(0, 1), Array(5.5f, 5.8f))) + LocalFeatureJoinJob.shouldRetry = false } /** From 717b9ce0ce5b9198d22183cf66db8fd4afaeaec4 Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Wed, 29 Mar 2023 11:40:02 -0700 Subject: [PATCH 2/4] Fix few failing tests --- .../feathr/offline/join/DataFrameFeatureJoiner.scala | 5 ++++- .../offline/swa/SlidingWindowAggregationJoiner.scala | 9 +++++---- .../com/linkedin/feathr/offline/util/FeathrUtils.scala | 7 ++++++- .../feathr/offline/SlidingWindowAggIntegTest.scala | 9 +++------ 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala index df58e2a40..b46d04071 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala @@ -341,13 +341,16 @@ private[offline] class DataFrameFeatureJoiner(logicalPlan: MultiStageJoinPlan, d // We will retry the SWA features which could not added because of val retryableErasedEntityTaggedFeatures = requiredWindowAggFeatures.filter(x => retryableFeatureNames.contains(x.getFeatureName)) + + // Keep only the features which are to be retried. + val updatedWindowAggFeatureStages = windowAggFeatureStages.map(x => (x._1, x._2.intersect(retryableFeatureNames))) if (retryableFeatureNames.nonEmpty) { swaJoiner.joinWindowAggFeaturesAsDF( ss, featureDataFrame.df, joinConfig, keyTagIntsToStrings, - windowAggFeatureStages, + updatedWindowAggFeatureStages, retryableErasedEntityTaggedFeatures, bloomFilters, swaObsTime, diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala index 63b0bf0c6..9b529b186 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala @@ -263,14 +263,15 @@ private[offline] class SlidingWindowAggregationJoiner( SlidingWindowFeatureUtils.getFactDataDef(filteredFactData, anchorWithSourceToDFMap.keySet.toSeq, featuresToDelayImmutableMap, selectedFeatures) } val origContextObsColumns = labelDataDef.dataSource.columns - + val shouldRetryForMissingData = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.RETRY_ADDING_MISSING_SWA_FEATURES).toBoolean try { // THIS IS FOR LOCAL TEST ONLY. It is to induce a spark exception with the root cause of FileNotFoundException. - if (isRetry && LocalFeatureJoinJob.shouldRetryAddingSWAFeatures) throw new SparkException("file not found", new FileNotFoundException()) + if (isRetry && FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.LOCAL_RETRY_ADDING_MISSING_SWA_FEATURES).toBoolean) + throw new SparkException("file not found", new FileNotFoundException()) contextDF = if (swaHandler.isDefined) swaHandler.get.join(labelDataDef, factDataDefs.toList) else SlidingWindowJoin.join(labelDataDef, factDataDefs.toList) } catch { // Many times the files which are to be loaded gets deleted midway. We will retry all the features at this stage again by reloading the datasets. - case exception: SparkException => if (isRetry && exception.getCause != null && exception.getCause.isInstanceOf[FileNotFoundException]) { + case exception: SparkException => if (isRetry && shouldRetryForMissingData && exception.getCause != null && exception.getCause.isInstanceOf[FileNotFoundException]) { val unjoinedFeatures = factDataDefs.flatMap(factData => factData.aggFeatures.map(_.name)) retryableSwaFeatures ++= unjoinedFeatures } @@ -278,7 +279,7 @@ private[offline] class SlidingWindowAggregationJoiner( val finalJoinedFeatures = joinedFeatures diff retryableSwaFeatures contextDF = if (shouldFilterNulls && !factDataRowsWithNulls.isEmpty) { - val nullDfWithFeatureCols = joinedFeatures.foldLeft(factDataRowsWithNulls)((s, x) => s.withColumn(x, lit(null))) + val nullDfWithFeatureCols = finalJoinedFeatures.foldLeft(factDataRowsWithNulls)((s, x) => s.withColumn(x, lit(null))) contextDF.union(nullDfWithFeatureCols) } else contextDF diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala index e18d154b3..98ae98eda 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala @@ -51,6 +51,9 @@ private[feathr] object FeathrUtils { val ENABLE_SANITY_CHECK_MODE = "enable.sanity.check.mode" val SANITY_CHECK_MODE_ROW_COUNT = "sanity.check.row.count" val FILTER_NULLS = "filter.nulls" + val RETRY_ADDING_MISSING_SWA_FEATURES = "retry.adding.missing.swa.features" + // Retryer to be configured only for local tests + val LOCAL_RETRY_ADDING_MISSING_SWA_FEATURES = "local.retry.adding.missing.swa.features" val STRING_PARAMETER_DELIMITER = "," // Used to check if the current dataframe has satisfied the checkpoint frequency @@ -86,7 +89,9 @@ private[feathr] object FeathrUtils { SPARK_JOIN_MIN_PARALLELISM -> (SQLConf.buildConf(getFullConfigKeyName(SPARK_JOIN_MIN_PARALLELISM )).stringConf.createOptional, "10"), ENABLE_SANITY_CHECK_MODE -> (SQLConf.buildConf(getFullConfigKeyName(ENABLE_SANITY_CHECK_MODE )).stringConf.createOptional, "false"), SANITY_CHECK_MODE_ROW_COUNT -> (SQLConf.buildConf(getFullConfigKeyName(SANITY_CHECK_MODE_ROW_COUNT )).stringConf.createOptional, "10"), - FILTER_NULLS -> (SQLConf.buildConf(getFullConfigKeyName(FILTER_NULLS )).stringConf.createOptional, "false") + FILTER_NULLS -> (SQLConf.buildConf(getFullConfigKeyName(FILTER_NULLS)).stringConf.createOptional, "false"), + LOCAL_RETRY_ADDING_MISSING_SWA_FEATURES -> (SQLConf.buildConf(getFullConfigKeyName(LOCAL_RETRY_ADDING_MISSING_SWA_FEATURES)).stringConf.createOptional, "false"), + RETRY_ADDING_MISSING_SWA_FEATURES -> (SQLConf.buildConf(getFullConfigKeyName(RETRY_ADDING_MISSING_SWA_FEATURES)).stringConf.createOptional, "true") ) /** diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala index 30da906da..bf4331a23 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala @@ -1,9 +1,8 @@ package com.linkedin.feathr.offline import com.linkedin.feathr.offline.AssertFeatureUtils.{rowApproxEquals, validateRows} -import com.linkedin.feathr.offline.job.LocalFeatureJoinJob import com.linkedin.feathr.offline.util.FeathrUtils -import com.linkedin.feathr.offline.util.FeathrUtils.{FILTER_NULLS, SKIP_MISSING_FEATURE, setFeathrJobParam} +import com.linkedin.feathr.offline.util.FeathrUtils.{FILTER_NULLS, LOCAL_RETRY_ADDING_MISSING_SWA_FEATURES, SKIP_MISSING_FEATURE, setFeathrJobParam} import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.types.{LongType, StructField, StructType} @@ -335,7 +334,7 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { */ @Test def testLocalAnchorSWAWithDenseVectorWithRetry(): Unit = { - LocalFeatureJoinJob.shouldRetryAddingSWAFeatures = true + setFeathrJobParam(LOCAL_RETRY_ADDING_MISSING_SWA_FEATURES, "true") val res = runLocalFeatureJoinForTest( """ | settings: { @@ -400,7 +399,7 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { assertEquals(featureList(0).getAs[Row]("aEmbedding"), mutable.WrappedArray.make(Array(5.5f, 5.8f))) assertEquals(featureList(0).getAs[Row]("memberEmbeddingAutoTZ"), TestUtils.build1dSparseTensorFDSRow(Array(0, 1), Array(5.5f, 5.8f))) - LocalFeatureJoinJob.shouldRetryAddingSWAFeatures = false + setFeathrJobParam(LOCAL_RETRY_ADDING_MISSING_SWA_FEATURES, "true") } /** @@ -410,7 +409,6 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { */ @Test def testLocalAnchorSWAWithDenseVector(): Unit = { - LocalFeatureJoinJob.shouldRetry = true val res = runLocalFeatureJoinForTest( """ | settings: { @@ -475,7 +473,6 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { assertEquals(featureList(0).getAs[Row]("aEmbedding"), mutable.WrappedArray.make(Array(5.5f, 5.8f))) assertEquals(featureList(0).getAs[Row]("memberEmbeddingAutoTZ"), TestUtils.build1dSparseTensorFDSRow(Array(0, 1), Array(5.5f, 5.8f))) - LocalFeatureJoinJob.shouldRetry = false } /** From 43557c0f0984a01cbd0542c59c70ec57a6e4ba54 Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Wed, 29 Mar 2023 11:42:56 -0700 Subject: [PATCH 3/4] fix comment --- .../linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala index b46d04071..a2ed52069 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala @@ -339,7 +339,7 @@ private[offline] class DataFrameFeatureJoiner(logicalPlan: MultiStageJoinPlan, d failOnMissingPartition, swaHandler) - // We will retry the SWA features which could not added because of + // We will retry the SWA features which could not added because of changing data. val retryableErasedEntityTaggedFeatures = requiredWindowAggFeatures.filter(x => retryableFeatureNames.contains(x.getFeatureName)) // Keep only the features which are to be retried. From 20c159e036a6b786761147b7cab7b8ef25f36fcd Mon Sep 17 00:00:00 2001 From: Rakesh Kashyap Hanasoge Padmanabha Date: Wed, 29 Mar 2023 12:02:21 -0700 Subject: [PATCH 4/4] fix variable naming --- .../offline/swa/SlidingWindowAggregationJoiner.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala index 9b529b186..9aac821cd 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala @@ -69,7 +69,7 @@ private[offline] class SlidingWindowAggregationJoiner( * @param swaObsTimeOpt start and end time of observation data * @param failOnMissingPartition whether to fail the data loading if some of the date partitions are missing. * @param swaHandler External SWA libraries if any should handle the SWA join - * @param isRetry If this is a retry attempt to retry adding features which were missed because of IOExceptions. + * @param shouldRetry If this is a retry attempt to retry adding features which were missed because of IOExceptions. * Default is set to true. * @return pair of : * 1) dataframe with feature column appended to the obsData, @@ -88,7 +88,7 @@ private[offline] class SlidingWindowAggregationJoiner( swaObsTimeOpt: Option[DateTimeInterval], failOnMissingPartition: Boolean, swaHandler: Option[SWAHandler], - isRetry: Boolean = true): (FeatureDataFrame, Seq[String]) = { + shouldRetry: Boolean = true): (FeatureDataFrame, Seq[String]) = { val retryableSwaFeatures = ArrayBuffer.empty[String] val joinConfigSettings = joinConfig.settings // extract time window settings @@ -266,12 +266,12 @@ private[offline] class SlidingWindowAggregationJoiner( val shouldRetryForMissingData = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.RETRY_ADDING_MISSING_SWA_FEATURES).toBoolean try { // THIS IS FOR LOCAL TEST ONLY. It is to induce a spark exception with the root cause of FileNotFoundException. - if (isRetry && FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.LOCAL_RETRY_ADDING_MISSING_SWA_FEATURES).toBoolean) + if (shouldRetry && FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.LOCAL_RETRY_ADDING_MISSING_SWA_FEATURES).toBoolean) throw new SparkException("file not found", new FileNotFoundException()) contextDF = if (swaHandler.isDefined) swaHandler.get.join(labelDataDef, factDataDefs.toList) else SlidingWindowJoin.join(labelDataDef, factDataDefs.toList) } catch { // Many times the files which are to be loaded gets deleted midway. We will retry all the features at this stage again by reloading the datasets. - case exception: SparkException => if (isRetry && shouldRetryForMissingData && exception.getCause != null && exception.getCause.isInstanceOf[FileNotFoundException]) { + case exception: SparkException => if (shouldRetry && shouldRetryForMissingData && exception.getCause != null && exception.getCause.isInstanceOf[FileNotFoundException]) { val unjoinedFeatures = factDataDefs.flatMap(factData => factData.aggFeatures.map(_.name)) retryableSwaFeatures ++= unjoinedFeatures }