BT

如何利用碎片时间提升技术认知与能力? 点击获取答案

揭秘InputFormat:掌控Map Reduce任务执行的利器

| 作者 Boris Lublinsky 关注 0 他的粉丝 , Mike Segel 关注 0 他的粉丝 ,译者 曹如进 关注 0 他的粉丝 发布于 2012年1月10日. 估计阅读时间: 32 分钟 | 如何结合区块链技术,帮助企业降本增效?让我们深度了解几个成功的案例。

随着越来越多的公司采用Hadoop,它所处理的问题类型也变得愈发多元化。随着Hadoop适用场景数量的不断膨胀,控制好怎样执行以及何处执行map任务显得至关重要。实现这种控制的方法之一就是自定义InputFormat实现。

InputFormat 类是Hadoop Map Reduce框架中的基础类之一。该类主要用来定义两件事情:

  • 数据分割(Data splits)
  • 记录读取器(Record reader)

数据分割 是Hadoop Map Reduce框架中的基础概念之一,它定义了单个Map任务的大小及其可能的执行服务器信息。记录读取器 主要负责从输入文件实际读取数据并将它们(以键值对的形式)提交给mapper。尽管有不少文章介绍过怎样实现自定义的记录读取器(例如,参考文章[1]),但是关于如何进行分割(split)的介绍却相当粗略。这里我们将会解释什么是分割,并介绍怎样实现自定义分割来完成特定任务。

剖析分割

任何分割操作的实现都继承自Apache抽象基类——InputSplit,它定义了分割的长度及位置。分割长度 是指分割数据的大小(以字节为单位),而分割位置 是分割所在的机器结点名称组成的列表,其中待分割的数据都会于本地存在。分割位置可以方便调度器决定在哪个机器上执行此次分割。简化后的[1]作业跟踪器(job tracker)工作流程如下:

  • 接受来自某个任务跟踪器(task tracker)的心跳通信,得到该位置map的可用情况。
  • 为队列等候中的分割任务找到可用的“本地”结点。
  • 向任务跟踪器提交分割请求以待执行。

数据局部性(Locality)的相关程度因存储机制和整体的执行策略的不同而不同。例如,在Hadoop分布式文件系统(HDFS)中,分割通常对应一个物理数据块大小以及该数据块物理定位所在的一系列机器(其中机器总数由复制因子定义)的位置。这就是FileInputFormat 计算分割的过程。

而HBase的实现则采用了另外一套方法。在HBase中,分割 对应于一系列属于某个表区域(table region)的表键(table keys),而位置则为正在运行区域服务器的机器。

计算密集型应用

Map Reduce应用中有一类特殊的应用叫做计算密集型应用(Compute-Intensive application)。这类应用的特点在于Mapper.map()函数执行的时间要远远长于数据访问的时间,且至少要差一个数量级。从技术角度来说,虽然这类应用仍然可以使用“标准”输入格式的实现,但是它会带来数据存放结点过少而集群内剩余结点没能充分利用的问题(见图1)。

(点击图片进行放大)

图1:数据局部性情况下的结点使用图

图1中显示了针对计算密集型应用,使用“标准”数据局部性导致的结点使用率上的巨大差异——有些结点(红色标注)被过度使用,而其他结点(黄色和浅绿色标注)则使用不足。由此可见,在针对计算密集型应用时,需要重新思考对“局部性”概念的认识。在这种情况下,“局部性”意味着所有可用结点之间map任务的均匀分布——即最大化地使用集群机器的计算能力。

使用自定义InputFormat改变“局部性”

假定源数据以文件序列的形式存在,那么一个简单的ComputeIntensiveSequenceFileInputFormat 类(见清单1)便可以实现将分割生成的结果均匀地分布在集群中的所有服务器上。

package com.navteq.fr.mapReduce.InputFormat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;

public class ComputeIntensiveSequenceFileInputFormat<K, V> extends SequenceFileInputFormat<K, V> {

          private static final double SPLIT_SLOP = 1.1; // 10% slop
          static final String NUM_INPUT_FILES = "mapreduce.input.num.files";

          /**
          * Generate the list of files and make them into FileSplits.
          */
          @Override
          public List<InputSplit> getSplits(JobContext job) throws IOException {
           

          long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
          long maxSize = getMaxSplitSize(job);

          // get servers in the cluster
          String[] servers = getActiveServersList(job);
          if(servers == null)
                     return null;
          // generate splits
          List<InputSplit> splits = new ArrayList<InputSplit>();
          List<FileStatus>files = listStatus(job);
          int currentServer = 0;
          for (FileStatus file: files) {
                    Path path = file.getPath();
                    long length = file.getLen();
                    if ((length != 0) && isSplitable(job, path)) {
                              long blockSize = file.getBlockSize();
                              long splitSize = computeSplitSize(blockSize, minSize, maxSize);

                              long bytesRemaining = length;
                              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                                        splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
                                                                  new String[] {servers[currentServer]}));
                              currentServer = getNextServer(currentServer, servers.length);
                              bytesRemaining -= splitSize;
                    }

