文档库 最新最全的文档下载
当前位置:文档库 › Apache_Spark源码走读系列篇二

Apache_Spark源码走读系列篇二

Apache_Spark源码走读系列篇二
Apache_Spark源码走读系列篇二

超人学院—Apache Spark源码走读之Task运行期之函数调用关系分析

欢迎转载,转载请注明出处,超人学院。

概要

本篇主要阐述在TaskRunner中执行的task其业务逻辑是如何被调用到的,另外试图讲清楚运行着的task其输入的数据从哪获取,处理

的结果返回到哪里,如何返回。

准备

1.spark已经安装完毕

2.spark运行在local mode或local-cluster mode

local-cluster mode

local-cluster模式也称为伪分布式,可以使用如下指令运行

MASTER=local[1,2,1024] bin/spark-shell

[1,2,1024]分别表示,executor number, core number和内存大小,其中内存大小不应小于默认的512M

Driver Programme的初始化过程分析

初始化过程的涉及的主要源文件

1.SparkContext.scala 整个初始化过程的入口

2.SparkEnv.scala 创建BlockManager,

MapOutputTrackerMaster, ConnectionManager, CacheManager

3.DAGScheduler.scala 任务提交的入口,即将Job

划分成各个stage的关键

4.TaskSchedulerImpl.scala 决定每个stage可以运行几个task,

每个task分别在哪个executor上运行

5.SchedulerBackend

1.最简单的单机运行模式的话,看LocalBackend.scala

2.如果是集群模式,看源文件

SparkDeploySchedulerBackend

初始化过程步骤详解

步骤1:根据初始化入参生成SparkConf,再根据SparkConf来创建SparkEnv, SparkEnv中主要包含以下关键性组件 1. BlockManager 2. MapOutputTracker 3. ShuffleFetcher 4. ConnectionManager

private[spark] val env = SparkEnv.create(

conf,

"",

conf.get("spark.driver.host"),

conf.get("spark.driver.port").toInt,

isDriver = true,

isLocal = isLocal)

SparkEnv.set(env)

步骤2:创建TaskScheduler,根据Spark的运行模式来选择相应的SchedulerBackend,同时启动taskscheduler,这一步至为关键

private[spark] var taskScheduler =

SparkContext.createTaskScheduler(this, master, appName)

taskScheduler.start()

TaskScheduler.start目的是启动相应的SchedulerBackend,并启动定时器进行检测

overridedef start() {

backend.start()

if (!isLocal && conf.getBoolean("spark.speculation", false)) {

logInfo("Starting speculative execution thread") import sc.env.actorSystem.dispatcher

sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,

SPECULATION_INTERVAL milliseconds) {

checkSpeculatableTasks()

}

}

}

步骤3:以上一步中创建的TaskScheduler实例为入参创建DAGScheduler并启动运行

@volatileprivate[spark] var dagScheduler = new DAGScheduler(taskScheduler)

dagScheduler.start()

步骤4:启动WEB UI

ui.start()

RDD的转换过程

还是以最简单的wordcount为例说明rdd的转换过程

sc.textFile("README.md").flatMap(line=>line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)

上述一行简短的代码其实发生了很复杂的RDD转换,下面仔细解释每一步的转换过程和转换结果

步骤1:val rawFile = sc.textFile("README.md")

textFile先是生成hadoopRDD,然后再通过map操作生成MappedRDD,

如果在spark-shell中执行上述语句,得到的结果可以证明所做的分

scala> sc.textFile("README.md")

14/04/2313:11:48 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes

14/04/2313:11:48 INFO MemoryStore: ensureFreeSpace(119741)

called with curMem=0, maxMem=311387750

14/04/2313:11:48 INFO MemoryStore: Block broadcast_0 stored as

values to memory (estimated size 116.9 KB, free 296.8 MB)

14/04/2313:11:48 DEBUG BlockManager: Put block broadcast_0

locally took 277 ms

14/04/2313:11:48 DEBUG BlockManager: Put for block broadcast_0

without replication took 281 ms

res0: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at

textFile at :13

步骤2: val splittedText = rawFile.flatMap(line =>

line.split(" "))

flatMap将原来的MappedRDD转换成为FlatMappedRDD

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

= new FlatMappedRDD(this, sc.clean(f))

步骤3:val wordCount = splittedText.map(word => (word, 1))

利用word生成相应的键值对,上一步的FlatMappedRDD被转换成为

MappedRDD

步骤4:val reduceJob = wordCount.reduceByKey(_ + _),这一步最复杂

步骤2,3中使用到的operation全部定义在RDD.scala中,而这里使用到的reduceByKey却在RDD.scala中见不到踪迹。reduceByKey 的定义出现在源文件PairRDDFunctions.scala

