博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
第七章:druid.io实践分享之realtime+kafka 一
阅读量:2396 次
发布时间:2019-05-10

本文共 4747 字,大约阅读时间需要 15 分钟。

目前使用druid已经有3年了,在整个国内互联网广告行业了解下来,我们算较早使用的团队。

其优势太明显了,就是快,绝大多数的场景都可以在毫秒或秒级响应(特别是数据量足够大的情况下,还能保持良好的速度)。

其二就是提供的功能特别能解决我们业务上的问题。
其三整个系统相对来说还是比较封闭的,减少了不必要的依赖,json的灵活性提供了更好的二次开发的潜力。
其四整体源码风格是函数式,可以使之前面向对象开发的工程师进行提升(在后期源码剖析再去好好体会)。

但如何发挥到最好、最稳定,需要更多细节的调整。

我会从整体部署、实时节点、历史节点、broker节点来依次介绍。

从整体部署来看:

第一:druid.io 属于IO和CPU双重密集型引擎,所以对内存、CPU、硬盘IO都有特定要求,特别提醒,如果资金充足,可以直接上SSD(内存和CPU同理)。

第二:部署的整个过程需要多次练习,并记录成流程规范,因为整体集群涉及的层次较多,如果不进行流程规范化,会导致因人不同,使线上操作出现问题

第三:相应的监控机制,要配合到位,后期在整体部署中,逐步进行实施自动化方式

第四:各节点类型服务启动时,没有明确的先后顺序

整体完整结构如下:

这里写图片描述

目前作者成立的 提供了更加完善的解决方案,有兴趣的可以参考下。

首先我们来分析下实时节点。

对实时节点(realtime)我考虑从两个方面来分享:
一个方面:数据本身要求
一个方面:realtime运行阶段出现的一些问题(如:segment堆积等)

首先我会对背景进行介绍下,我们是一个CPA广告联盟,有impression、click、conversion等类型的数据。对我们而言conversion数据最重要,因为我们是conversion来计费的。所以就有一个高标准conversion类型数据不能丢失且不能重复,对另外两类型数据在一定范围内可以接受少数丢失和少数重复。我们是基于6.0的版本来分析的。

初期druid的实时数据获取的方式是通过realtime节点结合kafka的方式,所以提供了不丢失数据的优势,如果对实时数据的各方面要求很高的前提下,realtime节点结合kafka的方式还是会带来诸多问题(这也是新版本中已经不推荐使用的原因,个人见解任何系统只要使用了kafka集群都会有这样的情况发生,只要努力去解决就好)。

场景如下:
场景一:realtime保证了不少数据,但没保证不多数据
场景二:realtime节点是单点特性,这样一旦一个节点出问题,对数据敏感的话,会立马发现问题
场景三:realtime节点没有提供安全关闭的逻辑(官方是提供直接kill方式)
场景四:realtime在某些场景中,还是会掉数据
另外也存在少量的bug,这个可以忽略下(在6.0版本之后都得到了修复)。我将一一对这些场景的产生及解决方式进行阐述。

场景一:

确实只要你的数据进入了kafka集群,数据是真的不丢失(当然如果kafka的硬盘已经满了这种情况,一般都会用监控的方式去规避吧)。但是为了支持多个realtime节点,kafka里的topic必须进行分区,不然无法整体提升并发的能力。如下图:

这里写图片描述
并且官方文档给出的kafka使用有一个参数
"auto.commit.enable": "false"
也就是交由realtime节点自行管理。这个参数设置成false后,realtime会在把一个时间段的数据持久化之后,才会给kafka集群发一个commit命令。

正是因为这个逻辑,所以在线上运行阶段,当consumer组里某个一个消费者出现某种问题,会导致没有及时对topic消费进行响应(但是已经正常消费数据了),这时候kafka会对分区进行重新调整,导致其它消费者会根据上次的offset进行消费数据,从而最终导致数据重复。如下图:

这里写图片描述

场景二:

