为了加深对sparkRDD的理解,写一篇文章讲解下RDD各API的用法。
Actions | 解释 | 简介 |
---|---|---|
map(f: T=>U) | RDD[T] => RDD[U] | 一对一转换 |
flatMap(f: T=>Seq[U]) | RDD[T] => RDD[U] | 一对多转换 |
filter(f: T=>Bool) | RDD[T] => RDD[T] | 过滤 |
disdict() | RDD[T] => RDD[T] | 去重 |
sample(fract:Float) | RDD[T] => RDD[T] | 取样 |
groupByKey() | RDD[(K,V)] => RDD[(K,Seq[V])] | 分组 |
combineByKey() | RDD[(K,V)] => RDD[(K,Seq[V])] | 合并 |
reduceByKey(f: (V,V)=>V) | RDD[(K,V)] => RDD[(K,V)] | 合并 |
union() | (RDD[T],RDD[T]) => RDD[T] | 合并俩RDD |
join() | (RDD[(K,V)],RDD[(K,W)]) => RDD[(K,(V,W))] | 俩RDD按KEY合并 |
cogroup() | (RDD[(K,V)],RDD[(K,W)]) => RDD[(K,(Seq[V],Seq[W]))] | 合并 |
mapValue(f: V=>W) | RDD[(K,V)] => RDD[(K,W)] | 转换Value |
sort(c:Comparator[K]) | RDD[(K,V)] => RDD[(K,V)] | 排序 |
cartesian() | (RDD[(K,V)],RDD[(X,W)]) => RDD[((K,X),(V,W))] | 笛卡尔积 |
count() | RDD[(K,V)] => Long | 统计长度 |
collect() | RDD[(K,V)] => Seq[T] | 执行计算 |
reduce(f:(T,T)=>T) | RDD[T] => T | 聚合 |
lookup(k:K) | RDD[(K,V)] => Seq[V] | 查找 |
aggregate() | RDD[T]=>RDD[U] | 聚合 |
fold() | RDD[T]=>RDD[U] | 聚合 |
zip() | (RDD[V],RDD[W]) => RDD[(V,W)] | 集合依次合并 |
zipWithIndex() | RDD[T] => RDD[(T,Long)] | 生成索引 |
zipWithUniqueId() | RDD[T] => RDD[(T,Long)] | 生成唯一ID |
intersection() | RDD[T] => RDD[T] | 计算交集 |
subtract() | RDD[T] => RDD[T] | 计算余集 |
coalesce()/repartition() | RDD[T] => RDD[T] | 重新分区 |
各个方法的实例如下
map
map执行数据集的转换操作,将RDD中每个元素执行f函数生成新的数据1
def map[U: ClassTag](f: T => U): RDD[U]
1 | val arr = Array((1,"a"),(2,"b"),(3,"c"),(4,"d"),(5,"e"),(6,"f"),(7,"g")) |
flatMap
作用同map,不过一个元素可能生成多个结果数据。1
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
1 | val arr = Array("1#2#3","4#5","6") |
filter
过滤操作,返回函数f为true的数据1
def filter(f: T => Boolean): RDD[T]
1 | val arr = Array("1#2#3","4#5","6") |
distinct
去重操作。1
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
1 | val arr = Array(1,2,3,2,3,4,5,4,6) |
sample
随机取样本。1
2
3
4def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]
1 | val arr = 1 to 20 |
第一个参数如果为true,可能会有重复的元素,如果为false,不会有重复的元素;
第二个参数取值为[0,1],最后的数据个数大约等于第二个参数乘总数;
第三个参数为随机因子。
groupByKey
相同key值集合到一起1
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
1 | val arr = Array((1,"a"),(2,"b"),(2,"c"),(1,"b"),(3,"c"),(4,"d"),(2,"d")) |
combineByKey
对相同的key值的元素执行处理和聚合操作1
2
3
4
5
6def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)]
1 | val arr = Array((1,"a"),(2,"c"),(2,"b"),(1,"b"),(3,"c"),(4,"d"),(2,"d")) |
combineByKey三个参数
createCombiner: V => C,相同key的第一个元素的进入的时候调用
mergeValue: (C, V) => C,相同key的第N个元素(N$\geq$2)进入的时候与第一个函数结果进行合并的函数
mergeCombiners: (C, C) => C,相同key不同分区生成的第二个函数结果的合并函数
reduceByKey
相同的key执行reduce操作1
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
1 | val arr = Array((1,"a"),(2,"c"),(2,"b"),(1,"b"),(3,"c"),(4,"d"),(2,"d")) |
union
两个RDD聚合1
def union(other: RDD[T]): RDD[T]
1 | val arr = Array((1,"a"),(2,"c"),(2,"b")) |
join
两个RDD以key聚合
join 包含 join , fullOuterJoin , leftOuterJoin , rightOuterJoin 等多个方法,用法类似于SQL操作。1
2
3
4
5
6
7
8
9
10def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def fullOuterJoin[W](
other: RDD[(K, W)],
numPartitions: Int): RDD[(K, (Option[V], Option[W]))]
def leftOuterJoin[W](
other: RDD[(K, W)],
numPartitions: Int): RDD[(K, (V, Option[W]))]
def rightOuterJoin[W](
other: RDD[(K, W)],
numPartitions: Int): RDD[(K, (Option[V], W))]
1 | val arr = Array((1,"a"),(2,"c"),(2,"b"),(5,"g")) |
cogroup
多个RDD聚合1
2
3
4
5def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
other2: RDD[(K, W2)],
other3: RDD[(K, W3)],
partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
1 | val arr = Array((1,"a"),(2,"c")) |
demo结果显示,如果key不匹配会默认给一个空值
mapValue
对key,value结构数据(二元组)的value执行map1
def mapValues[U](f: V => U): RDD[(K, U)]
1 | val arr = Array((1,"a"),(2,"c"),(2,"b"),(5,"g")) |
sort
RDD排序,sort 提供 sortBy 和sortByKey 等多个方法。1
2
3
4
5
6
7
8def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
def sortByKey(ascending: Boolean = true,
numPartitions: Int = self.partitions.length)
: RDD[(K, V)]
1 | val arr = Array((1,"a"),(2,"c"),(3,"b"),(5,"g")) |
第二个参数为倒序控制
cartesian
cartesian进行笛卡尔积运算1
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
1 | val arr = Array((1,"a"),(2,"c")) |
count
count 返回RDD的长度1
def count(): Long
1 | val arr = Array((1,"a"),(2,"c"),(2,"c")) |
collect
collect会触发数据的执行,生成结果集1
def collect(): Array[T]
reduce
1 | def reduce(f: (T, T) => T): T |
1 | val arr = Array((1,"a"),(2,"c"),(2,"D")) |
lookup
lookup查询key为入参的所有数据集1
def lookup(key: K): Seq[V]
1 | val arr = Array((1,"a"),(2,"c"),(2,"D")) |
aggregate
aggregate用户聚合RDD中的元素,先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型,再使用combOp将之前每个分区聚合后的U类型聚合成U类型,特别注意seqOp和combOp都会使用zeroValue的值,zeroValue的类型为U。1
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U={...}
1 | val a = Array(1,2,3,4,5,6,7,8,9) |
三个分区数据分别为(1,2,3),(4,5,6),(7,8,9)
res:第一个aggregate是把所有的数据拼成字符串,运行过程如下:
第一步:每个分区先进行第一个函数运算,zeroValue是“a”,所以依次执行append结果就是a123,a456,a789
第二步:继续执行append,初始值也是a,分区来计算的顺序是随机的(可以通过多运行几次看结果来验证),所有最后的结果是a开头,然后依次拼上第一步的结果。res2:第二个aggregate是执行乘法运算,运行过程如下:
第一步:
3+1+2+3 = 9
3+4+5+6 = 18
3+7+8+9 = 27
第二步:
$3\times9\times18\times27=13122$
fold
fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数op。1
def fold(zeroValue: T)(op: (T, T) => T): T
1 | val a = Array(1,2,3,4,5,6,7,8,9) |
三个分区依次执行加法
3+1+2+3 = 9
3+4+5+6 = 18
3+7+8+9 = 27
最后累加3+9+18+27 = 57
zip
1 | def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] |
1 | val a = Array(1,2,3,4,5,6,7,8,9) |
注意:
zip时两个RDD的分区数必须一样,长度也必须一样。
intersection
计算两个RDD的交集1
def intersection(other: RDD[T]): RDD[T]
1 | val a = Array(1,2,3,4,5,6,7,8,9) |
注意:两个RDD类型须一致
subtract
计算两个RDD的余集1
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
1 | val a = Array(1,2,3,4,5,6,7,8,9) |
coalesce/repartition
重新分区1
2
3
4def coalesce(numPartitions: Int, shuffle: Boolean = false)
(implicit ord: Ordering[T] = null) : RDD[T]
def repartition(numPartitions: Int)
(implicit ord: Ordering[T] = null): RDD[T]
1 | val rdd = sc.parallelize(Array(1,2,3,4,5,6),3) |