博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Storm-源码分析-Topology Submit-Task
阅读量:6947 次
发布时间:2019-06-27

本文共 6490 字,大约阅读时间需要 21 分钟。

mk-task, 比较简单, 因为task只是概念上的结构, 不象其他worker, executor都需要创建进程或线程

所以其核心其实就是mk-task-data,
1. 创建TopologyContext对象, 其实就是把之前的topology对象和worker-data混合到一起, 便于task在执行时可以取到需要的topology信息.
2. 创建task-object, spout-object或bolt-object, 封装相应的逻辑, 如nextTuple, execute
3. 生成tasks-fn, 名字起的不好,让人误解执行了task的功能, 其实就是做些emit之间的准备工作, 其中最重要的就是调用grouper去产生targets task, 当然还包含些metrics, hooks的调用.

说白了其实mk-tasks, 没做啥事

(defn mk-task [executor-data task-id]  (let [task-data (mk-task-data executor-data task-id)  ;;1 mk-task-data          storm-conf (:storm-conf executor-data)]    (doseq [klass (storm-conf TOPOLOGY-AUTO-TASK-HOOKS)] ;; add预定义的hooks      (.addTaskHook ^TopologyContext (:user-context task-data) (-> klass Class/forName .newInstance)))    ;; when this is called, the threads for the executor haven't been started yet,    ;; so we won't be risking trampling on the single-threaded claim strategy disruptor queue    (send-unanchored task-data SYSTEM-STREAM-ID ["startup"])  ;;向SYSTEM-STREAM, 发送startup通知,谁会接收SYSTEM-STREAM…?    task-data    ))

 

1 mk-task-data

(defn mk-task-data [executor-data task-id]  (recursive-map    :executor-data executor-data    :task-id task-id    :system-context (system-topology-context (:worker executor-data) executor-data task-id)    :user-context (user-topology-context (:worker executor-data) executor-data task-id)    :builtin-metrics (builtin-metrics/make-data (:type executor-data))    :tasks-fn (mk-tasks-fn <>)    :object (get-task-object (.getRawTopology ^TopologyContext (:system-context <>)) (:component-id executor-data))))

1.1 TopologyContext

:system-context, :user-context, 只是context中的topology对象不同, system为system-topology!

1.2 builtin-metrics/make-data

这里的builtin-metrics用来记录spout或bolt的执行状况的metrics

1.3 mk-tasks-fn

返回tasks-fn, 这个函数主要用于做emit之前的准备工作, 返回target tasks list

1. 调用grouper, 产生target tasks
2. 执行emit hook
3. 满足sampler条件时, 更新stats和buildin-metrics

task-fn, 两种不同参数版本

[^String stream ^List values], 这个版本好理解些, 就是将stream对应的component的target tasks都算上(一个stream可能有多个out component, 一份数据需要发到多个bolt处理)

[^Integer out-task-id ^String stream ^List values], 指定out-task-id, 即direct grouping

这里对out-task-id做了验证
out-task-id (if grouping out-task-id), 即out-task-id->component->grouper不为nil(为:direct?), 即验证这个stream确实有到该out-task-id对应component
如果验证失败, 将out-task-id置nil