总体来说也是由于场景一带来的副作用,由于分区导致,每一个partition被一个cousumer消费,所以多个realtime节点就是单点特性,kill任意一台realtime节点反过来也会激活kafka集群对partition再分配的处理。所以会出现这样的场景一旦启动realtime节点,就永久无法正常shutdown(只能用kill方式)。并且多realtime节点一起启动时,在启动的过程中,每个节点启动间隔不能太长(原因大家可以想想,所以一般会使用一个远程启动脚本方式,统一进行远程启动)。首先我先启动realtime1节点,如下图:

这里写图片描述
3个partition先分配给realtime1,分别从offset=300、200、100开始消费。过1分钟后再启动realtime2节点,这样会触发partition动态分配事件,启动时假设realtime1已经消费到offset=312、222、133,但还没有提交commit(所以kafka那边的offset值是不变的)
如下图:
这里写图片描述
假设重新分配后,partition-3分配给了realtime2,那么realtime2节点将从offset=100的位置开始进行消费,如下图:
这里写图片描述
这就是realtime节点间启动间隔不能太长的原因。

场景三:

跟场景二有关,就是没有提供安全关闭的命令(只能kill),这会让实际操作过程中,有种不安全感,虽然kill掉后,可以通过kafka来恢复,但是在这种场景下,还是会导致数据重复,例如:当realtime已经对数据持久化了,但还没来得及返回offset告知kafka集群(是将 auto.commit.enable = false),这时realtime节点被kill掉后。如下图:

这里写图片描述
kill命令发生在持久化之后,commit之前。如下图:
这里写图片描述
再重启realtime节点就会引发数据重复消费。
可能大家会发现,如果kill命令发生在内存Cache到持久化之间,会不会重复?当auto.commit.enable = false情况下,这块的数据是会从kafka里恢复回来的。

解决方案

其实场景一、二和三,总结起来其实是一个类别的情况,需要整体一起考虑去解决。

后面版本中,官方了给出了一个方案类似于双机热备,简单的说,就是两个realitme节点对应一个partition,但我没有去尝试过,这里主要介绍下我们在当时如何解决这样的场景的(特别是kafka、realtime集群在扩容或者删除节点的时候,也会带来这样的问题)。

首先给出一个最简单方式,在采集层做数据的backup,如果你的segment是设置的一个小时,那么就按小时进行check。发现异常,就用backup数据进行修复还原。

我们尝试下来后,发现客户在使用的过程中,会反馈为什么上一个小时的数据有变化,给用户的体验不好(当然后期我们可以做到提前发消息通知用户告知),站在用户的角度出发希望数据不要经常变动,这样用户会质疑你的系统,而对我们开发人员来说,可以让数据补进当前这个时间段里,总体来说没有让数据丢失。其次按每小时check后台逻辑复杂,人工干预工作量很大。再次就是当我们开发在进行快速迭代和上线的时候,势必带来相关数据修复工作,影响效率(最常见的就是增加纬度信息时)。

后来我们讨论了后,重新订了一个新方案

  1. 增加安全关闭逻辑
  2. realtime集群增加一层Cache,用于重复数据过滤
  3. 改auto.commit.enable = true,并将zk同步时间稍微缩
  4. 定期记录partition的offset值,用于回滚

注:这些方式不一定是最通用最完美的方式,但在当时一定是适合我们需要的方式。

以下是安全关闭部分

  1. 启动时 RealtimeManager 为每个datasouce, 启动一个线程, 建立一个FireChief
  2. FireChief 持有RealtimePlumber 和Firehose
  3. FireChief消费数据逻辑
    plumber.startJob();
    while(notSafeCloseFlag && firehose.hasNext())
    定时 persistent
    plumber.finishJob();
  4. plumber.finishJob逻辑
    basePersistent 下面的datasource下的文件夹表示sink, 现在系统中一个小时一个Sink
    一个plumber 持有多个sink.
    Sink持有多个FireHydrant, 但只有一个是激活状态, 用于接收数据
  5. 一个FireHydrant保存在0, 1, 2 文件夹下。
    对应一个IncrementalIndex和一个Segment
    index 表示内存中的对象
    segment 表示持久化对象
    hasSwapped表示十分已经持久化
    swap操作: 建立segment, 将index设置成null
  6. stop逻辑, 见流程图
    RealtimeManager层次图:
    这里写图片描述
    stop关闭流程图:
    这里写图片描述

关于Cache层解决方式,很多人都提过有单点故障,但这个时候需要具体分析了,首先我们这里主要解决是conversion问题,就数量级来说conversion比impression、click要小太多了;其次Cache的conversion不是永久的(属于一个区间内),不会出现数据量过于膨胀;再次逻辑简单因为conversion只有主键存储及判断,不会负载很重。要而且经过了时间证明,运行了两年多零故障率。结构如下图:

这里写图片描述

后面两点,属于小调整,这样的好处:

一是重复数据量较小,在面对每天10多亿数据的流转过程中,不影响体验
二是回滚offset的范围可控

通过此方式,我们进行不断的调整、测试、验证和细节优化,最终做到了整体数据丢失率99.99%(由测试部门给出的),conversion数据丢失率是0的效果。并且也让人工干预的操作减少很多。

场景四:

数据量进来的速率高于消费的速率时,就会引发数据丢失,主要原因就是windowPeriod这个参数设置,假设segment是设置成1个小时,其windowPeriod设置成PT10m,当前时间是13点,那么就表示在13点10分之前(包含10分)还可以继续消费12点-13点之间的数据,大家应该知道在海量数据传输时,不可能在13点整点就能消费完成12点-13点之间的数据,所以druid在配置时,提供了一个缓冲区间,我们在整个运行过程中出现过两次,第一次是就是速率突然暴涨导致,第二次是因为数据的时间导致。

这样的场景在实际的运营过程中,不会常碰到,但不排除恶意攻击或者刷流量的情况。

一般解决方式:
第一:流控报警
第二:时间重置

流控主要是定时监控Kafka集群里的offset的差值,发现差值变大就要报警。这时的策略就是数据采集服务层降低写入速度(这部分最后也可以变成自动化方式进行),让realtime节点集群的消费得到缓解。然后快速定位到底是达到了当前消费极限还是其它问题。

时间重置通过读取windowPeriod的值,来进行换算,将此记录放到下一个时间段里,
举例说明:windowPeriod设置成10分钟,当前的click数据的时间是17:59:44,当前系统时间是18:10:23,这时满足18:10:23-17:59:44>10分钟,将对此click数据的时间重置成18:10:23,进入下一个时间段。
这种方式,特别适合自动化补数,提高效率。

以上就是数据本身的要求所带来的全部内容。

下一篇我将介绍realtime结合kafka在运行阶段会出现场景及解决方式。

你可能感兴趣的文章
hiho一下 第四十四周 题目1 : 博弈游戏·Nim游戏
查看>>
poj2299 Ultra-QuickSort(线段树计数问题)
查看>>
hdu4565 So Easy!(矩阵快速幂)
查看>>
poj2528 Mayor's posters(线段树,离散化)
查看>>
线段树多lazy-tag(两个)
查看>>
hdu4578(三个更新操作,三个求值操作)
查看>>
并查集(初级)小结
查看>>
Treap
查看>>
相似图片搜索——感知哈希算法
查看>>
编译原理 词法分析
查看>>
计算机系统结构 计算机指令集结构
查看>>
计算机系统结构 输入/输出系统
查看>>
信息安全技术及应用 常规加密技术
查看>>
02-线性结构1 两个有序链表序列的合并
查看>>
HDU 1080 DP LCS
查看>>
HDU 3308 线段树+区间合并
查看>>
ASP.NET 入手页面控件及事件触发
查看>>
HDU 4123 树状DP+RMQ
查看>>
vim配置文件(持续更新)
查看>>
Fedora 16下添加终端快捷键
查看>>