Spark RDD API

为了加深对sparkRDD的理解,写一篇文章讲解下RDD各API的用法。

Actions 解释 简介
map(f: T=>U) RDD[T] => RDD[U] 一对一转换
flatMap(f: T=>Seq[U]) RDD[T] => RDD[U] 一对多转换
filter(f: T=>Bool) RDD[T] => RDD[T] 过滤
disdict() RDD[T] => RDD[T] 去重
sample(fract:Float) RDD[T] => RDD[T] 取样
groupByKey() RDD[(K,V)] => RDD[(K,Seq[V])] 分组
combineByKey() RDD[(K,V)] => RDD[(K,Seq[V])] 合并
reduceByKey(f: (V,V)=>V) RDD[(K,V)] => RDD[(K,V)] 合并
union() (RDD[T],RDD[T]) => RDD[T] 合并俩RDD
join() (RDD[(K,V)],RDD[(K,W)]) => RDD[(K,(V,W))] 俩RDD按KEY合并
cogroup() (RDD[(K,V)],RDD[(K,W)]) => RDD[(K,(Seq[V],Seq[W]))] 合并
mapValue(f: V=>W) RDD[(K,V)] => RDD[(K,W)] 转换Value
sort(c:Comparator[K]) RDD[(K,V)] => RDD[(K,V)] 排序
cartesian() (RDD[(K,V)],RDD[(X,W)]) => RDD[((K,X),(V,W))] 笛卡尔积
count() RDD[(K,V)] => Long 统计长度
collect() RDD[(K,V)] => Seq[T] 执行计算
reduce(f:(T,T)=>T) RDD[T] => T 聚合
lookup(k:K) RDD[(K,V)] => Seq[V] 查找
aggregate() RDD[T]=>RDD[U] 聚合
fold() RDD[T]=>RDD[U] 聚合
zip() (RDD[V],RDD[W]) => RDD[(V,W)] 集合依次合并
zipWithIndex() RDD[T] => RDD[(T,Long)] 生成索引
zipWithUniqueId() RDD[T] => RDD[(T,Long)] 生成唯一ID
intersection() RDD[T] => RDD[T] 计算交集
subtract() RDD[T] => RDD[T] 计算余集
coalesce()/repartition() RDD[T] => RDD[T] 重新分区

各个方法的实例如下

map

map执行数据集的转换操作,将RDD中每个元素执行f函数生成新的数据

1
def map[U: ClassTag](f: T => U): RDD[U]

1
2
3
4
5
val arr = Array((1,"a"),(2,"b"),(3,"c"),(4,"d"),(5,"e"),(6,"f"),(7,"g"))
val rdd = sc.parallelize(arr).map(f=>("A"+f._1*10,f._2+"#"))

println(rdd.collect().mkString(","))
//(A10,a#),(A20,b#),(A30,c#),(A40,d#),(A50,e#),(A60,f#),(A70,g#)

flatMap

作用同map,不过一个元素可能生成多个结果数据。

1
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

1
2
3
4
5
val arr = Array("1#2#3","4#5","6")
val rdd = sc.parallelize(arr).flatMap(f=>f.split("#"))

println(rdd.collect().mkString(","))
//1,2,3,4,5,6

filter

过滤操作,返回函数f为true的数据

1
def filter(f: T => Boolean): RDD[T]

1
2
3
4
5
val arr = Array("1#2#3","4#5","6")
val rdd = sc.parallelize(arr).filter(f=>f.length>=3)

println(rdd.collect().mkString(","))
//1#2#3,4#5

distinct

去重操作。

1
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

1
2
3
4
5
val arr = Array(1,2,3,2,3,4,5,4,6)
val rdd = sc.parallelize(arr).distinct()

println(rdd.collect().mkString(","))
//4,1,6,3,5,2

sample

随机取样本。

1
2
3
4
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]

1
2
3
4
5
6
7
8
9
val arr = 1 to 20
val rdd = sc.parallelize(arr,3)
val a = rdd.sample(true,0.5,10)
val b = rdd.sample(false,0.5,10)

println("a:"+a.collect().mkString(","))
println("b:"+b.collect().mkString(","))
//a:2,7,11,12,12,15,15,18
//b:1,4,7,8,10,11,15,16,17,18,20

第一个参数如果为true,可能会有重复的元素,如果为false,不会有重复的元素;
第二个参数取值为[0,1],最后的数据个数大约等于第二个参数乘总数;
第三个参数为随机因子。

groupByKey

相同key值集合到一起

1
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

