专访Jeffery Richter:Windows 8是微软的重中之重
Jeffery Richter以其多本Windows核心技术的经典著作而闻名,同时,他深入掌握微软的.NET等一系列核心技术,2012年1月,Jeffery Richter在北京接受了InfoQ中文站的专访,谈到Windows 8和WinRT编程,并就异步编程、Windows编程中的可扩展性、性能和安全性方面给出自己的建议。
该内容已经被标记书签!
标记书签错误,请重试!

作者 岑文初 发布于 2008年8月13日
── 分布式计算开源框架Hadoop入门实践(三)


一个图片太大了,只好分割成为两部分。根据流程图来说一下具体一个任务执行的情况。
业务场景描述:可设定输入和输出路径(操作系统的路径非HDFS路径),根据访问日志分析某一个应用访问某一个API的总次数和总流量,统计后分别输出到两个文件中。这里仅仅为了测试,没有去细分很多类,将所有的类都归并于一个类便于说明问题。

测试代码类图
LogAnalysiser就是主类,主要负责创建、提交任务,并且输出部分信息。内部的几个子类用途可以参看流程中提到的角色职责。具体地看看几个类和方法的代码片断:
LogAnalysiser::MapClass
public static class MapClass extends MapReduceBase
implements Mapper<LongWritable, Text, Text, LongWritable>
{
public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter)
throws IOException
{
String line = value.toString();//没有配置RecordReader,所以默认采用line的实现,key就是行号,value就是行内容
if (line == null || line.equals(""))
return;
String[] words = line.split(",");
if (words == null || words.length < 8)
return;
String appid = words[1];
String apiName = words[2];
LongWritable recbytes = new LongWritable(Long.parseLong(words[7]));
Text record = new Text();
record.set(new StringBuffer("flow::").append(appid)
.append("::").append(apiName).toString());
reporter.progress();
output.collect(record, recbytes);//输出流量的统计结果,通过flow::作为前缀来标示。
record.clear();
record.set(new StringBuffer("count::").append(appid).append("::").append(apiName).toString());
output.collect(record, new LongWritable(1));//输出次数的统计结果,通过count::作为前缀来标示
}
}
LogAnalysiser:: PartitionerClass
public static class PartitionerClass implements Partitioner<Text, LongWritable>
{
public int getPartition(Text key, LongWritable value, int numPartitions)
{
if (numPartitions >= 2)//Reduce 个数,判断流量还是次数的统计分配到不同的Reduce
if (key.toString().startsWith("flow::"))
return 0;
else
return 1;
else
return 0;
}
public void configure(JobConf job){}
}
LogAnalysiser:: CombinerClass
参看ReduceClass,通常两者可以使用一个,不过这里有些不同的处理就分成了两个。在ReduceClass中蓝色的行表示在CombinerClass中不存在。
LogAnalysiser:: ReduceClass
public static class ReduceClass extends MapReduceBase
implements Reducer<Text, LongWritable,Text, LongWritable>
{
public void reduce(Text key, Iterator<LongWritable> values,
OutputCollector<Text, LongWritable> output, Reporter reporter)throws IOException
{
Text newkey = new Text();
newkey.set(key.toString().substring(key.toString().indexOf("::")+2));
LongWritable result = new LongWritable();
long tmp = 0;
int counter = 0;
while(values.hasNext())//累加同一个key的统计结果
{
tmp = tmp + values.next().get();
counter = counter +1;//担心处理太久,JobTracker长时间没有收到报告会认为TaskTracker已经失效,因此定时报告一下
if (counter == 1000)
{
counter = 0;
reporter.progress();
}
}
result.set(tmp);
output.collect(newkey, result);//输出最后的汇总结果
}
}
LogAnalysiser
public static void main(String[] args)
{
try
{
run(args);
} catch (Exception e)
{
e.printStackTrace();
}
}
public static void run(String[] args) throws Exception
{
if (args == null || args.length <2)
{
System.out.println("need inputpath and outputpath");
return;
}
String inputpath = args[0];
String outputpath = args[1];
String shortin = args[0];
String shortout = args[1];
if (shortin.indexOf(File.separator) >= 0)
shortin = shortin.substring(shortin.lastIndexOf(File.separator));
if (shortout.indexOf(File.separator) >= 0)
shortout = shortout.substring(shortout.lastIndexOf(File.separator));
SimpleDateFormat formater = new SimpleDateFormat("yyyy.MM.dd");
shortout = new StringBuffer(shortout).append("-")
.append(formater.format(new Date())).toString();
if (!shortin.startsWith("/"))
shortin = "/" + shortin;
if (!shortout.startsWith("/"))
shortout = "/" + shortout;
shortin = "/user/root" + shortin;
shortout = "/user/root" + shortout;
File inputdir = new File(inputpath);
File outputdir = new File(outputpath);
if (!inputdir.exists() || !inputdir.isDirectory())
{
System.out.println("inputpath not exist or isn't dir!");
return;
}
if (!outputdir.exists())
{
new File(outputpath).mkdirs();
}
JobConf conf = new JobConf(new Configuration(),LogAnalysiser.class);//构建Config
FileSystem fileSys = FileSystem.get(conf);
fileSys.copyFromLocalFile(new Path(inputpath), new Path(shortin));//将本地文件系统的文件拷贝到HDFS中
conf.setJobName("analysisjob");
conf.setOutputKeyClass(Text.class);//输出的key类型,在OutputFormat会检查
conf.setOutputValueClass(LongWritable.class); //输出的value类型,在OutputFormat会检查
conf.setMapperClass(MapClass.class);
conf.setCombinerClass(CombinerClass.class);
conf.setReducerClass(ReduceClass.class);
conf.setPartitionerClass(PartitionerClass.class);
conf.set("mapred.reduce.tasks", "2");//强制需要有两个Reduce来分别处理流量和次数的统计
FileInputFormat.setInputPaths(conf, shortin);//hdfs中的输入路径
FileOutputFormat.setOutputPath(conf, new Path(shortout));//hdfs中输出路径
Date startTime = new Date();
System.out.println("Job started: " + startTime);
JobClient.runJob(conf);
Date end_time = new Date();
System.out.println("Job ended: " + end_time);
System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
//删除输入和输出的临时文件
fileSys.copyToLocalFile(new Path(shortout),new Path(outputpath));
fileSys.delete(new Path(shortin),true);
fileSys.delete(new Path(shortout),true);
}
以上的代码就完成了所有的逻辑性代码,然后还需要一个注册驱动类来注册业务Class为一个可标示的命令,让hadoop jar可以执行。
public class ExampleDriver {
public static void main(String argv[]){
ProgramDriver pgd = new ProgramDriver();
try {
pgd.addClass("analysislog", LogAnalysiser.class, "A map/reduce program that analysis log .");
pgd.driver(argv);
}
catch(Throwable e){
e.printStackTrace();
}
}
}
将代码打成jar,并且设置jar的mainClass为ExampleDriver这个类。在分布式环境启动以后执行如下语句:
hadoop jar analysiser.jar analysislog /home/wenchu/test-in /home/wenchu/test-out
在/home/wenchu/test-in中是需要分析的日志文件,执行后就会看见整个执行过程,包括了Map和Reduce的进度。执行完毕会在/home/wenchu/test-out下看到输出的内容。有两个文件:part-00000和part-00001分别记录了统计后的结果。 如果需要看执行的具体情况,可以看在输出目录下的_logs/history/xxxx_analysisjob,里面罗列了所有的Map,Reduce的创建情况以及执行情况。在运行期也可以通过浏览器来查看Map,Reduce的情况:http://MasterIP:50030/jobtracker.jsp
首先这里使用上面的范例作为测试,也没有做太多的优化配置,这个测试结果只是为了看看集群的效果,以及一些参数配置的影响。
文件复制数为1,blocksize 5M
| Slave数 | 处理记录数(万条) | 执行时间(秒) |
| 2 | 95 | 38 |
| 2 | 950 | 337 |
| 4 | 95 | 24 |
| 4 | 950 | 178 |
| 6 | 95 | 21 |
| 6 | 950 | 114 |
Blocksize 5M
| Slave数 | 处理记录数(万条) | 执行时间(秒) |
| 2(文件复制数为1) | 950 | 337 |
| 2(文件复制数为3) | 950 | 339 |
| 6(文件复制数为1) | 950 | 114 |
| 6(文件复制数为3) | 950 | 117 |
文件复制数为1
| Slave数 | 处理记录数(万条) | 执行时间(秒) |
| 6(blocksize 5M) | 95 | 21 |
| 6(blocksize 77M) | 95 | 26 |
| 4(blocksize 5M) | 950 | 178 |
| 4(blocksize 50M) | 950 | 54 |
| 6(blocksize 5M) | 950 | 114 |
| 6(blocksize 50M) | 950 | 44 |
| 6(blocksize 77M) | 950 | 74 |
测试的数据结果很稳定,基本测几次同样条件下都是一样。通过测试结果可以看出以下几点:
“云计算”热的烫手,就和SAAS、Web2及SNS等一样,往往都是在搞概念,只有真正踏踏实实的大型互联网公司,才会投入人力物力去研究符合自己的分布式计算。其实当你的数据量没有那么大的时候,这种分布式计算也就仅仅只是一个玩具而已,只有在真正解决问题的过程中,它深层次的问题才会被挖掘出来。
这三篇文章(分布式计算开源框架Hadoop介绍,Hadoop中的集群配置和使用技巧)仅仅是为了给对分布式计算有兴趣的朋友抛个砖,要想真的掘到金子,那么就踏踏实实的去用、去想、去分析。或者自己也会更进一步地去研究框架中的实现机制,在解决自己问题的同时,也能够贡献一些什么。
前几日看到有人跪求成为架构师的方式,看了有些可悲,有些可笑,其实有多少架构师知道什么叫做架构?架构师的职责是什么?与其追求这么一个名号,还不如踏踏实实地做块石头沉到水底。要知道,积累和沉淀的过程就是一种成长。
相关阅读:
作者介绍:岑文初,就职于阿里软件公司研发中心平台一部,任架构师。当前主要工作涉及阿里软件开发平台服务框架(ASF)设计与实现,服务集成平台(SIP)设计与实现。没有什么擅长或者精通,工作到现在唯一提升的就是学习能力和速度。个人Blog为:http://blog.csdn.net/cenwenchu79。
志愿参与InfoQ中文站内容建设,请邮件至editors@cn.infoq.com。也欢迎大家到InfoQ中文站用户讨论组参与我们的线上讨论。
大图片不好放,但是切割后看起来太模糊了,是否可以做个外部链接看完整清晰图?一个小建议
学习了!想起以前配HA系统,用Heartbeat的时候,捣鼓了很久,要能有作者这样的入门文章该有多少啊,呵呵!
很欣赏这句话:"与其追求这么一个名号,还不如踏踏实实地做块石头沉到水底。要知道,积累和沉淀的过程就是一种成长。"
很多谈架构的人 根本见不到大数据 纸上谈兵的居多
如题,谢谢!
1。把第三方的jar放到每一个Slave的lib里,重起集群,最苯的方法。
2。把第三方的jar和你的应用打到一个jar文件中。
3。在提交Job之前使用DistributedCache.setCacheArchives(JarURIs, jobConf);
把你的第三方的jar Cache到HDFS中。TaskRunner自动会把第三方的jar添加到应用环境中。
www.hadoop.org.cn
忘了说一声在使用DistributedCache.setCacheArchives(JarURIs, jobConf);
后要添加DistributedCache.addCacheArchive(JarURIs, job);
请看DistributedCache文档和TaskRunner源码就明白了。
www.hadoop.org.cn
目前的设计用不到啊用不到
Jeffery Richter以其多本Windows核心技术的经典著作而闻名,同时,他深入掌握微软的.NET等一系列核心技术,2012年1月,Jeffery Richter在北京接受了InfoQ中文站的专访,谈到Windows 8和WinRT编程,并就异步编程、Windows编程中的可扩展性、性能和安全性方面给出自己的建议。
云计算平台的可用性,相比传统互联网服务而言,更加复杂和困难,也更具有挑战性。本文借助新浪SAE云平台为读者讲述了云平台可用性的定义、如何打造高可用的平台,以及对云计算的用户提出了建议。
淘宝高度重视Java平台的健康发展,组建了一个团队专注于Java平台的底层部分的性能、功能与稳定性改进;工作主要基于OpenJDK中的HotSpot VM开展,其中一些通用的功能随后也会逐渐反馈给OpenJDK社区。希望能与使用Java平台开发应用的大家交流经验。
本次演讲视频录制于QCon杭州2011。
2011年4月21日至22日是值得云计算从业者纪念的日子。Amazon的IaaS服务出现故障,导致许多商业网站的服务中断,影响非常严重。作为云计算用户,我们需要思考的是,如何保证即便在云服务不可用的情况,我们的应用架构仍然能够屹立不倒?本文正是站在云计算用户的角度试图探讨这一问题。
12人的技术团队,4组刀片服务器,每月20亿的访问量,每日1次准时部署,99.9%的可用性。这可能吗?当然。想知道如何做的吗?百姓网将与您分享他们在DevOps实践过程中的经验和技巧。
本次演讲视频录制于QCon杭州2011。
篱笆作为一家起源于社区的电子商务公司,反映到技术层面就是同时要面对产品和业务,以及经营战略的变化调整。如何在产品和业务的夹缝之间完成技术架构的抽象与平衡,寻找更有效的价值定位,这当中有些经验教训和个人感悟愿与众人分享。
本次演讲视频录制于QCon杭州2011。
本文将对特性注入以及相关方法做一个扫盲性的介绍。我们会解释这个框架的关键要素,并附上实例来证实它们。为了让文章保持相对较短,我们不会深入到某个工具或方法中,而是会给出一些参考资料,以便大家做进一步的研究。
随着JDK 7的发布,字节码指令集终于迎来了第一位新成员——invokedynamic指令。这条新增加的指令是JDK 7实现“动态类型语言(Dynamically Typed Language)”支持而进行的改进之一,也是为JDK 8可以顺利实现Lambda表达式做技术准备。在这篇文章中,我们将去了解JDK 7这项新特性的出现前因后果和它的意义。
7 条回复
关注此讨论 回复