Jstorm源码分析--kill、rebanlance、activate、deactivate方法流程
发布时间:2016-12-08 18:16
本文关键词:Storm源码分析,由笔耕文化传播整理发布。
JStorm源码分析--kill、rebanlance、activate、deactivate方法流程
2015-05-19 浏览()
[摘要:那几个kill、rebanlance、activate、deactivate方式放正在一路剖析,是由于他们皆依附于nimbus界说的通用的 状况转移方式: transitionName ********************************************那里只详细剖析kill方式]
这几个kill、rebanlance、activate、deactivate方法放在一起分析,是因为他们都依赖于nimbus定义的通用的 状态转移方法:
transitionName********************************************这里只具体分析kill方法*********************************一:初始化LocalCluster同拓扑提交一样,详见大神;
二:LocalCluster完成构造之后,即可使用killTopology/ killTopologyWithOpts杀死拓扑。
跟踪killTopologyWithOpts方法,方法定义如下:
public void killTopologyWithOpts(String topologyName, KillOptions options)可以看到需要两个参数,,分别是拓扑的名称和kill的操作。 @Override public void killTopologyWithOpts(String topologyName, KillOptions options) throws NotAliveException, TException { try { checkTopologyActive(data, topologyName, true); Integer wait_amt = null; //查看是否设置了杀死拓扑的等待时间,如果设置了就等待 if (options.is_set_wait_secs()) { wait_amt = options.get_wait_secs(); } // NimbusUtils.transitionName(data, topologyName, true, StatusType.kill, wait_amt); } catch (NotAliveException e) { String errMsg = "KillTopology Error, no this topology " + topologyName; LOG.error(errMsg, e); throw new NotAliveException(errMsg); } catch (Exception e) { String errMsg = "Failed to kill topology " + topologyName; LOG.error(errMsg, e); throw new TException(errMsg); } }流程:1,先查看拓扑是否Active,
public void checkTopologyActive(NimbusData nimbus, String topologyName, boolean bActive) throws Exception { if (isTopologyActive(nimbus.getStormClusterState(), topologyName) != bActive) { if (bActive) { throw new NotAliveException(topologyName + " is not alive"); } else { throw new AlreadyAliveException(topologyName + " is already active"); } } }跟下去 public boolean isTopologyActive(StormClusterState stormClusterState, String topologyName) throws Exception { boolean rtn = false; if (Cluster.get_topology_id(stormClusterState, topologyName) != null) { rtn = true; } return rtn; }
2:查看是否设置了kill操作是否设置了等待时间,如果设置了就等待后再执行
3:transitionName:这是在nimbus定义的通用的 状态转移方法,,他们会根据传入的转移事件做相应的处理。
public static <T> void transitionName(NimbusData data, String topologyName, boolean errorOnNoTransition, StatusType transition_status, T... args) throws Exception { //获得nimbus的信息,通过getStormClusterState查找对应的storm-id(实际上就是将storm-name 转换为storm-id的状态),如果找到了,就调用transition方法。 StormClusterState stormClusterState = data.getStormClusterState(); String topologyId = Cluster.get_topology_id(stormClusterState, topologyName); if (topologyId == null) { throw new NotAliveException(topologyName); } //开始执行kill操作,转变拓扑状态。 transition(data, topologyId, errorOnNoTransition, transition_status, args); }本文关键词:Storm源码分析,由笔耕文化传播整理发布。
本文编号:208466
本文链接:https://www.wllwen.com/wenshubaike/mishujinen/208466.html
最近更新
教材专著