spark中sql和rdd的区别与联系(Spark七)

RDD的持久化1. RDD Cache缓存

RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

使用

1.rdd.cache() 等价于rdd.persist(MEMORY_ONLY)

2.rdd.persist() 可以设置级别

object StorageLevel { val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

示例

应用中运行两个job任务,缓存rdd3

/** * @description: RDD的Cache缓存 * @author: HaoWu * @create: 2020年08月04日 */ object DependeciedTest { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDDTest").setMaster("local[*]") val sc = new SparkContext(conf) val rdd1 = sc.makeRDD(List(1, 2, 3, 4)) val rdd2 = rdd1.map(x=>x) val rdd3 = rdd2.groupBy(x=>x) println("*******缓存前rdd3的血缘依赖**********") println(rdd3.toDebugString) //将rdd3缓存 rdd3.cache() //任务1 rdd3.collect() println("*******缓存后rdd3的血缘依赖**********") println(rdd3.toDebugString) //任务2 rdd3.saveAsTextFile("output") Thread.sleep(10000000) } }

小说明:Spark会将Shuffle的RDD缓存进cache,上面写的有些小问题,但不影响演示。

缓存前后RDD的血缘依赖:

spark中sql和rdd的区别与联系(Spark七)(1)

任务1的执行WEB界面

spark中sql和rdd的区别与联系(Spark七)(2)

任务2的执行WEB界面

spark中sql和rdd的区别与联系(Spark七)(3)

总结

① cache操作会增加血缘关系,不改变原有的血缘关系

②RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行

③丢失的数据会被重算,RDD的各个Partition是相对独立,只需要计算丢失的部分即可

④Spark会自动对一些Shuffle操作的中间数据做cache操作

⑤实际使用的时候,如果想重用数据,建议调用persist或cache。

2. RDD CheckPoint检查点

​ 对cache的补充。 cache一般将数据缓存到内存,不可靠,checkpoint选择将数据写入到磁盘!

什么是checkpoint检查点:通过将RDD中间结果写入磁盘由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。

对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。

使用

//设置checkpoint目录,集群环境checkpoint目录必须是HDFS的路径 sc.setCheckpointDir("./checkpointDir1") //rdd3设置checkpoint rdd3.checkpoint()

示例

/** * @description: RDD的CheckPoint持久化 * @author: HaoWu * @create: 2020年08月04日 */ object DependeciedTest { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDDTest").setMaster("local[*]") val sc = new SparkContext(conf) //设置checkpoint目录,集群环境checkpoint目录必须是HDFS的路径 sc.setCheckpointDir("./checkpointDir1") val rdd1 = sc.makeRDD(List(1, 2, 3, 4)) val rdd2 = rdd1.groupBy(x=>x) val rdd3 = rdd2.map(x=>x) //增加cache缓存 rdd3.cache() println("********设置checkpoint前,rdd3的血缘依赖**********") println(rdd3.toDebugString) //增加数据检查点 rdd3.checkpoint() //任务1 rdd3.collect() println("********设置checkpoint后,rdd3的血缘依赖**********") println(rdd3.toDebugString) //任务2 rdd3.saveAsTextFile("output12") Thread.sleep(10000000) } }

查看检查点前后的血缘依赖

spark中sql和rdd的区别与联系(Spark七)(4)

spark中sql和rdd的区别与联系(Spark七)(5)

spark中sql和rdd的区别与联系(Spark七)(6)

总结

①在集群环境运行时,checkpoint目录必须是HDFS的路径

②一旦RDD被checkpoint,当前RDD的血缘关系会被切断

③ checkpoint在第一个行动算子被调用时,触发,依然会生成一个Job,执行checkpoint任务

3. cache和check的区别

①Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。

②Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。

③建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。

,

免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com

    分享
    投诉
    首页