Spark常用函数讲解之Action操作
在进行 Spark 编程时,常常需要使用到 Action 操作。Action 操作是触发 Spark 作业执行的操作,当执行 Action 操作时,Spark 会根据依赖关系生成一个任务 DAG,然后对任务 DAG 进行调度,执行具体的计算,并返回结果。本文基于 Spark 2.4.0 版本,对 Spark 常用的 Action 操作进行了讲解。
collect
collect 操作会将 RDD 的所有分区数据拉取到 Spark Driver 中,数据类型为一个数组,返回值类型为 Array。该操作适用于 RDD 数据量较小的情况,否则容易导致 Driver 内存溢出。
例子:
val data = sc.parallelize(Seq(1, 2, 3, 4, 5))
val res = data.collect()
res.foreach(println)
输出结果:
1
2
3
4
5
count
count 操作会统计 RDD 中元素的个数,并返回一个 Long 类型的结果。
例子:
val data = sc.parallelize(Seq(1, 2, 3, 4, 5))
val res = data.count()
println(res)
输出结果:
5
first
first 操作会返回 RDD 中第一个元素,相当于 take(1) 操作。
例子:
val data = sc.parallelize(Seq(1, 2, 3, 4, 5))
val res = data.first()
println(res)
输出结果:
1
take
take 操作会返回 RDD 中前 n 个元素,返回值类型为 Array。
例子:
val data = sc.parallelize(Seq(1, 2, 3, 4, 5))
val res = data.take(3)
res.foreach(println)
输出结果:
1
2
3
foreach
foreach 操作用于遍历 RDD 中的每一个元素,并对每个元素执行传入的函数,因为该操作是在分布式计算节点上执行的,因此函数需要具备序列化能力。
例子:
val data = sc.parallelize(Seq(1, 2, 3, 4, 5))
data.foreach(println)
输出结果:
1
2
3
4
5
reduce
reduce 操作接收一个函数作为参数,对 RDD 中的元素进行累加操作,返回一个累加后的结果。
例子:
val data = sc.parallelize(Seq(1, 2, 3, 4, 5))
val res = data.reduce(_ + _)
println(res)
输出结果:
15
fold
fold 操作与 reduce 类似,接收一个“零值”作为参数,并将“零值”作为累加器的初始值。该操作对 RDD 中的元素进行累加操作,并返回一个累加后的结果。
例子:
val data = sc.parallelize(Seq(1, 2, 3, 4, 5))
val res = data.fold(0)(_ + _)
println(res)
输出结果:
15
aggregate
aggregate 操作与 fold 类似,接收一个“零值”作为参数,并将“零值”作为累加器的初始值。该操作对 RDD 中的元素进行累加操作,并返回一个累加后的结果。与 fold 不同的是,aggregate 操作接收两个函数作为参数,第一个函数用于将 RDD 中的元素分组,第二个函数用于对每组元素进行累加操作。
例子:
val data = sc.parallelize(Seq(1, 2, 3, 4, 5))
val res = data.aggregate((0, 0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
val avg = res._1 / res._2.toDouble
println(avg)
输出结果:
3.0
max / min
max / min 操作用于求 RDD 中的最大 / 最小值,返回相应的结果。
例子:
val data = sc.parallelize(Seq(1, 2, 3, 4, 5))
val max = data.max()
val min = data.min()
println(max + ", " + min)
输出结果:
5, 1
countByValue
countByValue 操作用于统计 RDD 中每个元素出现的次数,并返回一个 Map 集合。其中 key 为元素值,value 为元素出现的次数。
例子:
val data = sc.parallelize(Seq(1, 2, 1, 3, 4, 4, 5))
val res = data.countByValue()
res.foreach(println)
输出结果:
(4,2)
(1,2)
(5,1)
(2,1)
(3,1)
综上所述,本文介绍了 Spark 常用的 Action 操作,包括 collect、count、first、take、foreach、reduce、fold、aggregate、max / min 和 countByValue 等操作。熟练掌握这些操作能够帮助我们更好的编写 Spark 作业,实现复杂的数据分析和处理任务。