1
2
3
4
5
6
val arr = Array((1,"a"),(2,"b"),(2,"c"),(1,"b"),(3,"c"),(4,"d"),(2,"d"))
val rdd = sc.parallelize(arr,3)
val a = rdd.groupByKey()

println(a.collect().mkString(","))
//(3,CompactBuffer(c)),(4,CompactBuffer(d)),(1,CompactBuffer(a, b)),(2,CompactBuffer(b, c, d))

combineByKey

对相同的key值的元素执行处理和聚合操作

1
2
3
4
5
6
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)]

1
2
3
4
5
6
7
8
val arr = Array((1,"a"),(2,"c"),(2,"b"),(1,"b"),(3,"c"),(4,"d"),(2,"d"))
val rdd = sc.parallelize(arr,3)
val a = rdd.combineByKey(f=>new StringBuffer(f),
(a:StringBuffer,b:String)=>(a.append(b)),
(c:StringBuffer,d:StringBuffer)=>(c.append(d)))

println(a.collect().mkString(","))
//(3,c),(4,d),(1,ab),(2,cbd)

combineByKey三个参数
createCombiner: V => C,相同key的第一个元素的进入的时候调用
mergeValue: (C, V) => C,相同key的第N个元素(N$\geq$2)进入的时候与第一个函数结果进行合并的函数
mergeCombiners: (C, C) => C,相同key不同分区生成的第二个函数结果的合并函数

reduceByKey

相同的key执行reduce操作

1
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

1
2
3
4
5
6
val arr = Array((1,"a"),(2,"c"),(2,"b"),(1,"b"),(3,"c"),(4,"d"),(2,"d"))
val rdd = sc.parallelize(arr,3)
val a = rdd.reduceByKey((a,b)=>a+"|"+b)

println(a.collect().mkString(","))
//(3,c),(4,d),(1,a|b),(2,c|b|d)

union

两个RDD聚合

1
def union(other: RDD[T]): RDD[T]

1
2
3
4
5
6
7
8
val arr = Array((1,"a"),(2,"c"),(2,"b"))
val arr2 = Array((1,"b"),(3,"c"),(4,"d"),(2,"d"))
val rdd1 = sc.parallelize(arr,3)
val rdd2 = sc.parallelize(arr2)
val a = rdd1.union(rdd2)

println(a.collect().mkString(","))
//(1,a),(2,c),(2,b),(1,b),(3,c),(4,d),(2,d)

join

两个RDD以key聚合
join 包含 join , fullOuterJoin , leftOuterJoin , rightOuterJoin 等多个方法,用法类似于SQL操作。

1
2
3
4
5
6
7
8
9
10
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def fullOuterJoin[W](
other: RDD[(K, W)],
numPartitions: Int): RDD[(K, (Option[V], Option[W]))]
def leftOuterJoin[W](
other: RDD[(K, W)],
numPartitions: Int): RDD[(K, (V, Option[W]))]
def rightOuterJoin[W](
other: RDD[(K, W)],
numPartitions: Int): RDD[(K, (Option[V], W))]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
val arr = Array((1,"a"),(2,"c"),(2,"b"),(5,"g"))
val arr2 = Array((1,"B"),(3,"C"),(4,"D"),(2,"D"),(2,"E"))
val rdd1 = sc.parallelize(arr,3)
val rdd2 = sc.parallelize(arr2)

val a = rdd1.join(rdd2)
val b = rdd1.fullOuterJoin(rdd2)
val c = rdd1.leftOuterJoin(rdd2)
val d = rdd1.rightOuterJoin(rdd2)

println("join:"+a.collect().mkString(","))
println("fullOuterJoin:"+b.collect().mkString(","))
println("leftOuterJoin:"+c.collect().mkString(","))
println("rightOuterJoin:"+d.collect().mkString(","))
//join:(1,(a,B)),(2,(c,D)),(2,(c,E)),(2,(b,D)),(2,(b,E))

//fullOuterJoin:(3,(None,Some(C))),(4,(None,Some(D))),(1,(Some(a),Some(B))),(5,(Some(g),None)),(2,(Some(c),Some(D))),(2,(Some(c),Some(E))),(2,(Some(b),Some(D))),(2,(Some(b),Some(E)))

//leftOuterJoin:(1,(a,Some(B))),(5,(g,None)),(2,(c,Some(D))),(2,(c,Some(E))),(2,(b,Some(D))),(2,(b,Some(E)))

//rightOuterJoin:(3,(None,C)),(4,(None,D)),(1,(Some(a),B)),(2,(Some(c),D)),(2,(Some(c),E)),(2,(Some(b),D)),(2,(Some(b),E))

cogroup

