# network-log-data-collection-sys **Repository Path**: jaywalker/network-log-data-collection-sys ## Basic Information - **Project Name**: network-log-data-collection-sys - **Description**: 网站流量日志数据分析系统 - **Primary Language**: Unknown - **License**: MulanPSL-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 2 - **Forks**: 0 - **Created**: 2023-01-01 - **Last Updated**: 2025-05-08 ## Categories & Tags **Categories**: Uncategorized **Tags**: hdfs, flume, hive, SpringBoot, Echarts ## README # network-log-data-collection-sys #### 介绍 网站流量日志数据分析系统 #### 软件架构 软件架构说明 - 数据采集:flume - 数据预处理:编写mapreduce实现mr程序 - 数据清洗(ETL) - 数据仓库 - hive - 数据统计及分析 - 通过sqoop将数据导入到mysql数据库 - 数据可视化 - 编写一个web程序,springboot写 - echarts api实现 - azkaban - 调用整个工作流程 项目流程: 一、 网站日志数据采集(代码及实现步骤) a) 网站日志数据生成及按时间拆解日志数据 需求:生成并拆解日志数据到指定文件夹 实现过程描述:首先通过一个shell脚本指定日志文件存储路径,在路径里面生成log文件,最后通过crontab配置指令实现每分钟采集。 代码如下: Shell: #!/bin/sh logs_path="/home/hadoop/nginx/logs/" pid_path="/home/hadoop/nginx/logs/nginx.pid" filepath=${logs_path}"access.log" echo $filepath mv ${logs_path}access.log ${logs_path}access_$(date -d '-1 day' '+%Y-%m-%d-%H-%M').log kill -USR1 `cat ${pid_path}` crontab配置: */1 * * * * sh /home/hadoop/bigdatasoftware/project1/nginx_log.sh b) 通过flume日志数据采集到hdfs 实现过程描述:通过配置文件确定日志在hdfs的存储路径以及相关配置,再运行flume脚本,到浏览器上点击指定网页,再去查看hdfs是否生成日志文件夹。如果有则采集成功。 代码如下: Hdfs配置文件: agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 #监控一个目录下的多个文件新增的内容 agent1.sources.source1.type = TAILDIR #通过 json 格式存下每个文件消费的偏移量,避免从头消费 agent1.sources.source1.positionFile = /home/hadoop/taildir_position.json agent1.sources.source1.filegroups = f1 agent1.sources.source1.filegroups.f1 = /home/hadoop/nginx/logs/access_*.log agent1.sources.source1.interceptors = i1 agent1.sources.source1.interceptors.i1.type = host agent1.sources.source1.interceptors.i1.hostHeader = hostname #配置sink组件为hdfs agent1.sinks.sink1.type = hdfs agent1.sinks.sink1.hdfs.path=hdfs://hadoop-001:8020/weblog/flume-collection/%Y-%m-%d/%H-%M_%{hostname} #指定文件名前缀 agent1.sinks.sink1.hdfs.filePrefix = access_log #指定每批下沉数据的记录条数 agent1.sinks.sink1.hdfs.batchSize= 100 agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.writeFormat =Text #指定下沉文件按1MB大小滚动 agent1.sinks.sink1.hdfs.rollSize = 1048576 #指定下沉文件按1000000条数滚动 agent1.sinks.sink1.hdfs.rollCount = 1000000 #指定下沉文件按30分钟滚动 agent1.sinks.sink1.hdfs.rollInterval = 30 #agent1.sinks.sink1.hdfs.round = true #agent1.sinks.sink1.hdfs.roundValue = 10 #agent1.sinks.sink1.hdfs.roundUnit = minute agent1.sinks.sink1.hdfs.useLocalTimeStamp = true #使用memory类型channel agent1.channels.channel1.type = memory agent1.channels.channel1.capacity = 500000 agent1.channels.channel1.transactionCapacity = 600 # Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1 运行flume脚本: bin/flume-ng agent --conf conf/ --name agent1 --conf-file job/TaildirSource-hdfs.conf -Dflume.root.logger=INFO,console 二、 数据预处理(代码及实现步骤) a) 将数据从采集目录移动到预处理目录 需求:移动文件到预处理工作目录 实现过程描述:通过设置flume采集生成的日志文件存放的目录和预处理程序的工作目录,就可以实现两者的移动。 代码如下: #!/bin/bash # # =========================================================================== # 程序名称: # 功能描述: 移动文件到预处理工作目录 # 输入参数: 运行日期 # 目标路径: /data/weblog/preprocess/input # 数据源 : flume采集数据所存放的路径: /weblog/flume-collection # 代码审核: # 修改人名: # 修改日期: # 修改原因: # 修改列表: # =========================================================================== #flume采集生成的日志文件存放的目录 log_flume_dir=/weblog/flume-collection #预处理程序的工作目录 log_pre_input=/data/weblog/preprocess/input #获取时间信息 day_01="2013-09-18" day_01=`date -d'-1 day' +%Y-%m-%d` syear=`date --date=$day_01 +%Y` smonth=`date --date=$day_01 +%m` sday=`date --date=$day_01 +%d` #读取日志文件的目录,判断是否有需要上传的文件 files=`hadoop fs -ls $log_flume_dir | grep $day_01 | wc -l` if [ $files -gt 0 ]; then hadoop fs -mv ${log_flume_dir}/${day_01} ${log_pre_input} echo "success moved ${log_flume_dir}/${day_01} to ${log_pre_input} ....." fi b) 数据清洗(5分) 需求:从第一次采集的数据筛选出有效的数据。 实现过程描述:编写一个数据清洗的mr程序 代码如下: 定义一个实体类描述源日志数据: package com.gec.mr.pre.mrbean; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * 对接外部数据的层,表结构定义最好跟外部数据源保持一致 * 术语: 贴源表 * @author * */ public class WebLogBean implements Writable { private boolean valid = true;// 判断数据是否合法 private String remote_addr;// 记录客户端的ip地址 private String remote_user;// 记录客户端用户名称,忽略属性"-" private String time_local;// 记录访问时间与时区 private String request;// 记录请求的url与http协议 private String status;// 记录请求状态;成功是200 private String body_bytes_sent;// 记录发送给客户端文件主体内容大小 private String http_referer;// 用来记录从那个页面链接访问过来的 private String http_user_agent;// 记录客户浏览器的相关信息 public void set(boolean valid,String remote_addr, String remote_user, String time_local, String request, String status, String body_bytes_sent, String http_referer, String http_user_agent) { this.valid = valid; this.remote_addr = remote_addr; this.remote_user = remote_user; this.time_local = time_local; this.request = request; this.status = status; this.body_bytes_sent = body_bytes_sent; this.http_referer = http_referer; this.http_user_agent = http_user_agent; } public String getRemote_addr() { return remote_addr; } public void setRemote_addr(String remote_addr) { this.remote_addr = remote_addr; } public String getRemote_user() { return remote_user; } public void setRemote_user(String remote_user) { this.remote_user = remote_user; } public String getTime_local() { return this.time_local; } public void setTime_local(String time_local) { this.time_local = time_local; } public String getRequest() { return request; } public void setRequest(String request) { this.request = request; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } public String getBody_bytes_sent() { return body_bytes_sent; } public void setBody_bytes_sent(String body_bytes_sent) { this.body_bytes_sent = body_bytes_sent; } public String getHttp_referer() { return http_referer; } public void setHttp_referer(String http_referer) { this.http_referer = http_referer; } public String getHttp_user_agent() { return http_user_agent; } public void setHttp_user_agent(String http_user_agent) { this.http_user_agent = http_user_agent; } public boolean isValid() { return valid; } public void setValid(boolean valid) { this.valid = valid; } /** * \001是hive当中默认的分隔符,不会出现用户手打出来的情况 * @return */ @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append(this.valid); sb.append("\001").append(this.getRemote_addr()); sb.append("\001").append(this.getRemote_user()); sb.append("\001").append(this.getTime_local()); sb.append("\001").append(this.getRequest()); sb.append("\001").append(this.getStatus()); sb.append("\001").append(this.getBody_bytes_sent()); sb.append("\001").append(this.getHttp_referer()); sb.append("\001").append(this.getHttp_user_agent()); return sb.toString(); } @Override public void readFields(DataInput in) throws IOException { this.valid = in.readBoolean(); this.remote_addr = in.readUTF(); this.remote_user = in.readUTF(); this.time_local = in.readUTF(); this.request = in.readUTF(); this.status = in.readUTF(); this.body_bytes_sent = in.readUTF(); this.http_referer = in.readUTF(); this.http_user_agent = in.readUTF(); } @Override public void write(DataOutput out) throws IOException { out.writeBoolean(this.valid); out.writeUTF(null==remote_addr?"":remote_addr); out.writeUTF(null==remote_user?"":remote_user); out.writeUTF(null==time_local?"":time_local); out.writeUTF(null==request?"":request); out.writeUTF(null==status?"":status); out.writeUTF(null==body_bytes_sent?"":body_bytes_sent); out.writeUTF(null==http_referer?"":http_referer); out.writeUTF(null==http_user_agent?"":http_user_agent); } } 编写处理数据清洗的工具类: package com.gec.mr.pre; import com.gec.mr.pre.mrbean.WebLogBean; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Locale; import java.util.Set; public class WebLogParser { public static SimpleDateFormat df1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US); public static SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US); public static WebLogBean parser(String line) { WebLogBean webLogBean = new WebLogBean(); //通过空格来对我们的数据进行切割,然后拼接字符串,将我们同一个字段里面的数据拼接到一起 //222.66.59.174 -- [18/Sep/2013:06:53:30 +0000] "GET /images/my.jpg HTTP/1.1" 200 19939 "http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:23.0) Gecko/20100101 Firefox/23.0" String[] arr = line.split(" "); if (arr.length > 11) { webLogBean.setRemote_addr(arr[0]); webLogBean.setRemote_user(arr[1]); //将我们的字符串转换成中文习惯的字符串 // [18/Sep/2013:06:52:32 +0000] // 18/Sep/2013:06:52:32------》2013-09-18 06:52:32 String time_local = formatDate(arr[3].substring(1)); if(null==time_local || "".equals(time_local)) { time_local="-invalid_time-"; } webLogBean.setTime_local(time_local); webLogBean.setRequest(arr[6]); webLogBean.setStatus(arr[8]); webLogBean.setBody_bytes_sent(arr[9]); webLogBean.setHttp_referer(arr[10]); //如果useragent元素较多,拼接useragent。 //数组长度大于12,说明我们的最后一个字段切出来比较长,我们把所有多的数据都塞到最后一个字段里面去 // "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET CLR 1.1.4322; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; MDDR; InfoPath.2; .NET4.0C)" if (arr.length > 12) { StringBuilder sb = new StringBuilder(); for(int i=11;i= 400) {// 大于400,HTTP错误 webLogBean.setValid(false); } //如果获取时间没拿到,那么也是认为是无效的数据 if("-invalid_time-".equals(webLogBean.getTime_local())){ webLogBean.setValid(false); } } else { //58.215.204.118 - - [18/Sep/2013:06:52:33 +0000] "-" 400 0 "-" "-" //如果切出来的数组长度小于11个,说明数据不全,,直接丢掉 webLogBean=null; } return webLogBean; } public static void filtStaticResource(WebLogBean bean, Set pages) { if (!pages.contains(bean.getRequest())) { bean.setValid(false); } } //格式化时间方法 public static String formatDate(String time_local) { try { return df2.format(df1.parse(time_local)); } catch (ParseException e) { return null; } } } 配置mr类: package com.gec.mr.pre.mapper; import com.gec.mr.pre.mrbean.WebLogBean; import com.gec.mr.pre.utils.WebLogParser; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.util.HashSet; import java.util.Set; public class WeblogPreProcessMapper extends Mapper { // 用来存储网站url分类数据 Set pages = new HashSet(); Text k = new Text(); NullWritable v = NullWritable.get(); /** * map阶段的初始化方法 * 从外部配置文件中加载网站的有用url分类数据 存储到maptask的内存中,用来对日志数据进行过滤 * 过滤掉我们日志文件当中的一些静态资源,包括js css img 等请求日志都需要过滤掉 */ @Override protected void setup(Context context) throws IOException, InterruptedException { //定义一个集合 pages.add("/about"); pages.add("/black-ip-list/"); pages.add("/cassandra-clustor/"); pages.add("/finance-rhive-repurchase/"); pages.add("/hadoop-family-roadmap/"); pages.add("/hadoop-hive-intro/"); pages.add("/hadoop-zookeeper-intro/"); pages.add("/hadoop-mahout-roadmap/"); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //得到我们一行数据 String line = value.toString(); WebLogBean webLogBean = WebLogParser.parser(line); if (webLogBean != null) { // 过滤js/图片/css等静态资源 WebLogParser.filtStaticResource(webLogBean, pages); if (!webLogBean.isValid()) return; k.set(webLogBean.toString()); context.write(k, v); } } } Driver: package com.gec.mr.pre.driver; import com.gec.mr.pre.mapper.WeblogPreProcessMapper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; public class WeblogEtlPreProcessDriver { static { try { // 设置 HADOOP_HOME 目录 System.setProperty("hadoop.home.dir", "D:/winutils-master/winutils-master/hadoop-3.0.0/"); // 加载库文件 System.load("D:/winutils-master/winutils-master/hadoop-3.0.0/bin/hadoop.dll"); } catch (UnsatisfiedLinkError e) { System.err.println("Native code library failed to load.\n" + e); System.exit(1); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); FileInputFormat.addInputPath(job,new Path("file:///d:\\src\\input")); job.setInputFormatClass(TextInputFormat.class); FileOutputFormat.setOutputPath(job,new Path("file:///d:\\src\\weblogPreOut2")); job.setOutputFormatClass(TextOutputFormat.class); job.setJarByClass(WeblogEtlPreProcessDriver.class); job.setMapperClass(WeblogPreProcessMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(0); boolean res = job.waitForCompletion(true); } } c) 数据点击流数据模型生成 需求:将点击流数据存到hdfs。 实现过程描述:通过编写clickstreampageview,实现点击流访问数据 代码如下: 核心类Reducer: package com.gec.mr.pre.reducer; import com.gec.mr.pre.mrbean.WebLogBean; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.shaded.org.apache.commons.beanutils.BeanUtils; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; public class ClickStreamReducer extends Reducer { Text v = new Text(); /** * reduce阶段接收到的key就是我们的IP * 接收到的value就是我们一行行的数据 * @param key * @param values * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { ArrayList beans = new ArrayList(); // 先将一个用户的所有访问记录中的时间拿出来排序 try { //循环遍历V2,这里面装的,都是我们的同一个用的数据 for (WebLogBean bean : values) { // beans.add(bean); //为什么list集合当中不能直接添加循环出来的这个bean? //这里通过属性拷贝,每次new 一个对象,避免了bean的属性值每次覆盖 //这是涉及到java的深浅拷贝问题 WebLogBean webLogBean = new WebLogBean(); try { BeanUtils.copyProperties(webLogBean, bean); } catch(Exception e) { e.printStackTrace(); } //beans.add(bean); beans.add(webLogBean); } //将bean按时间先后顺序排序,排好序之后,就计算这个集合当中下一个时间和上一个时间的差值 ,如 //如果差值小于三十分钟,那么就代表一次会话,如果差值大于30分钟,那么就代表多次会话 //将我们的weblogBean塞到一个集合当中,我们就可以自定义排序,对集合当中的数据进行排序 Collections.sort(beans, new Comparator() { @Override public int compare(WebLogBean o1, WebLogBean o2) { try { Date d1 = toDate(o1.getTime_local()); Date d2 = toDate(o2.getTime_local()); if (d1 == null || d2 == null) return 0; return d1.compareTo(d2); } catch (Exception e) { e.printStackTrace(); return 0; } } }); /** * 以下逻辑为:从有序bean中分辨出各次visit,并对一次visit中所访问的page按顺序标号step * 核心思想: * 就是比较相邻两条记录中的时间差,如果时间差<30分钟,则该两条记录属于同一个session * 否则,就属于不同的session * */ int step = 1; //定义一个uuid作为我们的session编号 String session = UUID.randomUUID().toString(); ///经过排序之后,集合里面的数据都是按照时间来排好序了 for (int i = 0; i < beans.size(); i++) { WebLogBean bean = beans.get(i); // 如果仅有1条数据,则直接输出 if (1 == beans.size()) { // 设置默认停留时长为60s v.set(session+"\001"+key.toString()+"\001"+bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001" + bean.getStatus()); context.write(NullWritable.get(), v); session = UUID.randomUUID().toString(); break; } // 如果不止1条数据,则将第一条跳过不输出,遍历第二条时再输出 if (i == 0) { continue; } // 求近两次时间差 long timeDiff = timeDiff(toDate(bean.getTime_local()), toDate(beans.get(i - 1).getTime_local())); // 如果本次-上次时间差<30分钟,则输出前一次的页面访问信息 if (timeDiff < 30 * 60 * 1000) { v.set(session+"\001"+key.toString()+"\001"+beans.get(i - 1).getRemote_user() + "\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" + step + "\001" + (timeDiff / 1000) + "\001" + beans.get(i - 1).getHttp_referer() + "\001" + beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus()); context.write(NullWritable.get(), v); step++; } else { // 如果本次-上次时间差>30分钟,则输出前一次的页面访问信息且将step重置,以分隔为新的visit v.set(session+"\001"+key.toString()+"\001"+beans.get(i - 1).getRemote_user() + "\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" + (step) + "\001" + (60) + "\001" + beans.get(i - 1).getHttp_referer() + "\001" + beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus()); context.write(NullWritable.get(), v); // 输出完上一条之后,重置step编号 step = 1; session = UUID.randomUUID().toString(); } // 如果此次遍历的是最后一条,则将本条直接输出 if (i == beans.size() - 1) { // 设置默认停留市场为60s v.set(session+"\001"+key.toString()+"\001"+bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001" + bean.getStatus()); context.write(NullWritable.get(), v); } } } catch (ParseException e) { e.printStackTrace(); } } private String toStr(Date date) { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US); return df.format(date); } private Date toDate(String timeStr) throws ParseException { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US); return df.parse(timeStr); } private long timeDiff(String time1, String time2) throws ParseException { Date d1 = toDate(time1); Date d2 = toDate(time2); return d1.getTime() - d2.getTime(); } private long timeDiff(Date time1, Date time2) throws ParseException { // date 调用 getTime获取毫秒值 return time1.getTime() - time2.getTime(); } } d) 数据点击访问页面数据模型生成 需求:生成点击流访问数据 实现过程描述:通过编写ClickStreamVisit,实现模型生成。 代码如下: 核心类reducer: package com.gec.mr.pre.reducer; import com.gec.mr.pre.mrbean.PageViewsBean; import com.gec.mr.pre.mrbean.VisitBean; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.shaded.org.apache.commons.beanutils.BeanUtils; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; public class ClickStreamVisitReducer extends Reducer { @Override protected void reduce(Text session, Iterable pvBeans, Context context) throws IOException, InterruptedException { // 将pvBeans按照step排序 ArrayList pvBeansList = new ArrayList(); for (PageViewsBean pvBean : pvBeans) { PageViewsBean bean = new PageViewsBean(); try { BeanUtils.copyProperties(bean, pvBean); pvBeansList.add(bean); } catch (Exception e) { e.printStackTrace(); } } /** * 将数据按照我们的步骤进行排序,这样就可以得到哪个页面先访问,哪个页面后访问的 */ Collections.sort(pvBeansList, new Comparator() { @Override public int compare(PageViewsBean o1, PageViewsBean o2) { return o1.getStep() > o2.getStep() ? 1 : -1; } }); // 取这次visit的首尾pageview记录,将数据放入VisitBean中 VisitBean visitBean = new VisitBean(); // 取visit的首记录 visitBean.setInPage(pvBeansList.get(0).getRequest()); visitBean.setInTime(pvBeansList.get(0).getTimestr()); // 取visit集合当中末尾的记录即可 visitBean.setOutPage(pvBeansList.get(pvBeansList.size() - 1).getRequest()); visitBean.setOutTime(pvBeansList.get(pvBeansList.size() - 1).getTimestr()); // visit访问的页面数 visitBean.setPageVisits(pvBeansList.size()); // 来访者的ip visitBean.setRemote_addr(pvBeansList.get(0).getRemote_addr()); // 本次visit的referal visitBean.setReferal(pvBeansList.get(0).getReferal()); visitBean.setSession(session.toString()); context.write(NullWritable.get(), visitBean); } } 三、 将数据导入hive(代码及实现步骤) a) 将清洗后的数据导入Hive 需求:将数据导入hive 实现过程描述:首先生成hive数据表,用load data语句将数据从hdfs导入hive 代码如下:drop table if exists ods_weblog_origin; create table ods_weblog_origin( valid string, remote_addr string, remote_user string, time_local string, request string, status string, body_bytes_sent string, http_referer string, http_user_agent string) partitioned by (datestr string) row format delimited fields terminated by '\001'; pageview: drop table if exists ods_click_pageviews; create table ods_click_pageviews( session string, remote_addr string, remote_user string, time_local string, request string, visit_step string, page_staylong string, http_referer string, http_user_agent string, body_bytes_sent string, status string) partitioned by (datestr string) row format delimited fields terminated by '\001'; visit: drop table if exists ods_click_stream_visit; create table ods_click_stream_visit( session string, remote_addr string, inTime string, outTime string, inPage string, outPage string, referal string, pageVisits int) partitioned by (datestr string) row format delimited fields terminated by '\001'; hadoop jar etlmr-1.0-SNAPSHOT.jar com.gec.mr.pre.driver.WeblogEtlPreProcessDriver /data/weblog/preprocess/input/2021-12-09 /data/weblog/preprocess/weblogPreOut/2021-12-09 hadoop jar clickstreamdata-1.0-SNAPSHOT.jar com.gec.mr.pre.driver.ClickStreamDriver /data/weblog/preprocess/weblogPreOut/2021-12-10 /data/weblog/preprocess/pageViewOut/2021-12-15 hadoop jar visitstreamdata-1.0-SNAPSHOT.jar com.gec.mr.pre.driver.ClickStreamVisitDriver /data/weblog/preprocess/pageViewOut/2021-12-15 /data/weblog/preprocess/clickStreamVisit/2021-12-15 load data inpath '/data/weblog/preprocess/weblogPreOut/2021-12-10/' overwrite into table ods_weblog_origin partition(datestr='2021-12-10'); b) 将点击页面流数据导入Hive 需求:将点击页面流数据导入Hive 实现过程描述:用load data语句将数据从hdfs导入hive 代码如下: load data inpath '/data/weblog/preprocess/pageViewOut/2021-12-15' overwrite into table ods_click_pageviews partition(datestr='2021-12-15'); c) 将点击流访问表数据导入Hive 需求:将点击流访问表数据导入Hive 实现过程描述:用load data语句将数据从hdfs导入hive 代码如下: load data inpath '/data/weblog/preprocess/clickStreamVisit/2021-12-15' overwrite into table ods_click_stream_visit partition(datestr='2021-12-15'); 四、 数据导入(代码及实现步骤) a) 基于sqoop将数据导出mysql 需求:将hive里面的数据导入到关系型数据库mysql 实现过程描述:创建关系型数据库并建表,用sqoop工具连接mysql,创建三个ods表分别算每天的pvs值,指定日期的每个小时的pvs值,每天的page的pvs值。 代码如下: bin/sqoop export --connect jdbc:mysql://192.168.94.121:3306/weblogs --username root --password 123456 --m 1 --export-dir /user/hive/warehouse/dw_pvs_everyday --table dw_pvs_everyday --input-fields-terminated-by '\001' bin/sqoop export --connect jdbc:mysql://192.168.94.121:3306/weblogs --username root --password 123456 --m 1 --export-dir /user/hive/warehouse/dw_pvs_everyhour_oneday/datestr=2021-12-15 --table dw_pvs_everyhour_oneday --input-fields-terminated-by '\001' bin/sqoop export --connect jdbc:mysql://192.168.94.121:3306/weblogs --username root --password 123456 --m 1 --export-dir /user/hive/warehouse/dw_pvs_request_everyday --table dw_pvs_request_page --input-fields-terminated-by '\001' 五、 定制开发web图表(代码及实现步骤) a) 每天访问量的pvs值(折线图) 需求:用折线图表示出每天访问量的pvs值 实现过程描述:在idea里面通过导入echarts,编写html文件连通mysql即可。 代码如下: Html: 每天访问量的pvs值
b) 指定某天的每小时的pvs(折线图) 需求:用折线图指定某天的每小时的pvs 实现过程描述:在idea里面通过导入echarts,编写html文件连通mysql即可 代码如下: Html: 每天访问量的pvs值
c) 统计页面pvs值(柱状图) 需求:用柱状图统计页面pvs值 实现过程描述:在idea里面通过导入echarts,编写html文件连通mysql即可 代码如下: Html: 点击流数据图表
六、 Azkaban运行项目(代码及实现步骤) a) 编写运行job脚本 需求:为了更好的工作调度 实现过程描述:配置好Azkaban,再将job脚本编写到Azkaban。此脚本是在linux打包到windows再布置到azkaban。 代码如下: 1.job: type=command command=sh 1.sh 2.job: type=command command=hadoop jar etlmr-1.0-SNAPSHOT.jar com.gec.mr.pre.driver.WeblogEtlPreProcessDriver /data/weblog/preprocess/input/2021-12-12 /data/weblog/preprocess/weblogPreOut/2021-12-12 dependencies=1 3.job: type=command command=hadoop jar clickstreamdata-1.0-SNAPSHOT.jar com.gec.mr.pre.driver.ClickStreamDriver /data/weblog/preprocess/weblogPreOut/2021-12-12 /data/weblog/preprocess/pageViewOut/2021-12-12 dependencies=2 4.job: type=command command=hadoop jar visitstreamdata-1.0-SNAPSHOT.jar com.gec.mr.pre.driver.ClickStreamVisitDriver /data/weblog/preprocess/pageViewOut/2021-12-12 /data/weblog/preprocess/clickStreamVisit/2021-12-12 dependencies=3 5.job: type=command command=sh 5.sh dependencies=4 6.job: type=command command=/home/hadoop/bigdatasoftware/hive/bin/hive -f "6.sql" dependencies=5 7.job: type=command command=sh 7.sh dependencies=6 b) 运行azkaban项目 需求:为了很好地组织起这样的复杂执行计划,需要一个工作流调度系统来调度执行(azkaban)。只需要一键即可开始运行布置的整个流程。 实现过程描述:配置好job脚本,运行前将data里面的数据移到日志采集目录,直接运行即可。不过时间接近两个小时。最后数据库有新增加的数据,成功运行项目。