第十一章:流处理
作者:负雪明烛 时间:2021 年 12 月 27 日 上一章的批处理技术讨论了什么?
- 读取读取一组文件作为输入,并生成一组新的文件作为输出。
- 输出是衍生数据(derived data) 的一种形式:如果需要,可以通过再次运行批处理过程来重新创建数据集。
批处理技术的问题?
- 一个很大的假设:即输入是有界的。
- 例如,MapReduce核心的排序操作必须读取其全部输入,然后才能开始生成输出:
- 可能发生这种情况:最后一条输入记录具有最小的键,因此需要第一个被输出,所以提早开始输出是不可行的。
- 例如,MapReduce核心的排序操作必须读取其全部输入,然后才能开始生成输出:
- 实际上,很多数据是无界限的,因为它随着时间的推移而逐渐到达,如用户行为。
- 批处理必须人为地分成固定时间段的数据库,例如每天结束时处理一天的数据。
- 批处理延迟太大。如果增加很多批次,就可以减少延迟,就是流处理(stream processing) 背后的想法。
什么是“流”?
- “流”是指随着时间的推移逐渐可用的数据。
- Unix的stdin和stdout,编程语言(惰性列表),文件系统API(如Java的FileInputStream),TCP连接,通过互联网传送音频和视频等等。
- 我们将把 事件流(event stream) 视为一种数据管理机制:无界限,增量处理,与上一章中的批量数据相对应。
传递事件流
流处理的输入输出的等价物看上去是什么样子?
- 在批处理领域,作业的输入和输出都是文件,处理时的第一个步骤通常是把文件解析成一系列记录。
- 在流处理领域,记录通常被叫做 **事件(event),**但它本质上是一样的:一个小的、自包含的、不可变的对象,包含某个时间点发生的某件事情的细节。
- 一个事件通常包含一个来自日历时钟的时间戳,以指明事件发生的时间。
- 如用户的某个行为(查看页面,购买),或机器的传感器;
- 事件可能被编码成文本字符串或者 JSON,或者二进制编码。
流处理的数据消费方式?
- 批处理中,文件被写入一次,然后可能被多个作业读取。
- 类似的,在流处理术语中,
- 一个事件由 生产者(producer) (也称为 发布者(publisher) 或 发送者(sender) )生成一次
- 然后可能由多个 消费者(consumer) ( 订阅者(subscribers) 或 接收者(recipients) )进行处理。
- 在文件系统中,文件名标识一组相关记录;在流式系统中,相关的事件通常被聚合为一个 主题(topic) 或 流(stream) 。
流处理的数据怎么存储?
- 原则上说,文件或者数据库可以当做流处理的数据输入方式
- 生产者把每个事件写入数据存储。
- 消费者定期轮询数据存储,检查自上次运行以来的新事件。
- 可这和批处理没区别了。
- 想要低延迟的连续处理,那么就不能用数据存储+轮询的方式。
- 最好在新事件出现时,直接通知消费者。
- 数据库对这种通知方式支持的不好,关系型数据库通常有触发器(trigger) 。
- 它们可以对变化(如,插入表中的一行)作出反应,但是它们的功能非常有限,并且在数据库设计中有些后顾之忧。
- 已经开发出来了一些专门的工具提供事件通知。
消息传递系统
什么是消息传递系统?
- 向消费者通知新事件的常用方式是使用消息传递系统(messaging system):生产者发送包含事件的消息,然后将消息推送给消费者。
- 简单方法:像生产者和消费者之间的Unix管道或TCP连接这样的直接信道,
- 实际上,大多数消息传递系统都做了拓展:允许多个生产者发送消息到同一个主题,允许多个消费者节点接收主题中的消息。
怎么区分不同的消息传递系统?
- 如果生产者发送消息的速度比消费者能够处理的速度快会发生什么?
- 阻塞生产者的三种选择
- 丢掉消息
- 把消息放入缓冲队列
- 反压(backpressure)
- 阻塞生产者的三种选择
- 如果节点崩溃或暂时脱机,会发生什么情况?——是否有消息丢失?
- 与数据库一样,持久性可能需要写入磁盘或者进行复制
是否可以接受消息丢失?
- 这取决于应用
- 如,周期传输的传感器读数,偶尔确实数据不重要;
- 如,正在事件进行记数,如果不能可靠送达就意味着计数器的错误扩大。
直接从生产者传递给消费者
很多消息传递系统使用生产者消费者之间的直接网络通信,而不通过中间节点:
- UDP 组播广泛应用于金融行业,其中低时延非常重要;虽然 UDP 本身不可靠,但是应用层协议可以恢复丢失的数据包;
- 无代理的消息库,如 ZeroMQ 或者 nanomsg 用 TCP 或者 IP 多播实现发布/订阅消息传递;
- StatsD 和 Brubeck 使用不可靠的UDP消息传递来收集网络中所有机器的指标并对其进行监控。
- 如果消费者在网络上公开了服务,生产者可以直接发送HTTP或RPC请求,将消息推送给使用者。这是 webhooks 背后的想法。
怎么实现可靠性?
- 这些消息传递系统通常需要应用代码意识到消息丢失的可能性。
- 容错很有限:始终假设生产者、消费者始终在线。
- 如果消费者脱机,或者生产者崩溃,那么消息很可能就会丢失。
消息代理
什么是消息代理?
- 消息代理(message broker)(也称为消息队列(message queue)),实质上是一种针对处理消息流而优化的数据库。
- 它作为服务器运行,生产者和消费者作为客户端连接到服务器。
- 把数据集中在代理上,系统更容易容忍客户端的连接,断开,和崩溃;
- 消息持久化转移在代理上;
- 消息处理是异步的,生产者只负责发给代理,不用管什么时间消费;
消息代理与数据库的对比
消息队列与数据库的差异:
- 数据库通常保留数据直至显式删除,而大多数消息队列在消息成功传递给消费者后自动删除消息;因此不适合长期的数据存储;
- 大多数代理的工作集相当小——队列很短;
- 数据库支持二级索引和检索数据,消息队列通常按照某种模式匹配主题,订阅其子集。
- 查询数据库,通常基于某个时间点的数据快照,所以数据变化时,第一个客户端不会发现其数据已经过期;而消息代理不支持任意查询,数据发生变化时,它们会通知客户端。
多个消费者
多个消费者从同一主题消费消息时,两种不同的消息传递模式: 负载均衡(load balancing)
- 每条消息都被传递给消费者之一。
- 在并行处理消息时,此模式非常有用;
扇出(fan-out)
- 每条消息都被传递给所有消费者。
两种模式可以组合使用:两个独立的消费者组订阅同一个主题,可以一组选择负载均衡,另一组选择扇出。
确认与重新传递
为什么需要确认?
- 消费者可能随时崩溃,有种情况:代理向消费者送达了消息,但消费者还没处理/没处理完就崩溃了。
- 为了确保消息不会丢失,消息代理使用确认:客户端必须显式告知代理消息处理完毕,消息代理才会把消息从队列中删除。
如果代理没有收到确认怎么办?
- 代理认为消息没有被处理,因此它把消息再传递给另一个消费者;
- 可能会出现消息处理完毕,但是「确认消息」在网络中丢失。需要原子提交消息才能处理这种情况。
- 可能出现消息消费顺序与发送顺序不同的情况。
怎么保证消息顺序?
- 消息代理虽然可以试图保留消息的顺序,但是负载均衡和重传的组合也不可避免消息被重新排序;
- 为了避免消息乱序,可以让每个消费者使用单独的队列(即,不使用负载均衡)
- 如果消息完全独立的,则消息顺序重排不是一个问题;
- 如果消息有因果依赖关系,这是个重要的问题。
分区日志
数据会永久保存吗?
- 数据库和文件系统:在显式删除前永久保存;
- AMQP/JMS风格的消息传递并非如此:消息被读取后,消息代理则删除;
- 基于日志的消息代理(log-based message brokers):既有数据库的持久化,有有消息传递的低延迟;
使用日志进行消息存储
怎么用日志实现消息代理?
- 生产者通过将消息追加到日志末尾来发送消息,而消费者通过依次读取日志来接收消息。
- 如果消费者读到日志末尾,则会等待新消息追加的通知。
- Unix工具tail -f 能监视文件被追加写入的数据,基本上就是这样工作的。
当吞吐量超过单个磁盘,怎么办?
- 可以对日志分区。
- 不同的分区可以托管在不同的机器上,使得每个分区都有一份独立于其他分区的读写日志。
- 一个主题可以定义为一组携带相同类型消息的分区。
- 每个分区内,代理给每个消息分配一个单调递增的序列号或偏移量(offset)
- 序列号:分区内完全有序,跨分区没有顺序保证。
哪些消息队列产品基于日志?
- Apache Kafka ,Amazon Kinesis Streams 和 Twitter 的 DistributedLog 都是基于日志的消息代理。
- Google Cloud Pub/Sub在架构上类似,但对外暴露的是JMS风格的API,而不是日志抽象。
- 这些代理把消息写入磁盘,通过跨多台机器分区,实现每秒百万条消息的吞吐量
日志与传统的消息传递相比
基于日志的消息队列为什么适合扇出?
- 天然支持
- 因为多个消费者可以独立读取日志,并且互补影响——读入消息不会将其从日志中删除;
- 每个客户端将消费被指派分区中的所有消息,通常就是单线程顺序读取分区中的消息。
- 缺点:
- 由于同一个分区的消息被传递到同一个节点,所以消费一个主题的节点数 最多为 主体中的日志分区数。
- 单个消息处理缓慢时会影响后续消息的处理。
- 缺点:
怎么选择消息队列?
- 当消息处理代价高,希望逐条并行处理,以及消息顺序不重要时,JMS/AMQP风格的消息代理是可取的。
- 当消息吞吐量很高,处理迅速,消息顺序重要时,基于日志的方法表现非常好。
消费者偏移量
顺序消费分区内消息的优点?
- 判断消息是否已经被处理变得很简单:小于偏移的都已经被处理,否则都会被处理。
- 代理只需要定期记录消费者的偏移即可;
- 减少了记录的开销;
- 有助于提高基于日志的系统的吞吐量。
与单领导者数据库复制中常见的日志序列号很类似:
- 也是记录复制到哪里了。
- 消息代理表现得像主库,消费者像从库。
当消费者节点失效怎么办?
- 失效的消费者的分区被指派给其他节点,并从最后的偏移量开始消费消息;
- 当消费者已经处理了消息,但是没有记录偏移量,那么重启后这些消息被处理两次。
磁盘空间使用
追加日志怎么防止磁盘耗尽?
- 为了回收磁盘空间,日志实际上被分割成段,并不时地将旧段删除或移动到归档存储。 (我们将在后面讨论一种更为复杂的磁盘空间释放方式)
- 慢消费者如果跟不上消息产生的速率而落后得太多,那么可能错过被删除的消息。
- 日志实现了一个有限大小的缓冲区,被称为循环缓冲区(circular buffer) 或环形缓冲区(ring buffer)。
当消费者跟不上生产者时
消费者跟不上生产者发送消息的速度时怎么办?
- 丢弃信息
- 进行缓冲:基于日志的方法也是缓冲的一种形式。
- 施加背压。
消费者落后太多怎么办?
- 可以监控消费者落后日志头部的距离,产生报警,防止落后太多。
- 即使落后太多,也只是一个消费者的问题,修它的时候不会影响别的消费者。
重播旧消息
为什么能重播旧消息?
- AMQP和JMS风格的消息代理,处理和确认消息是破坏性的操作,会导致它在消息代理中被删除;
- 基于日志的消息代理,是只读操作,不会更改日志;
- 除了消费者的任何输出之外,处理的唯一副作用是消费者偏移量;
- 如果需要的话,可以操纵消费者的偏移量,使得消息被重复处理任意次。
- 使得基于日志的消息传递类似于批处理,容错性更强。
数据库与流
数据库与流的关系?
- 数据库和数据流之间的联系不仅仅是磁盘日志的物理存储 —— 而是更深层的联系。
- 复制日志是一个由数据库写入事件组成的流,由主库在处理事务时生成。复制日志中的事件描述发生的数据更改。
保持系统同步
为什么需要保持系统同步?
- 大多数应用都是多种应用的组合:OLTP,缓存,全文索引,数据仓库
- 每种应用都有自己的数据副本,根据自己的目的进行存储方式的优化;
- 当相同/相关的数据出现在不同的地方,相互之间就要保持同步。
怎么做到系统同步?
- 批处理进行数据转换
- 双写
双写存在的问题?
- 竞争条件(并发)
- 导致结果不符合预期(除非有额外的并发检测机制)
- 或者写入失败(容错问题,需要原子提交)。
变更数据捕获
大多数数据库的复制日志问题是什么?
- 被当做数据库的内部实现,而不是公开的 API。
- 简而言之,不通用。
什么是变更数据捕获(change data capture, CDC)?
- 观察写入数据库的所有变更,提取并转换成为可以复制到其他系统形式的过程。
- 变更能在被写入后,立刻用于流。
举个 CDC 的例子:
- 捕获数据库中的变更,不断将相同的变更应用于搜索索引。
- 只要变更日志按照相同顺序应用,就可以预期搜索索引中的数据与数据库一致。
- 搜索索引和其他衍生数据是变更流的消费者。
变更数据捕获的实现
我们将日志消费者叫做衍生数据系统。衍生数据就是原始数据的准确副本。
变更数据捕获的原理?
- 使得被捕获的数据库成为领导者,其他组件成为追随者。
- 基于日志的消息代理非常适合从源数据库传输变更事件,因为保留了消息的顺序。
- 数据库触发器可用来实现变更数据捕获
- 缺点:脆弱,性能差。
- 解析复制日志:更稳健;但是有挑战,如应对模式变更。
常见实现:
- LinkedIn的Databus,Facebook的 Wormhole 和 Yahoo! 的 Sherpa 大规模地应用这个思路。
- Bottled Water 使用解码 WAL 的 API 实现了 PostgreSQL 的 CDC
- Maxwell 和 Debezium 通过解析 binlog 对 MySQL 做了类似的事情
- Mongoriver读取MongoDB oplog,而GoldenGate为Oracle提供类似的功能。
同步还是异步?
- CDC 通常是异步的
- 性能好,但有延迟。
初始快照
怎么从一个已有的数据库中使用 CDC 同步数据?
- 如果拥有数据库的所有变更日志,那么可以重播该日志;
- 实际上,数据库不会保留所有变更日志,而且重播过于费时;
- 一些 CDC 工具集成了数据库快照功能,其他工具留给你手动执行;
日志压缩
如何避免每添加一个衍生数据系统都要对数据库做一个快照?
- 日志压缩。
日志压缩原理?
- 之前「哈希索引」章节中介绍了:只保留每个键的最新值,去除历史上的重复内容;
怎么使用日志压缩来重建衍生数据系统?
- Kafka 有这种功能。
- 先读取并压缩数据库到 Kafka 里,然后衍生数据系统再消费 Kafka。
变更流的API支持
越来越多的数据库把变更流作为第一等的接口。
- RethinkDB允许查询订阅通知,当查询结果变更时获得通知
- Firebase 和 CouchDB 基于变更流进行同步,该变更流同样可用于应用。
- 而Meteor使用MongoDB oplog订阅数据变更,并改变了用户接口。
- VoltDB允许事务以流的形式连续地从数据库中导出数据
- Kafka Connect 致力于将广泛的数据库系统的变更数据捕获工具与Kafka集成。
事件溯源
我们讨论的想法类似于事件溯源(Event Sourcing) 之间有一些相似之处,这是一个在 领域驱动设计(domain-driven design, DDD) 社区中折腾出来的技术。
什么是事件溯源?
- 类似于变更数据捕获,事件溯源涉及到将所有对应用状态的变更存储为变更事件日志。
- 区别是事件溯源将这一想法应用到了一个不同的抽象层次上:
- 在变更数据捕获中,应用以可变方式(mutable way) 使用数据库,可以任意更新和删除记录。变更日志是从数据库的底层提取的(例如,通过解析复制日志),从而确保从数据库中提取的写入顺序与实际写入的顺序相匹配,从而避免竞态条件。写入数据库的应用不需要知道CDC的存在。
- 在事件溯源中,应用逻辑显式构建在写入事件日志的不可变事件之上。在这种情况下,事件存储是仅追加写入的,更新与删除是不鼓励的或禁止的。事件被设计为旨在反映应用层面发生的事情,而不是底层的状态变更。
为什么将用户的行为记录为不可变的事件?
- 更有意义
- 而不是在可变数据库中记录这些行为的影响。
- 事件溯源使得应用随时间演化更为容易,通过更容易理解事情发生的原因来帮助调试的进行,并有利于防止应用Bug
事件溯源属于什么模型?
- 事件溯源类似于编年史(chronicle) 数据模型
实际中的数据溯源?
- 诸如Event Store 这样的专业数据库已经被开发出来
- 传统的数据库或基于日志的消息代理也可以用来构建这种风格的应用。
从事件日志中派生出当前状态
用户关心事件日志吗?
- 不关心
- 用户关心系统的当前状态,而不是变更历史
事件日志和状态怎么转化?
- 使用事件溯源的应用需要拉取事件日志(表示写入系统的数据),并将其转换为适合向用户显示的应用状态(从系统读取数据的方式)
- 这种转换可以使用任意逻辑,但它应当是确定性的,以便能再次运行,并从事件日志中衍生出相同的应用状态。
日志压缩重构系统当前状态的方法?
- 用于记录更新的CDC事件通常包含记录的完整新版本,因此可以丢弃相同主键的先前事件。
- 事件溯源在更高层次进行建模:事件通常表示用户操作的意图,而不是因为操作而发生的状态更新机制。这种情况下,需要用完整的历史事件重构最新状态。
命令和事件
什么是事件和命令?
- 事件溯源的哲学是仔细区分事件(event) 和命令(command)
- 当来自用户的请求刚到达时,它一开始是一个命令:在这个时间点上它仍然可能可能失败,比如,因为违反了一些完整性条件。
- 当检查成功时,应用可以生成一个事件。
- 在事件生成的时刻,它就成为了事实(fact)
对事件怎么操作?
- 事件流的消费者不允许拒绝事件:
- 消费者看到事件时,它已经成为日志中不可变更的一部分。
- 因此,任何对命令的验证,都需要在它成为事件之前同步完成。
- 或者拆成两个事件:
- 第一个是暂时预约
- 第二个是验证预约后的肚子的确认事件
状态、流和不变性
状态和数据库?
- 数据库视为应用程序当前状态的存储
状态和事件的关系?
- 无论状态如何变化,总是有一系列事件导致了这些变化。
- 尽管事件已经结束或者正在进行中,但这些事件发生是不争的事实。
- 关键的想法是:可变的状态与不可变事件的仅追加日志相互之间并不矛盾:它们是一体两面,互为阴阳的。所有变化的日志—— 变化日志(changelog),表示了随时间演变的状态。
用数学表达应用当前状态与事件流之间的关系:
- 应用状态是事件流对时间求积分得到的结果
- 而变更流是状态对时间求微分的结果
日志和数据库的关系?
- 事务日志记录了数据库的所有变更。
- 高速追加是更改日志的唯一方法。
- 从这个角度来看,数据库的内容其实是日志中记录最新值的缓存。
- 日志才是真相,数据库是日志子集的缓存,这一缓存子集恰好来自日志中每条记录与索引值的最新值。
日志压缩是连接日志与数据库状态之间的桥梁:它只保留每条记录的最新版本,并丢弃被覆盖的版本。
不可变事件的优点
不可变事件有什么优点?
- 之前的一笔交易有问题的话,可以新增一笔交易作为更正。
- 使用不可变事件的仅追加日志,诊断问题与故障恢复就要容易的多。
- 不可变的事件也包含了比当前状态更多的信息。可以研究用户的行为心理学(添加购物车,然后删除)。
从同一事件日志中派生多个视图
不变的事件日志中分离出可变状态的优点?
- 可以针对不同的读取方式,从相同的事件日志中衍生出几种不同的表现形式。效果就像一个流的多个消费者一样。
- 比如Kafka Connect能将来自Kafka的数据导出到各种不同的数据库与索引。
- 添加从事件日志到数据库的显式转换,能够使应用更容易地随时间演进:引入新功能,可以使用事件日志来构建一个单独的,能共存的新应用,无需修改现有系统;做到了隔离。
存储数据很麻烦吗?
- 如果你不需要担心如何查询与访问数据,那么存储数据通常是非常简单的。
- 模式设计、索引和存储引擎的许多复杂性,都是希望支持某些特定查询和访问模式的结果。
- 出于这个原因,通过将数据写入的形式与读取形式相分离,并允许几个不同的读取视图,你能获得很大的灵活性。这个想法有时被称为命令查询责任分离(command query responsibility segregation, CQRS)
数据库和设计模式基于一种谬论:
- 即数据必须以与查询相同的形式写入。
- 如果可以将数据从针对写入优化的事件日志转换为针对读取优化的应用状态,那么有关规范化和非规范化的争论就变得无关紧要了。
针对读取优化的例子?
- 推特主页时间线,它是特定用户关注的人群所发推特的缓存(类似邮箱)。
- 主页时间线是高度非规范化的,因为你的推文与你所有粉丝的时间线都构成了重复。
- 然而,扇出服务保持了这种重复状态与新推特以及新关注关系的同步,从而保证了重复的可管理性。
并发控制
事件溯源和变更数据捕获的最大缺点是什么?
- 事件日志的消费者通常是异步的,所以可能会出现这样的情况:用户会写入日志,然后从日志衍生视图中读取,结果发现他的写入还没有反映在读取视图中。(读己之写中讨论过)
如何解决异步与并发问题?
- 一种解决方案是将事件追加到日志时同步执行读取视图的更新。而将这些写入操作合并为一个原子单元需要事务,所以要么将事件日志和读取视图保存在同一个存储系统中,要么就需要跨不同系统进行分布式事务。
- 另外,从事件日志导出当前状态也简化了并发控制的某些部分。因为用户操作只需要在一个地方进行单词写入——把事件附加到日志中——这很容易原子化。
- 如果事件日志与应用状态以相同的方式分区,那么直接使用单线程日志消费者就不需要写入并发控制了。日志通过在分区中定义事件的序列顺序,消除了并发性的不确定性。
不变性的局限性
不使用事件溯源模型的系统可以依赖不变性吗?
- 各种数据库在内部使用不可变的数据结构或多版本数据来支持时间点快照
- Git,Mercurial和Fossil等版本控制系统也依靠不可变的数据来保存文件的版本历史记录。
永远保持所有变更的不变历史,在多大程度上是可行的?
- 取决于数据集的流失率。
- 一些工作负载主要是添加数据,很少更新或删除;它们很容易保持不变。
- 其他工作负载在相对较小的数据集上有较高的更新/删除率;在这些情况下,不可变的历史可能增至难以接受的巨大,碎片化可能成为一个问题,压缩与垃圾收集的表现对于运维的稳健性变得至关重要
- 可能有出于管理方面的原因需要删除数据的情况,尽管这些数据都是不可变的。比如用户注销账户,删除敏感信息。
需要彻底删除的日志该怎么办?
- 不能在日志中添加另一个事件来指明先前数据应该被视为删除; —— 你实际上是想改写历史,并假装数据从一开始就没有写入。
- Datomic管这个特性叫切除(excision),而Fossil版本控制系统有一个类似的概念叫避免(shunning)
真正删除数据是非常非常困难的:
- 因为副本可能存在于很多地方:例如,存储引擎,文件系统和SSD通常会向一个新位置写入,而不是原地覆盖旧数据。
- 而备份通常是特意做成不可变的,防止意外删除或损坏。
- 删除操作更多的是指“使取回数据更困难”,而不是指“使取回数据不可能”。
流处理
我们已经讨论了
- 流的来源(用户活动事件,传感器和写入数据库)
- 流如何传输(直接通过消息传送,通过消息代理,通过事件日志)
流能做什么?
- 你可以将事件中的数据写入数据库、缓存、搜索索引或类似的存储系统,然后能被其他客户端查询。如图11-5所示,这是数据库与系统其他部分所发生的变更保持同步的好方法 —— 特别是当流消费者是写入数据库的唯一客户端时。如“批处理工作流的输出”中所讨论的,它是写入存储系统的流等价物。
- 你能以某种方式将事件推送给用户,例如发送报警邮件或推送通知,或将事件流式传输到可实时显示的仪表板上。在这种情况下,人是流的最终消费者。
- 你可以处理一个或多个输入流,并产生一个或多个输出流。流可能会经过由几个这样的处理阶段组成的流水线,最后再输出(选项1或2)。
流处理与批处理的关系?
- 本章的剩余部分讨论:处理流以产生其他衍生流。处理这样的流的代码片段,被称为算子(operator) 或作业(job)。
- 它与我们在第十章中讨论过的Unix进程和MapReduce作业密切相关,数据流的模式是相似的:一个流处理器以只读的方式使用输入流,并将其输出以仅追加的方式写入一个不同的位置。
- 流处理中的分区和并行化模式也非常类似于第十章中介绍的MapReduce和数据流引擎,因此我们不再重复这些主题。基本的Map操作(如转换和过滤记录)也是一样的。
- 与批量作业相比的一个关键区别是,流不会结束。因此无法使用排序合并连接。容错也也需要改变。
流处理的应用
长期以来,流处理一直用于监控目的,如果某个事件发生,组织希望能得到警报。例如:
- 信用卡欺诈检测系统
- 股票交易系统
- 机器状态监控
- 军事情报系统
新时代,流处理有了新用途。
复合事件处理
什么是复合事件处理?
- 复合事件处理(complex event processing, CEP) 是20世纪90年代为分析事件流而开发出的一种方法,尤其适用于需要搜索某些事件模式的应用。
- 类似于正则匹配搜索特定字符模式,CEP 允许指定规则搜索某些事件模式。
- 通常使用高层次的声明式查询语言,比如SQL,或者图形用户界面,来描述应该检测到的事件模式。
- 这些查询被提交给处理引擎,该引擎消费输入流,并在内部维护一个执行所需匹配的状态机。当发现匹配时,引擎发出一个复合事件(complex event)(CEP因此得名),并附有检测到的事件模式详情。
CEP 与普通数据库的区别?
- CEP查询和数据之间的关系与普通数据库相比是颠倒的。
- 通常情况下,数据库会持久存储数据,并将查询视为临时的:当查询进入时,数据库搜索与查询匹配的数据,然后在查询完成时丢掉查询。
- CEP引擎反转了角色:查询是长期存储的,来自输入流的事件不断流过它们,搜索匹配事件模式的查询。
CEP 的常见实现?
- CEP的实现包括Esper,IBM InfoSphere Streams,Apama,TIBCO StreamBase和SQLstream。像Samza这样的分布式流处理组件,支持使用SQL在流上进行声明式查询。
流分析
CEP与流分析之间的边界是模糊的,但一般来说,分析往往对找出特定事件序列并不关心,而更关注大量事件上的聚合与统计指标 —— 例如:
- 测量某种类型事件的速率(每个时间间隔内发生的频率)
- 滚动计算一段时间窗口内某个值的平均值
- 将当前的统计值与先前的时间区间的值对比(例如,检测趋势,当指标与上周同比异常偏高或偏低时报警)
这种统计值聚合的时间间隔称为窗口(window)
流分析系统为什么使用概率算法?
- 流分析系统有时会使用概率算法,例如Bloom filter(我们在“性能优化”中遇到过)来管理成员资格,HyperLogLog 用于基数估计以及各种百分比估计算法(请参阅“实践中的百分位点“)。
- 概率算法产出近似的结果,但比起精确算法的优点是内存使用要少得多。
许多开源分布式流处理框架的设计都是针对分析设计的:
- 例如Apache Storm,Spark Streaming,Flink,Concord,Samza和Kafka Streams。
- 托管服务包括Google Cloud Dataflow和Azure Stream Analytics。
维护物化视图
什么是物化视图?
- 数据库的变更流可以用于维护衍生数据系统(如缓存、搜索索引和数据仓库),并使其与源数据库保持最新。
- 我们可以将这些示例视作维护物化视图(materialized view) 的一种具体场景:在某个数据集上衍生出一个替代视图以便高效查询,并在底层数据变更时更新视图。
流系统怎么维护物化视图?
- 与流分析场景不同的是,仅考虑某个时间窗口内的事件通常是不够的:构建物化视图可能需要任意时间段内的所有事件,除了那些可能由日志压缩丢弃的过时事件(请参阅“日志压缩“)。
- 实际上,你需要一个可以一直延伸到时间开端的窗口。
哪些系统支持?
- Samza和Kafka Streams支持这种用法,建立在Kafka对日志压缩的支持上
在流上搜索
为什么需要在流上搜索?
- 除了允许搜索由多个事件构成模式的CEP外,有时也存在基于复杂标准(例如全文搜索查询)来搜索单个事件的需求。
- 比如订阅了新闻 Feed,然后一直查看感兴趣的新闻。
- 做法是先把查询存储起来,然后文档从查询中流过。
消息传递和RPC
消息传递系统可以作为RPC的替代方案,即作为一种服务间通信的机制,比如在Actor模型中所使用的那样。
尽管这些系统也是基于消息和事件,但我们通常不会将其视作流处理组件:
- Actor框架主要是管理模块通信的并发和分布式执行的一种机制,而流处理主要是一种数据管理技术。
- Actor之间的交流往往是短暂的、一对一的;而事件日志则是持久的、多订阅者的。
- Actor可以以任意方式进行通信(包括循环的请求/响应模式),但流处理通常配置在无环流水线中,其中每个流都是一个特定作业的输出,由良好定义的输入流中派生而来。
也可以使用Actor框架来处理流。但是,很多这样的框架在崩溃时不能保证消息的传递,除非你实现了额外的重试逻辑,否则这种处理不是容错的。
时间推理
关于时间戳:
- 时间窗口的时间戳很棘手。
- 批处理中直接使用每个事件中的时间戳。
- 流处理:
- 使用事件的时间戳是确定的,再次运行也会有相同的结果。
- 很多流处理框架使用处理机器上的时间戳(处理时间)来确定窗口。
- 优点:简单;
- 缺点:如果事件的创建时间与处理时间有明显的延迟,则不可。
事件时间与处理时间
处理延迟带来的问题?
- 处理发生延迟是常见的。
- 导致无法预测消息顺序;
- 导致错误的数据(因为重启后,按照处理时间的话会有消息积压)
知道什么时候准备好了
用事件时间定义窗口的问题?
- 你永远也无法确定是不是已经收到了特定窗口的所有事件,还是说还有一些事件正在来的路上。
- 可能存在延迟导致的滞留事件。
对滞留事件的两种选择:
- 忽略这些滞留事件,因为在正常情况下它们可能只是事件中的一小部分。你可以将丢弃事件的数量作为一个监控指标,并在出现大量丢消息的情况时报警。
- 发布一个更正(correction),一个包括滞留事件的更新窗口值。你可能还需要收回以前的输出。
可以用消息表示时间戳已经完成:
- 生产者可以生产特殊消息:从现在开始,没有比 t 更早的消息了。
- 但是在很多机器上的生产者都在生成事件,消费者的难度较大。
你用的是谁的时钟?
事件的时间戳怎么来的?
- 取决于事件发生的机器
- 就会存在不可靠的时钟。
如何校正不准确的时钟? 要校正不正确的设备时钟,一种方法是记录三个时间戳:
- 事件发生的时间,取决于设备时钟
- 事件发送往服务器的时间,取决于设备时钟
- 事件被服务器接收的时间,取决于服务器时钟
当忽略掉网络的传输时间时,可以用时间戳三和二估算设备与服务器的时钟偏移。
窗口的类型
确定了事件的时间戳后,下面就是定义时间段的窗口。 滚动窗口(Tumbling Window) 滚动窗口有着固定的长度,每个事件都仅能属于一个窗口。例如,假设你有一个1分钟的滚动窗口,则所有时间戳在10:03:00
和10:03:59
之间的事件会被分组到一个窗口中,10:04:00
和10:04:59
之间的事件被分组到下一个窗口,依此类推。通过将每个事件时间戳四舍五入至最近的分钟来确定它所属的窗口,可以实现1分钟的滚动窗口。
跳动窗口(Hopping Window)
跳动窗口也有着固定的长度,但允许窗口重叠以提供一些平滑。例如,一个带有1分钟跳跃步长的5分钟窗口将包含`10:03:00`至`10:07:59`之间的事件,而下一个窗口将覆盖`10:04:00`至`10:08:59`之间的事件,等等。通过首先计算1分钟的滚动窗口(tunmbling window),然后在几个相邻窗口上进行聚合,可以实现这种跳动窗口。
滑动窗口(Sliding Window)
滑动窗口包含了彼此间距在特定时长内的所有事件。例如,一个5分钟的滑动窗口应当覆盖`10:03:39`和`10:08:12`的事件,因为它们相距不超过5分钟(注意滚动窗口与步长5分钟的跳动窗口可能不会把这两个事件分组到同一个窗口中,因为它们使用固定的边界)。通过维护一个按时间排序的事件缓冲区,并不断从窗口中移除过期的旧事件,可以实现滑动窗口。
会话窗口(Session window)
与其他窗口类型不同,会话窗口没有固定的持续时间,而定义为:将同一用户出现时间相近的所有事件分组在一起,而当用户一段时间没有活动时(例如,如果30分钟内没有事件)窗口结束。会话切分是网站分析的常见需求(请参阅“[分组](ch10.md#%E5%88%86%E7%BB%84)”)。
流连接
流连接比批处理的连接更有挑战性。 有三种:流-流连接,流-表连接,与表-表连接
流流连接(窗口连接)
举例:在搜索-点击场景中,需要根据 id关联两者的数据。
为了实现这种类型的连接,流处理器需要维护状态:
- 按会话ID索引最近一小时内发生的所有事件。
- 无论何时发生搜索事件或点击事件,都会被添加到合适的索引中,而流处理器也会检查另一个索引是否有具有相同会话ID的事件到达。
- 如果有匹配事件就会发出一个表示搜索结果被点击的事件;如果搜索事件直到过期都没看见有匹配的点击事件,就会发出一个表示搜索结果未被点击的事件。
流表连接(流扩充)
举例:用户事件和用户档案数据库相关联。被称为 扩充(enriching) 活动事件。
做法:
- 做法一,每个活动事件,在数据库中查询一次用户 ID
- 缺点:远程查询很慢,可能导致系统过载
- 做法二,把数据库副本加载到流处理器中。类似于 Map 侧连接中把数据集拷贝到内存中。
- 缺点:数据库内容会变化。
- 改进:通过变更数据捕获使得内存中与远程数据库同步更新。
流表连接类似于流流连接。区别是表的变更日志流相当于一个无限的窗口。
表表连接(维护物化视图)
举例:维护推特时间线。
我们需要一个时间线缓存:一种每个用户的“收件箱”,在发送推文的时候写入这些信息,因而读取时间线时只需要简单地查询即可。 要在流处理器中实现这种缓存维护,你需要推文事件流(发送与删除)和关注关系事件流(关注与取消关注)。流处理需要维护一个数据库,包含每个用户的粉丝集合。以便知道当一条新推文到达时,需要更新哪些时间线。 流处理过程的另一种视角是:它维护了一个连接了两个表(推文与关注)的物化视图:
SELECT follows.follower_id AS timeline_id,
array_agg(tweets.* ORDER BY tweets.timestamp DESC)
FROM tweets
JOIN follows ON follows.followee_id = tweets.sender_id
GROUP BY follows.follower_id
流连接直接对应于这个查询中的表连接。时间线实际上是这个查询结果的缓存,每当底层的表发生变化时都会更新。
连接的时间依赖性
这里描述的三种连接(流流,流表,表表)有很多共通之处:它们都需要流处理器维护连接一侧的一些状态(搜索与点击事件,用户档案,关注列表),然后当连接另一侧的消息到达时查询该状态。
连接时的结果与事件顺序的关系?
- 如果跨越流的事件顺序是未定的,则连接会变为不确定性的,这意味着你在同样输入上重跑相同的作业未必会得到相同的结果:当你重跑任务时,输入流上的事件可能会以不同的方式交织。
怎么解决?
- 在数据仓库中,这个问题被称为缓慢变化的维度(slowly changing dimension, SCD),通常通过对特定版本的记录使用唯一的标识符来解决:
- 每次状态数据变化都携带上一个新的标识符。查询时标记一下使用的哪个版本的标识符。
- 优点:连接变成确定性的。
- 缺点:数据库无法进行日志压缩,因为所有记录版本都会保留。
容错
批处理怎么做容错?
- 直接重试就行,因为输入是不可变的。
- 任务失败重试,每条输入记录都恰好处理一次。
什么是恰好处理一次?
- 尽管重启任务意味着实际上可能会多次处理记录,但输出中的可见效果看上去就像只处理过一次。
- 这个原则被称为恰好一次语义(exactly-once semantics),尽管等效一次(effectively-once) 可能会是一个更写实的术语
流处理怎么容错呢?
微批量与存档点
什么是微批次?
- 一个解决方案是将流分解成小块,并像微型批处理一样处理每个块。这种方法被称为微批次(microbatching),它被用于Spark Streaming 。
- 微批次也隐式提供了一个与批次大小相等的滚动窗口(按处理时间而不是事件时间戳分窗)。任何需要更大窗口的作业都需要显式地将状态从一个微批次转移到下一个微批次。
什么是存档点?
- Apache Flink则使用不同的方法,它会定期生成状态的滚动存档点并将其写入持久存储。
- 如果流算子崩溃,它可以从最近的存档点重启,并丢弃从最近检查点到崩溃之间的所有输出。
- 存档点会由消息流中的壁障(barrier) 触发,类似于微批次之间的边界,但不会强制一个特定的窗口大小。
上述功能实现了恰好一次吗?
- 在流处理框架的范围内,微批次与存档点方法提供了与批处理一样的恰好一次语义。
- 但是,只要输出离开流处理器(例如,写入数据库,向外部消息代理发送消息,或发送电子邮件),框架就无法抛弃失败批次的输出了。在这种情况下,重启失败任务会导致外部副作用发生两次,只有微批次或存档点不足以阻止这一问题。
原子提交再现
如何保证出现故障时表现出恰好处理一次的样子?
- 我们需要确保事件处理的所有输出和副作用当且仅当处理成功时才会生效。
- 这些影响包括:
- 发送给下游算子或外部消息传递系统(包括电子邮件或推送通知)的任何消息
- 任何数据库写入
- 对算子状态的任何变更
- 以及对输入消息的任何确认(包括在基于日志的消息代理中将消费者偏移量前移)。
怎么实现?
- 类似于分布式事务和两阶段提交。
- 也是有可能高效实现这种原子提交机制的。
- Google Cloud Dataflow 和 VoltDB 中使用了这种方法,Apache Kafka有计划加入类似的功能。
- 与XA不同,这些实现不会尝试跨异构技术提供事务,而是通过在流处理框架中同时管理状态变更与消息传递来内化事务。
- 事务协议的开销可以通过在单个事务中处理多个输入消息来分摊。
幂等性
我们的目标是丢弃任何失败任务的部分输出,以便能安全地重试,而不会生效两次。
- 分布式事务是实现这个目标的一种方式
- 而另一种方式是依赖幂等性(idempotence)
什么是幂等?
- 幂等操作是多次重复执行与单次执行效果相同的操作。
- K-V 操作设置某个值是幂等的;
- 递增计数器不幂等。
天生不幂等的操作,可以实现幂等吗?
- 可以
- 在使用来自Kafka的消息时,每条消息都有一个持久的、单调递增的偏移量。将值写入外部数据库时可以将这个偏移量带上,这样你就可以判断一条更新是不是已经执行过了,因而避免重复执行。
流处理中幂等的实现?
- Storm的Trident基于类似的想法来处理状态;
使用幂等性的前提?
- 依赖幂等性意味着隐含了一些假设:重启一个失败的任务必须以相同的顺序重播相同的消息(基于日志的消息代理能做这些事),处理必须是确定性的,没有其他节点能同时更新相同的值。
- 当从一个处理节点故障切换到另一个节点时,可能需要进行防护(fencing)(请参阅“领导者和锁”),以防止被假死节点干扰。
- 尽管有这么多注意事项,幂等操作是一种实现恰好一次语义的有效方式,仅需很小的额外开销。
失败后重建状态
为什么需要失败后重建状态?
- 任何需要状态的流处理 —— 例如,任何窗口聚合(例如计数器,平均值和直方图)以及任何用于连接的表和索引,都必须确保在失败之后能恢复其状态。
怎么实现?
- 一种选择是将状态保存在远程数据存储中,并进行复制。很慢。
- 另一种方法是在流处理器本地保存状态,并定期复制。然后当流处理器从故障中恢复时,新任务可以读取状态副本,恢复处理而不丢失数据。
常见实现?
- Flink定期捕获算子状态的快照,并将它们写入HDFS等持久存储中。
- Samza和Kafka Streams通过将状态变更发送到具有日志压缩功能的专用Kafka主题来复制状态变更,这与变更数据捕获类似。
- VoltDB通过在多个节点上对每个输入消息进行冗余处理来复制状态。
一定要复制状态么?
- 非也
- 如果可以从输入流重建。比如重播输入事件/变更流。
本章小结
比较两种消息代理:
AMQP/JMS风格的消息代理
代理将单条消息分配给消费者,消费者在成功处理单条消息后确认消息。消息被确认后从代理中删除。这种方法适合作为一种异步形式的RPC(另请参阅“[消息传递中的数据流]”),例如在任务队列中,消息处理的确切顺序并不重要,而且消息在处理完之后,不需要回头重新读取旧消息。
基于日志的消息代理
代理将一个分区中的所有消息分配给同一个消费者节点,并始终以相同的顺序传递消息。并行是通过分区实现的,消费者通过存档最近处理消息的偏移量来跟踪工作进度。消息代理将消息保留在磁盘上,因此如有必要的话,可以回跳并重新读取旧消息。
变更数据捕获 就流的来源而言,我们讨论了几种可能性:用户活动事件,定期读数的传感器,和Feed数据(例如,金融中的市场数据)能够自然地表示为流。我们发现将数据库写入视作流也是很有用的:我们可以捕获变更日志 —— 即对数据库所做的所有变更的历史记录 —— 隐式地通过变更数据捕获,或显式地通过事件溯源。日志压缩允许流也能保有数据库内容的完整副本。
我们区分了流处理中可能出现的三种连接类型: 流流连接 两个输入流都由活动事件组成,而连接算子在某个时间窗口内搜索相关的事件。例如,它可能会将同一个用户30分钟内进行的两个活动联系在一起。如果你想要找出一个流内的相关事件,连接的两侧输入可能实际上都是同一个流(自连接(self-join))。 流表连接 一个输入流由活动事件组成,另一个输入流是数据库变更日志。变更日志保证了数据库的本地副本是最新的。对于每个活动事件,连接算子将查询数据库,并输出一个扩展的活动事件。 表表连接 两个输入流都是数据库变更日志。在这种情况下,一侧的每一个变化都与另一侧的最新状态相连接。结果是两表连接所得物化视图的变更流。 最后,我们讨论了在流处理中实现容错和恰好一次语义的技术。与批处理一样,我们需要放弃任何失败任务的部分输出。然而由于流处理长时间运行并持续产生输出,所以不能简单地丢弃所有的输出。相反,可以使用更细粒度的恢复机制,基于微批次、存档点、事务或幂等写入。