91欧美超碰AV自拍|国产成年人性爱视频免费看|亚洲 日韩 欧美一厂二区入|人人看人人爽人人操aV|丝袜美腿视频一区二区在线看|人人操人人爽人人爱|婷婷五月天超碰|97色色欧美亚州A√|另类A√无码精品一级av|欧美特级日韩特级

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

關(guān)于Spark的從0實現(xiàn)30s內(nèi)實時監(jiān)控指標(biāo)計算

佳佳 ? 來源:jf_36786605 ? 作者:jf_36786605 ? 2024-06-14 15:52 ? 次閱讀
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

前言

說起Spark,大家就會自然而然地想到Flink,而且會不自覺地將這兩種主流的大數(shù)據(jù)實時處理技術(shù)進行比較。然后最終得出結(jié)論:Flink實時性大于Spark。

的確,F(xiàn)link中的數(shù)據(jù)計算是以事件為驅(qū)動的,所以來一條數(shù)據(jù)就會觸發(fā)一次計算,而Spark基于數(shù)據(jù)集RDD計算,RDD最小生成間隔就是50毫秒,所以Spark就被定義為亞實時計算。

窗口Window

這里的RDD就是“天然的窗口”,將RDD生成的時間間隔設(shè)置成1min,那么這個RDD就可以理解為“1min窗口”。所以如果想要窗口計算,首選Spark。

但當(dāng)需要對即臨近時間窗口進行計算時,必須借助滑動窗口的算子來實現(xiàn)。

臨近時間如何理解

例如“3分鐘內(nèi)”這種時間范圍描述。這種時間范圍的計算,需要計算歷史的數(shù)據(jù)。例如1 ~ 3是3min,2 ~ 4也是3min,這里就重復(fù)使用了2和3的數(shù)據(jù),依次類推,3 ~ 5也是3min,同樣也重復(fù)使用了3和4。

如果使用普通窗口,就無法滿足“最近3分鐘內(nèi)”這種時間概念。

很多窗口都丟失了臨近時間,例如第3個RDD的臨近時間其實是第二個RDD,但是他們就沒法在一起計算,這就是為什么不用普通窗口的原因。

滑動窗口

滑動窗口三要素:RDD的生成時間、窗口的長度、滑動的步長。

我在本次實踐中,將RDD的時間間隔設(shè)置為10s,窗口長度為30s、滑動步長為10s。也就是說每10s就會生成一個窗口,計算最近30s內(nèi)的數(shù)據(jù),每個窗口由3個RDD組成。

數(shù)據(jù)源構(gòu)建

1. 數(shù)據(jù)規(guī)范

假設(shè)我們采集了設(shè)備的指標(biāo)信息,這里我們只關(guān)注吞吐量和響應(yīng)時間,在采集之前定義數(shù)據(jù)字段和規(guī)范[throughput, response_time],這里都定義成int類型,響應(yīng)時間單位這里定義成毫秒ms。

實際情況中,我們不可能只采集一臺設(shè)備,如果我們想要得出每臺或者每個種類設(shè)備的指標(biāo)監(jiān)控,就要在采集數(shù)據(jù)的時候?qū)γ總€設(shè)備加上唯一ID或者TypeID。

我這里的想法是對每臺設(shè)備的指標(biāo)進行分析,所以我給每個設(shè)備都增加了一個唯一ID,最終字段[id, throughput, response_time],所以我們就按照這個數(shù)據(jù)格式,在SparkStreaming中構(gòu)建數(shù)據(jù)源讀取部分。

2. 讀取kafka

代碼語言:scala

復(fù)制

val conf = new SparkConf().setAppName("aqi").setMaster("local[1]")
val ssc = new StreamingContext(conf, Seconds(10))
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "121.91.168.193:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "aqi",
  "auto.offset.reset" -> "earliest",
  "enable.auto.commit" -> (true: java.lang.Boolean)
)

val topics = Array("evt_monitor")
val stream: DStream[String] = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
).map(_.value)

這里我們將一個RDD時間間隔設(shè)置為10S,因為使用的是筆記本跑,所以這里要將Master設(shè)置為local,表示本地運行模式,1代表使用1個線程。

