欧美亚洲综合图区在线|天天射天天干国产成卜|99久久免费国产精精品|国产的欧美一区二区三区|日韩中文字幕无码不卡专区|亚麻成人aV极品一区二区|国产成人AV区一区二区三|成人免费一区二区三区视频网站

當(dāng)前位置:首頁 > 軟件開放 > 正文內(nèi)容

如何閱讀spark源碼(spark源碼看什么書)

軟件開放2年前 (2023-02-24)1258

本篇文章給大家談?wù)勅绾伍喿xspark源碼,以及spark源碼看什么書對應(yīng)的知識點(diǎn),希望對各位有所幫助,不要忘了收藏本站喔。

本文目錄一覽:

如何成為Spark高手

  第一階段:熟練掌握Scala語言

1,spark框架是采用scala語言寫的,精致優(yōu)雅。想要成為spark高手,你就必須閱讀spark源碼,就必須掌握scala。?

2,雖然現(xiàn)在的spark可以使用多種語言開發(fā),java,python,但是最快速和支持最好的API依然并將永遠(yuǎn)是Scala的API,所以必須掌握scala來編寫復(fù)雜的和高性能的spark分布式程序。?

3尤其是熟練掌握Scala的trait,apply,函數(shù)式編程,泛型,逆變,與協(xié)變等。

第二階段:精通spark平臺本身提供給開發(fā)折的API

1,掌握spark中面向RDD的開發(fā)模式,掌握各種transformation和action函數(shù)的使用。?

2,掌握Spark中的款依賴和窄依賴,lineage機(jī)制。?

3,掌握RDD的計(jì)算流程,如Stage的劃分,spark應(yīng)用程序提交給集群的基礎(chǔ)過程和Work節(jié)點(diǎn)基礎(chǔ)的工作原理。

  第三階段:深入Spark內(nèi)核

此階段主要是通過Spark框架的源碼研讀來深入Spark內(nèi)核部分:?

1,通過源碼掌握Spark的任務(wù)提交,?

2,通過源碼掌握Spark的集群的任務(wù)調(diào)度,?

3,尤其要精通DAGScheduler,TaskScheduler和Worker節(jié)點(diǎn)內(nèi)部的工作的每一步細(xì)節(jié)。

第四階段:掌握Spark上的核心框架的使用

Spark作為云計(jì)算大數(shù)據(jù)時(shí)代的集大成者,在實(shí)時(shí)流式處理,圖技術(shù),機(jī)器學(xué)習(xí),nosql查詢等方面具有明顯的優(yōu)勢,我們使用Spark的時(shí)候大部分時(shí)間都是在使用其框架:?

sparksql,spark streaming等?

1,spark streaming是出色的實(shí)時(shí)流失處理框架,要掌握,DStream,transformation和checkpoint等。?

2,spark sql是離線統(tǒng)計(jì)分析工具,shark已經(jīng)沒落。?

3,對于spark中的機(jī)器學(xué)習(xí)和Graphx等要掌握其原理和用法。

  第五階段:做商業(yè)級的spark項(xiàng)目

通過一個完整的具有代表性的spark項(xiàng)目來貫穿spark的方方面面,包括項(xiàng)目的框架設(shè)計(jì),用到的技術(shù)的剖析,開始實(shí)現(xiàn),運(yùn)維等,完善掌握其中的每一個階段和細(xì)節(jié),以后你就可以從容的面對絕大多數(shù)spark項(xiàng)目。

  第六階段:提供spark解決方案

1,徹底掌握spark框架源碼的每一個細(xì)節(jié),?

2,根據(jù)步同的業(yè)務(wù)場景的需要提供spark在不同場景的解決方案,?

3,根據(jù)實(shí)際需要,在spark框架基礎(chǔ)上經(jīng)行2次開發(fā),打造自己的spark框架。

可能是全網(wǎng)最詳細(xì)的 Spark Sql Aggregate 源碼剖析

縱觀 Spark Sql 源碼,聚合的實(shí)現(xiàn)是其中較為復(fù)雜的部分,本文希望能以例子結(jié)合流程圖的方式來說清楚整個過程。這里僅關(guān)注 Aggregate 在物理執(zhí)行計(jì)劃相關(guān)的內(nèi)容,之前的 parse、analyze 及 optimize 階段暫不做分析。在 Spark Sql 中,有一個專門的 Aggregation strategy 用來處理聚合,我們先來看看這個策略。