(defn mk-tasks-fn [task-data]  (let [task-id (:task-id task-data)        executor-data (:executor-data task-data)        component-id (:component-id executor-data)        ^WorkerTopologyContext worker-context (:worker-context executor-data)        storm-conf (:storm-conf executor-data)        emit-sampler (mk-stats-sampler storm-conf)        stream->component->grouper (:stream->component->grouper executor-data) ;;        user-context (:user-context task-data)        executor-stats (:stats executor-data)        debug? (= true (storm-conf TOPOLOGY-DEBUG))]            (fn ([^Integer out-task-id ^String stream ^List values]          (when debug?            (log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values))          (let [target-component (.getComponentId worker-context out-task-id)                component->grouping (get stream->component->grouper stream)                grouping (get component->grouping target-component)                out-task-id (if grouping out-task-id)]            (when (and (not-nil? grouping) (not= :direct grouping))              (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping")))                                      (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id]))            (when (emit-sampler)              (builtin-metrics/emitted-tuple! (:builtin-metrics task-data) executor-stats stream)              (stats/emitted-tuple! executor-stats stream)              (if out-task-id                (stats/transferred-tuples! executor-stats stream 1)                (builtin-metrics/transferred-tuple! (:builtin-metrics task-data) executor-stats stream 1)))            (if out-task-id [out-task-id])            ))        ([^String stream ^List values]           (when debug?             (log-message "Emitting: " component-id " " stream " " values))           (let [out-tasks (ArrayList.)]             (fast-map-iter [[out-component grouper] (get stream->component->grouper stream)]               (when (= :direct grouper)                  ;;  TODO: this is wrong, need to check how the stream was declared                  (throw (IllegalArgumentException. "Cannot do regular emit to direct stream")))               (let [comp-tasks (grouper task-id values)] ;;执行grouper, 产生target tasks                 (if (or (sequential? comp-tasks) (instance? Collection comp-tasks))                   (.addAll out-tasks comp-tasks)                   (.add out-tasks comp-tasks)                   )))             (apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks)) ;;执行事先注册的emit hook             (when (emit-sampler)    ;;满足抽样条件时, 更新stats和buildin-metrics中的emitted和transferred metric               (stats/emitted-tuple! executor-stats stream)               (builtin-metrics/emitted-tuple! (:builtin-metrics task-data) executor-stats stream)                             (stats/transferred-tuples! executor-stats stream (count out-tasks))               (builtin-metrics/transferred-tuple! (:builtin-metrics task-data) executor-stats stream (count out-tasks)))             out-tasks)))    ))

1.4 get-task-object

取出component的对象,

比如对于Spout, 取出SpoutSpec中的ComponentObject spout_object, 包含了spout的逻辑, 比如nextTuple()

(defn- get-task-object [^TopologyContext topology component-id]  (let [spouts (.get_spouts topology)        bolts (.get_bolts topology)        state-spouts (.get_state_spouts topology)        obj (Utils/getSetComponentObject             (cond              (contains? spouts component-id) (.get_spout_object ^SpoutSpec (get spouts component-id))              (contains? bolts component-id) (.get_bolt_object ^Bolt (get bolts component-id))              (contains? state-spouts component-id) (.get_state_spout_object ^StateSpoutSpec (get state-spouts component-id))              true (throw-runtime "Could not find " component-id " in " topology)))        obj (if (instance? ShellComponent obj)              (if (contains? spouts component-id)                (ShellSpout. obj)                (ShellBolt. obj))              obj )        obj (if (instance? JavaObject obj)              (thrift/instantiate-java-object obj)              obj )]    obj    ))

转载地址:http://cvhnl.baihongyu.com/

你可能感兴趣的文章
Windows漏洞利用技术概述
查看>>
多态与接口
查看>>
HTML5标准学习 - 文档结构
查看>>
zookeeper练习
查看>>
最短路径
查看>>
手机评测
查看>>
java ssm 后台框架平台 项目源码 websocket 即时通讯 IM quartz springmvc
查看>>
我的小爬虫—cocoa 中的正则表达式
查看>>
HTML5 中 div 和section以及 article 的不同之处
查看>>
Yii2学习笔记之场景
查看>>
CS Website
查看>>
docker - 容器里安装ssh
查看>>
Ant design 组件开发
查看>>
完整性约束
查看>>
docker 17.09.0-ce 启动更换网络地址
查看>>
关于《大道至简》第六章的收获
查看>>
JavaWeb部分面试题
查看>>
mac osx 系统开发php 的一些工具
查看>>
Tcp的三次握手,以及原理详解
查看>>
sprintboot 中占位符及多环境配置
查看>>