Broadcasting a variable is useful for repeatedly used
read-only variables in an application, like large
lookup tables or similar. Spark automatically sends all
variables referenced in your closures to the worker nodes. While
this is convenient, it can also be inefficient because (1) the
default task launching mechanism is optimized for small task
sizes, and (2) you might, in fact, use the same variable in
multiple parallel operations, but Spark will send it separately
for each operation [1, page 104]. A broadcast variable in is an
object of type spark.broadcast.Broadcast[T]
, it
wraps a Serializable value of type T
. This value
can be accessed by calling the value
property. The
variable will be sent to each node once and should be treated as
read-only (updates will not be propagated to other nodes)[1,
page 104-106]. If you broadcast the variable it will be
distributed efficiently once per node.
Example by [2]: If you have huge array that is accessed from Spark Closures, for example some reference data, this array will be shipped to each spark node with closure. If you for example if you have 10 nodes cluster with 100 partitions (10 partitions per node), this Array will be distributed at least 100 times (10 times to each node).
val array: Array[Int] = ??? // some huge array
val broadcasted = sc.broadcast(array)
And some RDD
val rdd: RDD[Int] = ???
In this case array will be shipped with closure each time
.map(i => array.contains(i)) rdd
and with broadcast you’ll get huge performance benefit
.map(i => broadcasted.value.contains(i)) rdd
In below assume df
is a
pyspark.sql.DataFrame
the broadcast variable can be
used in a udf.
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Row
from sklearn.neighbors import NearestNeighbors
from pyspark import SparkContext
= SparkContext.getOrCreate()
sc
= [[0, 0, 2], [1, 0, 0], [0, 0, 1]]
training = [[0, 2, 2], [1, 0, 5], [1, 0, 1]]
validation
= NearestNeighbors(2, 0.4)
neigh
neigh.fit(training)= sc.broadcast(neigh)
bc_neigh
def kneighbors(x):
"""Returns the two closest neighbours.
"""
return list(bc_neigh.value.kneighbors(x, 2, return_distance=False))
= F.udf(kneighbors, T.ArrayType(T.DoubleType))
udf_kneighbors = (
df
sc=x) for x in validation])
.parallelize([Row(x
.toDF()
.withColumn("knn",
udf_kneighbors(x)
) )
In this small example we fit a NN model to the training data on the master node and then broadcast it out to each worker node to find the nearest neighbours for the samples in validation.
[1] Holden Karau, Andy Konwinski, Patrick Wendell, Matei Zaharia, Learning Spark: Lightning-Fast Big Data Analysis. O’Reilly Media, 2015.
[2] Ramana, What are broadcast variables? What problems do they solve?. Stackoverflow question 2014
[3] Umberto Griffo, When to use Broadcast variable. Blog Post.
Feel free to comment here below. A Github account is required.