博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MapReduce(五)
阅读量:5268 次
发布时间:2019-06-14

本文共 9157 字,大约阅读时间需要 30 分钟。

                                                    MapReduce的(五)

1.MapReduce的多表关联查询。

        根据文本数据格式。查询多个文本中的内容关联。查询。

2.MapReduce的多任务窜执行的使用

    多任务的串联执行问题,主要是要建立controlledjob,然后建组管理起来。留意多线程因效率而导致执行结束时间不一致的问题。

-------------------------------------------------- -------------------------------------------------- ----------------------------

MapReduce的的的多表关联查询

数据:

ctoryname地址北京红星1 深圳迅雷3 广州本田2 北京瑞星1 广州发展银行2 腾讯3 北京银行5
addressID地址名称1北京2广州3深圳4西安

代码:

包com.huhu.day05; import java.io.IOException; 导入org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;import com.huhu.day04.ProgenyCount; / **  * *厂名厂址为北京红星1  *  *地址addressID地址名称1北京 *  *从工厂选择factory.factoryname,address.addressname,地址在哪里 * factory.addressed = address.addressID  *  *流程1.读取这2个文件?1个mapreduce 2.mapper 2个:map--同时可以处理2个文件代码3.map输出kv k:id  * v:t1:北京红星1 k:id v:t2:1北京4.降低价值{t1:北京红 *星1,t2:1北京}  *  * @作者huhu_k  *  * / 公共类扩展ToolRunner implements Tool { 	私人配置conf; 	公共静态类MyMapper扩展Mapper 
{ @覆盖 protected void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException {
String [] line = value.toString()。split(“\ t”); 如果(line [0] .matches(“\\ d”)){ // k:1 v:t1:1:北京 context.write(new Text(line [0]),new Text(“t1”+ line [0] +“:”+ line [1])); } else { // k:1 v:t2:beijingredstar:1 context.write(new Text(line [1]),new Text(“t2”+ line [0] +“:”+ line [1])) ; } } } 公共静态类MyReduce扩展减速器{ @覆盖 保护无效设置(上下文上下文)抛出IOException,InterruptedException { context.write(new Text(“factoryname \ t \ t”),新文本(“地址名称”)); } @覆盖 protected void reduce(Text key,Iterable
values,Context context) 抛出IOException,InterruptedException {
String fsc =“”; String addr =“”; for(Text s:values){ String line = s.toString(); if(line.contains(“t1”)){ addr = line.split(“:”)[1]; } else if(line.contains(“t2”)){ fsc = line.split(“:”)[0]; } } if(!fsc.equals(“”)&&!addr.equals(“”)){ context.write(new Text(fsc),new Text(addr)); } } @覆盖 保护无效清理(上下文上下文)抛出IOException,InterruptedException { } } 公共静态无效的主要(字符串[]参数)抛出异常{ 多重连接t = new MutipleJoin(); 配置conf = t.getConf (); String [] other = new GenericOptionsParser(conf,args).getRemainingArgs(); if(other.length!= 2){ System.err.println(“number is fail”); } int run = ToolRunner.run(conf,t,args); System.exit(运行); } @覆盖 public Configuration getConf(){ if(conf!= null){ 返回conf; } 返回新的配置(); } @覆盖 public void setConf(Configuration arg0){ } @覆盖 公共诠释运行(字符串[]其他)抛出异常{ 配置con = getConf(); Job job = Job.getInstance(con); job.setJarByClass(ProgenyCount.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //默认分区 // job.setPartitionerClass(HashPartitioner.class); job.setReducerClass(MyReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job,new Path(“hdfs:// ry-hadoop1:8020 / in / day05”)); Path path = new Path(“hdfs:// ry-hadoop1:8020 / out / day05.txt”); FileSystem fs = FileSystem.get(getConf()); if(fs.exists(path)){ fs.delete(path,true); } FileOutputFormat.setOutputPath(job,path); 返回job.waitForCompletion(true)?0:1; } }

运行结果:

将有规律的数据进行关联查询。

二。MapReduce的的多任务窜改的使用

WordCount_Mapper
包com.huhu.day05; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; 导入org.apache.hadoop.mapreduce.Mapper; 公共类WordCount_Mapper扩展映射器
{ private final IntWritable one = new IntWritable(1); @覆盖 保护无效映射(LongWritable键,文本值,映射器
.Context上下文) 抛出IOException,InterruptedException { String [] line = value.toString()。split(“”); for(String s:line){ context.write(new Text(s),one); } } }

WordCount_Reduce

包com.huhu.day05; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; 公共类WordCount_Reducer扩展Reducer 
{ @覆盖 protected void reduce(Text key,Iterable
values,Context context) 抛出IOException,InterruptedException { int sum = 0; for(IntWritable i:values){ sum + = i.get(); } context.write(key,new IntWritable(sum)); } }

Top10_Mapper

包com.huhu.day05; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; 导入org.apache.hadoop.mapreduce.Mapper; 公共类Top10_Mapper扩展了Mapper 
{ @覆盖 protected void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException { String [] line = value.toString()。split(“\ t”); context.write(new Text(line [0]),new IntWritable(Integer.parseInt(line [1]))); } }

Top10_Reducer

包com.huhu.day05; import java.io.IOException; import java.util.TreeSet; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import com.huhu.day05.pojo.WordCount; 公共类Top10_Reducer扩展Reducer 
{ private TreeSet
set = new TreeSet <>(); @覆盖 protected void reduce(Text key,Iterable
values,Context context) 抛出IOException,InterruptedException { for(IntWritable v:values){ System.err.println(v.toString()+“----- -----------”); set.add(new WordCount(key.toString(),Integer.parseInt(v.toString()))); } if(10

WordCountTop_Cuan

包com.huhu.day05; 导入org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;import com.huhu.day05.pojo.WordCount; 公共类WordCountTop_Cuan扩展ToolRunner实现工具{ 	私人配置con; 	@覆盖	public配置getConf(){ 		如果(con!= null)			返回con; 		返回新的配置(); 	} 	@覆盖	public void setConf(Configuration arg0){ 	} 	@覆盖	公共诠释运行(字符串[] arg0)抛出异常{ 		配置con = getConf();} 		Job WordJob = Job.getInstance(con,“WordCount Job”); 		WordJob.setJarByClass(WordCountTop_Cuan.class); 		WordJob.setMapperClass(WordCount_Mapper.class); 		WordJob.setMapOutputKeyClass(Text.class); 		WordJob.setMapOutputValueClass(IntWritable.class); 		WordJob.setReducerClass(WordCount_Reducer.class);		WordJob.setOutputKeyClass(WordCount.class); 		WordJob.setOutputValueClass(NullWritable.class); 		FileInputFormat.addInputPath(WordJob,new Path(“hdfs:// ry-hadoop1:8020 / in / ihaveadream.txt”)); 		Path path = new Path(“hdfs:// ry-hadoop1:8020 / out / Word_job.txt”); 		FileSystem fs = FileSystem.get(getConf()); 		if(fs.exists(path)){ 			fs.delete(path,true); 		} 		FileOutputFormat.setOutputPath(WordJob,path); 		Job TopJob = Job.getInstance(con,“Top10 Job”); 		TopJob.setJarByClass(WordCountTop_Cuan.class); 		TopJob.setMapperClass(Top10_Mapper.class); 		TopJob.setMapOutputKeyClass(Text.class); 		TopJob.setMapOutputValueClass(IntWritable.class);		TopJob.setReducerClass(Top10_Reducer.class); 		TopJob.setOutputKeyClass(WordCount.class); 		TopJob.setOutputValueClass(NullWritable.class); 		FileInputFormat.addInputPath(TOPJOB,路径); 		Path paths = new Path(“hdfs:// ry-hadoop1:8020 / out / Top_Job.txt”); 		if(fs.exists(paths)){ 			fs.delete(paths,true); 		} 		FileOutputFormat.setOutputPath(TopJob,paths); 		//重点		ControlledJob controlledWC = new ControlledJob(WordJob.getConfiguration()); 		ControlledJob controlledTP = new ControlledJob(TopJob.getConfiguration()); 		// JobTop依赖JobWC 		controlledTP.addDependingJob(controlledWC); 		//定义控制器		JobControl jobControl =新的JobControl(“WordCount和Top”); 		jobControl.addJob(controlledWC); 		jobControl.addJob(controlledTP); 		线程线程=新线程(JobControl作业控制); 		thread.start(); 		而{(jobControl.allFinished()!)			了了Thread.sleep(1000); 		} 		jobControl.stop(); 		返回0; 	} 	公共静态无效的主要(字符串[]参数)抛出异常{ 		WordCountTop_Cuan wc = new WordCountTop_Cuan(); 		配置conf = wc.getConf(); 		String [] other = new GenericOptionsParser(conf,args).getRemainingArgs(); 		int run = ToolRunner.run(conf,wc,other); 		System.exit(运行); 	} }

运行结果:

我是在本地运行的,如果在Hadoop的的上运行输入命令

hadoop jar xxx.jar /in/xx.txt /out/Word_Job.txt /out/Top_Job.txt

此时Top_Job依赖于Word_Job

因为Top_Job的输入路径是Word_Job的输出路径。当线程只启动一个工作,Top_job等待Word_Job运行完,Top_Job开始运行。

转载于:https://www.cnblogs.com/meiLinYa/p/9252101.html

你可能感兴趣的文章
Linux中防火墙centos
查看>>
如何设置映射网络驱动器的具体步骤和方法
查看>>
centos下同时启动多个tomcat
查看>>
slab分配器
查看>>
【读书笔记】C#高级编程 第三章 对象和类型
查看>>
【SVM】libsvm-python
查看>>
Jmeter接口压力测试,Java.net.BindException: Address already in use: connect
查看>>
Leetcode Balanced Binary Tree
查看>>
go:channel(未完)
查看>>
[JS]递归对象或数组
查看>>
多线程《三》进程与线程的区别
查看>>
linux sed命令
查看>>
html标签的嵌套规则
查看>>
[Source] Machine Learning Gathering/Surveys
查看>>
HTML <select> 标签
查看>>
tju 1782. The jackpot
查看>>
湖南多校对抗赛(2015.03.28) H SG Value
查看>>
hdu1255扫描线计算覆盖两次面积
查看>>
hdu1565 用搜索代替枚举找可能状态或者轮廓线解(较优),参考poj2411
查看>>
bzoj3224 splay板子
查看>>