Yahoo! S4:分布式流计算平台

一、概述

S4(Simple Scalable Streaming System)最初是Yahoo!为提高搜索广告有效点击率的问题而开发的一个平台,通过统计分析用户对广告的点击率,排除相关度低的广告,提升点击率。目前该项目刚启动不久,所以也可以理解为是他们提出的一个分布式流计算(Distributed Stream Computing)的模型。

S4的设计目标是:

·提供一种简单的编程接口来处理数据流

·设计一个可以在普通硬件之上可扩展的高可用集群。

·通过在每个处理节点使用本地内存,避免磁盘I/O瓶颈达到最小化延迟

·使用一个去中心的,对等架构;所有节点提供相同的功能和职责。没有担负特殊责任的中心节点。这大大简化了部署和维护。

·使用可插拔的架构,使设计尽可能的即通用又可定制化。

·友好的设计理念,易于编程,具有灵活的弹性

系统基于如下假设:

·一旦一个节点失败,会failover到另一个standby节点,但是会丢失原节点的内存状态,这也就是为什么说S4是一个部分容错的系统。

·节点不能动态增加和减少。

二、与MapReduce的区别

通常对于大规模分布式数据的处理会首先想到MapReduce,Yahoo!也维护了Hadoop项目,但是他们最终放弃了扩展Hadoop的想法,因为相比之下,流计算面对的场景和需求是完全不同的。

流计算强调的是数据流的形式和实时性。MapReduce系统主要解决的是对静态数据的批量处理,当MapReduce任务启动时,一般数据已经到位了(比如保存到了分布式文件系统上)。而流式计算系统在启动时,一般数据并没有完全到位,而是经由外部数据流源源不断地流入,并且不像批处理系统重视的是总数据处理的吞吐,而是对数据处理的低延迟,希望进入的数据越快处理越好,这里的思想是数据的价值随着时间的递增而递减,所以数据越快被处理,结果就越有价值,这也是实时处理的价值所在。

MapReduce采用的是一种比较静态的模型,如果用它做数据流的处理,首先需要将数据流缓存并分块,然后放入集群运算。如果数据块分得太小,可以获得一定的低延迟以保障实时性,但是包括集群启动之类的额外开销将会占很大比重;如果数据块分得太大,将无法满足低延迟的需求,达不到实时性的要求。

流计算的数据本身就是数据流,不需要数据准备的时间,有数据即可流入同时计算,同时解决了数据准备和延迟的两个问题,所以流计算是一种有别于MapReduce的分布式计算模型。同时高速数据流的速率也不是MapReduce的设计可以承担的,流计算系统没有批处理的概念,而是自启动开始,每时每刻都在处理实时流入的数据。这也是为什么S4的开发小组一直在强调S4 is not Real-Time MapReduce。

三、实现逻辑

1、Event

数据流是事件(Event)的序列流。每个Event是一个(K,A)元素,通过EventType来标示其类型。K、A分别表示这种类型的 Event的若干个key和若干个attribute。key和attribute都是tuple-valued,即key=value这种元组值。例如:

EventType(EV): TradeEvent

KEY: product=”T-shirt” type=”buy it now”

Attribute: customerId=”1234” time=”2011-4-19 01:21:31”

2、Processing Elements


Processing Elements(PE)是S4中的基本计算单元,一个PE通过下面四个组件来表示:

functionality:由实现PE的Java类和相关配置来定义。

types of events:处理的Event Type。

key:关心哪种key。

key的值:关心(匹配)的key值是多少。

每个PE只负责处理自己所关心的event type,并且只处理自己所对应的key值的event,也就是说,只有当event type, key, key的值都匹配时,才会交由该PE进行计算处理。

这里要注意的是,如果有匹配的PE,则交由该PE处理,如果没有,则会创建一个新的PE。所以一个PN中的PE可能有许多个,这就需要在前期对事件的key及其取值范围进行很好的划分,否则可能因为过多的PE导致系统效率降低,同时也应该定期对使用率较低的PE进行清除,但是由于数据存放在节点内存中,所以清除前应该对有必要的数据进行持久化处理,否则会永久性丢失。后期应该会在PE上添加优先级等属性,可以提升清除PE工作的准确率。

有一种PE没有属性key和attribute,它们可以处理指定event type的所有事件,通常这些事件是原始数据,这类PE一般放置在S4集群的输入层,在这里原始事件会被赋予一个key以便于分发给后面的PE处理。PE处理后可能输出一个或多个event,输出频率在配置文件中定义,可以配置指定时间间隔输出一次或是指定事件数发生后输出一次,特例是可以配置为每次事件触发都输出一次。

PE是直接面向业务方的组件,由业务方定义PE中对事件的处理和处理以后的输出,剩下的事情全部交由平台去做。

3、Processing Node

Processing Node(PN)对应着集群中的每一个逻辑结点,主要工作是监听事件,当事件到达时调用适合的PE处理事件,如果PE有输出,则还需和通讯层合作进行事件的分发和输出。需要注意的是,集群中所有的PN都是对等的,使得集群的部署和维护相对简单,没有中心节点。其结构如下图:

Event Listener负责监听事件并转交给PE容器(Processing Element Container, PEC),由PEC交由合适的PE处理业务逻辑。配置文件中还会配置PE原型(PE prototype),定义其功能、event type、key。PN对其监听到的每一个唯一属性值会触发一次操作,如果当前没有合适的PE处理该事件,则会根据该PE原型创建新的对应了该唯一属性值的PE来对事件进行处理。