我們使用Kafka作為數(shù)據(jù)源,在讀取時就要構(gòu)建Consumer的config,像bootstrap.servers這些基本配置沒有什么好說的,關(guān)鍵的是auto.offset.reset和enable.auto.commit,

這兩個參數(shù)分表控制讀取topic消費策略和是否提交offset。這里的earliest會從topic中現(xiàn)存最早的數(shù)據(jù)開始消費,latest是最新的位置開始消費。

當(dāng)重啟程序時,這兩種消費模式又被enable.auto.commit控制,設(shè)置true提交offset時,earliest和latest不再生效,都是從消費組記錄的offset進行消費。設(shè)置為false不提交offset,offset不被提交記錄earliest還是從topic中現(xiàn)存最早的數(shù)據(jù)開始消費,latest還是從最新的數(shù)據(jù)消費。

最后就是設(shè)置要讀取的topic和創(chuàng)建Kafka的DStream數(shù)據(jù)流。至此,整個數(shù)據(jù)源的讀取就已經(jīng)完成了,下面就是對數(shù)據(jù)處理邏輯的開發(fā)。

3. 指標(biāo)聚合計算

代碼語言:scala

復(fù)制

stream.map(x => {
      val s = x.split(",")
      (s(0), (s(2).toInt, 1))
    }).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
      .reduceByKeyAndWindow((x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2), Seconds(30), Seconds(10))
    .foreachRDD(rdd => {
      rdd.foreach(x => {
        val id = x._1
        val responseTimes = x._2._1
        val num = x._2._2
        val responseTime_avg = responseTimes / num
        println(id, responseTime_avg)
      })
    })

我們從自身需求出發(fā),來構(gòu)思程序邏輯的開發(fā)。從需求看,關(guān)鍵字無非是最近一段時間內(nèi)、平均值。想要取一段時間內(nèi)的數(shù)據(jù),就要使用滑動窗口,以當(dāng)前時間為基準(zhǔn),向前圈定時間范圍。

而平均值,無非就是將時間范圍內(nèi),即窗口所有的響應(yīng)時間加起來,然后除以數(shù)據(jù)條數(shù)即可。想要把所有的響應(yīng)時間加起來,這里使用reduceByKey() 將窗口內(nèi)相同ID的設(shè)備時間相加,將數(shù)據(jù)條數(shù)進行相加。

所以我在第一步切分?jǐn)?shù)據(jù)的時候,就將數(shù)據(jù)切分成KV的元組形式,V有兩個字段,第一個是響應(yīng)時間,第二個1表示一條數(shù)據(jù)。reduceByKey一共分為兩步,第一是RDD內(nèi)的reduceByKey,這也算是數(shù)據(jù)的預(yù)處理,RDD的數(shù)據(jù)只會計算一次,當(dāng)這個RDD被多個窗口使用,就不會重復(fù)計算了。第二步是基于窗口的reduceByKey,將窗口所有RDD的數(shù)據(jù)再一次聚合,最后在foreachRDD中獲取輸出

4. 驗證結(jié)果

我們向kafka的evt_monitor這個topic中寫入數(shù)據(jù)。

備注:(最后11那個id是終端顯示問題,其實是1),然后可以輸出平均值。

驗證結(jié)果是沒有問題的,換個角度,我們也可以從DAG來看。

這個窗口一共計算了3個RDD,其中左側(cè)的兩個是灰色的,上面是skipped標(biāo)識,代表著這兩個RDD在上一個窗口已經(jīng)計算完成了,在這個窗口只需要計算當(dāng)前的RDD,然后再一起對RDD的結(jié)果數(shù)據(jù)進行窗口計算。

結(jié)語

本篇文章主要是利用Spark的滑動窗口,做了一個計算平均響應(yīng)時長的應(yīng)用場景,以Kafka作為數(shù)據(jù)源、通過滑動窗口和reduceByKey算子得以實現(xiàn)。同時,開發(fā)Spark還是強烈推薦scala,整個程序看起來沒有任何多余的部分。

最后對于Spark和Flink的選型看法,Spark的確是在實時性上比Flink差一些,但是Spark對于窗口計算還是有優(yōu)勢的。所以對于每種技術(shù),也不用人云亦云,適合自己的才是最好的。

