a)输入数据采样。为了降低数据倾斜带来的性能影响,我们对输入数据做了采样,根据采样结果来确定RangePartition的边界,从而保证每个sort任务处理的数据量尽量接近。 举例说明,假设输入数据被分成了X个文件,首先,我们在每个文件里随机选取Y个位置,从每个位置开始连续读取Z个数据样本,最后共得到X * Y * Z个样本。然后,我们对这些样本数据进行排序,排序后样本数据被均分为S份,这里S为sort任务的个数,这样就得到每个sort任务待处理数据的范围边界。由于样本是均分的,可以使得每个sort任务都处理了几乎相等的数据量。 对于GraySort而言,我们有20000个输入文件(X),每个输入文件选取300个位置(Y),每个位置读取1个样本(Z),最终我们选取6000000条样本进行排序,并均分为20000份(sort任务个数),map任务将根据上述样本来进行RangePartition,保证 sort任务处理的数据尽量均匀。整个采样过程大约耗时35秒。对于MinuteSort而言,3350个输入文件,我们在每个文件里选取900个数据作为样本,总的样本数量为3015000,排序后分成10050份。整个采样过程耗时4秒。对于IndySort,则不需要这个采样过程。 b)IO 双buffer。map阶段,FuxiSort在一个I/O buffer中处理数据,同时Pangu在另一个buffer中执行数据读入操作。这两个buffer的角色会周期性地进行切换,这样就能保证处理数据操作和I/O操作能并行起来,从而能够大幅降低任务的Latency。 图2. FuxiSort各阶段启动顺序
c)流水线操作。如图2所示,为了进一步降低整体Latency,我们把排序过程的每个阶段分解成许多小的步骤,并且尽可能地将这些小的步骤重叠起来执行。这些分解出来的小步骤如下所示:数据采样; Job启动;MapTask读输入数据; MapTask发送数据至SortTask;SortTask接收数据; SortTask将内存中的数据进行排序,当内存装不下时,将排好序的数据dump到临时文件中;SortTask将内存中的有序数据和临时文件中的有序数据做merge sort; SortTask写最终输出文件。FuxiSort将数据采样过程和Job启动过程并行起来执行,在Job启动阶段做的主要工作包括任务的分发,以及一些其他的数据管理工作,比如收集所有SortTask的网络地址,并且通知所有的MapTask。当数据采样过程结束时,采样程序会将每个分区的界限存放在Pangu上,并且会建立另一个通知文件存放在Pangu上,用来标志采样结束。一旦任务分发完成,每个MapTask就开始周期性地检查通知文件是否存在。一旦检查到通知文件存在,也就意味着采样程序产生的各分区界限可用,MapTask就会立刻读取这些分区界限,并且根据这些界限进行数据分发。 步骤(3)(4)和(5)在map阶段并行执行,步骤(7)和(8)在sort阶段并行执行。在步骤(6)中,只有当分配给task的内存已经全部填满,才会进行排序和dump,由于在排序过程中,内存被全部占用,没有剩余内存可以接收新的数据,因此步骤(5)会被阻塞。为了缓解这个问题,我们将步骤(5)和(6)并行起来,一旦内存使用超过一定量值,就开始做排序,这样,步骤(6)会被提前执行,而步骤(5)也不会被阻塞。当内存全部占满时,我们将内存中已经排好序的数据进行归并,并dump到临时文件中。显然,开始做排序的内存阈值越低,步骤(6)开始得越早。在我们的实验中,当接收到的数据占用分配给Task内存的1/10时,开始执行步骤(6)。通过这种方法,我们将I/O和计算并行起来,并且没有明显的延迟,虽然这种方法可能会需要merge更多的临时文件,但在我们的场景中没有因此导致明显的overhead。 图2说明了每一步所花费的时间以及在执行过程中这些步骤之前的重合部分。 d)网络通信优化。在map task和sort task之前有明显的网络通信流量,每个网络包到达后都会产生CPU中断。如果对中断的处理被绑定到一个指定的CPU内核上,当这个CPU内核忙于排序时,对中断的处理会被延迟,这就可能导致请求超时,甚至丢包。通过设置”sm_affinity”,可以将网络中断产生的负载均衡到所有的CPU内核上,请求超时和丢包的比率明显下降。 图三. 实时计算框架
e) 对MinuteSort的进一步优化。由于MinuteSort的执行时间要求限制在60秒内,一般离线作业的调度开销就变得不容忽视。为了降低这些开销,我们在Fuxi的准实时Job模型上执行MinuteSort,Fuxi准实时Job模型是为了降低调度产生的overhead,使内存计算获得很高的性能而开发的。Figure 3说明了准实时Job模型的框架。在典型的生产环境中,准实时系统是一个长期运行的service,会在集群部署过程中被启动,并且在每台机器上启动一个不退出的worker进程。系统启动之后,用户可以向准实时系统的调度器提交各种job,并且可以获得job在运行期间的状态。sort benchmark竞赛要求与排序直接相关的启动和退出过程也需要包含在最终的时间里,为了遵守这一规则,我们在提交MinuteSort job之前,先通过程序去启动准实时系统worker,在job运行结束后,再将worker进程停掉,在最终提交的结果中,包含了worker启动和停止所用的时间。 准实时系统针对的场景是在中等规模大小的数据集(不超过10TB)上,对延迟敏感的数据处理过程,在这种规模的数据集下,包括输入和输出在内的所有records都可能被cache在内存中。在我们的实验中,我们只在准实时系统中运行MinuteSort。 对普通人意味着什么? 从2009年阿里云诞生那天起,我们的愿景就是打造一个自研的、通用的、大规模分布式计算底层系统,让计算像电一样成为公共服务,“飞天”平台是承载这一理念的技术核心。 FuxiSort打破Sort Benchmark排序比赛世界纪录是阿里云6年技术沉淀的直接体现,是所有技术人的骄傲。 但这仅仅是开始。技术本身不是目的,阿里云在任何技术上的进步,都会通过云产品对外输出,让中国乃至全世界的云计算客户收益。比如本次参赛的FuxiSort,通过开放数据处理服务(Open Data Processing Service, 简称ODPS)对外商用。ODPS是由阿里云自主研发,提供针对TB/PB级数据、实时性要求不高的分布式处理能力,应用于数据分析、挖掘、商业智能等领域。阿里巴巴的离线数据业务都运行在ODPS上(详情参考http://www.aliyun.com/product/odps/ )。 阿里云将借助技术创新,不断提升计算能力与规模效益,希望更多的合作伙伴、中小企业、开发者能够受益于云计算带来的便利和价值,共同将云计算变成真正意义上的公共服务和普惠科技。 |