Skip to content
This repository has been archived by the owner on Dec 20, 2018. It is now read-only.

Spark 2.2 + spark-avro_2.11-4.0.0.jar is very slow for certain job comparing Spark 1.6.3 + spark-avro_2.10-2.0.1.jar #280

Open
java8964 opened this issue May 9, 2018 · 2 comments

Comments

@java8964
Copy link

java8964 commented May 9, 2018

We have some jobs that input source is AVRO files, run very slow in Spark 2, but works fine in Spark 1. The slow behavior looks like Spark2 AVRO is almost hang when reading the avro files. From the Spark UI, 100 executors only read about 2G input data in 1 hour. The issue is that this slowness only happened in one particular AVRO files. Other schema avro files read very fast. And this happens only in Spark 2.

While we have this issue on production, I also happen to find out a simple performance issue using this particular avro schema data in a simple test case in Spark 2, but works fine in Spark 1.

Here is my test case:
The data is about 1.3G avro file, with the following schema, and we use 12 executors and 4g executor memory for both Spark2 and Spark1
{
"namespace" : "com.xxx",
"type" : "record",
"name" : "Lists",
"fields" : [
{"name" : "account_id", "type" : "long"},
{"name" : "list_id", "type" : "string"},
{"name" : "sequence_id", "type" : ["null", "int"]} ,
{"name" : "name", "type" : ["null", "string"]},
{"name" : "state", "type" : ["null", "string"]},
{"name" : "description", "type" : ["null", "string"]},
{"name" : "dynamic_filtered_list", "type" : ["null", "int"]},
{"name" : "filter_criteria", "type" : ["null", "string"]},
{"name" : "created_at", "type" : ["null", "long"]},
{"name" : "updated_at", "type" : ["null", "long"]},
{"name" : "deleted_at", "type" : ["null", "long"]},
{"name" : "favorite", "type" : ["null", "int"]},
{"name" : "delta", "type" : ["null", "boolean"]},
{
"name" : "list_memberships", "type" : {
"type" : "array", "items" : {
"name" : "ListMembership", "type" : "record",
"fields" : [
{"name" : "channel_id", "type" : "string"},
{"name" : "created_at", "type" : ["null", "long"]},
{"name" : "created_source", "type" : ["null", "string"]},
{"name" : "deleted_at", "type" : ["null", "long"]},
{"name" : "sequence_id", "type" : ["null", "int"]}
]
}
}
}
]
}
On Spark 2, (I tested with spark-avro_2.11-4.0.0.jar and spark-avro_2.11-3.2.0.jar with Spark 2.2.0, both have the issue for the following simple case:)

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.types.StringType
import com.databricks.spark.avro._
val rawlists = spark.read.avro("/data/lists")
val lists = rawlists.withColumn("partitioner", $"account_id".cast(StringType).substr(-2,2))
lists.write.partitionBy("partitioner").mode(SaveMode.Overwrite).parquet("/pdata/lists")

It took 4.5 hours to finish on Spark2, and the executors have no status update for hours (only input size grow very slowly)

The following code run for about 15 minutes in Spark1.6.3 + spark-avro_2.10-2.0.1.jar
val rawlists = sqlContext.read.avro("/data/lists")
val lists = rawlists.withColumn("partitioner", $"account_id".cast(StringType).substr(-2,2))
lists.write.partitionBy("partitioner").mode(SaveMode.Overwrite).parquet("/pdata/lists")

I am not sure where is the problem, but in Spark2, for other avro file (different dataset/schema), performance is fine. I think the problem is the spark-avro library reading this particular kind of avro files, as it is very slow during the scan/read stage. Any idea?

Thanks

@arthurdk
Copy link

Yes it was explained in the issue I open for the same reason

#267

@java8964
Copy link
Author

@arthurdk Thanks for the information.

Our case is a little different. We have 2 datasets, both in AVRO, and both with array of nested struct in the schema.

But querying the datasets themselves look fine for us, like df.count. But in the above partitionBy example, it is very slow for one dataset, but fine with another one. And in a more complex query plan in Spark2, we need to explode of array of struct of both datasets, and the stage of scanning one dataset runs fine in Spark2, but the stage of scanning another one is almost hanging forever.

Will some kind of array of struct cause this issue? And the performance difference between these 2 datasets will reach 100x. Will the #267 issue can cause such huge difference?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants