哈尔滨理工大学
软件与微电子学院
实 验 报 告
(2020-2021第二学期)
课程名称: | 实时数据处理 |
班 级: | 软件18- 1 班 |
学 号: | 1814010130 |
姓 名: | 张立辉 |
哈尔滨理工大学软件与微电子学院
实验名称: | 实验2 storm实时处理结果的NoSQL保存 | 专 业 | 软件工程 | |||
---|---|---|---|---|---|---|
姓 名 | 张立辉 | 学 号 | 1814010130 | 班 级 | 软件18-1 |
一、实验目的:
理解大数据场景下数据处理和数据保存的结合,掌握Storm和Redis数据库的联合应用
二、实验内容:
在实验一的基础上,设计实时处理结果的键值模型,并定义一个Bolt完成实时处理结果的存储。
(1)任务一:保存词和词频数据
键值模型如下:
Word: frequency
定义VO类和接口:
public class WordFreq{
private String word;
private int freq;
private Date date; //以月为单位,比如2017-05-01
//setter getter
}
public interface WordFreqDao{
public void saveWordFreq(WordFreq wf);
}
(2)任务二:保存词的联合频率
键值模型如下:
Word-word: frequency
定义VO类和接口:
public class BiWordFreq{
private String word1;
private String word2;
private Date date; //以月为单位,比如2017-05-01
private int freq;
//setter getter
}
public interface BiWordFreqDao{
public void saveBiWordFreq(biWordFreq wf);
}
三、实验设备及软件环境:
Windows10专业版
IntelliJ IDEA 2020.3.2 (Ultimate Edition)
Java15
四、实验过程及结果截图:
BiWordFreq.java
package bean;
import java.util.Date;
public class BiWordFreq {
private String word1;
private String word2;
private Date date;
private int freq;
public void setWord1(String word1) {
this.word1 = word1;
}
public String getWord1() {
return word1;
}
public void setWord2(String word2) {
this.word2 = word2;
}
public String getWord2() {
return word2;
}
public void setDate(Date date) {
this.date = date;
}
public Date getDate() {
return date;
}
public void setFreq(int freq) {
this.freq = freq;
}
public int getFreq() {
return freq;
}
}
WordFreq.java
package bean;
import java.util.Date;
public class WordFreq {
private String word;
private int freq;
private Date date;
public void setWord(String word) {
this.word = word;
}
public String getWord() {
return word;
}
public void setFreq(int freq) {
this.freq = freq;
}
public int getFreq() {
return freq;
}
public void setDate(Date date) {
this.date = date;
}
public Date getDate() {
return date;
}
}
FrequencyBolt.java
package bolt;
import java.util.Date;
import java.util.HashMap;
import java.util.Set;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import bean.WordFreq;
import dao.WordFreqDAO;
public class FrequencyBolt extends BaseRichBolt {
private Map<String, Integer> wordFreq;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
// TODO Auto-generated method stub
wordFreq = new HashMap<String, Integer>();
}
@Override
public void execute(Tuple input) {
// TODO Auto-generated method stub
String word = input.getStringByField("word");
if("".equals(word)) {
System.out.println("词频:");
System.out.println(wordFreq);
System.out.println();
Set<String> set = wordFreq.keySet();
for(String s: set) {
WordFreq wf = new WordFreq();
wf.setWord(s);
wf.setFreq(wordFreq.get(s));
wf.setDate(new Date());
new WordFreqDAO().saveWordFreq(wf);
}
return;
}
if(!wordFreq.containsKey(word)) {
wordFreq.put(word, 1);
}
else {
wordFreq.replace(word, wordFreq.get(word) + 1);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
}
ParticipleBolt.java
package bolt;
import java.io.IOException;
import java.io.StringReader;
import java.util.Map;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.wltea.analyzer.lucene.IKAnalyzer;
import org.apache.storm.tuple.Fields;
public class ParticipleBolt extends BaseRichBolt {
OutputCollector collector;
public void execute(Tuple arg0) {
String content = arg0.getStringByField("content");
if("".equals(content)) {
collector.emit(new Values(""));
return;
}
try(Analyzer analyzer = new IKAnalyzer(true);) {
TokenStream stream = analyzer.tokenStream("content", new StringReader(content));
CharTermAttribute cta = stream.addAttribute(CharTermAttribute.class);
stream.reset();
while (stream.incrementToken()) {
collector.emit(new Values(cta.toString()));
}
}
catch(IOException e) {
e.printStackTrace();
}
}
public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
collector = arg2;
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
RelationBolt.java
package bolt;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.wltea.analyzer.lucene.IKAnalyzer;
import bean.BiWordFreq;
import dao.BiWordFreqDAO;
public class RelationBolt extends BaseRichBolt {
private Map<String, Integer> wordFreq;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
// TODO Auto-generated method stub
wordFreq = new HashMap<String, Integer>();
}
@Override
public void execute(Tuple input) {
// TODO Auto-generated method stub
String content = input.getStringByField("content");
List<String> wordList = new ArrayList<String>();
if("".equals(content)) {
System.out.println("关联词频:");
System.out.println(wordFreq);
System.out.println();
Set<String> set = wordFreq.keySet();
for(String s: set) {
BiWordFreq wf = new BiWordFreq();
String[] words = s.split("-");
wf.setWord1(words[0]);
wf.setWord2(words[1]);
wf.setFreq(wordFreq.get(s));
wf.setDate(new Date());
new BiWordFreqDAO().saveBiWordFreq(wf);
}
return;
}
try(Analyzer analyzer = new IKAnalyzer(true);) {
TokenStream stream = analyzer.tokenStream("content", new StringReader(content));
CharTermAttribute cta = stream.addAttribute(CharTermAttribute.class);
stream.reset();
while (stream.incrementToken()) {
wordList.add(cta.toString());
}
}
catch(IOException e) {
e.printStackTrace();
}
for(int i = 0; i < wordList.size(); i++) {
for(int j = i + 1; j < wordList.size(); j++) {
String str1 = wordList.get(i);
String str2 = wordList.get(j);
String key = null;
if(str1.compareTo(str2) == 0) {
continue;
}
else if(str1.compareTo(str2) > 0) {
key = str2 + "-" + str1;
}
else {
key = str1 + "-" + str2;
}
if(!wordFreq.containsKey(key)) {
wordFreq.put(key, 1);
}
else {
wordFreq.replace(key, wordFreq.get(key) + 1);
}
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
}
BiWordFreqDAO.java
package dao;
import bean.BiWordFreq;
import util.JedisUtil;
import redis.clients.jedis.Jedis;
public class BiWordFreqDAO {
public void saveBiWordFreq(BiWordFreq wf) {
try(Jedis jedis = JedisUtil.getConnection()) {
String key = wf.getWord1() + "-" + wf.getWord2();
int value = wf.getFreq();
if(jedis.exists(key)) {
jedis.set(key, String.valueOf(Integer.parseInt(jedis.get(key)) + value));
}
else {
jedis.set(key, String.valueOf(value));
}
}
catch(Exception e) {
e.printStackTrace();
}
}
}
WordFreqDAO.java
package dao;
import bean.WordFreq;
import util.JedisUtil;
import redis.clients.jedis.Jedis;
public class WordFreqDAO {
public void saveWordFreq(WordFreq wf) {
try(Jedis jedis = JedisUtil.getConnection()) {
String key = wf.getWord();
int value = wf.getFreq();
if(jedis.exists(key)) {
jedis.set(key, String.valueOf(Integer.parseInt(jedis.get(key)) + value));
}
else {
jedis.set(key, String.valueOf(value));
}
}
catch(Exception e) {
e.printStackTrace();
}
}
}
Spout.java
package spout;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Values;
import org.apache.storm.tuple.Fields;
import redis.clients.jedis.Jedis;
import util.JedisUtil;
public class Spout extends BaseRichSpout {
private SpoutOutputCollector collector;
public void nextTuple() {
String content = JedisUtil.getConnection().get("!");
String[] strs = content.split("\n");
for(String s: strs) {
collector.emit(new Values(s));
}
collector.emit(new Values(""));
try {
Thread.sleep(Long.MAX_VALUE);
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector arg2) {
collector = arg2;
}
public void declareOutputFields(OutputFieldsDeclarer arg0) {
arg0.declare(new Fields("content"));
}
}
zlh.java
package test;
import topology.TopologyFactory;
import util.JedisUtil;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
public class zlh {
public static void main(String[] args) throws MalformedURLException, IOException {
Document doc = Jsoup.parse(new URL("https://blog.csdn.net/weixin_45267419"), 50000);
Elements titles = doc.select("h4");
StringBuffer sb = new StringBuffer();
for(Element title: titles) {
sb.append(title.html() + "\n");
}
sb.delete(sb.length() - 1, sb.length());
JedisUtil.getConnection().set("!", sb.toString());
StormTopology topology = TopologyFactory.factory();
LocalCluster cluster = new LocalCluster();
Config config = new Config();
cluster.submitTopology("1814010130", config, topology);
}
}
TopologyFactory.java
package topology;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.generated.StormTopology;
import spout.*;
import bolt.*;
public class TopologyFactory {
public static StormTopology factory() {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("s1", new Spout());
builder.setBolt("b1", new ParticipleBolt()).shuffleGrouping("s1");
builder.setBolt("b2", new FrequencyBolt()).shuffleGrouping("b1");
builder.setBolt("b3", new RelationBolt()).shuffleGrouping("s1");
return builder.createTopology();
}
}
JedisUtil.java
package util;
import redis.clients.jedis.Jedis;
public class JedisUtil {
public static Jedis getConnection() {
// return new Jedis("121.89.197.4", 6379);
Jedis jedis= new Jedis("121.89.197.4", 6379);
jedis.auth("root");
return jedis;
}
}
运行结果:
reids内容:
五、总结:
通过本次实验:理解大数据场景下数据处理和数据保存的结合,掌握Storm和Redis数据库的联合应用
实验成绩: 指导教师: 年 月 日