细心的你一定会问reduceByKey不是MappedRDD的属性和方法啊,怎么能被MappedRDD调用呢?其实这背后发生了一个隐式的转换,该转换将MappedRDD转换成为PairRDDFunctions

implicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) =

new PairRDDFunctions(rdd)

这种隐式的转换是scala的一个语法特征,如果想知道的更多,请用关键字"scala implicit method"进行查询,会有不少的文章对此进行详尽的介绍。

接下来再看一看reduceByKey的定义

def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {

reduceByKey(defaultPartitioner(self), func)

}

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {

combineByKey[V]((v: V) => v, func, func, partitioner)

}

def combineByKey[C](createCombiner: V => C,

mergeValue: (C, V) => C,

mergeCombiners: (C, C) => C,

partitioner: Partitioner,

mapSideCombine: Boolean = true,

serializerClass: String = null): RDD[(K, C)] = {

if (getKeyClass().isArray) {

if (mapSideCombine) {

thrownew SparkException("Cannot use map-side combining with array keys.")

}

if (partitioner.isInstanceOf[HashPartitioner]) {

thrownew SparkException("Default partitioner cannot partition array keys.")

}

}

val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)

if (self.partitioner == Some(partitioner)) {

self.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context,

https://www.wendangku.net/doc/157958934.html,bineValuesByKey(iter, context))

}, preservesPartitioning = true)

} elseif (mapSideCombine) {

val combined = self.mapPartitionsWithContext((context, iter) => {

https://www.wendangku.net/doc/157958934.html,bineValuesByKey(iter, context)

}, preservesPartitioning = true)

val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)

.setSerializer(serializerClass)

partitioned.mapPartitionsWithContext((context, iter) => {

new InterruptibleIterator(context,

https://www.wendangku.net/doc/157958934.html,bineCombinersByKey(iter, context))

}, preservesPartitioning = true)

} else {

// Don't apply map-side combiner.

val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)

values.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context,

https://www.wendangku.net/doc/157958934.html,bineValuesByKey(iter, context))

}, preservesPartitioning = true)

}

}

reduceByKey最终会调用combineByKey, 在这个函数中PairedRDDFunctions会被转换成为ShuffleRDD,当调用mapPartitionsWithContext之后,shuffleRDD被转换成为MapPartitionsRDD

Log输出能证明我们的分析

res1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at reduceByKey at :13

RDD转换小结

小结一下整个RDD转换过程

HadoopRDD->MappedRDD->FlatMappedRDD->MappedRDD->PairRDDFunc tions->ShuffleRDD->MapPartitionsRDD

整个转换过程好长啊,这一切的转换都发生在任务提交之前。

运行过程分析

数据集操作分类

在对任务运行过程中的函数调用关系进行分析之前,我们也来探讨一个偏理论的东西,作用于RDD之上的Transformantion为什么会是这个样子?

对这个问题的解答和数学搭上关系了,从理论抽象的角度来说,任务

处理都可归结为“input->processing->output"。input和output

对应于数据集dataset.

在此基础上作一下简单的分类

1.one-one 一个dataset在转换之后还是一个dataset,而且

dataset的size不变,如map

2.one-one 一个dataset在转换之后还是一个dataset,但size

发生更改,这种更改有两种可能:扩大或缩小,如flatMap是

size增大的操作,而subtract是size变小的操作

3.many-one 多个dataset合并为一个dataset,如combine, join

4.one-many 一个dataset分裂为多个dataset, 如groupBy

Task运行期的函数调用

task的提交过程参考本系列中的第二篇文章。本节主要讲解当task

在运行期间是如何一步步调用到作用于RDD上的各个operation

TaskRunner.run

o Task.run

?Task.runTask (Task是一个基类,有两个子类,分

别为ShuffleMapTask和ResultTask)

?RDD.iterator

?https://www.wendangku.net/doc/157958934.html,puteOrReadCheckpoint

?https://www.wendangku.net/doc/157958934.html,pute

或许当看到https://www.wendangku.net/doc/157958934.html,pute函数定义时,还是觉着f没有被调用,以

MappedRDD的compute定义为例

overridedef compute(split: Partition, context: TaskContext) = firstParent[T].iterator(split, context).map(f)

注意,这里最容易产生错觉的地方就是map函数,这里的map不是

RDD中的map,而是scala中定义的iterator的成员函数map, 请自

行参考

https://www.wendangku.net/doc/157958934.html,/api/2.10.4/index.html#scala.colle ction.Iterator

堆栈输出

80 at

org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:1 11)

81 at

org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:154 )

82 at

https://www.wendangku.net/doc/157958934.html,pute(HadoopRDD.scala:149) 83 at

https://www.wendangku.net/doc/157958934.html,pute(HadoopRDD.scala:64) 84 at

https://www.wendangku.net/doc/157958934.html,puteOrReadCheckpoint(RDD.scala: 241)

85 at

