一个专注于大数据技术架构与应用分享的技术博客

Spark Streaming updateStateByKey案例实战和内幕源码解密

Spark Streaming updateStateByKey案例实战和内幕源码解密

前言

Spark 是当下最受欢迎的分布式计算框架之一。Spark StreamingSpark 中的一个核心模块,在数据处理领域中具有非常广泛的用途。其中,updateStateByKeySpark Streaming 中最为常用的算子之一。本篇文章将结合updateStateByKey案例,在实战中探究其内幕源码。

1. updateStateByKey 介绍

updateStateByKey 是一个非常重要的算子,可以用于更新状态。其原理是,它会对以前批处理中的 RDD 中的每个键的状态进行聚合,并将其与当前批处理中的数据进行聚合。这里的状态是指任何类型的数据,可以是自定义的对象,函数,甚至是 RDD。利用这个算子,可以很方便地计算某个时间窗口内的数据的统计数据,比如平均值,最大/最小值等。

2. updateStateByKey 应用场景

  • 用户的今日访问次数;
  • 用户的某个操作是否完成;
  • 实时交易排名的更新;
  • 统计窗口内新增或取消订单数量等等。

3. updateStateByKey 算子使用案例

val stateSpec =
  StateSpec
    .function(mappingFunction _)
    .timeout(Minutes(30))

val processedStream =
  stream
    .map(x => (x.department, x.salary))
    .reduceByKey(_ + _)
    .mapWithState(stateSpec)

以上代码是一个经典的 updateStateByKey 实例。首先,数据从 stream 中获取,在 stream 中,利用 map 的函数将数据由 (String, Double) 形式的元组转换为 (String, Double) 形式的键值对。然后,利用 reduceByKey 算子进行累加操作。最后,调用 mapWithState 来对状态进行处理。

4. updateStateByKey 算子内幕源码解密

为了更好地理解 updateStateByKey 的内幕,我们分析一下其底层源码。在 Spark 中,updateStateByKey 是通过 flatMapWithState 实现的。

def updateStateByKey[S: ClassTag, T: ClassTag](
      updateFunc: (Seq[T], Option[S]) => Option[S]
    ): DStream[(K, S)] = {

    val mappingFunc = (key: K, value: Option[T], state: State[S]) => {
      /**
       * The contract for state updating is that anything returned from `updateStateByKey` is
       * guaranteed to be returned in a subsequent pairing of the key-value pair. Returns of `None`
       * are handled delicately. If the state was already previously defined, then an `Option(state)`
       * value will be passed as the second argument to `mappingFunc`. If it wasn't, then `None`
       * *might* be. Seems iffy, no?
       */
      val (newValue, outputState) = value match {
        case Some(value) =>
          val seq = state.getOption().map(Seq(_)).getOrElse(Seq()) :+ value
          val updatedState = updateFunc(seq, state.getStateOption)
          (updatedState, updatedState)
        case None =>
          val updatedState = updateFunc(Seq(), state.getStateOption)
          (None, updatedState)
      }

      state.update(outputState)

      if (newValue.isDefined || state.hasTimedOut) {
        Some(key, newValue.getOrElse(outputState))
      } else {
        None
      }
    }

    flatMapWithState(StateSpec.function(mappingFunc))
  }

解析以上源码,我们发现 updateStateByKey 算子可以接收函数类型的参数 updateFunc,它用于对现有状态进行更新。

updateFunc 中,某一个键上每次汇总时,传入 updateStateByKey 函数之前的累计结果以及旧状态。在 updateFunc 函数中,我们将状态与累计结果相加,更新状态,并返回新的状态。最后,flatMapWithState 函数会将更新过的状态重新打回 DStream,供下一次累计使用。

5. 总结

本文针对 Spark Streaming updateStateByKey 算子的实战和内幕源码进行了解析。提到了 updateStateByKey 的基本概念和算子底层实现原理,希望能够对大家的学习有所帮助。最后,提醒大家注意实践的同时,也要多思考源码的实现原理,以便于更好地应用到自己的项目中。

赞(0)
版权声明:本文采用知识共享 署名4.0国际许可协议 [BY-NC-SA] 进行授权
文章名称:《Spark Streaming updateStateByKey案例实战和内幕源码解密》
文章链接:https://macsishu.com/spark-streaming-updatestatebykey-case-of-actual
本站资源仅供个人学习交流,请于下载后24小时内删除,不允许用于商业用途,否则法律问题自行承担。