RDD设计背景与概念
在实际应用中,有很多迭代算法(如机器学习、图算法等)和交互式数据挖掘工具。这些应用场景的共同点是中间结果,即某一阶段的输出,在不同的计算阶段之间被复用。结果作为下一阶段的输入。然而目前的MapReduce框架将中间结果写入HDFS,这带来了大量的数据复制、磁盘IO和序列化开销。尽管Pregel等图计算框架也将结果存储在内存中,但这些框架只能支持一些特定的计算模式,并不能提供通用的数据抽象。 RDD的出现就是为了满足这种需求。它提供了一个抽象的数据架构。我们不必担心底层数据的分布式特性。我们只需要将具体的应用逻辑表达为一系列的转换过程,以及不同RDD之间的转换。操作形成依赖关系,可以流水线化,从而避免了中间结果的存储,大大减少了数据复制、磁盘IO和序列化开销。
RDD 是分布式对象的集合。它本质上是一个只读分区记录集合。每个RDD可以分为多个分区。每个分区都是一个数据集碎片,一个RDD的不同分区可以保存到集群中。可以在不同的节点上进行并行计算,从而可以在集群中的不同节点上进行并行计算。 RDD提供了高度受限的共享内存模型,即RDD是不能直接修改的只读记录分区的集合。 RDD只能基于稳定物理存储中的数据集创建,或者通过在其他RDD上执行来创建。新的 RDD 是通过某些转换操作(例如 map、join 和 groupBy)创建的。 RDD提供了丰富的操作集来支持常见的数据操作,分为“Action”和“Transformation”两种类型。前者用于执行计算并指定输出的形式,后者指定RDD。之间的相互依赖关系。两类操作的主要区别在于,转换操作(如map、filter、groupBy、join等)接受RDD并返回RDD,而操作操作(如count、collect等)接受RDD但返回RDD非RDD(即输出一个值或结果)。 RDD提供的转换接口非常简单。它们是map、filter、groupBy、join等粗粒度的数据转换操作,而不是对某个数据项的细粒度修改。因此,RDD更适合对数据集中的元素执行相同操作的批处理应用,但不适合需要异步且细粒度状态的应用,例如Web应用系统、增量网络爬虫等。其中,这种粗粒度的转换接口设计会让人直观地认为RDD的功能非常有限,不够强大。然而,RDD实际上已被实践证明可以很好地应用于许多并行计算应用中。它可以具备很多现有计算框架(如MapReduce、SQL、Pregel等)的表达能力,可以应用于这些框架无法处理的任务。交互式数据挖掘应用程序。
资料来源。1
RDD处理流程
Spark用Scala语言实现了RDD API,程序员可以通过调用API来实现对RDD的各种操作。 RDD的典型执行流程如下:
- RDD是通过读取外部数据源(或内存中的集合)创建的;
- RDD经过一系列“转换”操作,每次都会生成不同的RDD供下一次“转换”使用;
- 最后一个RDD经“行动”操作进行处理,并输出到外部数据源(或者变成Scala集合或标量)。
需要说明的是,RDD采用了惰性调用,即在RDD的执行过程中(如图9-8所示),真正的计算发生在RDD的“行动”操作,对于“行动”之前的所有“转换”操作,Spark只是记录下“转换”操作应用的一些基础数据集以及RDD生成的轨迹,即相互之间的依赖关系,而不会触发真正的计算。
如图1所示,创建了word.txt文件,内容如上;图2加载文件和视频统计逻辑;图3打印并生成RDD内容;图 5 显示每个生成的对象都是一个 RDD。
可以看出,Spark应用基本上就是一系列基于RDD的计算操作。 RDD按顺序使用并且相互依赖。最后一个操作是持久化 RDD,将其保存在内存或磁盘中。 (内存是从scala程序继承的)。
总结:
Spark的核心构建在统一的抽象RDD之上,使得Spark的各个组件可以无缝集成,在同一个应用程序中完成大数据计算任务。
计算一个RDD集合的执行流程如下:
- 创建这个Spark程序的执行上下文,即创建一个SparkContext对象; (用于操作 Spark 的 Scala 上下文连接)。
- 通过SparkContext从外部数据源读取数据创建RDD对象;
- RDD的转换(构建RDD之间的依赖关系,形成DAG图。此时没有发生真正的计算,只记录转换的轨迹;)
- 动作类型的操作会触发实际计算并将结果保留在内存中。
RDD分区
分区的概念
分区是RDD内部并行计算的一个计算单元,RDD的数据集在逻辑上被划分为多个分片,每一个分片称为分区,分区的格式决定了并行计算的粒度,而每个分区的数值计算都是在一个任务中进行的,因此任务的个数,也是由RDD(准确来说是作业最后一个RDD)的分区数决定。
分区优势
RDD 是一种分布式的数据集,数据源多种多样,而且数据量也很大,在存储这些海量数据时,也是按照块来存的,当RDD读取这些数据进行操作时,实际上是对每个分区中的数据进行操作,每一个分区的数据可以并行操作,分区可以提高并行度。
RDD特性
-
数据集的基本单位是一组分区或分区列表。每个分片将由一个计算任务处理,分片的数量决定了并行度。用户在创建RDD时可以指定RDD分片的数量。如果未指定,将使用默认值(默认值 2)。
-
一个函数将应用于每个分区。 Spark中RDD的计算是基于分区的,函数会应用到每个分区。
-
一个 RDD 依赖于多个其他 RDD。 RDD的每次转换都会生成一个新的RDD,因此RDD之间会形成管道式的依赖关系。当部分分区数据丢失时,Spark可以通过这种依赖关系重新计算丢失的分区数据,而不是重新计算RDD的所有分区。 (Spark的容错机制)
-
K-V 类型的 RDD 将具有 Partitioner 功能。非键值RDD的Parititioner的值为None。 Partitioner函数决定了RDD本身的分区数量,也决定了父RDD Shuffle输出时的分区数量。
-
每个RDD维护一个列表,每个Partition的位置(首选位置)都存储在列表中。
RDD转换(transformation)和行动(action)
RDD操作分为两类:transformation和action
转换:通过操作将一个RDD转换为另一个RDD
操作:评估或输出 RDD
所有这些操作主要针对两种类型的 RDD:
1)数值RDD(value型)
2)键值对RDD (K-V型)
RDD 上的所有转换操作都是延迟执行的,Spark 仅在发生操作操作时才会执行它们。
转换运算符是一个对 RDD 进行操作的接口函数。其作用是将一个或多个RDD转化为新的RDD。使用Spark进行数据计算。使用创建算子生成RDD后,数据处理算法设计和程序编写中最关键的部分就是使用变换算子对原始数据生成的RDD进行一步一步的变换,最终得到想要的计算结果。结果。
动作是触发调度的操作符,它返回结果或将数据写入外部存储器。
RDD API
创建RDD
- 从外部存储系统中的数据集创建
//本地加载数据创建RDD
val baseRdd = sc.textFile("file:///wordcount/input/words.txt")
//加载hdfs文件
val textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
- 通过使用运算符转换现有 RDD 来生成新的 RDD
- 创建现有的 Scala 集合
-
方法1 :
sc.parallelize(Array(1,2,3,4,5,6,7,8))
-
方法2 :
sc.makeRDD(List(1,2,3,4,5,6,7,8))
makeRDD方法底层调用了parallelize方法,调用parallelize()方法的时候,不指定分区数的时候,使用系统给出的分区数,而调用makeRDD()方法的时候,会为每个集合对象创建最佳分区
- 通过消息队列(如kafka、rabbitMQ)创建RDD
主要用于流处理应用(我还没学过)。
RDD转换
图片来源2
RDD行动
其他参考资料:
Spark的RDD运算符-转换运算符
RDD持久化
RDD数据持久化的作用是什么?
1、将多次使用的RDD缓存起来,缓存在内存中。当频繁使用时,会直接从内存中读取缓存的数据,无需重新计算。
2、将RDD结果写入硬盘(容错机制)。当RDD丢失数据,或者从属RDD丢失数据时,可以使用持久化到硬盘的数据来恢复。
除了将 RDD 的最终目的地作为集合标量返回之外,RDD 还可以存储在外部文件系统或数据库中。 Spark与Hadoop完全兼容,因此Spark也支持MapReduce支持的文件或数据库类型的读写。
写入HDFS
写缓存
RDD通过cache()
或者persist()
将前面的计算结果缓存,默认情况下会把数据缓存在JVM的堆内存中。cache() 不需要传参,persist()需要设置持久化级别。
持久性级别为(首先在此列出,稍后将详细讨论):
- 仅内存
- 内存和磁盘
- MEMORY_ONLY_SER
- 内存_和_磁盘_SER;
- 仅磁盘
- MEMORY_ONLY_2
- MEMORY_AND_DISK_2
cache()底层调用persist()并将持久化级别设置为MEMORY_ONLY,这意味着cache()和persist(StorageLevel.MEMORY_ONLY)是相同的。
可以使用 unpersist() 方法手动从缓存中删除持久 RDD。
写入HDFS(Spark自带的方法)
将数据写入HDFS并使用HDFS永久存储。
操作流程:
-
设置持久存储路径
-
调用checkpoint()进行数据的保存
SparkContext.setCheckpointDir("HDFS的目录")
-
调用持久化方法
RDD.checkpoint()
写入本地目录
wordCount1.saveAsTextFile("file:///home/master/hadoop/files/...")
总结:
-
Persist和Cache将数据保存在内存中
-
Checkpoint将数据保存在HDFS中
-
程序结束或者手动调用unpersist方法后,Persist和Cache都会被清除。
-
检查点永久存储不会被删除。
相关参考:Spark和RDD的知识整理和总结