多个RDD聚合

1
2
3
4
5
def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
other2: RDD[(K, W2)],
other3: RDD[(K, W3)],
partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]

1
2
3
4
5
6
7
8
9
10
11
val arr = Array((1,"a"),(2,"c"))
val arr2 = Array((1,"B"),(3,"C"))
val arr3 = Array((1,"C"),(3,"D"))
val rdd1 = sc.parallelize(arr,3)
val rdd2 = sc.parallelize(arr2)
val rdd3 = sc.parallelize(arr3)

val a = rdd1.cogroup(rdd2,rdd3)//支持1-3个RDD参数

println(a.collect().mkString(","))
//(3,(CompactBuffer(),CompactBuffer(C),CompactBuffer(D))),(1,(CompactBuffer(a),CompactBuffer(B),CompactBuffer(C))),(2,(CompactBuffer(c),CompactBuffer(),CompactBuffer()))

demo结果显示,如果key不匹配会默认给一个空值

mapValue

对key,value结构数据(二元组)的value执行map

1
def mapValues[U](f: V => U): RDD[(K, U)]

1
2
3
4
5
6
val arr = Array((1,"a"),(2,"c"),(2,"b"),(5,"g"))
val rdd1 = sc.parallelize(arr,3)
val a = rdd1.mapValues(f=>f.toUpperCase())

println(a.collect().mkString(","))
//(1,A),(2,C),(2,B),(5,G)

sort

RDD排序,sort 提供 sortBy 和sortByKey 等多个方法。

1
2
3
4
5
6
7
8
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
def sortByKey(ascending: Boolean = true,
numPartitions: Int = self.partitions.length)
: RDD[(K, V)]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val arr = Array((1,"a"),(2,"c"),(3,"b"),(5,"g"))
val rdd1 = sc.parallelize(arr,3)
val a = rdd1.sortByKey(true)
val b = rdd1.sortByKey(false)
val c = rdd1.sortBy(f=>f._2,true)
val d = rdd1.sortBy(f=>f._2,false)

println(a.collect().mkString(","))
println(b.collect().mkString(","))
println(c.collect().mkString(","))
println(d.collect().mkString(","))
//(1,a),(2,c),(3,b),(5,g)
//(5,g),(3,b),(2,c),(1,a)
//(1,a),(3,b),(2,c),(5,g)
//(5,g),(2,c),(3,b),(1,a)

第二个参数为倒序控制

cartesian

cartesian进行笛卡尔积运算

1
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]

1
2
3
4
5
6
7
8
9
10
11
12
13
val arr = Array((1,"a"),(2,"c"))
val arr2 = Array((3,"C"),(4,"D"),(5,"E"))
val rdd1 = sc.parallelize(arr,3)
val rdd2 = sc.parallelize(arr2)
val a = rdd1.cartesian(rdd2)

println(a.collect().mkString("\n"))
//((1,a),(3,C))
//((1,a),(4,D))
//((1,a),(5,E))
//((2,c),(3,C))
//((2,c),(4,D))
//((2,c),(5,E))

count

count 返回RDD的长度

1
def count(): Long

1
2
3
4
5
6
val arr = Array((1,"a"),(2,"c"),(2,"c"))
val rdd1 = sc.parallelize(arr,3)
val a = rdd1.count()

println(a)
//3

collect

collect会触发数据的执行,生成结果集

1
def collect(): Array[T]

reduce

1
def reduce(f: (T, T) => T): T
1
2
3
4
5
6
val arr = Array((1,"a"),(2,"c"),(2,"D"))
val rdd1 = sc.parallelize(arr,3)
val a = rdd1.reduce((a,b)=>(a._1+b._1,a._2+b._2))

println(a)
//(5,acD)

lookup

lookup查询key为入参的所有数据集

1
def lookup(key: K): Seq[V]

1
2
3
4
5
6
val arr = Array((1,"a"),(2,"c"),(2,"D"))
val rdd1 = sc.parallelize(arr,3)
val a = rdd1.lookup(2)

println(a.mkString(","))
//c,D

aggregate

aggregate用户聚合RDD中的元素,先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型,再使用combOp将之前每个分区聚合后的U类型聚合成U类型,特别注意seqOp和combOp都会使用zeroValue的值,zeroValue的类型为U。

1
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U={...}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val a = Array(1,2,3,4,5,6,7,8,9)
val m1 = sc.parallelize(a,3)
m1.foreachPartition(f=>println(f.toList))

val res = m1.aggregate[StringBuffer](new StringBuffer("a"))
((a,b)=>a.append(b),(a,b)=>a.append(b))
val res2 = m1.aggregate(3)((a,b)=>a+b,(a,b)=>a*b)