本文暫不討論 distinct Aggregate 的實(shí)現(xiàn)(有興趣的可以看看另一篇博文 ),我們來看看 AggUtils#planAggregateWithoutDistinct 是如何生成聚合的物理執(zhí)行計(jì)劃的

創(chuàng)建聚合分為兩個階段:

AggregateExpression 共有以下幾種 mode:

Q:是否支持使用 hash based agg 是如何判斷的?

摘自我另一篇文章:

為了說明最常用也是最復(fù)雜的的 hash based agg,本小節(jié)暫時(shí)將示例 sql 改為

這樣就能進(jìn)入 HashAggregateExec 的分支

構(gòu)造函數(shù)主要工作就是對 groupingExpressions、aggregateExpressions、aggregateAttributes、resultExpressions 進(jìn)行了初始化

在 enable code gen 的情況下,會調(diào)用 HashAggregateExec#inputRDDs 來生成 RDD,為了分析 HashAggregateExec 是如何生成 RDD 的,我們設(shè)置 spark.sql.codegen.wholeStage 為 false 來 disable code gen,這樣就會調(diào)用 HashAggregateExec#doExecute 來生成 RDD,如下:

可以看到,關(guān)鍵的部分就是根據(jù) child.execute() 生成的 RDD 的每一個 partition 的迭代器轉(zhuǎn)化生成一個新的 TungstenAggregationIterator ,即 HashAggregateExec 生成的 RDD 的各個 partition。由于 TungstenAggregationIterator 涉及內(nèi)容非常多,我們單開一大節(jié)來進(jìn)行介紹。

此迭代器:

注:UnsafeKVExternalSorter 的實(shí)現(xiàn)可以參考:

UnsafeRow 是 InternalRow(表示一行記錄) 的 unsafe 實(shí)現(xiàn),由原始內(nèi)存(byte array)而不是 Java 對象支持,由三個區(qū)域組成:

使用 UnsafeRow 的收益:

構(gòu)造函數(shù)的主要流程已在上圖中說明,需要注意的是:當(dāng)內(nèi)存不足時(shí)(畢竟每個 grouping 對應(yīng)的 agg buffer 直接占用內(nèi)存,如果 grouping 非常多,或者 agg buffer 較大,容易出現(xiàn)內(nèi)存用盡)會從 hash based aggregate 切換為 sort based aggregate(會 spill 數(shù)據(jù)到磁盤),后文會進(jìn)行詳述。先來看看最關(guān)鍵的 processInputs 方法的實(shí)現(xiàn)

上圖中,需要注意的是:hashMap 中 get 一個 groupingKey 對應(yīng)的 agg buffer 時(shí),若已經(jīng)存在該 buffer 則直接返回;若不存在,嘗試申請內(nèi)存新建一個:

上圖中,用于真正處理一條 row 的 AggregationIterator#processRow 還需進(jìn)一步展開分析。在此之前,我們先來看看 AggregateFunction 的分類

AggregateFunction 可以分為 DeclarativeAggregate 和 ImperativeAggregate 兩大類,具體的聚合函數(shù)均為這兩類的子類。

DeclarativeAggregate 是一類直接由 Catalyst 中的 Expressions 構(gòu)成的聚合函數(shù),主要邏輯通過調(diào)用 4 個表達(dá)式完成,分別是:

我們再次以容易理解的 Count 來舉例說明:

通常來講,實(shí)現(xiàn)一個基于 Expressions 的 DeclarativeAggregate 函數(shù)包含以下幾個重要的組成部分:

再來看看 AggregationIterator#processRow

AggregationIterator#processRow 會調(diào)用

生成用于處理一行數(shù)據(jù)(row)的函數(shù)

說白了 processRow 生成了函數(shù)才是直接用來接受一條 input row 來更新對應(yīng)的 agg buffer,具體是根據(jù) mode 及 aggExpression 中的 aggFunction 的類型調(diào)用其 updateExpressions 或 mergeExpressions 方法:

比如,對于 aggFunction 為 DeclarativeAggregate 類型的 Partial 下的 Count 來說就是調(diào)用其 updateExpressions 方法,即:

對于 Final 的 Count 來說就是調(diào)用其 mergeExpressions 方法,即:

對于 aggFunction 為 ImperativeAggregate 類型的 Partial 下的 Collect 來說就是調(diào)用其 update 方法,即:

對于 Final 的 Collect 來說就是調(diào)用其 merge 方法,即:

我們都知道,讀取一個迭代器的數(shù)據(jù),是要不斷調(diào)用 hasNext 方法進(jìn)行 check 是否還有數(shù)據(jù),當(dāng)該方法返回 true 的時(shí)候再調(diào)用 next 方法取得下一條數(shù)據(jù)。所以要知道如何讀取 TungstenAggregationIterator 的數(shù)據(jù),就得分析其這兩個方法。

分為兩種情況,分別是:

Agg 的實(shí)現(xiàn)確實(shí)復(fù)雜,本文雖然篇幅已經(jīng)很長,但還有很多方面沒有 cover 到,但基本最核心、最復(fù)雜的點(diǎn)都詳細(xì)介紹了,如果對于未 cover 的部分有興趣,請自行閱讀源碼進(jìn)行分析~

spark sql 2.3 源碼解讀 - Execute (7)

終于到了最后一步執(zhí)行了:

最關(guān)鍵的兩個函數(shù)便是 doPrepare和 doExecute了。

還是以上一章的sql語句為例,其最終生成的sparkplan為:

看一下SortExec的doPrepare 和 doExecute方法:

下面看child也就是ShuffleExchangeExec:

先看沒有exchangeCoordinator的情況,

首先執(zhí)行:

上面的方法會返回一個ShuffleDependency,ShuffleDependency中最重要的是rddWithPartitionIds,它決定了每一條InternalRow shuffle后的partition id:

接下來:

返回結(jié)果是ShuffledRowRDD:

CoalescedPartitioner的邏輯:

再看有exchangeCoordinator的情況:

同樣返回的是ShuffledRowRDD:

再看doEstimationIfNecessary:

estimatePartitionStartIndices 函數(shù)得到了 partitionStartIndices:

有exchangeCoordinator的情況就生成了partitionStartIndices,從而對分區(qū)進(jìn)行了調(diào)整。

最后來一個例子:

未開啟exchangeCoordinator的plan:

開啟exchangeCoordinator的plan:

不同之處是 兩個Exchange都帶了coordinator,且都是同一個coordinator。

執(zhí)行withExchangeCoordinator前:

執(zhí)行withExchangeCoordinator后:

生成了coordinator,且執(zhí)行了 doPrepare后,可以看到兩個exchange都向其注冊了。

doExecute后:

原先的numPartitions是200,經(jīng)過執(zhí)行后,生成的partitionStartIndices為[1],也就是只有1個partition,顯然在測試數(shù)據(jù)量很小的情況下,1個partition是更為合理的。這就是ExchangeCoordinator的功勞。

execute 最終的輸出是rdd,剩下的結(jié)果便是spark對rdd的運(yùn)算了。其實(shí) spark sql 最終的目標(biāo)便也是生成rdd,交給spark core來運(yùn)算。

spark sql的介紹到這里就結(jié)束了。

怎么用Eclipse搭建Spark源碼閱讀環(huán)境

應(yīng)該說這個和是不是Spark項(xiàng)目沒什么關(guān)系。

建議你使用intellij idea,在spark目錄下執(zhí)行"sbt/sbt gen-idea",會自動生成.idea項(xiàng)目,導(dǎo)入即可。

idea我不熟,還需要做一些其他的插件配置(python, sbt等)和環(huán)境設(shè)置。

你也可以使用Eclipse看,Eclipse有scala IDE,把Spark項(xiàng)目當(dāng)maven工程導(dǎo)入。但是子項(xiàng)目之間的依賴會有點(diǎn)問題,會報(bào)錯。

推薦使用前者,向Databricks的開發(fā)者看齊;我使用的是后者,我直接依賴了編譯好的包就不會報(bào)錯了,純讀源碼的話也勉強(qiáng)可以跟蹤和調(diào)試。

另外,我也看有的Committer用vim看spark代碼的,所以怎么看源碼都無所謂,你熟悉就好,而且這和是不是Spark項(xiàng)目也沒什么關(guān)系。:)

怎么在Idea IDE里面打開Spark源碼而不報(bào)錯

首先我們先點(diǎn)擊一個工程的Project Structure菜單,這時(shí)候會彈出一個對話框,仔細(xì)的用戶肯定會發(fā)現(xiàn)里面列出來的模塊(Module)居然沒有yarn!就是這個原因?qū)е聐arn模塊相關(guān)的代碼老是報(bào)錯!只需要將yarn模塊加入到這里即可。

步驟依次選擇 Add-Import Module-選擇pom.xml,然后一步一步點(diǎn)擊確定,這時(shí)候會在對話框里面多了spark-yarn_2.10模塊,

然后點(diǎn)擊Maven Projects里面的Reimport All Maven Projects,等yarn模塊里面的所有依賴全部下載完的時(shí)候,我們就可以看到這個模塊里面的代碼終于不再報(bào)錯了?。?/p>

Spark源碼分析之SparkSubmit的流程

本文主要對SparkSubmit的任務(wù)提交流程源碼進(jìn)行分析。 Spark源碼版本為2.3.1。

首先閱讀一下啟動腳本,看看首先加載的是哪個類,我們看一下 spark-submit 啟動腳本中的具體內(nèi)容。

可以看到這里加載的類是org.apache.spark.deploy.SparkSubmit,并且把啟動相關(guān)的參數(shù)也帶過去了。下面我們跟一下源碼看看整個流程是如何運(yùn)作的...

SparkSubmit的main方法如下

這里我們由于我們是提交作業(yè),所有會走上面的submit(appArgs, uninitLog)方法

可以看到submit方法首先會準(zhǔn)備任務(wù)提交的環(huán)境,調(diào)用了prepareSubmitEnvironment,該方法會返回四元組,該方法中會調(diào)用doPrepareSubmitEnvironment,這里我們重點(diǎn)注意 childMainClass類具體是什么 ,因?yàn)檫@里涉及到后面啟動我們主類的過程。

以下是doPrepareSubmitEnvironment方法的源碼...

可以看到該方法首先是解析相關(guān)的參數(shù),如jar包,mainClass的全限定名,系統(tǒng)配置,校驗(yàn)一些參數(shù),等等,之后的關(guān)鍵點(diǎn)就是根據(jù)我們 deploy-mode 參數(shù)來判斷是如何運(yùn)行我們的mainClass,這里主要是通過childMainClass這個參數(shù)來決定下一步首先啟動哪個類。

childMainClass根據(jù)部署模型有不同的值:

之后該方法會把準(zhǔn)備好的四元組返回,我們接著看之前的submit方法

可以看到這里最終會調(diào)用doRunMain()方法去進(jìn)行下一步。

doRunMain的實(shí)現(xiàn)如下...

doRunMain方法中會判斷是否需要一個代理用戶,然后無論需不需要都會執(zhí)行runMain方法,我們接下來看看runMain方法是如何實(shí)現(xiàn)的。

這里我們只假設(shè)以集群模式啟動,首先會加載類,將我們的childMainClass加載為字節(jié)碼對象mainClass ,然后將mainClass 映射成SparkApplication對象,因?yàn)槲覀円约耗J絾?,那么上一步返回四元組中的childMainClass的參數(shù)為ClientApp的全限定名,而這里會調(diào)用app實(shí)例的start方法因此,這里最終調(diào)用的是ClientApp的start方法。

ClientApp的start方法如下...

可以看到這里和之前我們的master啟動流程有些相似。

可以參考我上一篇文章 Spark源碼分析之Master的啟動流程 對這一流程加深理解。

首先是準(zhǔn)備rpcEnv環(huán)境,之后通過master的地址獲取masterEndpoints端點(diǎn)相關(guān)信息,因?yàn)檫@里運(yùn)行start方法時(shí)會將之前配置的相關(guān)參數(shù)都傳進(jìn)來,之后就會通過rpcEnv注冊相關(guān)clientEndPoint端點(diǎn)信息,同時(shí)需要注意,這里會把masterEndpoints端點(diǎn)信息也作為構(gòu)造ClientEndpoint端點(diǎn)的參數(shù),也就是說這個ClientEndpoint會和masterEndpoints通信。

而在我上一篇文章中說過,只要是setupEndpoint方法被調(diào)用,一定會調(diào)用相關(guān)端點(diǎn)的的onStart方法,而這會調(diào)用clientEndPoint的onStart方法。

ClientEndPoint類中的onStart方法會匹配launch事件。源碼如下

onStart中匹配我們的launch的過程,這個過程是啟動driverWrapper的過程,可以看到上面源碼中封裝了mainClass ,該參數(shù)對應(yīng)DriverWrapper類的全限定名,之后將mainClass封裝到command中,然后封裝到driverDescription中,向Master申請啟動Driver。

這個過程會向Mster發(fā)送消息,是通過rpcEnv來實(shí)現(xiàn)發(fā)射消息的,而這里就涉及到outbox信箱,會調(diào)用postToOutbox方法,向outbox信箱中添加消息,然后通過TransportClient的send或sendRpc方法發(fā)送消息。發(fā)件箱以及發(fā)送過程是在同一個線程中進(jìn)行。

而細(xì)心的同學(xué)會注意到這里調(diào)用的方法名為SendToMasterAndForwardReply,見名之意,發(fā)送消息到master并且期待回應(yīng)。

下面是rpcEnv來實(shí)現(xiàn)向遠(yuǎn)端發(fā)送消息的一個調(diào)用流程,最終會通過netty中的TransportClient來寫出。

之后,Master端會觸發(fā)receiveAndReply函數(shù),匹配RequestSubmitDriver樣例類,完成模式匹配執(zhí)行后續(xù)流程。

可以看到這里首先將Driver信息封裝成DriverInfo,然后添加待調(diào)度列表waitingDrivers中,然后調(diào)用通用的schedule函數(shù)。

由于waitingDrivers不為空,則會走LaunchDriver的流程,當(dāng)前的application申請資源,這時(shí)會向worker發(fā)送消息,觸發(fā)Worker的receive方法。

Worker的receive方法中,當(dāng)Worker遇到LaunchDriver指令時(shí),創(chuàng)建并啟動一個DriverRunner,DriverRunner啟動一個線程,異步的處理Driver啟動工作。這里說啟動的Driver就是剛才說的org.apache.spark.deploy.worker.DriverWrapper

可以看到上面在DriverRunner中是開辟線程異步的處理Driver啟動工作,不會阻塞主進(jìn)程的執(zhí)行,而prepareAndRunDriver方法中最終調(diào)用 runDriver..

runDriver中主要先做了一些初始化工作,接著就開始啟動driver了。

上述Driver啟動工作主要分為以下幾步:

下面我們直接看DriverWrapper的實(shí)現(xiàn)

DriverWrapper,會創(chuàng)建了一個RpcEndpoint與RpcEnv,RpcEndpoint為WorkerWatcher,主要目的為監(jiān)控Worker節(jié)點(diǎn)是否正常,如果出現(xiàn)異常就直接退出,然后當(dāng)前的ClassLoader加載userJar,同時(shí)執(zhí)行userMainClass,在執(zhí)行用戶的main方法后關(guān)閉workerWatcher。

以上就是SparkSubmit的流程,下一篇我會對SparkContext的源碼進(jìn)行解析。

歡迎關(guān)注...

如何閱讀spark源碼的介紹就聊到這里吧,感謝你花時(shí)間閱讀本站內(nèi)容,更多關(guān)于spark源碼看什么書、如何閱讀spark源碼的信息別忘了在本站進(jìn)行查找喔。

掃描二維碼推送至手機(jī)訪問。

版權(quán)聲明:本文由飛速云SEO網(wǎng)絡(luò)優(yōu)化推廣發(fā)布,如需轉(zhuǎn)載請注明出處。

本文鏈接:http://www.landcheck.net/post/9796.html

標(biāo)簽: 如何閱讀spark源碼

“如何閱讀spark源碼(spark源碼看什么書)” 的相關(guān)文章

軟件開發(fā)培訓(xùn)學(xué)校(軟件開發(fā)培訓(xùn)學(xué)校排名)

軟件開發(fā)培訓(xùn)學(xué)校(軟件開發(fā)培訓(xùn)學(xué)校排名)

本篇文章給大家談?wù)勡浖_發(fā)培訓(xùn)學(xué)校,以及軟件開發(fā)培訓(xùn)學(xué)校排名對應(yīng)的知識點(diǎn),希望對各位有所幫助,不要忘了收藏本站喔。 本文目錄一覽: 1、北京有哪些比較好的軟件開發(fā)培訓(xùn)學(xué)校 2、游戲軟件開發(fā)培訓(xùn)學(xué)校哪個比較有名? 3、軟件開發(fā)培訓(xùn)班好的有哪些? 北京有哪些比較好的軟件開發(fā)培訓(xùn)學(xué)校 北京的軟件...

oa軟件開發(fā)(oa軟件開發(fā)語言)

oa軟件開發(fā)(oa軟件開發(fā)語言)

今天給各位分享oa軟件開發(fā)的知識,其中也會對oa軟件開發(fā)語言進(jìn)行解釋,如果能碰巧解決你現(xiàn)在面臨的問題,別忘了關(guān)注本站,現(xiàn)在開始吧!本文目錄一覽: 1、淺談OA系統(tǒng)二次開發(fā)的優(yōu)劣 2、oa系統(tǒng)開發(fā)深圳哪家比較好?不知有沒有專業(yè)做OA軟件開發(fā)的公司? 3、淺談OA系統(tǒng)的二次開發(fā) 4、開發(fā)OA...

軟件開發(fā)外包交易平臺(個人開發(fā)外包平臺)

軟件開發(fā)外包交易平臺(個人開發(fā)外包平臺)

今天給各位分享軟件開發(fā)外包交易平臺的知識,其中也會對個人開發(fā)外包平臺進(jìn)行解釋,如果能碰巧解決你現(xiàn)在面臨的問題,別忘了關(guān)注本站,現(xiàn)在開始吧!本文目錄一覽: 1、軟件項(xiàng)目外包,哪個網(wǎng)站比較好? 2、軟件項(xiàng)目交易網(wǎng)怎么樣?用過的人說一下 3、軟件開發(fā)交易外包平臺有哪些 4、軟件開發(fā)外包平臺好用...

廈門思維軟件開發(fā)(廈門思維課程機(jī)構(gòu)有哪些)

廈門思維軟件開發(fā)(廈門思維課程機(jī)構(gòu)有哪些)

本篇文章給大家談?wù)剰B門思維軟件開發(fā),以及廈門思維課程機(jī)構(gòu)有哪些對應(yīng)的知識點(diǎn),希望對各位有所幫助,不要忘了收藏本站喔。 本文目錄一覽: 1、廈門軟件開發(fā)公司哪家好? 2、廈門艾孚思軟件開發(fā)有限公司怎么樣? 3、廈門好思維教育科技有限公司怎么樣? 4、廈門軟件開發(fā),廈門軟件開發(fā)多少錢 5...

免費(fèi)網(wǎng)站建設(shè)(免費(fèi)建站系統(tǒng)哪個好)

免費(fèi)網(wǎng)站建設(shè)(免費(fèi)建站系統(tǒng)哪個好)

本篇文章給大家談?wù)劽赓M(fèi)網(wǎng)站建設(shè),以及免費(fèi)建站系統(tǒng)哪個好對應(yīng)的知識點(diǎn),希望對各位有所幫助,不要忘了收藏本站喔。 本文目錄一覽: 1、如何給自己免費(fèi)建設(shè)一個網(wǎng)站 2、自己如何免費(fèi)做網(wǎng)站 3、自己手機(jī)怎么免費(fèi)做網(wǎng)站 如何給自己免費(fèi)建設(shè)一個網(wǎng)站 很多人都想建立一個自己的網(wǎng)站,但如果是用作商業(yè)的網(wǎng)...

軟件開發(fā)中心(海通證券軟件開發(fā)中心)

軟件開發(fā)中心(海通證券軟件開發(fā)中心)

今天給各位分享軟件開發(fā)中心的知識,其中也會對海通證券軟件開發(fā)中心進(jìn)行解釋,如果能碰巧解決你現(xiàn)在面臨的問題,別忘了關(guān)注本站,現(xiàn)在開始吧!本文目錄一覽: 1、工商銀行軟件研發(fā)中心提前批之后還要筆試嗎 2、工行軟件開發(fā)中心怎樣能提升 3、全國排名前十的軟件開發(fā)培訓(xùn)中心在哪里? 4、中國農(nóng)業(yè)銀行...