哈尔滨理工大学
软件与微电子学院
实 验 报 告
(2020-2021第二学期)
课程名称: | 实时数据处理 |
班 级: | 软件18- 1 班 |
学 号: | 1814010130 |
姓 名: | 张立辉 |
哈尔滨理工大学软件与微电子学院
实验名称: | 实验1应用storm实现实时词频统计 | 专 业 | 软件工程 | |||
---|---|---|---|---|---|---|
姓 名 | 张立辉 | 学 号 | 1814010130 | 班 级 | 软件18-1 |
一、实验目的:
理解流式实时数据处理的基本思想,掌握应用storm实现实时数据处理的核心技术,以及可靠性保证技术。
二、实验内容:
任务一:实现博客文章标题的词频统计
要点:
(1)定义spout组件,读取爬取的博客文章标题
(2)定义第一个Bolt组件,对标题进行分词
(3)定义第二个Bolt组件,统计词频
任务二:实现博客文章标题的关联词频统计
要点:
(1)定义第三个Bolt组件,统计关联词频,也就是两个词同时出现的频率,比如 java+storm: 频率
三、实验设备及软件环境:
Windows10专业版
IntelliJ IDEA 2020.3.2 (Ultimate Edition)
Java15
四、实验过程及结果截图:
BiWordFreq.java
package bean;
import java.util.Date;
public class BiWordFreq {
private String word1;
private String word2;
private Date date;
private int freq;
public void setWord1(String word1) {
this.word1 = word1;
}
public String getWord1() {
return word1;
}
public void setWord2(String word2) {
this.word2 = word2;
}
public String getWord2() {
return word2;
}
public void setDate(Date date) {
this.date = date;
}
public Date getDate() {
return date;
}
public void setFreq(int freq) {
this.freq = freq;
}
public int getFreq() {
return freq;
}
}
WordFreq.java
package bean;
import java.util.Date;
public class WordFreq {
private String word;
private int freq;
private Date date;
public void setWord(String word) {
this.word = word;
}
public String getWord() {
return word;
}
public void setFreq(int freq) {
this.freq = freq;
}
public int getFreq() {
return freq;
}
public void setDate(Date date) {
this.date = date;
}
public Date getDate() {
return date;
}
}
FrequencyBolt.java
package bolt;
import java.util.Date;
import java.util.HashMap;
import java.util.Set;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import bean.WordFreq;
import dao.WordFreqDAO;
public class FrequencyBolt extends BaseRichBolt {
private Map<String, Integer> wordFreq;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
// TODO Auto-generated method stub
wordFreq = new HashMap<String, Integer>();
}
@Override
public void execute(Tuple input) {
// TODO Auto-generated method stub
String word = input.getStringByField("word");
if("".equals(word)) {
System.out.println("词频:");
System.out.println(wordFreq);
System.out.println();
Set<String> set = wordFreq.keySet();
for(String s: set) {
WordFreq wf = new WordFreq();
wf.setWord(s);
wf.setFreq(wordFreq.get(s));
wf.setDate(new Date());
new WordFreqDAO().saveWordFreq(wf);
}
return;
}
if(!wordFreq.containsKey(word)) {
wordFreq.put(word, 1);
}
else {
wordFreq.replace(word, wordFreq.get(word) + 1);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
}
ParticipleBolt.java
package bolt;
import java.io.IOException;
import java.io.StringReader;
import java.util.Map;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.wltea.analyzer.lucene.IKAnalyzer;
import org.apache.storm.tuple.Fields;
public class ParticipleBolt extends BaseRichBolt {
OutputCollector collector;
public void execute(Tuple arg0) {
String content = arg0.getStringByField("content");
if("".equals(content)) {
collector.emit(new Values(""));
return;
}
try(Analyzer analyzer = new IKAnalyzer(true);) {
TokenStream stream = analyzer.tokenStream("content", new StringReader(content));
CharTermAttribute cta = stream.addAttribute(CharTermAttribute.class);
stream.reset();
while (stream.incrementToken()) {
collector.emit(new Values(cta.toString()));
}
}
catch(IOException e) {
e.printStackTrace();
}
}
public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
collector = arg2;
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
RelationBolt.java
package bolt;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.wltea.analyzer.lucene.IKAnalyzer;
import bean.BiWordFreq;
import dao.BiWordFreqDAO;
public class RelationBolt extends BaseRichBolt {
private Map<String, Integer> wordFreq;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
// TODO Auto-generated method stub
wordFreq = new HashMap<String, Integer>();
}
@Override
public void execute(Tuple input) {
// TODO Auto-generated method stub
String content = input.getStringByField("content");
List<String> wordList = new ArrayList<String>();
if("".equals(content)) {
System.out.println("关联词频:");
System.out.println(wordFreq);
System.out.println();
Set<String> set = wordFreq.keySet();
for(String s: set) {
BiWordFreq wf = new BiWordFreq();
String[] words = s.split("-");
wf.setWord1(words[0]);
wf.setWord2(words[1]);
wf.setFreq(wordFreq.get(s));
wf.setDate(new Date());
new BiWordFreqDAO().saveBiWordFreq(wf);
}
return;
}
try(Analyzer analyzer = new IKAnalyzer(true);) {
TokenStream stream = analyzer.tokenStream("content", new StringReader(content));
CharTermAttribute cta = stream.addAttribute(CharTermAttribute.class);
stream.reset();
while (stream.incrementToken()) {
wordList.add(cta.toString());
}
}
catch(IOException e) {
e.printStackTrace();
}
for(int i = 0; i < wordList.size(); i++) {
for(int j = i + 1; j < wordList.size(); j++) {
String str1 = wordList.get(i);
String str2 = wordList.get(j);
String key = null;
if(str1.compareTo(str2) == 0) {
continue;
}
else if(str1.compareTo(str2) > 0) {
key = str2 + "-" + str1;
}
else {
key = str1 + "-" + str2;
}
if(!wordFreq.containsKey(key)) {
wordFreq.put(key, 1);
}
else {
wordFreq.replace(key, wordFreq.get(key) + 1);
}
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
}
BiWordFreqDAO.java
package dao;
import bean.BiWordFreq;
import util.JedisUtil;
import redis.clients.jedis.Jedis;
public class BiWordFreqDAO {
public void saveBiWordFreq(BiWordFreq wf) {
try(Jedis jedis = JedisUtil.getConnection()) {
String key = wf.getWord1() + "-" + wf.getWord2();
int value = wf.getFreq();
if(jedis.exists(key)) {
jedis.set(key, String.valueOf(Integer.parseInt(jedis.get(key)) + value));
}
else {
jedis.set(key, String.valueOf(value));
}
}
catch(Exception e) {
e.printStackTrace();
}
}
}
WordFreqDAO.java
package dao;
import bean.WordFreq;
import util.JedisUtil;
import redis.clients.jedis.Jedis;
public class WordFreqDAO {
public void saveWordFreq(WordFreq wf) {
try(Jedis jedis = JedisUtil.getConnection()) {
String key = wf.getWord();
int value = wf.getFreq();
if(jedis.exists(key)) {
jedis.set(key, String.valueOf(Integer.parseInt(jedis.get(key)) + value));
}
else {
jedis.set(key, String.valueOf(value));
}
}
catch(Exception e) {
e.printStackTrace();
}
}
}
Spout.java
package spout;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Values;
import org.apache.storm.tuple.Fields;
import redis.clients.jedis.Jedis;
import util.JedisUtil;
public class Spout extends BaseRichSpout {
private SpoutOutputCollector collector;
public void nextTuple() {
String content = JedisUtil.getConnection().get("!");
String[] strs = content.split("\n");
for(String s: strs) {
collector.emit(new Values(s));
}
collector.emit(new Values(""));
try {
Thread.sleep(Long.MAX_VALUE);
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector arg2) {
collector = arg2;
}
public void declareOutputFields(OutputFieldsDeclarer arg0) {
arg0.declare(new Fields("content"));
}
}
zlh.java
package test;
import topology.TopologyFactory;
import util.JedisUtil;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
public class zlh {
public static void main(String[] args) throws MalformedURLException, IOException {
Document doc = Jsoup.parse(new URL("https://blog.csdn.net/weixin_45267419"), 50000);
Elements titles = doc.select("h4");
StringBuffer sb = new StringBuffer();
for(Element title: titles) {
sb.append(title.html() + "\n");
}
sb.delete(sb.length() - 1, sb.length());
JedisUtil.getConnection().set("!", sb.toString());
StormTopology topology = TopologyFactory.factory();
LocalCluster cluster = new LocalCluster();
Config config = new Config();
cluster.submitTopology("1814010130", config, topology);
}
}
TopologyFactory.java
package topology;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.generated.StormTopology;
import spout.*;
import bolt.*;
public class TopologyFactory {
public static StormTopology factory() {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("s1", new Spout());
builder.setBolt("b1", new ParticipleBolt()).shuffleGrouping("s1");
builder.setBolt("b2", new FrequencyBolt()).shuffleGrouping("b1");
builder.setBolt("b3", new RelationBolt()).shuffleGrouping("s1");
return builder.createTopology();
}
}
JedisUtil.java
package util;
import redis.clients.jedis.Jedis;
public class JedisUtil {
public static Jedis getConnection() {
// return new Jedis("121.89.197.4", 6379);
Jedis jedis= new Jedis("121.89.197.4", 6379);
jedis.auth("root");
return jedis;
}
}
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com</groupId>
<artifactId>sssjclshiyan111</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>sssjclshiyan111</name>
<url>http://maven.apache.org</url>
<repositories>
<repository>
<id>aliyunmaven</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
</repositories>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<lucene.version>4.5.1</lucene.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.11</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.1</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.2</version>
</dependency>
<!-- lucene核心包 -->
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
<version>${lucene.version}</version>
</dependency>
<!--QueryParser 查询类-->
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-queryparser</artifactId>
<version>${lucene.version}</version>
</dependency>
<!-- 分词器 -->
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-analyzers-common</artifactId>
<version>${lucene.version}</version>
</dependency>
<!-- 高亮显示 -->
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-highlighter</artifactId>
<version>${lucene.version}</version>
</dependency>
<!-- ikanalyzer 分词器 -->
<dependency>
<groupId>com.janeluo</groupId>
<artifactId>ikanalyzer</artifactId>
<version>2012_u6</version>
</dependency>
<dependency>
<!-- jsoup HTML parser library @ https://jsoup.org/ -->
<groupId>org.jsoup</groupId>
<artifactId>jsoup</artifactId>
<version>1.13.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<!--这部分可有可无,加上的话则直接生成可运行jar包 -->
<!-- <archive>
<manifest>
<mainClass>com.thanks.hehe.App</mainClass>
</manifest>
</archive> -->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
运行结果:
五、总结:
通过本次实验:理解流式实时数据处理的基本思想,掌握应用storm实现实时数据处理的核心技术,以及可靠性保证技术
实验成绩: 指导教师: 年 月 日