                    if (bytesRemaining != 0) {
                              splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
                                                new String[] {servers[currentServer]}));
                              currentServer = getNextServer(currentServer, servers.length);
                    }
          } else if (length != 0) {
                    splits.add(new FileSplit(path, 0, length,
                                     new String[] {servers[currentServer]}));
                    currentServer = getNextServer(currentServer, servers.length);
          } else {
                    //Create empty hosts array for zero length files
                    splits.add(new FileSplit(path, 0, length, new String[0]));
          }
      }

      // Save the number of input files in the job-conf
      job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());

      return splits;
   }

   private String[] getActiveServersList(JobContext context){

      String [] servers = null;
       try {
                JobClient jc = new JobClient((JobConf)context.getConfiguration());
                ClusterStatus status = jc.getClusterStatus(true);
                Collection<String> atc = status.getActiveTrackerNames();
                servers = new String[atc.size()];
                int s = 0;
                for(String serverInfo : atc){
                         StringTokenizer st = new StringTokenizer(serverInfo, ":");
                         String trackerName = st.nextToken();
                         StringTokenizer st1 = new StringTokenizer(trackerName, "_");
                         st1.nextToken();
                         servers[s++] = st1.nextToken();
                }
      }catch (IOException e) {
                e.printStackTrace();
      }

      return servers;

  }

  private static int getNextServer(int current, int max){

      current++;
      if(current >= max)
                current = 0;
      return current;
  }
}

清单1:ComputeIntensiveSequenceFileInputFormat类

该类继承自 SequenceFileInputFormat 并重写了getSplits()方法,虽然计算分割的过程与FileInputFormat完全一样,但是它为分割指定了“局部性”,以便从集群中找出可用的服务器。getSplits()利用到了以下两个方法:

  • getActiveServersList() 方法,返回集群中当前可用的服务器(名称)组成的数组。
  • getNextServer() 方法,返回服务器数组中下一个服务器索引,且当服务器数组中元素全部用尽时,返回数组头部重新开始。

虽然上述实现(见清单1)将map任务的执行均匀地分布在了集群中的所有服务器上,但是它却完全忽略了数据的实际位置。稍微好点的getSplits方法实现(见清单2)可以试图将两种策略结合在一起:既尽量多地放置针对数据的本地作业,且保持剩余作业在集群上的良好平衡。[2]

public List<InputSplit> getSplits(JobContext job) throws IOException { 

         // get splits
        List<InputSplit> originalSplits = super.getSplits(job);

         // Get active servers
        String[] servers = getActiveServersList(job);
        if(servers == null)
                return null;
        // reassign splits to active servers
        List<InputSplit> splits = new ArrayList<InputSplit>(originalSplits.size());
        int numSplits = originalSplits.size();
        int currentServer = 0;
        for(int i = 0; i < numSplits; i++, currentServer = i>getNextServer(currentServer,
                                                                           servers.length)){
                String server = servers[currentServer]; // Current server
                boolean replaced = false;
                // For every remaining split
                for(InputSplit split : originalSplits){
                        FileSplit fs = (FileSplit)split;
                        // For every split location
                        for(String l : fs.getLocations()){
                                 // If this split is local to the server
                                 if(l.equals(server)){
                                           // Fix split location
                                           splits.add(new FileSplit(fs.getPath(), fs.getStart(),
                                                            fs.getLength(), new String[] {server}));
                                           originalSplits.remove(split);
                                           replaced = true;
                                           break;
                                 }
                        }
                        if(replaced)
                                 break;
                }
                // If no local splits are found for this server
                if(!replaced){
                        // Assign first available split to it
                        FileSplit fs = (FileSplit)splits.get(0);
                        splits.add(new FileSplit(fs.getPath(), fs.getStart(), fs.getLength(),
                                                                             new String[] {server}));
                        originalSplits.remove(0);
                }
        }
        return splits;
}

清单2:优化过的getSplits方法

在此实现中,我们首先使用父类(FileInputSplit)来得到包含位置计算在内的分割以确保数据局部性。然后我们计算出可用的服务器列表,并为每一个存在的服务器尝试分配与其同处本地的分割。

延迟公平调度

虽然清单1和清单2中的代码都正确地计算出了分割位置,但当我们试图在Hadoop集群上运行代码时,就会发现结果与服务器之间产生均匀分布相去甚远。参考文章[2]中很好的描述了我们观察到的这个问题,并为该问题描述了一种解决方案——即延迟公平调度。