審核編輯 黃宇

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • RDD
    RDD
    +關(guān)注

    關(guān)注

    0

    文章

    7

    瀏覽量

    8179
  • 實時監(jiān)控
    +關(guān)注

    關(guān)注

    1

    文章

    128

    瀏覽量

    14290
  • SPARK
    +關(guān)注

    關(guān)注

    1

    文章

    108

    瀏覽量

    21243
收藏 人收藏
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

    評論

    相關(guān)推薦
    熱點推薦

    汽車級高精度隔離放大器AMC0x30S-Q1:特性、應(yīng)用與設(shè)計指南

    汽車級高精度隔離放大器AMC0x30S-Q1:特性、應(yīng)用與設(shè)計指南 在汽車電子領(lǐng)域,對于高精度、可靠的信號處理和隔離需求日益增長。德州儀器(TI)的AMC0x30S-Q1系列隔離放大器,包括
    的頭像 發(fā)表于 01-19 17:15 ?564次閱讀

    0到1搭建實時日志監(jiān)控系統(tǒng):基于WebSocket + Elasticsearch的實戰(zhàn)方案

    問題。 WebSocket斷連重試 :前端實現(xiàn)指數(shù)退避重連機制。 數(shù)據(jù)壓縮 :對大文本日志啟用Gzip壓縮,減少帶寬占用。 5. 最終效果 實時性 :日志產(chǎn)生到展示延遲 < 1秒 吞吐量
    發(fā)表于 01-09 16:43

    esp32s3多連接機BLE設(shè)備出現(xiàn)超時斷開連接的現(xiàn)象

    基于V5.5.1版本的gattc_gatts_cox例程修改;同時也修改了menuconfig里的配置;在都連接上四個機后;在數(shù)據(jù)傳輸過程一段時間(幾分鐘或者更久)會出現(xiàn)超時斷開連接的現(xiàn)象;有什么解決辦法;其中機在30s后連
    發(fā)表于 12-02 09:18

    NVIDIA DGX Spark助力構(gòu)建自己的AI模型

    2025 年 1 月 6 日,NVIDIA 正式宣布其 Project DIGITS 項目,并于 3 月 18 日更名為 NVIDIA DGX Spark,進一步公布了產(chǎn)品細節(jié)。DGX Spark
    的頭像 發(fā)表于 11-21 09:25 ?1189次閱讀
    NVIDIA DGX <b class='flag-5'>Spark</b>助力構(gòu)建自己的AI模型

    【CPKCOR-RA8D1】打造一個迷你系統(tǒng)監(jiān)控中心:ADC電壓與溫度實時顯示

    本文將帶領(lǐng)大家在CPKCOR-RA8D1開發(fā)板上,實現(xiàn)一個兼具實用與觀賞性的“迷你系統(tǒng)監(jiān)控中心”。項目基于MIPI顯示屏,實時可視化地展示ADC采集的電壓值以及MCU內(nèi)部溫度,讓您對系統(tǒng)狀態(tài)一目了然
    發(fā)表于 10-30 15:14

    【CPKCOR-RA8D1】+ 打造一個迷你系統(tǒng)監(jiān)控中心:ADC電壓與溫度實時顯示

    本文將帶領(lǐng)大家在CPKCOR-RA8D1開發(fā)板上,實現(xiàn)一個兼具實用與觀賞性的“迷你系統(tǒng)監(jiān)控中心”。項目基于MIPI顯示屏,實時可視化地展示ADC采集的電壓值以及MCU內(nèi)部溫度,讓您對系統(tǒng)狀態(tài)一目了然
    發(fā)表于 10-30 09:44

    怎樣確定實時校驗機制的驗證指標(biāo)?

    確定實時校驗機制的驗證指標(biāo),需遵循 “ 風(fēng)險導(dǎo)向 + 場景適配 + 標(biāo)準(zhǔn)量化 ” 原則,圍繞 “ 準(zhǔn)確性(防漏判 / 誤判)、抗干擾性(應(yīng)對復(fù)雜環(huán)境)、安全性(防篡改)、穩(wěn)定性(長期可靠) ” 四大
    的頭像 發(fā)表于 10-11 17:03 ?938次閱讀

    NVIDIA DGX Spark桌面AI計算機開啟預(yù)訂

    DGX Spark 現(xiàn)已開啟預(yù)訂!麗臺科技作為 NVIDIA 授權(quán)分銷商,提供產(chǎn)品到服務(wù)的一站式解決方案,助力輕松部署桌面 AI 計算機。
    的頭像 發(fā)表于 09-23 17:20 ?1336次閱讀
    NVIDIA DGX <b class='flag-5'>Spark</b>桌面AI<b class='flag-5'>計算</b>機開啟預(yù)訂

    DTU 30s后沒有指令,會自動斷開連接?

    DTU 30s后沒有指令,會自動斷開連接? 30s后沒有指令,DTU就會自動斷開連接,如果需要DTU不自動斷開,在30s內(nèi)發(fā)送查詢命令作為心跳包。
    發(fā)表于 08-07 07:38

    如何評估協(xié)議分析儀的性能指標(biāo)?

    評估協(xié)議分析儀的性能指標(biāo)硬件處理能力、協(xié)議解析精度、實時響應(yīng)效率、擴展性與兼容性、用戶體驗五大維度綜合考量。以下是具體指標(biāo)及評估方法,結(jié)合實際場景說明其重要性:一、硬件處理能力:決
    發(fā)表于 07-18 14:44

    網(wǎng)絡(luò)化多電機伺服系統(tǒng)監(jiān)控終端設(shè)計

    較少,只能實現(xiàn)基本的系統(tǒng)狀態(tài)監(jiān)控和報警等功能。同時,需要現(xiàn)場對每個電機驅(qū)動器參數(shù)逐一進行設(shè)定,不便于系統(tǒng)的使用和調(diào)試\"1。因此,針對基于CAN總線的多電機伺服系統(tǒng),設(shè)計一種實時性高、運行
    發(fā)表于 06-23 07:15

    邊緣計算網(wǎng)關(guān)在水產(chǎn)養(yǎng)殖尾水處理中的實時監(jiān)控應(yīng)用

    ,某大型水產(chǎn)養(yǎng)殖企業(yè)決定引入先進的 YC-GR90-S工業(yè)智能網(wǎng)關(guān) 技術(shù),對尾水處理過程進行遠程監(jiān)控和管理。 二、項目需求 設(shè)備遠程監(jiān)控: 需要實時
    的頭像 發(fā)表于 06-06 14:36 ?618次閱讀
    邊緣<b class='flag-5'>計算</b>網(wǎng)關(guān)在水產(chǎn)養(yǎng)殖尾水處理中的<b class='flag-5'>實時</b><b class='flag-5'>監(jiān)控</b>應(yīng)用

    自媒體推廣實時監(jiān)控服務(wù)器帶寬到用戶行為解決方法

    自媒體推廣的實時監(jiān)控需要從底層基礎(chǔ)設(shè)施到前端用戶行為進行全鏈路覆蓋,確保推廣活動的穩(wěn)定性和效果可追蹤。以下是系統(tǒng)性解決方案,主機推薦小編為您整理發(fā)布自媒體推廣實時監(jiān)控
    的頭像 發(fā)表于 04-09 10:47 ?630次閱讀

    邊緣計算網(wǎng)關(guān)的實時監(jiān)控與預(yù)測性維護都有哪些方面?適合哪些行業(yè)使用?

    邊緣計算網(wǎng)關(guān)的實時監(jiān)控與預(yù)測性維護都有哪些方面?適合哪些行業(yè)使用? 有實施過得案例的介紹嗎? 深控技術(shù)的不需要點表的邊緣計算網(wǎng)關(guān)如何?
    發(fā)表于 04-01 09:44

    NVIDIA 宣布推出 DGX Spark 個人 AI 計算

    的 DGX? 個人 AI 超級計算機。 ? DGX Spark(前身為 Project DIGITS)支持 AI 開發(fā)者、研究人員、數(shù)據(jù)科學(xué)家和學(xué)生,在臺式電腦上對大模型進行原型設(shè)計、微調(diào)和推理。用
    發(fā)表于 03-19 09:59 ?792次閱讀
       NVIDIA 宣布推出 DGX <b class='flag-5'>Spark</b> 個人 AI <b class='flag-5'>計算</b>機