org.apache.spark.rdd.RDD.iterator(RDD.scala:232)

86 at

https://www.wendangku.net/doc/157958934.html,pute(MappedRDD.scala:31) 87 at

https://www.wendangku.net/doc/157958934.html,puteOrReadCheckpoint(RDD.scala: 241)

88 at

org.apache.spark.rdd.RDD.iterator(RDD.scala:232)

89 at

https://www.wendangku.net/doc/157958934.html,pute(FlatMappedRDD.sc ala:33)

90 at

https://www.wendangku.net/doc/157958934.html,puteOrReadCheckpoint(RDD.scala: 241)

91 at

org.apache.spark.rdd.RDD.iterator(RDD.scala:232)

92 at

https://www.wendangku.net/doc/157958934.html,pute(MappedRDD.scala:31) 93 at

https://www.wendangku.net/doc/157958934.html,puteOrReadCheckpoint(RDD.scala: 241)

94 at

org.apache.spark.rdd.RDD.iterator(RDD.scala:232)

95 at

https://www.wendangku.net/doc/157958934.html,pute(MapPartitions RDD.scala:34)

96 at

https://www.wendangku.net/doc/157958934.html,puteOrReadCheckpoint(RDD.scala: 241)

97 at

org.apache.spark.rdd.RDD.iterator(RDD.scala:232)

98 at

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMa pTask.scala:161)

99 at

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMa pTask.scala:102)

100 at

org.apache.spark.scheduler.Task.run(Task.scala:53)

101 at

org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$ 1.apply$mcV$sp(Executor.scala:211)

ResultTask

compute的计算过程对于ShuffleMapTask比较复杂,绕的圈圈比较多,对于ResultTask就直接许多。

overridedef runTask(context: TaskContext): U = {

metrics = Some(context.taskMetrics)

try {

func(context, rdd.iterator(split, context))

} finally {

context.executeOnCompleteCallbacks()

}

}

计算结果的传递

上面的分析知道,wordcount这个job在最终提交之后,被DAGScheduler分为两个stage,第一个Stage是shuffleMapTask,第二个Stage是ResultTask.

那么ShuffleMapTask的计算结果是如何被ResultTask取得的呢?这个过程简述如下

1.ShffuleMapTask将计算的状态(注意不是具体的数据)包装为

MapStatus返回给DAGScheduler

2.DAGScheduler将MapStatus保存到MapOutputTrackerMaster

3.ResultTask在执行到ShuffleRDD时会调用

BlockStoreShuffleFetcher的fetch方法去获取数据

1.第一件事就是咨询MapOutputTrackerMaster所要取的数

据的location

2.根据返回的结果调用BlockManager.getMultiple获取真

正的数据

BlockStoreShuffleFetcher的fetch函数伪码

val blockManager = SparkEnv.get.blockManager

val startTime = System.currentTimeMillis

val statuses =

SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)

logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format(

shuffleId, reduceId, System.currentTimeMillis - startTime))

val blockFetcherItr =

blockManager.getMultiple(blocksByAddress, serializer)

val itr = blockFetcherItr.flatMap(unpackBlock)

注意上述代码中的getServerStatuses及getMultiple,一个是询问数据的位置,一个是去获取真正的数据。

Android Hotfix 新方案——Amigo 源码解读

Android Hotfix 新方案——Amigo 源码解读 首先我们先来看看如何使用这个库。 用法 在project 的build.gradle中 dependencies { classpath 'me.ele:amigo:0.0.3' } 在module 的build.gradle中 apply plugin: 'me.ele.amigo' 就这样轻松的集成了Amigo。 生效补丁包 补丁包生效有两种方式可以选择: ? 稍后生效补丁包 ? 如果不想立即生效而是用户第二次打开App 时才打入补丁包,则可以将新的Apk 放到/data/data/{your pkg}/files/amigo/demo.apk,第二次打开时就会自动生效。可以通过这个方法 ? File hotfixApk = Amigo.getHotfixApk(context); ?

获取到新的Apk。 同时,你也可以使用Amigo 提供的工具类将你的补丁包拷贝到指定的目录当中。 ? FileUtils.copyFile(yourApkFile, amigoApkFile); ? ? 立即生效补丁包 ? 如果想要补丁包立即生效,调用以下两个方法之一,App 会立即重启, 并且打入补丁包。 ? Amigo.work(context); ? Amigo.work(context, apkFile); ? 删除补丁包 如果需要删除掉已经下好的补丁包,可以通过这个方法 Amigo.clear(context); 提示:如果apk 发生了变化,Amigo 会自动清除之前的apk。 自定义界面 在热修复的过程中会有一些耗时的操作,这些操作会在一个新的进程中的Activity 中执行,所以你可以通过以下方式来自定义这个Activity。

安卓计算机源代码

Mxl代码

相关文档