配置文件中定义了S4集群所关心的key的集合,这里会通过对事件的event type, key, key的值计算哈希函数,以路由到指定的PN,此外单个事件可能会被路由到多个PN中,这里应该是为将来做负载均衡做的准备,也可以作为广播事件的途径。

PEC中有若干个PE分别对应不同的情况,这里的思想即是Actor模型。PE处理完逻辑后根据其定义的输出方法可以输出事件,事件交由Dispatcher与通讯层(Communication Layer)进行交互并由Emiter输出至逻辑节点。

结果就是,所有包含特性属性值的事件在理论上都能通过哈希函数到达相应的PN,并被路由到PN内的PE上处理。

4、Communication Layer

通讯层提供集群管理、故障恢复(failover)到备用节点、逻辑节点到物理节点的映射。当监测到硬件故障时,会自动更新映射。通讯层隐藏的映射使得PN发送消息时只需要关心逻辑节点而不用关心物理节点。

通讯层采用插件式的架构来选择网络协议。它会对事件本身采用不同的协议发送,例如对于控制消息,会采用可靠方式(例如TCP)发送,而为了优化集群的吞吐率,可能会采用不可靠的方式(例如UDP)发送数据消息,这在S4中是可以容忍的。

5、Configuration Management System

配置管理系统主要用于对集群的操作,包括为S4任务创建和销毁集群,分配新的物理节点到S4任务集群中,空闲的集群可以作为冷备。这里的一致性保证交由ZooKeeper来做。

S4系统的数据流整体上看起来类似下图:

四、编程模型

面向业务方的PE编程十分简单,只需要编写处理事件的processEvent函数和输出的output函数即可,PE本身可以定义一些变量进行本地的数据记录,同时S4本身提供了一些持久化的辅助,主要类都在io.ts4.persist包下面。值得注意的是在API文档中可以看到,持久化的类通常都包含一个Clock对象,可以在初始化时定义持久时间,到时间以后会被抛弃。

五、性能

开发小组运用S4系统分别进行了在线实验和离线实验(压力测试)。

在线实验集群有16台服务器,每台4个32位CPU,2GB内存。每天大约有25万用户发起共100万次搜索,实验两周内观察到的峰值是每秒1600个事件。实验结果表明S4增加了3%的广告点击,主要通过快速检测低质量广告并把它们过滤出去达成。

离线实验集群有8台服务器,每台服务器4个64位CPU,16GB内存。集群中跑了16个PN,每台2个,事件由300万服务和点击组成。这次实验主要用于评估系统在远高于期望事件流量下的性能。

压力测试结果如下:

可见当事件流越来越大时,S4系统的错误越来越多,当数据流速达到9.7Mbps时,开始出现错误,这主要是由于S4系统无法及时处理过多的事件而导致了丢失数据的情况。

六、系统分析

S4是面向流式数据和实时处理的,所以针对实时性较高的业务,可以很好地对数据做出高效的分析处理,而且系统一旦上线,很少需要人工干预,源源不断的数据流会被自动路由并分析。对于海量数据,它和MapReduce类似都可以应对,但它能比后者更快地处理数据。

S4目前的缺点在于它的数据传输可靠性还不够,可能丢失数据,同时由于数据存放在内存中,一旦节点出现故障,就会丢失该节点的所有数据,这一点可以通过定期持久化来弥补(但是真的有必要吗?)。同时我认为这和它面向的场景也有关系,实时数据分析通常都是针对一些非常离散、细小的数据,从统计的角度来看,损失掉一部分数据对最后的统计结果并没有很大影响,而这部分牺牲却可以换来吞吐率的大幅提升。所以就目前来看,S4还是更适合对那些不一定非得对每条数据都仔细分析的场景,只求最后一个统计的结果来对业务做出相应的预计和调整。此外S4系统要求输入的是事件流,这就涉及到事件的生成,所以在数据流入S4以前,必须有能将数据转化为事件的系统进行中间处理。

从集群的扩展性来看,理论上可以通过增加节点应对更大的数据流,但是目前还无法在S4工作时动态增加或减少节点。所以对节点进行调整时很可能必须停下当前的工作,做不到无缝调整。而且由于S4由ZooKeeper进行集群管理,所以当集群增加到一定规模时,ZooKeeper的管理能力也有待考验。此外,仍然是因为S4无法保证数据100%的可靠传输,所以集群规模增长时,数据错误也会增长得很快。目前没有相关资料显示S4集群的规模究竟可以做到多大,但是相信未来随着数据传输可靠性的提升,会发挥很可观的作用。

在业务耦合度方面,S4完全隔离了平台和业务逻辑,业务方只需要编写PE逻辑即可,这一点类似于MapReduce中只需编写map和reduce函数,业务和平台的耦合度是非常低的。

七、前景

从目前来看,流式计算的前景非常广阔,目前已经有一些基于流的服务出台。例如Twitter的Streaming API,可以实时推送(Push)消息,Google也利用该API实现了实时搜索。未来无论从数据的产生还是流动上都会有数量级的飞跃,如何对海量数据做出及时、实时的处理,相信不论是实际业务还是数据挖掘领域都会十分关注。也可以独立开发自己的或者基于S4的流式计算平台,以契合自身的业务需求。新的信息不仅对数据分析有用,也能给予用户更好的体验(Yahoo!做S4的初衷就是提升用广告的用户体验以增加点击率)。相信S4会成为继Hadoop之后的又一分布式计算领域的干将。未来的数据将不再是一个一个包的概念,而成为一道道数据流源源不断地流入系统,产出效益。

附:Yahoo! S4论文:http://labs.yahoo.com/files/KDCloud%202010%20S4.pdf

Yahoo! S4:分布式流计算平台》上有3条评论

发表评论

电子邮件地址不会被公开。