Learn about aggregate func in spark

Aggregate the elements of each partition, and then the results for all the partitions, using a given combine functions and a neutral “zero value”.


aggregate(zeroValue, seqOp, combOp)

Parameters

  • `zeroValue`: The initialization value in the desired format.
  • `seqOp`: The operation to apply to RDD records. Runs once for every record in a partition.
  • `combOp`: Defines how the resulted objects get combined.

The first function `seqOp` can return a different result type U. Thus, we need one operation for merging a T into an U and one operation for merging two U.


>>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
>>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
>>> sc.parallelize([1, 2, 3, 4], 2).aggregate((0, 0), seqOp, combOp)
(10, 4)
>>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp)
(0, 0)

For first partition:

`x` gets initialized to the `zeroValue` parameter with (0, 0) and `y` is the first element of the list as 1, what happens:


0 + 1 = 1
0 + 1 = 1

`x` gets updated from (0, 0) to (1, 1), next:


1 + 2 = 3
1 + 1 = 2

and now the local result is (3, 2), which will be the final result from the first partition.

For 2nd partition, we get (7, 2).

Now we apply the combOp to each local result, so that we can form like:


(3, 2) + (7, 2) = (10, 4)

Refer to:

1. https://spark.apache.org/docs/1.2.0/api/python/pyspark.html?highlight=aggregate#pyspark.RDD.aggregate
2. https://stackoverflow.com/questions/28240706/explain-the-aggregate-functionality-in-spark/38949457

 

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>