Spark Learning Note Part 3

  • Pair RDD 是什么及其常见应用 ?

键值对RDD, 通常用于聚合操作。

  • 创建Pair RDD

使用第一个单词作为键,行内容作为值,创建一个Pair RDD。


# pairs = lines.map(lambda line: (line.split(' ')[0], line))

  • 常见Pair RDD的转化操作,以{(1,2), (3,4), (3,6)}为例

(1) reduceByKey(func) [累加具有相同键的值]

{(1,2), (3,10)}

 

(2) groupByKey() [将具有相同键的值进行分组]

{(1, [2]), (3, [4,6])}

 

(3) mapValues(func) [接收一个函数,不改变键]

mapValues(x => x+1)
{(1,3), (3,5), (3,7)}

 

(4) keys() [返回仅包含键的RDD]

{1,3,3}

 

(5) values() [返回仅包含值的RDD]

{2, 4, 6}

 

(6) sortBykey() [返回一个根据键排序的RDD]

{(1,2), (3,4), (3,6)}

 

  • 多个Pair RDD的操作,以{RDD = (1,2), (3,4), (3,6)}, other = {(3,9)}为例

(1) substractByKey() [删除RDD中键与other相同的元素]


# RDD.substractByKey(other)

{(1,2)}

 

(2) join() [对两个RDD进行内连接]


# RDD.join(other)

{(3, (4, 9)), (3, (6, 9))}

 

(3) rightOuterJoin() [对两个RDD进行内连接, 右边RDD的键必须存在]


# RDD.rightOuterJoin(other)

{(3,(4,9)), (3,(6,9))}

 

(4) leftOuterJoin() [对两个RDD进行内连接, 左边RDD的键必须存在]


# RDD.lefttOuterJoin(other)

{(1, (2, None)), (3,(4,9)), (3,(6,9))}

 

数据分区

在分布式程序中,通信的代价是很大的,因而控制数据分布以获得最少的网络传输可以极大地提升整体性能。

Spark中所有的键值对RDD都可以进行分区。系统会根据一个针对键的函数对元素进行分组,Spark可以确保同一组的键出现在同一个节点上。比如,哈希分区将一个RDD分成了100个分区,此时键的哈希值对100取模的结果相同的记录会被放在一个节点上。

分区实例

  • RDD1[userData] (userId, userInfo) 数据量很大
  • RDD2[events] (userId, linkInfo) 数据量较小
  • 根据userId将RDD1, RDD2进行join操作

// userData中的元素会根据它们被读取时的来源,即HDFS块所在的节点来分布,SparkCore此时无法获知某个特定的UserID对应的记录位于哪个节点上。
val sc = new SparkContext(...)
val userData = sc.sequenceFile[userId, userInfo]("hdfs://...").persist()

def processNewLogs(logFileName: String) {
val events = sc.sequenceFile[userId, linkInfo](logFileName)
val joined = userData.join(events)
...
}

 

join-shuffle

以上这段代码的效率不高,因为在每次调用processNewLogs()时都会用到join()操作,默认情况下,连接操作会将两个数据集的所有键的哈希值都求出来,将该哈希值相同的记录通过网络传到同一台机器上,在那台机器上对所有键相同的记录进行连接操作。在每次调用processNewLogs时都对RDD1进行哈希值计算和跨节点数据混洗,要注意的是这部分数据及数据的节点分布并不会因为重复计算而改变节点分布。
那么我们可以想一下,如何可以保证只要一次哈希计算,并且可以避免shuffle的过程 ?

解决方案: 在程序开始时,对RDD1使用partitionBy()转化操作,依据HashPartitioner对象哈希分区。


val sc = new SparkContext(...)
val userData = sc.sequenceFile[userId, userInfo]("hdfs://...")
.partitionBy(new HashPartitioner(100))
.persist()

join-after-partitionby-shuffle

我们可以这样理解这个过程:

我们在获取userData对象的时候就告知spark core这批数据是有分区概念的,且是按某个标准进行分区的,所以后面再有shuffle的过程时,不用对我重新shuffle了,让更小的数据集shuffle到我所在的分区就可以了。

分区之后,如果使用persist(),可以避免后续的RDD操作对partitioned谱系重新求值,避免重复的分区操作。

 

可以从分区中获益的操作

  • cogroup()
  • groupWith()
  • join()
  • leftOuterJoin()
  • rightOuterJoin()
  • groupByKey()
  • reduceByKey()
  • combineByKey()
  • lookup()

 

影响分区方式的操作

Spark core 可以获知各种操作如何影响分区,对这类数据的分区集中点自动设置分区器。

比如,在调用join()连接两个RDD时,键相同的元素被哈希到同一台机器上,Spark Core或知道该操作join是有分区概念的,那么再基于此分区做reduceByKey()就会快很多。

有一些transformation会破坏分区,比如join()之后,使用了map()函数。(替代的可以考虑引入mapValues(), flatMapValues())

对于二元操作,输出数据的分区方式取决于父RDD的分区方式。默认情况下,结果会采用哈希分区,分区的数量和操作的并行度一致,若其中一个父RDD已经设置分区方式,就会采用那种分区方式。如果两个父RDD都设置了分区方式,就采用第一个父RDD的分区方式。

关于以上,我们在开发中可以尽量形成分区传承的意识,即不要破坏了已经形成的分区,例如我们已经使用了join(), 其结果已经是携带分区的数据集,我们要利用好这个既定的分区,避免不必要的transformation对分区的损耗。

 

自定义分区方式

Spark提供的HashPartitioner与RangePartitioner已经能够满足大多数用例,仍然可以通过自定义的Partitioner对象来控制RDD的分区方式。

 

Refer to:

Learning Spark: Lightning-fast Data Analysis

 

 

 

 

 

 

 

 

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>