哈尔滨理工大学
软件与微电子学院
实 验 报 告
(2020-2021第二学期)
课程名称: | 实时数据处理 |
班 级: | 软件18- 1 班 |
学 号: | 1814010130 |
姓 名: | 张立辉 |
哈尔滨理工大学软件与微电子学院
实验名称: | 实验4Storm集群和Redis集群综合应用 | 专 业 | 软件工程 | |||
---|---|---|---|---|---|---|
姓 名 | 张立辉 | 学 号 | 1814010130 | 班 级 | 软件18-1 |
一、实验目的:
理解大数据场景下数据处理和数据保存的结合,掌握Storm集群和Redis数据库集群的联合应用
二、实验内容:
把redis数据库作为消息队列缓存爬虫爬取的数据;
在实验一的基础上,设计实时处理结果的键值模型,并定义一个Bolt实现实时处理结果保存至redis集群。
(1)任务一:redis数据库作为消息队列
要点:
(1)启动一个无需持久化的redis实例,用作消息队列缓存
(2)应用Redis的rpush和lpop功能,爬虫的数据rpush至redis
(3)在spout里lpop数据
(2)任务二:搭建客户端管理的redis集群,该集群架构如下图:
在Bolt里访问该集群,实现处理结果的持久化,包括词的频率和词的联合频率
三、实验设备及软件环境:
Windows10专业版
IntelliJ IDEA 2020.3.2 (Ultimate Edition)
Java15
四、实验过程及结果截图:
实验二代码基础上
JedisUtil.java
package util;
import java.util.List;
import java.util.ArrayList;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.ShardedJedis;
import redis.clients.jedis.JedisShardInfo;
public class JedisUtil {
public static Jedis getConnection() {
Jedis jedis=new Jedis("121.89.197.4",6379);
jedis.auth("root");
return jedis;
}
public static ShardedJedis section() {
List<JedisShardInfo> list = new ArrayList<>();
JedisShardInfo jedisShardInfo1 = new JedisShardInfo("121.89.197.4", 6379);
jedisShardInfo1.setPassword("root");
list.add(jedisShardInfo1);
JedisShardInfo jedisShardInfo2 = new JedisShardInfo("121.89.197.4", 6380);
jedisShardInfo2.setPassword("root");
list.add(jedisShardInfo2);
JedisShardInfo jedisShardInfo3 = new JedisShardInfo("121.89.197.4", 6381);
jedisShardInfo3.setPassword("root");
list.add(jedisShardInfo3);
return new ShardedJedis(list);
}
}
WordFreqDAO.java
package dao;
import bean.WordFreq;
import util.JedisUtil;
import redis.clients.jedis.ShardedJedis;
public class WordFreqDAO {
public void saveWordFreq(WordFreq wf) {
try(ShardedJedis jedis = JedisUtil.section()) {
String key = wf.getWord();
int value = wf.getFreq();
if(jedis.exists(key)) {
jedis.set(key, String.valueOf(Integer.parseInt(jedis.get(key)) + value));
}
else {
jedis.set(key, String.valueOf(value));
}
}
catch(Exception e) {
e.printStackTrace();
}
}
}
BiWordFreqDAO.java
package dao;
import bean.BiWordFreq;
import util.JedisUtil;
import redis.clients.jedis.ShardedJedis;
public class BiWordFreqDAO {
public void saveBiWordFreq(BiWordFreq wf) {
try(ShardedJedis jedis = JedisUtil.section()) {
String key = wf.getWord1() + "-" + wf.getWord2();
int value = wf.getFreq();
if(jedis.exists(key)) {
jedis.set(key, String.valueOf(Integer.parseInt(jedis.get(key)) + value));
}
else {
jedis.set(key, String.valueOf(value));
}
}
catch(Exception e) {
e.printStackTrace();
}
}
}
zlh.java
package test;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
import topology.TopologyFactory;
import util.JedisUtil;
public class zlh {
public static void main(String[] args) throws MalformedURLException, IOException {
Document doc = Jsoup.parse(new URL("https://blog.csdn.net/weixin_45267419"), 50000);
Elements titles = doc.select("h4");
StringBuffer sb = new StringBuffer();
for(Element title: titles) {
sb.append(title.html() + "\n");
}
sb.delete(sb.length() - 1, sb.length());
JedisUtil.getConnection().set("!", sb.toString());
StormTopology topology = TopologyFactory.factory();
LocalCluster cluster = new LocalCluster();
Config config = new Config();
cluster.submitTopology("zlh", config, topology);
}
}
服务器上开启三个redis:
运行结果:
6379:
内容:
6380
内容:
6381
内容:
关闭仨redis:
五、总结:
通过本次实验:理解大数据场景下数据处理和数据保存的结合,掌握Storm集群和Redis数据库集群的联合应用
实验成绩: 指导教师: 年 月 日