博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink SQL 功能解密系列 —— 流计算“撤回(Retraction)”案例分析
阅读量:6451 次
发布时间:2019-06-23

本文共 5630 字,大约阅读时间需要 18 分钟。

什么是retraction(撤回)

通俗讲retract就是传统数据里面的更新操作,也就是说retract是流式计算场景下对数据更新的处理

方式。
首先来看下流场景下的一个词频统计列子。

没有retract会导致最终结果不正确↑:

retract发挥的作用

下面再分享两个双十一期间retract保证数据正确性的业务case:

case1: 菜鸟物流订单统计

同一个订单的商品在运输过程中,因为各种原因,物流公司是有可能从A变成B的。为了统计物流公司承担的订单数目,菜鸟团队使用blink计算的retraction机制进行变key汇总操作。

-- TT source_table 数据如下:order_id      tms_company0001           中通0002           中通0003           圆通-- SQL代码create view dwd_table as select    order_id,    StringLast(tms_company)from source_tablegroup by order_id;create view dws_table as select     tms_company,    count(distinct order_id) as order_cntfrom dwd_table group by tms_company此时结果为:tms_company  order_cnt中通          2圆通          1-----------------------之后又来了一条新数据 0001的订单 配送公司改成 圆通了。这时,第一层group by的会先向下游发送一条 (0001,中通)的撤回消息,第二层group by节点收到撤回消息后,会将这个节点 中通对应的 value减少1,并更新到结果表中;然后第一层的分桶统计逻辑向下游正常发送(0001,圆通)的正向消息,更新了圆通物流对应的订单数目,达到了最初的汇总目的。order_id      tms_company0001           中通0002           中通0003           圆通0001           圆通写入ADS结果会是(满足需求)tms_company  order_cnt中通          1圆通          2

case2: 天猫双十一购物车加购统计:

双11爆款清单与知名综艺IP“火星情报局”跨界合作,汪涵、撒贝宁、陶晶莹等大咖主持加盟,杭州、长沙两地联播,成功打造为“双11子IP”与“双11购物风向标”,树立电商内容综艺化、娱乐化创新典范,为长线模式探索打下基础。

首次深度联动线下场景,在银泰门店落地爆款清单超级大屏,商场人流截停率28%,用户互动时间占营业时间的40%。

选品模式创新,打造最全维度爆款清单:TOP2000性价比爆款+TOP100小黑盒推荐(新品清单)+TOP200买手天团推荐(人群/场景/地域 清单)

核心业务指标

  • 加购金额
  • 加购件数
  • 加购UV

业务计算逻辑

  • 来自TT的数据要进行去重;
  • 以投放场景和购物车维度进行分组,获取每个分组的最后一条(最新)数据;
  • 以投放场景和小时为维度进行分组,统计 加购金额,加购件数和加购UV 业务指标;

业务BlinkSQL代码

--Blink SQL--********************************************************************----Comment: 天猫双11官方爆款清单统计计算--********************************************************************--CREATE TABLE dwd_mkt_membercart_ri(    cart_id      BIGINT, -- '购物车id',    sku_id       BIGINT, -- '存放商品的skuId,无sku时,为0',    item_id      BIGINT, -- '外部id:商品id或者skuid',    quantity     BIGINT, -- '购买数量',    user_id      BIGINT, -- '用户id',    status       BIGINT, -- '状态1:正常-1:删除',    gmt_create   VARCHAR, -- '属性创建时间',    gmt_modified VARCHAR, -- '属性修改时间',    biz_id VARCHAR, -- 投放场景,    start_time VARCHAR, -- 投放开始时间    end_time VARCHAR, -- 投放结束时间    activity_price_time VARCHAR, -- 活动开始时间    price VARCHAR, -- 商品价格    dbsync_operation BIGINT -- 时间自动用于排序) WITH (    type='tt'    -- 其他信息省略);--groub by 方式重复,防止TT重发CREATE VIEW distinct_dwd_mkt_membercart_ri AS SELECT    cart_id,    sku_id,    item_id,    quantity,    user_id,    status,    gmt_create,    gmt_modified,    biz_id,     start_time,     end_time,     activity_price_time,    price,     dbsync_operationFROM    dwd_mkt_membercart_riGROUP BY        cart_id,    sku_id,    item_id,    quantity,    user_id,    status,    gmt_create,    gmt_modified,    biz_id,     start_time,     end_time,     activity_price_time,    price,     dbsync_operation;-- 每个投放和购物车数据的最后一条CREATE VIEW tmp_dwd_mkt_membercart_ri AS SELECT     biz_id as biz_id,    LAST_VALUE(user_id,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as user_id,    LAST_VALUE(item_id,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as item_id,     LAST_VALUE(sku_id,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as sku_id,    LAST_VALUE(start_time,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as start_time,    LAST_VALUE(end_time,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as end_time,    LAST_VALUE(activity_price_time,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as activity_price_time,    LAST_VALUE(price,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as price,    LAST_VALUE(quantity,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as quantity,    LAST_VALUE(status,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as status,    LAST_VALUE(gmt_modified,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as gmt_modified,    LAST_VALUE(gmt_create,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as gmt_create,    LAST_VALUE(dbsync_operation,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as dbsync_operationFROM distinct_dwd_mkt_membercart_riWHERE DATE_FORMAT(gmt_create , 'yyyy-MM-dd HH:mm:ss' , 'yyyyMMdd')=DATE_FORMAT(gmt_modified , 'yyyy-MM-dd HH:mm:ss' , 'yyyyMMdd')GROUP BY cart_id,biz_id;--存储小时维度的计算结果CREATE TABLE result_mkt_membercart_ri_eh(    id VARCHAR,     data_time VARCHAR,      all_preheating_cart_cnt BIGINT, -- 预热期间的 加购件数    all_preheating_cart_alipay BIGINT,-- 预热期间的 加购金额    eh_all_preheating_cart_uv BIGINT,-- 预热期间的 加购UV    all_cart_cnt BIGINT, -- 投放期间的 加购件数    all_cart_alipay BIGINT, -- 投放期间的 加购金额    eh_all_cart_uv BIGINT, -- 投放期间的 加购UV    primary key(id,data_time)) WITH (    type = 'custom',     -- 其他信息省略    timeDiv='hour') ;--统计小时维度的 xx xx xx 业务指标INSERT INTO result_mkt_membercart_ri_eh SELECT     biz_id,    DATE_FORMAT(gmt_create , 'yyyy-MM-dd HH:mm:ss' , 'yyyyMMddHH') data_time,     `sum`(case when gmt_modified<=COALESCE(activity_price_time,end_time) then quantity else 0 end) as all_preheating_cart_cnt,    `sum`(case when gmt_modified<=COALESCE(activity_price_time,end_time) then quantity*CAST(price AS BIGINT) else 0 end) as all_preheating_cart_alipay,    `sum`((case when gmt_modified<=COALESCE(activity_price_time,end_time) then user_id end)) eh_all_preheating_cart_uv,    `sum`(quantity) as all_cart_cnt,    `sum`(quantity*CAST(price AS BIGINT)) as all_cart_alipay,    `count`(distinct user_id) eh_all_cart_uvFROM tmp_dwd_mkt_membercart_ri WHERE    status>0 GROUP BY  biz_id ,DATE_FORMAT(gmt_create , 'yyyy-MM-dd HH:mm:ss' , 'yyyyMMddHH') ;

上面case2天猫业务场景里面的加购金额统计来说,当每个投放场景的购物车的数据发生变化时候,就意味着上面【CREATE VIEW tmp_dwd_mkt_membercart_ri 】中的LAST_VALUE发生变化,最外层的sum统计【INSERT INTO result_mkt_membercart_ri_eh 】就要将前一条的LAST_VALUE【VALUE-1】撤回,用update的新LAST_VALUE【VALUE-2】进行求和统计,这样blink就需要有一种机制将VALUE-1进行撤回,利用【VALUE-2】进行计算,这种机制我们称为retract。

retract 实现原理参考

转载地址:http://vbwzo.baihongyu.com/

你可能感兴趣的文章
【转】正则基础之——神奇的转义
查看>>
团队项目测试报告与用户反馈
查看>>
MyBatis(1)——快速入门
查看>>
对软件工程课程的期望
查看>>
CPU高问题排查
查看>>
Mysql中文字符串提取datetime
查看>>
CentOS访问Windows共享文件夹的方法
查看>>
IOS 与ANDROID框架及应用开发模式对比一
查看>>
由中序遍历和后序遍历求前序遍历
查看>>
JQUERY Uploadify 3.1 C#使用案例
查看>>
coursera 北京大学 程序设计与算法 专项课程 完美覆盖
查看>>
firewall 端口转发
查看>>
wndows make images
查看>>
FS系统开发设计(思维导图)
查看>>
我学习参考的网址
查看>>
DEDE自带的采集功能,标题太短的解决方法
查看>>
easyui的combotree以及tree,c#后台异步加载的详细介绍
查看>>
1、串(字符串)以及串的模式匹配算法
查看>>
[Processing]点到线段的最小距离
查看>>
考研随笔2
查看>>