println(s"res:$res")
println(s"res2:$res2")
//List(1, 2, 3)
//List(4, 5, 6)
//List(7, 8, 9)
//res:aa456a123a789
//res2:13122

三个分区数据分别为(1,2,3),(4,5,6),(7,8,9)
res:第一个aggregate是把所有的数据拼成字符串,运行过程如下:
第一步:每个分区先进行第一个函数运算,zeroValue是“a”,所以依次执行append结果就是a123,a456,a789
第二步:继续执行append,初始值也是a,分区来计算的顺序是随机的(可以通过多运行几次看结果来验证),所有最后的结果是a开头,然后依次拼上第一步的结果。

res2:第二个aggregate是执行乘法运算,运行过程如下:
第一步:
3+1+2+3 = 9
3+4+5+6 = 18
3+7+8+9 = 27
第二步:
$3\times9\times18\times27=13122$

fold

fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数op。

1
def fold(zeroValue: T)(op: (T, T) => T): T

1
2
3
4
5
6
7
8
9
10
11
12
val a = Array(1,2,3,4,5,6,7,8,9)
val m1 = sc.parallelize(a,3)
m1.foreachPartition(f=>println(f.toList))


val res = m1.fold(3)((a,b)=>a+b)

println(s"res:$res")
//List(1, 2, 3)
//List(4, 5, 6)
//List(7, 8, 9)
//res:57

三个分区依次执行加法
3+1+2+3 = 9
3+4+5+6 = 18
3+7+8+9 = 27
最后累加3+9+18+27 = 57

zip

1
2
3
4
5
6
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
将两个RDD拼到一起
def zipWithIndex(): RDD[(T, Long)]
依次给每个数据一个从0开始的索引
def zipWithUniqueId(): RDD[(T, Long)]
依次给每个数据一个不重复的索引
1
2
3
4
5
6
7
8
9
10
11
12
13
14
val a = Array(1,2,3,4,5,6,7,8,9)
val b = Array("a","b","c","d","e","f","g","h","i")
val m1 = sc.parallelize(a,3)
val m2 = sc.parallelize(b,3)

val m3 = m1.zip(m2)
val m4 = m1.zipWithIndex()
val m5 = m1.zipWithUniqueId()
println("m3:"+m3.collect().mkString(","))
println("m4:"+m4.collect().mkString(","))
println("m5:"+m5.collect().mkString(","))
//m3:(1,a),(2,b),(3,c),(4,d),(5,e),(6,f),(7,g),(8,h),(9,i)
//m4:(1,0),(2,1),(3,2),(4,3),(5,4),(6,5),(7,6),(8,7),(9,8)
//m5:(1,0),(2,3),(3,6),(4,1),(5,4),(6,7),(7,2),(8,5),(9,8)

注意:
zip时两个RDD的分区数必须一样,长度也必须一样。

intersection

计算两个RDD的交集

1
def intersection(other: RDD[T]): RDD[T]

1
2
3
4
5
6
7
8
val a = Array(1,2,3,4,5,6,7,8,9)
val b = Array(3,4)
val m1 = sc.parallelize(a,3)
val m2 = sc.parallelize(b,3)

val m3 = m1.intersection(m2)
println("m3:"+m3.collect().mkString(","))
//m3:3,4

注意:两个RDD类型须一致

subtract

计算两个RDD的余集

1
def subtract(other: RDD[T], numPartitions: Int): RDD[T]

1
2
3
4
5
6
7
8
val a = Array(1,2,3,4,5,6,7,8,9)
val b = Array(3,4)
val m1 = sc.parallelize(a,3)
val m2 = sc.parallelize(b,2)

val m3 = m1.subtract(m2)
println("m3:"+m3.collect().mkString(","))
//m3:6,9,1,7,2,5,8

coalesce/repartition

重新分区

1
2
3
4
def coalesce(numPartitions: Int, shuffle: Boolean = false)
(implicit ord: Ordering[T] = null) : RDD[T]
def repartition(numPartitions: Int)
(implicit ord: Ordering[T] = null): RDD[T]

1
2
3
4
5
6
7
8
9
10
val rdd = sc.parallelize(Array(1,2,3,4,5,6),3)
val rdd1 = rdd.coalesce(1,false)
val rdd2 = rdd.repartition(1)

println("rdd:"+rdd.partitions.length)
println("rdd1:"+rdd1.partitions.length)
println("rdd2:"+rdd2.partitions.length)
//rdd:3
//rdd1:1
//rdd2:1

热评文章