假设已经设置好了公平调度程序,那么下面的程序段应当加入到mapred-site.xml文件中以激活某个延迟调度程序[3]:

<property>

        <name>mapred.fairscheduler.locality.delay</name>
        <value>360000000</value>
<property>

 

适当借助延迟公平调度程序,作业执行将可以利用整个集群(见图2)。此外,根据我们的实验,这种情况下的执行时间相比“数据局部性”的做法要节省约30%。

(点击图片进行放大)

图2:执行局部性情况下的结点使用图

其他注意事项

用于测试的计算作业共使用了96个split和mapper任务。测试集群拥有19个数据结点,其中每个数据结点拥有8个mapper槽,因此该集群共有152个可用槽。当该作业运行时,它并没有充分利用集群中的所有槽。

Ganglia的两份截图都展示了我们所使用的测试集群,其中前三个结点为控制结点,而第四个结点为边缘结点,主要用来启动作业。图中展示了中央处理器/机器的负载情况。在图1中,有一些结点被过度使用(红色显示),而集群中的其他结点则未得到充分利用。在图2中,虽然我们得到了更加平衡的分布,然而集群仍然未被充分利用。用于测试的作业也可以运行多线程,这么做会增加中央处理器的负载,但同时也会降低在每次Mapper.map()迭代上的整体时间花费。正如图3所示,通过增加线程数量,我们可以更好地利用集群资源,并进一步减少完成作业所花费的时间。通过改变作业区域性,我们可以在不牺牲性能的情况下更好地利用群集处理远程作业数据。

(点击图片进行放大)

图3:使用多线程Map作业的执行区域性情况下的结点使用图

即使机器中央处理器处于高负荷状态,它仍然可以允许其他磁盘I/O密集型作业运行在开放槽中,要注意的是,这么做会带来些许的性能下降。

自定义分割

本文中提到的方法对大文件非常适用,但是对于小文件而言,并没有足够的分割来让其使用集群中的多台机器。一种可行的方法是使用更小的分割块,但是这么做会给集群命名结点带来更多的负担(内存需求方面)。一种更好的做法是修改清单1中的代码,以使用自定义的块大小(而不是文件块大小)。这种方法可以计算出所需的分割块数量,而不用理会实际的文件大小。

总结

在这篇文章中,我们已经展示了如何利用自定义的InputFormats来更紧密地控制Map Reduce中的map任务在可用服务器间的分布。这种控制对于一类特殊应用——计算密集型应用非常重要,控制过程将Hadoop Map Reduce做为通用的并行执行框架使用。

关于作者

Boris Lublinsky是NAVTEQ公司的首席架构师,在这家公司中他的工作是为大型数据管理、处理以及SOA定义架构愿景,并且实施各种NAVTEQ的项目。他还是InfoQ的SOA编辑,OASIS的SOA RA工作组的参与者。Boris是一位作者并经常发表演讲,他最新的一本书叫做《Applied SOA》。

Michael Segel在过去二十多年里一直不断与客户合作,帮助他们发现并解决业务上的问题。Michael做过许多不同类型的工作,也在不同的行业圈摸打滚爬过。他是一位独立顾问,并且总是期望能够解决所有具有挑战性的问题。Michael拥有俄亥俄州立大学的软件工程学位。

参考

1. Boris Lublinsky, Mike Segel. Custom XML Records Reader. 

2. Matei Zaharia, Dhruba Borthakur, Joydeep Sen Sarma, Khaled Elmeleegy, Scott Shenker, Ion Stoica. Delay Scheduling: A Simple Technique for Achieving Locality and Fairness in Cluster Scheduling. 


[1] 这是一个简化后的解释。真实的调度算法要复杂得多;考虑更多的参数而不仅仅是分割的位置。

[2] 虽然我们将其列作一个选项,但是如果花费在Mapper.map()方法上的时间高与远程访问数据时间的一个或多个数量级时,清单1中的代码将不会有任何性能上的提升。但尽管如此,它也许会带来网络利用率上的略微提高。

[3] 请注意这里的延迟以毫秒为单位,在改变该值之后需要重新启动作业跟踪器。

查看英文原文:Uncovering mysteries of InputFormat: Providing better control for your Map Reduce execution.

登陆InfoQ,与你最关心的话题互动。


找回密码....

Follow

关注你最喜爱的话题和作者

快速浏览网站内你所感兴趣话题的精选内容。

Like

内容自由定制

选择想要阅读的主题和喜爱的作者定制自己的新闻源。

Notifications

获取更新

设置通知机制以获取内容更新对您而言是否重要

BT