While Kryo serializer can handle basic Scala Maps
:
import org.apache.spark.{SparkContext, SparkConf}
val aMap = Map[String, Long]("foo" -> 1).withDefaultValue(0L)
aMap("bar")
// Long = 0
val conf = new SparkConf()
.setAppName("bar")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(Seq(aMap))
rdd.map(_("foo")).first
// Long = 1
unfortunately it doesn't serialize default arguments:
rdd.map(_("bar")).first
// 16/09/30 15:20:39 ERROR Executor: Exception in task 2.0 in stage 2.0 (TID 7)
// java.util.NoSuchElementException: key not found: bar
// at scala.collection.MapLike$class.default(MapLike.scala:228)
// ...
As pointed out by Jakob Odersky this happens due to optimizations introduced in the Chill library.
In general to ensure correct behavior it is better to use safe methods (get
, getOrElse
) instead of depending on defaults (withDefault
, withDefaultValue
).
In general PySpark provides 2-tier serialization mechanism defined by actual serialization engine, like PickleSerializer
, and batching mechanism. An additional specialized serializer is used to serialize closures. It is important to note that Python classes cannot be serialized. It means that modules containing class definitions have to be accessible on every machine in the cluster.
PickleSerializer
is a default serialization engine which wraps Python pickle
. As already mentioned it is a default serialization engine used to serialize data in PySpark.
There is a subtle difference in pickle
imports between Python 2 and Python 3. While the former one imports cPickle
directly, the latter one depends on built-in fallback mechanism. It means that in Python 3 it is possible, although highly unlikely, that PySpark will use pure Python implementation. See also What difference between pickle and _pickle in python 3?.
MarshalSerializer
is a wrapper around marhsal
module which implements internal Python serialization mechanism. While in some cases it can be slightly faster than a default pickle
it is significantly more limited and performance gain is probably to limited to make it useful in practice.
CloudPickle
is an extended version of the original cloudpickle
project. It is used in PySpark to serialize closures. Among other interesting properties it can serialize functions defined in the __main__
module. See for example Passing Python functions as objects to Spark.
AutoBatchedSerializer
is used to serialize a stream of objects in batches of size which is dynamically adjusted on runtime to keep the size of an individual batch in bytes close to the bestSize
parameter. By default it is 65536 bytes. The general algorithm can be described as follows:
- Set
batchSize
to 1 - While input stream is not empty
- read
batchSize
elements and serialize, - compute
size
in bytes of the serialized batch,- if
size
is less thanbestSize
setbatchSize
tobatchSize
* 2 - else if
size
is greater thanbestSize
setbatchSize
tobatchSize
/ 2 - else keep current value
- if
BatchedSerializer
is used to serialize a stream of objects using either fixed size batches or unlimited batches.
By default PySpark uses AutoBatchedSerializer
with PickleSerializer
. Global serialization mechanism can be configured during SparkContext
initialization and provides two configurable properties - batchSize
and serializer
.
-
batchsize
argument is used to configure batching strategy and takes an intger which has following interpretation:0
-AutoBatchedSerializer
.-1
-BatchedSerializer
with unlimited batch size.- positive integer -
BatchedSerializer
with fixed batch size.
-
serializer
accepts an instance of apyspark.serializer
for example:from pyspark import SparkContext from pyspark.serializers import MarshalSerializer sc = SparkContext(serializer=MarshalSerializer())
In Spark 2.0.0+, you will need to create SparkContext
before the SparkSession
if you use the session builder, otherwise pass it explicitly to SparkSession
constructor.
It is also possible, although for obvious reasons not recommended, to use internal API to modify serialization mechanism for specific RDD:
from pyspark import SparkContext
from pyspark.serializers import BatchedSerializer, MarshalSerializer
sc = SparkContext()
rdd = sc.parallelize(range(10))
rdd._reserialize(BatchedSerializer(MarshalSerializer(), 1))