使用hadoop实现IP个数统计~并将结果写入数据库

转载请注明出处:http://blog.csdn.net/xiaojimanman/article/details/40372189

    hadoop源代码中的WordCount事例中实现了单词统计,但是输出到HDFS文件,线上程序想使用其计算结果还要再次写个程序,所以自己就研究一下关于MapReduce的输出问题,下面就通过一个简单的例子说明下如何将MapReduce的计算结果输出到数据库中。


需求描述:

    分析网络服务器上的Apache日志,统计每个IP访问资源的次数,并将结果写入到mysql数据库中。


数据格式:

    Apache日志数据如下图所示:


    一行数据就是一条http请求记录,该事例只做简单的IP个数统计。


需求分析:

    通过MapReduce对日志文件采用分布式计算,map主要对日志做简单的拆分计数,reduce对map的结果求和。

    map程序对一行日志数据做简单的拆分,获取客户端IP,输出结果为 key为客户端IP,value为IP出现次数。结果样例如下图所示:


    reduce程序对Key值下的values做求和计算,输出结果为 key为客户端IP,value为IP出现次数。结果样例如下图所示:


    上面的MapReduce程序和WordCount程序类似,只是对IP做了简单的求和计算,下面就需要写reduce的输出格式,使计算结果写入到mysql数据库中。

    MapReduce支持用户自定义的输出格式,定义的类只需要继承FileOutputFormat即可。实现如下图所示:


     自定义输出需要实现getRecordWriter方法,这里通过内部类的方式,实现了自定义的RecordWriter,在MysqlRecordWriter类中实现相关的输出即可完成将reduce结果数据写入到数据库中,具体实现如下图所示:


    在MapReduce程序中,在关于job的设置,只需要将输出格式指定为该输出格式即可完成将reduce的结果写入到数据库中。

job.setOutputFormatClass(MysqlOutputFormat.class);


代码实现:

    日志一行记录分析类TextLine,该类实现了从日志记录中提取IP信息以及IP次数(一行数据就是1次),代码如下:

/**
* 日志行分析
* @author lulei
*/
package com;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

public class TextLine {
	private String ip;
	private IntWritable one = new IntWritable(1);
	//标识数据是否为可用
	private boolean right = true;
	
	public TextLine(String textLine){
		//检验一行日志数据是否符合要求,如不符合,将其标识为不可用
		if (textLine == null || "".equals(textLine)) {
			this.right = false;
			return;
		}
		String []strs = textLine.split(" ");
		if (strs.length < 2) {
			this.right = false;
			return;
		}
		this.ip = strs[0];
	}
	
	public boolean isRight() {
		return right;
	}

	/**
	 * 返回map的输出key值
	 * @return
	 */
	public Text getIPCountMapOutKey() {
		return new Text(this.ip);
	}
	
	/**
	 * 返回map的输出value值
	 * @return
	 */
	public IntWritable getIPCountMapOutValue() {
		return this.one;
	}
}

     IPCountMR类实现了MapReduce功能,实现了日志数据中的IP出现次数统计,代码如下:

/**
 * 各IP出现次数统计
 */
package com;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class IPCountMR extends Configured implements Tool{
	/**
	 * ip个数统计map
	 * @author lulei
	 */
	public static class IPCountMap extends Mapper<LongWritable, Text, Text, IntWritable> {

		@Override
		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			TextLine textLine = new TextLine(value.toString());
			if (textLine.isRight()) {
				context.write(textLine.getIPCountMapOutKey(), textLine.getIPCountMapOutValue());
			}
		}
		
	}
	
	/**
	 * ip个数统计reduce
	 * @author lulei
	 */
	public static class IPCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {

		@Override
		public void reduce(Text key, Iterable<IntWritable> values, Context context)
				throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable value : values) {
				sum += value.get();
			}
			context.write(key, new IntWritable(sum));
		}
		
	}
	
	@SuppressWarnings("deprecation")
	@Override
	public int run(String[] arg0) throws Exception {
		Configuration conf = new Configuration();
		Job job = new Job(conf);
		job.setJobName("ipcount");
		job.setInputFormatClass(TextInputFormat.class);
		
		//将输出设置为MysqlOutputFormat
		job.setOutputFormatClass(MysqlOutputFormat.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		job.setMapperClass(IPCountMap.class);
		job.setCombinerClass(IPCountReduce.class);
		job.setReducerClass(IPCountReduce.class);
		
		FileInputFormat.addInputPath(job, new Path(arg0[0]));
		//个人认为下面应该可以不设置的,但是不设置就会报错,不知道是什么地方出了问题
		MysqlOutputFormat.setOutputPath(job,  new Path(arg0[1]));
		
		job.waitForCompletion(true);
		
		return job.isSuccessful() ? 0 : 1;
	}
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		try {
			int res = ToolRunner.run(new Configuration(), new IPCountMR(), args);
			System.exit(res);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}


    MysqlOutputFormat类实现了reduce自定义输出,将reduce计算结果输出到数据库中,代码如下:

package com;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

@SuppressWarnings("hiding")
public class MysqlOutputFormat<Text, IntWritable> extends FileOutputFormat<Text, IntWritable> {
	//Mysql RecordWriter私有类
	private static class MysqlRecordWriter<Text, IntWritable> extends RecordWriter<Text, IntWritable> {
		private LogDB logdb;
		
		/**
		 * 使用外部传进来的LogDB对象
		 * @param logdb
		 */
		MysqlRecordWriter(LogDB logdb){
			this.logdb = logdb;
		}
		@Override
		public void close(TaskAttemptContext arg0) throws IOException,
				InterruptedException {
			// TODO Auto-generated method stub
			
		}

		
		/**
		 * 将key-value写入到数据库中
		 */
		@Override
		public void write(Text key, IntWritable value) throws IOException,
				InterruptedException {
			// TODO Auto-generated method stub
			logdb.insert(key.toString(), value.toString());
		}
		
	}

	@Override
	public RecordWriter<Text, IntWritable> getRecordWriter(
			TaskAttemptContext arg0) throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		//返回MysqlRecordWriter对象
		return new MysqlRecordWriter<Text, IntWritable>(new LogDB());
	}

}

    上面的LogDB类是自己封装的数据库操作类,实现了数据的插入,具体实现代码如下:

package com;

import java.sql.SQLException;

import com.lulei.db.manager.DBServer;

public class LogDB {
	//新建连接池
	private DBServer dBServer = new DBServer("proxool.log");
	
	/**
	 * 将数据插入至数据库
	 * @param ip
	 * @param num
	 */
	public void insert(String ip, String num) {
		try {
			dBServer.insert("insert into logmp(ip, num) values ('"
					+ ip +"','" +
					num +"')");
		} catch (SQLException e) {
		} finally {
			dBServer.close();
		}
	}
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		new LogDB().insert("127.0.0.2", "1");
	}

}
      上面程序中的DBServer类是基于连接池proxool-0.9.1.jar封装的数据库操作类,这里就不做详细的介绍,这里也可以不通过连接池,直接将数据写入到数据库中,这不是本事例的重点,就不做详细的介绍。

上传运行:
      具体的操作命令参照博客 http://blog.csdn.net/xiaojimanman/article/details/40184581 中的上传运行。


执行结果:

    程序执行结束后,通过命令查看相应的数据表记录,计算结果已经正确写入到数据库中,如下图所示:


      上面用的日志文件的客户端都是从内网访问的,所以记录中都是内网地址。

注:资源 http://download.csdn.net/detail/xiaojimanman/6920219 中有相关的数据库连接池代码


郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。