Spark笔记:RDD基本操作(下)
上一篇里我提到可以把RDD当作一个数组,这样我们在学习spark的API时候很多问题就能很好理解了。上篇文章里的API也都是基于RDD是数组的数据模型而进行操作的。
Spark是一个计算框架,是对mapreduce计算框架的改进,mapreduce计算框架是基于键值对也就是map的形式,之所以使用键值对是人们发现世界上大部分计算都可以使用map这样的简单计算模型进行计算。但是Spark里的计算模型却是数组形式,RDD如何处理Map的数据格式了?本篇文章就主要讲解RDD是如何处理Map的数据格式。
Pair RDD及键值对RDD,Spark里创建Pair RDD也是可以通过两种途径,一种是从内存里读取,一种是从文件读取。
首先是从文件读取,上篇里我们看到使用textFile方法读取文件,读取的文件是按行组织成一个数组,要让其变成map格式就的进行转化,代码如下所示:
/*<br/> * 测试文件数据:<br/> * x01,1,4<br/> x02,11,1<br/> x01,3,9<br/> x01,2,6<br/> x02,18,12<br/> x03,7,9<br/> *<br/> * */<br/> val rddFile:RDD[(String,String)] = sc.textFile("file:///F:/sparkdata01.txt", 1).map { x => (x.split(",")(0),x.split(",")(1) + "," + x.split(",")(2)) }<br/> val rFile:RDD[String] = rddFile.keys<br/> println("=========createPairMap File=========")<br/> println(rFile.collect().mkString(","))// x01,x02,x01,x01,x02,x03<br/> println("=========createPairMap File=========")
我们由此可以看到以读取文件方式构造RDD,我们需要使用map函数进行转化,让其变成map的形式。
下面是通过内存方式进行创建,代码如下:
val rdd:RDD[(String,Int)] = sc.makeRDD(List(("k01",3),("k02",6),("k03",2),("k01",26)))<br/> val r:RDD[(String,Int)] = rdd.reduceByKey((x,y) => x + y)<br/> println("=========createPairMap=========")<br/> println(r.collect().mkString(","))// (k01,29),(k03,2),(k02,6)<br/> println("=========createPairMap=========")
RDD任然是数组形式,只不过数组的元素是(“k01”,3)格式是scala里面特有的Tuple2及二元组,元组可以当作一个集合,这个集合可以是各种不同数据类型组合而成,二元组就是只包含两个元素的元组。
由此可见Pair RDD也是数组,只不过是一个元素为二元组的数组而已,上篇里对RDD的操作也是同样适用于Pair RDD的。
下面是Pair RDD的API讲解,同样我们先说转化操作的API:
reduceByKey:合并具有相同键的值;<br/> groupByKey:对具有相同键的值进行分组;<br/> keys:返回一个仅包含键值的RDD;<br/> values:返回一个仅包含值的RDD;<br/> sortByKey:返回一个根据键值排序的RDD;<br/> flatMapValues:针对Pair RDD中的每个值应用一个返回迭代器的函数,然后对返回的每个元素都生成一个对应原键的键值对记录;<br/> mapValues:对Pair RDD里每一个值应用一个函数,但是不会对键值进行操作;<br/> combineByKey:使用不同的返回类型合并具有相同键的值;<br/> subtractByKey:操作的RDD我们命名为RDD1,参数RDD命名为参数RDD,剔除掉RDD1里和参数RDD中键相同的元素;<br/> join:对两个RDD进行内连接;<br/> rightOuterJoin:对两个RDD进行连接操作,第一个RDD的键必须存在,第二个RDD的键不再第一个RDD里面有那么就会被剔除掉,相同键的值会被合并;<br/> leftOuterJoin:对两个RDD进行连接操作,第二个RDD的键必须存在,第一个RDD的键不再第二个RDD里面有那么就会被剔除掉,相同键的值会被合并;<br/> cogroup:将两个RDD里相同键的数据分组在一起
下面就是行动操作的API了,具体如下:
countByKey:对每个键的元素进行分别计数;<br/> collectAsMap:将结果变成一个map;<br/> lookup:在RDD里使用键值查找数据
接下来我再提提那些不是很常用的RDD操作,具体如下:
转化操作的:
sample:对RDD采样;
行动操作:
take(num):返回RDD里num个元素,随机的;<br/> top(num):返回RDD里最前面的num个元素,这个方法实用性还比较高;<br/> takeSample:从RDD里返回任意一些元素;<br/> sample:对RDD里的数据采样;<br/> takeOrdered:从RDD里按照提供的顺序返回最前面的num个元素
接下来就是示例代码了,如下所示:
package cn.com.sparktest import org.apache.spark.SparkConf<br/> import org.apache.spark.SparkConf<br/> import org.apache.spark.SparkContext<br/> import org.apache.spark.SparkContext._<br/> import org.apache.spark.rdd.RDD<br/> import org.apache.spark.util.collection.CompactBuffer object SparkPairMap { val conf:SparkConf = new SparkConf().setAppName("spark pair map").setMaster("local[2]")<br/> val sc:SparkContext = new SparkContext(conf) /**<br/> * 构建Pair RDD<br/> */<br/> def createPairMap():Unit = {<br/> val rdd:RDD[(String,Int)] = sc.makeRDD(List(("k01",3),("k02",6),("k03",2),("k01",26)))<br/> val r:RDD[(String,Int)] = rdd.reduceByKey((x,y) => x + y)<br/> println("=========createPairMap=========")<br/> println(r.collect().mkString(","))// (k01,29),(k03,2),(k02,6)<br/> println("=========createPairMap=========") /*<br/> * 测试文件数据:<br/> * x01,1,4<br/> x02,11,1<br/> x01,3,9<br/> x01,2,6<br/> x02,18,12<br/> x03,7,9<br/> *<br/> * */<br/> val rddFile:RDD[(String,String)] = sc.textFile("file:///F:/sparkdata01.txt", 1).map { x => (x.split(",")(0),x.split(",")(1) + "," + x.split(",")(2)) }<br/> val rFile:RDD[String] = rddFile.keys<br/> println("=========createPairMap File=========")<br/> println(rFile.collect().mkString(","))// x01,x02,x01,x01,x02,x03<br/> println("=========createPairMap File=========")<br/> } /**<br/> * 关于Pair RDD的转化操作和行动操作<br/> */<br/> def pairMapRDD(path:String):Unit = {<br/> val rdd:RDD[(String,Int)] = sc.makeRDD(List(("k01",3),("k02",6),("k03",2),("k01",26)))<br/> val other:RDD[(String,Int)] = sc.parallelize(List(("k01",29)), 1) // 转化操作<br/> val rddReduce:RDD[(String,Int)] = rdd.reduceByKey((x,y) => x + y)<br/> println("====reduceByKey===:" + rddReduce.collect().mkString(","))// (k01,29),(k03,2),(k02,6)<br/> val rddGroup:RDD[(String,Iterable[Int])] = rdd.groupByKey()<br/> println("====groupByKey===:" + rddGroup.collect().mkString(","))// (k01,CompactBuffer(3, 26)),(k03,CompactBuffer(2)),(k02,CompactBuffer(6))<br/> val rddKeys:RDD[String] = rdd.keys<br/> println("====keys=====:" + rddKeys.collect().mkString(","))// k01,k02,k03,k01<br/> val rddVals:RDD[Int] = rdd.values<br/> println("======values===:" + rddVals.collect().mkString(","))// 3,6,2,26<br/> val rddSortAsc:RDD[(String,Int)] = rdd.sortByKey(true, 1)<br/> val rddSortDes:RDD[(String,Int)] = rdd.sortByKey(false, 1)<br/> println("====rddSortAsc=====:" + rddSortAsc.collect().mkString(","))// (k01,3),(k01,26),(k02,6),(k03,2)<br/> println("======rddSortDes=====:" + rddSortDes.collect().mkString(","))// (k03,2),(k02,6),(k01,3),(k01,26)<br/> val rddFmVal:RDD[(String,Int)] = rdd.flatMapValues { x => List(x + 10) }<br/> println("====flatMapValues===:" + rddFmVal.collect().mkString(","))// (k01,13),(k02,16),(k03,12),(k01,36)<br/> val rddMapVal:RDD[(String,Int)] = rdd.mapValues { x => x + 10 }<br/> println("====mapValues====:" + rddMapVal.collect().mkString(","))// (k01,13),(k02,16),(k03,12),(k01,36)<br/> val rddCombine:RDD[(String,(Int,Int))] = rdd.combineByKey(x => (x,1), (param:(Int,Int),x) => (param._1 + x,param._2 + 1), (p1:(Int,Int),p2:(Int,Int)) => (p1._1 + p2._1,p1._2 + p2._2))<br/> println("====combineByKey====:" + rddCombine.collect().mkString(","))//(k01,(29,2)),(k03,(2,1)),(k02,(6,1))<br/> val rddSubtract:RDD[(String,Int)] = rdd.subtractByKey(other);<br/> println("====subtractByKey====:" + rddSubtract.collect().mkString(","))// (k03,2),(k02,6)<br/> val rddJoin:RDD[(String,(Int,Int))] = rdd.join(other)<br/> println("=====rddJoin====:" + rddJoin.collect().mkString(","))// (k01,(3,29)),(k01,(26,29))<br/> val rddRight:RDD[(String,(Option[Int],Int))] = rdd.rightOuterJoin(other)<br/> println("====rightOuterJoin=====:" + rddRight.collect().mkString(","))// (k01,(Some(3),29)),(k01,(Some(26),29))<br/> val rddLeft:RDD[(String,(Int,Option[Int]))] = rdd.leftOuterJoin(other)<br/> println("=====rddLeft=====:" + rddLeft.collect().mkString(","))// (k01,(3,Some(29))),(k01,(26,Some(29))),(k03,(2,None)),(k02,(6,None))<br/> val rddCogroup: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd.cogroup(other)<br/> println("=====cogroup=====:" + rddCogroup.collect().mkString(","))// (k01,(CompactBuffer(3, 26),CompactBuffer(29))),(k03,(CompactBuffer(2),CompactBuffer())),(k02,(CompactBuffer(6),CompactBuffer())) // 行动操作<br/> val resCountByKey = rdd.countByKey()<br/> println("=====countByKey=====:" + resCountByKey)// Map(k01 -> 2, k03 -> 1, k02 -> 1)<br/> val resColMap = rdd.collectAsMap()<br/> println("=====resColMap=====:" + resColMap)//Map(k02 -> 6, k01 -> 26, k03 -> 2)<br/> val resLookup = rdd.lookup("k01")<br/> println("====lookup===:" + resLookup) // WrappedArray(3, 26)<br/> } /**<br/> * 其他一些不常用的RDD操作<br/> */<br/> def otherRDDOperate(){<br/> val rdd:RDD[(String,Int)] = sc.makeRDD(List(("k01",3),("k02",6),("k03",2),("k01",26))) println("=====first=====:" + rdd.first())//(k01,3)<br/> val resTop = rdd.top(2).map(x => x._1 + ";" + x._2)<br/> println("=====top=====:" + resTop.mkString(","))// k03;2,k02;6<br/> val resTake = rdd.take(2).map(x => x._1 + ";" + x._2)<br/> println("=======take====:" + resTake.mkString(","))// k01;3,k02;6<br/> val resTakeSample = rdd.takeSample(false, 2).map(x => x._1 + ";" + x._2)<br/> println("=====takeSample====:" + resTakeSample.mkString(","))// k01;26,k03;2<br/> val resSample1 = rdd.sample(false, 0.25)<br/> val resSample2 = rdd.sample(false, 0.75)<br/> val resSample3 = rdd.sample(false, 0.5)<br/> println("=====sample======:" + resSample1.collect().mkString(","))// 无<br/> println("=====sample======:" + resSample2.collect().mkString(","))// (k01,3),(k02,6),(k01,26)<br/> println("=====sample======:" + resSample3.collect().mkString(","))// (k01,3),(k01,26)<br/> } def main(args: Array[String]): Unit = {<br/> createPairMap()<br/> pairMapRDD("file:///F:/sparkdata01.txt")<br/> otherRDDOperate()<br/> } }
本篇到此就将我知道的spark的API全部讲完了,两篇文章里的示例代码都是经过测试的,可以直接运行,大家在阅读代码时候最好注意这个特点:我在写RDD转化代码时候都是很明确的写上了转化后的RDD的数据类型,这样做的目的就是让读者更加清晰的认识不同RDD转化后的数据类型,这点在实际开发里非常重要,在实际的计算里我们经常会不同的计算算法不停的转化RDD的数据类型,而使用scala开发spark程序时候,我发现scala和javascript很类似,我们不去指定返回值数据类型,scala编译器也会自动推算结果的数据类型,因此编码时候我们可以不指定具体数据类型。这个特点就会让我们在实际开发里碰到种种问题,因此我在示例代码里明确了RDD转化后的数据类型。
在使用Pair RDD时候,我们要引入:
import org.apache.spark.SparkContext._
否则代码就有可能报错,说找不到对应的方法,这个引入就是scala里导入的隐世类型转化的功能,原理和上段文字说到的内容差不多。
转发申明:
本文转自互联网,由小站整理并发布,在于分享相关技术和知识。版权归原作者所有,如有侵权,请联系本站 top8488@163.com,将在24小时内删除。谢谢