Storm 学习记录
原创编写: 王宇
2016-10-20
Storm 概念
Let us now have a closer look at the components of Apache Storm −
Tuple | Tuple is the main data structure in Storm. It is a list of ordered elements. By default, a Tuple supports all data types. Generally, it is modelled as a set of comma separated values and passed to a Storm cluster. |
Stream | Stream is an unordered sequence of tuples. |
Spouts | Source of stream. Generally, Storm accepts input data from raw data sources like Twitter Streaming API, Apache Kafka queue, Kestrel queue, etc. Otherwise you can write spouts to read data from datasources. “ISpout” is the core interface for implementing spouts. Some of the specific interfaces are IRichSpout, BaseRichSpout, KafkaSpout, etc. |
Bolts | Bolts are logical processing units. Spouts pass data to bolts and bolts process and produce a new output stream. Bolts can perform the operations of filtering, aggregation, joining, interacting with data sources and databases. Bolt receives data and emits to one or more bolts. “IBolt” is the core interface for implementing bolts. Some of the common interfaces are IRichBolt, IBasicBolt, etc. |
Topology | Spouts and bolts are connected together and they form a topology. Real-time application logic is specified inside Storm topology. In simple words, a topology is a directed graph where vertices are computation and edges are stream of data. |
Tasks | In simple words, a task is either the execution of a spout or a bolt. |
Nimbus | Nimbus is a master node of Storm cluster. All other nodes in the cluster are called as worker nodes. Master node is responsible for distributing data among all the worker nodes, assign tasks to worker nodes and monitoring failures. |
Supervisor | The nodes that follow instructions given by the nimbus are called as Supervisors. A supervisor has multiple worker processes and it governs worker processes to complete the tasks assigned by the nimbus. |
Worker process | A worker process will execute tasks related to a specific topology. A worker process will not run a task by itself, instead it creates executors and asks them to perform a particular task. A worker process will have multiple executors. |
Executor | An executor is nothing but a single thread spawn by a worker process. An executor runs one or more tasks but only for a specific spout or bolt. |
Task | A task performs actual data processing. So, it is either a spout or a bolt. |
ZooKeeper framework | Apache ZooKeeper is a service used by a cluster (group of nodes) to coordinate between themselves and maintaining shared data with robust synchronization techniques. Nimbus is stateless, so it depends on ZooKeeper to monitor the working node status. |
ZooKeeper helps the supervisor to interact with the nimbus. It is responsible to maintain the state of nimbus and supervisor.|
- Stream Grouping(消息分发策略)
- Shuffle Grouping 随机分组
- Fields Grouping 按字段分组
- All Grouping 广播发送,对于每个tuple, 所有Bolts都会收到
- Global Grouping 全局分组
- None Grouping 同随机分组相同
- Direct Grouping 指向分组
- Local or shuffle Grouping 本地或随机分组
Storm Workflow
- Local Mode
- Production Mode
Storm 配置
- 步骤一: 安装JDK 并配置环境变量 JAVA_HOME CLASSPATH
-
步骤二 : 安装ZooKeeper
下载ZooKeeper
解包$ tar xzvf zookeeper-3.5.2-alpha.tar.gz
$ mv ./zookeeper-3.5.2-alpha /opt/zookeepter
$ cd /opt/zookeeper
$ mkdir data
创建配置文件
$ cd /opt/zookeeper
$ vim conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2
启动ZooKeeper Seve
$ bin/zkServer.sh start
-
步骤三:在安装配置Storm
下载Storm
解包$ tar xvfz apache-storm-1.0.2.tar.gz
$ mv apache-storm-1.0.2/opt/storm
$ cd /opt/storm
$ mkdir data
编辑Storm配置
$ cd /opt/storm
$ vim conf/storm.yaml
storm.zookeeper.servers:
-"localhost"
storm.local.dir:“/path/to/storm/data(any path)”
nimbus.host:"localhost"
supervisor.slots.ports:
-6700
-6701
-6702
-6703
ui.port:6969
启动 Nimbus
$ cd /opt/storm
$ ./bin/storm nimbus
启动 Supervisor
$ cd /opt/storm
$ ./bin/stormi supervisor
启动 UI
$ cd /opt/storm
$ ./bin/storm ui
在Storm上开发实现一个统计任务
- 场景 - 统计移动电话的数量.
在Spout中,准备4个电话号码和电话之间随机通话数量。
分别创建不同的Bolt,用于统计
使用 Topology 将 Spout 和 Bolt 关联起来 - 以下程序在Ubuntu 16.04 64位 JDK1.8 环境下编译执行通过
-
创建 Spout 组件
Spout 需要继承 IRichSpout 接口, 接口描述如下:open − Provides the spout with an environment to execute. The executors will run this method to initialize the spout.
nextTuple − Emits the generated data through the collector.
close − This method is called when a spout is going to shutdown.
declareOutputFields − Declares the output schema of the tuple.
ack − Acknowledges that a specific tuple is processed
fail − Specifies that a specific tuple is not processed and not to be reprocessed.import java.util.*;
//import storm tuple packages
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
//import Spout interface packages
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
//Create a class FakeLogReaderSpout which implement IRichSpout interface to access functionalities
publicclassFakeCallLogReaderSpoutimplementsIRichSpout{
//Create instance for SpoutOutputCollector which passes tuples to bolt.
privateSpoutOutputCollector collector;
privateboolean completed =false;
//Create instance for TopologyContext which contains topology data.
privateTopologyContext context;
//Create instance for Random class.
privateRandom randomGenerator =newRandom();
privateInteger idx =0;
@Override
publicvoid open(Map conf,TopologyContext context,SpoutOutputCollector collector){
this.context = context;
this.collector = collector;
}
@Override
publicvoid nextTuple(){
if(this.idx <=1000){
List<String> mobileNumbers =newArrayList<String>();
mobileNumbers.add("1234123401");
mobileNumbers.add("1234123402");
mobileNumbers.add("1234123403");
mobileNumbers.add("1234123404");
Integer localIdx =0;
while(localIdx++<100&&this.idx++<1000){
String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
while(fromMobileNumber == toMobileNumber){
toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
}
Integer duration = randomGenerator.nextInt(60);
this.collector.emit(newValues(fromMobileNumber, toMobileNumber, duration));
}
}
}
@Override
publicvoid declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(newFields("from","to","duration"));
}
//Override all the interface methods
@Override
publicvoid close(){}
publicboolean isDistributed(){
returnfalse;
}
@Override
publicvoid activate(){}
@Override
publicvoid deactivate(){}
@Override
publicvoid ack(Object msgId){}
@Override
publicvoid fail(Object msgId){}
@Override
publicMap<String,Object> getComponentConfiguration(){
returnnull;
}
}
-
创建 Bolt 组件
Bolt 需要继承 IRichBolt 接口, 接口描述如下prepare − Provides the bolt with an environment to execute. The executors will run this method to initialize the spout.
execute − Process a single tuple of input.
cleanup − Called when a bolt is going to shutdown.
declareOutputFields − Declares the output schema of the tuple.//import util packages
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
//import Storm IRichBolt package
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
//Create a class CallLogCreatorBolt which implement IRichBolt interface
publicclassCallLogCreatorBoltimplementsIRichBolt{
//Create instance for OutputCollector which collects and emits tuples to produce output
privateOutputCollector collector;
@Override
publicvoid prepare(Map conf,TopologyContext context,OutputCollector collector){
this.collector = collector;
}
@Override
publicvoid execute(Tuple tuple){
Stringfrom= tuple.getString(0);
String to = tuple.getString(1);
Integer duration = tuple.getInteger(2);
collector.emit(newValues(from+" - "+ to, duration));
}
@Override
publicvoid cleanup(){}
@Override
publicvoid declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(newFields("call","duration"));
}
@Override
publicMap<String,Object> getComponentConfiguration(){
returnnull;
}
}
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
publicclassCallLogCounterBoltimplementsIRichBolt{
Map<String,Integer> counterMap;
privateOutputCollector collector;
@Override
publicvoid prepare(Map conf,TopologyContext context,OutputCollector collector){
this.counterMap =newHashMap<String,Integer>();
this.collector = collector;
}
@Override
publicvoid execute(Tuple tuple){
String call = tuple.getString(0);
Integer duration = tuple.getInteger(1);
if(!counterMap.containsKey(call)){
counterMap.put(call,1);
}else{
Integer c = counterMap.get(call)+1;
counterMap.put(call, c);
}
collector.ack(tuple);
}
@Override
publicvoid cleanup(){
for(Map.Entry<String,Integer> entry:counterMap.entrySet()){
System.out.println(entry.getKey()+" : "+ entry.getValue());
}
}
@Override
publicvoid declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(newFields("call"));
}
@Override
publicMap<String,Object> getComponentConfiguration(){
returnnull;
}
}
-
创建 Topology 和 Local Cluster
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
//import storm configuration packages
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
//Create main class LogAnalyserStorm submit topology.
publicclassLogAnalyserStorm{
publicstaticvoid main(String[] args)throwsException{
//Create Config instance for cluster configuration
Config config =newConfig();
config.setDebug(true);
//
TopologyBuilder builder =newTopologyBuilder();
builder.setSpout("call-log-reader-spout",newFakeCallLogReaderSpout());
builder.setBolt("call-log-creator-bolt",newCallLogCreatorBolt())
.shuffleGrouping("call-log-reader-spout");
builder.setBolt("call-log-counter-bolt",newCallLogCounterBolt())
.fieldsGrouping("call-log-creator-bolt",newFields("call"));
LocalCluster cluster =newLocalCluster();
cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
Thread.sleep(10000);
//Stop the topology
cluster.shutdown();
}
}
- 远程模式
http://storm.apache.org/releases/current/Distributed-RPC.html -
编译并运行应用
$ cd /opt/storm/my-example
$ javac *.java
$ java LogAnalyserStorm
-
输出结果
1234123402-1234123401:78
1234123402-1234123404:88
1234123402-1234123403:105
1234123401-1234123404:74
1234123401-1234123403:81
1234123401-1234123402:81
1234123403-1234123404:86
1234123404-1234123401:63
1234123404-1234123402:82
1234123403-1234123402:83
1234123404-1234123403:86
1234123403-1234123401:93
参考
Storm 官网 : http://storm.apache.org
教程 : https://www.tutorialspoint.com/apache_storm/index.htm
Storm-Java Doc http://storm.apache.org/releases/current/javadocs/index.html
- PDF
《Storm Applied》
《Getting Started with Storm》
《Storm Real-time Processing Cookbook》
《Learning Storm》
《Storm Blueprints:Patterns for Distributed Real-time Computation》
《Hadoop The Definitive Guide》
相关推荐
根据《get started with storm》.pdf写的storm学习笔记
IT十八掌第三期配套资料!...1、Storm介绍及特点 2、storm的优势与应用 3、storm使用和配置 4、配置storm并发度 5、配置storm完全分布式集群 6、storm开发环境与生产环境 7、storm的topology再平衡 8、分组、自定义分组
个人学习storm总结的笔记,有需要的可以下载看一下.
十八掌徐培成 storm 入门到精通视频讲解,总共5天,20个视频
内容概要: • Storm 记录级容错原理 • Storm 配置详解 • Storm 批处理 • Storm TOPN • Storm 流程聚合 • Storm DRPC • Storm executor、worker、task之间的关系和调优 • Storm异常解决
本文来自于csdn,本文简要通过storm和Hadoop各角色对比,介绍了storm的运行流程,希望对您的学习有帮助。一、storm是一个用于实时流式计算的分布式计算引擎,弥补了Hadoop在实时计算方面的不足(Hadoop在本质上是一...
storm 入门学习笔记。主要包括 storm概念,storm架构。能很好的对 storm 基础有一个入门。
Storm企业级应用实战、运维和调优书籍资料以及相关storm学习笔记ppt
storm kafka之间的搭建和配置开发的资料
strom学习笔记
BigdataNote ...大数据生态学习笔记 大数据笔记 hadoop搭建 hive笔记 HDFS YARN Mapreduce ORACLE面试 oracle 优化方法总结 https://www.cnblogs.com/doudou618/p/9376424.html 数据库面试 ...
一、Hadoop 二、Hive 三、Spark 四、Storm 五、Flink 六、HBase
针对storm的安装过程的介绍及整理、监控页面、storm的架构等,适合需要学习、首次安装的人做参考!
学习大数据的笔记。
是本人学习Java过程中记录的一些代码!从Java基础的数据类型、jdk1.8的使用、IO、集合、线程等等技术以及一些常用框架,netty、mina、springboot、kafka、storm、zookeeper、es、redis、hbase、hive等等。 使用 ...
这是本人学习过程中记录的一些代码!从Java基础的数据类型、修饰符、String类、IO、集合、线程等等到一些常用框架,Netty、Mina、SpringBoot、kafka、storm、zookeeper、redis、hbase、hive等等。
s这门技术有点特殊,跟比如其他的像纯java的课程,比如分布式课程,或者大数据类的课程,比如hadoop,spark,storm等。不太一样 2、es非常重要的一个api,是它的restful api,你自己思考一下,掌握这个es的restful ...
这学期云计算课程需要使用集群环境,刚好学校有云资源,记录下集群搭建过程 2. 目录结构 hadoop-train Hadoop基础与电商行为日志分析 新手入门大数据 本课程从Hadoop核心技术入手,以电商项目为依托,带领你从0基础...
非常全面的hive学习笔记,出自尚硅谷。
学习过程中记录的一些代码!从Java基础的数据类型、修饰符、String类、IO、集合、线程等等到一些常用框架,Netty、Mina、SpringBoot、kafka、storm、zookeeper、redis、hbase、hive等等。