You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
{{ message }}
This repository has been archived by the owner on Dec 20, 2018. It is now read-only.
We have some jobs that input source is AVRO files, run very slow in Spark 2, but works fine in Spark 1. The slow behavior looks like Spark2 AVRO is almost hang when reading the avro files. From the Spark UI, 100 executors only read about 2G input data in 1 hour. The issue is that this slowness only happened in one particular AVRO files. Other schema avro files read very fast. And this happens only in Spark 2.
While we have this issue on production, I also happen to find out a simple performance issue using this particular avro schema data in a simple test case in Spark 2, but works fine in Spark 1.
Here is my test case:
The data is about 1.3G avro file, with the following schema, and we use 12 executors and 4g executor memory for both Spark2 and Spark1
{
"namespace" : "com.xxx",
"type" : "record",
"name" : "Lists",
"fields" : [
{"name" : "account_id", "type" : "long"},
{"name" : "list_id", "type" : "string"},
{"name" : "sequence_id", "type" : ["null", "int"]} ,
{"name" : "name", "type" : ["null", "string"]},
{"name" : "state", "type" : ["null", "string"]},
{"name" : "description", "type" : ["null", "string"]},
{"name" : "dynamic_filtered_list", "type" : ["null", "int"]},
{"name" : "filter_criteria", "type" : ["null", "string"]},
{"name" : "created_at", "type" : ["null", "long"]},
{"name" : "updated_at", "type" : ["null", "long"]},
{"name" : "deleted_at", "type" : ["null", "long"]},
{"name" : "favorite", "type" : ["null", "int"]},
{"name" : "delta", "type" : ["null", "boolean"]},
{
"name" : "list_memberships", "type" : {
"type" : "array", "items" : {
"name" : "ListMembership", "type" : "record",
"fields" : [
{"name" : "channel_id", "type" : "string"},
{"name" : "created_at", "type" : ["null", "long"]},
{"name" : "created_source", "type" : ["null", "string"]},
{"name" : "deleted_at", "type" : ["null", "long"]},
{"name" : "sequence_id", "type" : ["null", "int"]}
]
}
}
}
]
}
On Spark 2, (I tested with spark-avro_2.11-4.0.0.jar and spark-avro_2.11-3.2.0.jar with Spark 2.2.0, both have the issue for the following simple case:)
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.types.StringType
import com.databricks.spark.avro._
val rawlists = spark.read.avro("/data/lists")
val lists = rawlists.withColumn("partitioner", $"account_id".cast(StringType).substr(-2,2))
lists.write.partitionBy("partitioner").mode(SaveMode.Overwrite).parquet("/pdata/lists")
It took 4.5 hours to finish on Spark2, and the executors have no status update for hours (only input size grow very slowly)
The following code run for about 15 minutes in Spark1.6.3 + spark-avro_2.10-2.0.1.jar
val rawlists = sqlContext.read.avro("/data/lists")
val lists = rawlists.withColumn("partitioner", $"account_id".cast(StringType).substr(-2,2))
lists.write.partitionBy("partitioner").mode(SaveMode.Overwrite).parquet("/pdata/lists")
I am not sure where is the problem, but in Spark2, for other avro file (different dataset/schema), performance is fine. I think the problem is the spark-avro library reading this particular kind of avro files, as it is very slow during the scan/read stage. Any idea?
Thanks
The text was updated successfully, but these errors were encountered:
Our case is a little different. We have 2 datasets, both in AVRO, and both with array of nested struct in the schema.
But querying the datasets themselves look fine for us, like df.count. But in the above partitionBy example, it is very slow for one dataset, but fine with another one. And in a more complex query plan in Spark2, we need to explode of array of struct of both datasets, and the stage of scanning one dataset runs fine in Spark2, but the stage of scanning another one is almost hanging forever.
Will some kind of array of struct cause this issue? And the performance difference between these 2 datasets will reach 100x. Will the #267 issue can cause such huge difference?
Sign up for freeto subscribe to this conversation on GitHub.
Already have an account?
Sign in.
We have some jobs that input source is AVRO files, run very slow in Spark 2, but works fine in Spark 1. The slow behavior looks like Spark2 AVRO is almost hang when reading the avro files. From the Spark UI, 100 executors only read about 2G input data in 1 hour. The issue is that this slowness only happened in one particular AVRO files. Other schema avro files read very fast. And this happens only in Spark 2.
While we have this issue on production, I also happen to find out a simple performance issue using this particular avro schema data in a simple test case in Spark 2, but works fine in Spark 1.
Here is my test case:
The data is about 1.3G avro file, with the following schema, and we use 12 executors and 4g executor memory for both Spark2 and Spark1
{
"namespace" : "com.xxx",
"type" : "record",
"name" : "Lists",
"fields" : [
{"name" : "account_id", "type" : "long"},
{"name" : "list_id", "type" : "string"},
{"name" : "sequence_id", "type" : ["null", "int"]} ,
{"name" : "name", "type" : ["null", "string"]},
{"name" : "state", "type" : ["null", "string"]},
{"name" : "description", "type" : ["null", "string"]},
{"name" : "dynamic_filtered_list", "type" : ["null", "int"]},
{"name" : "filter_criteria", "type" : ["null", "string"]},
{"name" : "created_at", "type" : ["null", "long"]},
{"name" : "updated_at", "type" : ["null", "long"]},
{"name" : "deleted_at", "type" : ["null", "long"]},
{"name" : "favorite", "type" : ["null", "int"]},
{"name" : "delta", "type" : ["null", "boolean"]},
{
"name" : "list_memberships", "type" : {
"type" : "array", "items" : {
"name" : "ListMembership", "type" : "record",
"fields" : [
{"name" : "channel_id", "type" : "string"},
{"name" : "created_at", "type" : ["null", "long"]},
{"name" : "created_source", "type" : ["null", "string"]},
{"name" : "deleted_at", "type" : ["null", "long"]},
{"name" : "sequence_id", "type" : ["null", "int"]}
]
}
}
}
]
}
On Spark 2, (I tested with spark-avro_2.11-4.0.0.jar and spark-avro_2.11-3.2.0.jar with Spark 2.2.0, both have the issue for the following simple case:)
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.types.StringType
import com.databricks.spark.avro._
val rawlists = spark.read.avro("/data/lists")
val lists = rawlists.withColumn("partitioner", $"account_id".cast(StringType).substr(-2,2))
lists.write.partitionBy("partitioner").mode(SaveMode.Overwrite).parquet("/pdata/lists")
It took 4.5 hours to finish on Spark2, and the executors have no status update for hours (only input size grow very slowly)
The following code run for about 15 minutes in Spark1.6.3 + spark-avro_2.10-2.0.1.jar
val rawlists = sqlContext.read.avro("/data/lists")
val lists = rawlists.withColumn("partitioner", $"account_id".cast(StringType).substr(-2,2))
lists.write.partitionBy("partitioner").mode(SaveMode.Overwrite).parquet("/pdata/lists")
I am not sure where is the problem, but in Spark2, for other avro file (different dataset/schema), performance is fine. I think the problem is the spark-avro library reading this particular kind of avro files, as it is very slow during the scan/read stage. Any idea?
Thanks
The text was updated successfully, but these errors were encountered: