最常用的五种流式ETL模式!
时间:2025-11-04 00:10:43 出处:域名阅读(143)
1970 年代的最常种流许多计算概念已经过时,但ETL (Extract-Transform-Load)及其最近的模式 anagram shuffle ELT并非如此,它在目的最常种流地与飞行中操纵数据。ETL 和 ELT 传统上是模式计划的批处理操作,但随着对始终在线、最常种流始终最新的模式数据服务的需求成为常态,在数据流上操作的最常种流实时 ELT 是许多组织的目标——如果不是现实的话。
在实际使用中,模式ETL 中的最常种流“T”代表由原始操作组装而成的各种模式。在本文中,模式我们将探索这些操作并查看如何将它们实现为 SQL 语句的最常种流示例。
使用 SQL 语句进行转换?模式是的!SQL 将声明性语言的最常种流强大和简洁性与任何使用代码或数据的人的普遍技能相结合。与您可能用作替代的企商汇模式几乎任何编程语言不同,SQL 的最常种流普及要归功于将近 50 年的寿命——计算行业中的几乎每个人都曾在某个时候使用过它。SQL 的强大功能和普遍性意味着它无处不在,甚至在构建最新开发人员技术和服务的公司中也是如此。当通过函数增强时,SQL 变得更加强大。
管道模式
大多数 ETL 管道都适合一种或多种模式。Decodable 的连接 - 流 - 管道抽象意味着您可以选择将所有内容构建到单个管道中,或者根据需要将复杂的转换分解为由流、跨团队、区域和用例连接的可重用管道网络。
1:过滤器
过滤器从流中删除不需要的记录,删除与 SQL where子句中的“规则”不匹配的记录。过滤器通常用于抑制敏感记录以确保合规性,或减少目标系统上的处理负载或存储需求。
复制1-- Filter only records pertaining to the application 2 3insert into application_events 4 5select * from http_eventswhere hostname = app.decodable.co 6 7 8 9-- Filter only records that modify the inventory1011insert into inventory_updates1213select * from http_eventswhere hostname = api.mycompany.com and1415path like /v1/inventory% and16 method in ( POST, PUT, DELETE, PATCH )1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16. 2:路线
Route 模式从一个或多个输入流创建多个输出流,根据一组规则将记录定向到正确的目的地。此模式实际上由多个过滤器组成,b2b供应网它们都可以查看每个输入记录,但每个过滤器仅传输与该特定目的地的规则匹配的那些记录。
复制1-- Route security-related HTTP events 2 3insert into security_events 4 5select * from http_eventswhere path like /login% or 6 7path like /billing/cc% 8-- Route app-related HTTP events 910insert into application_events1112select * from http_eventswhere hostname = app.decodable.co1314-- Route requests to Customer Success if it looks like the user needs help1516insert into cs_alerts1718select * from http_events1920where response_code between 500 and 599 or -- any server failure2122( path = /signup and response_code != 200 ) or -- failed to sign up for any reason1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22. 3:变换
转换管道通过修改输入记录来创建输出记录。通常这将导致 1:1 传输,但在某些情况下,输出来自多个输入记录,因此可能存在 1:many 关系。在这里,我们将调用三个专门的转换:
变换:提取

解析输入记录,从输入记录中提取数据并将其用作丰富派生输出记录的基础。
复制1-- Parse timestamp and action 2 3insert into user_events 4 5select 6 7to_date(fields[ts], YYYY-MM-DDTHH:MI:SS) as ts,
8 fields[user_id] as user_id,
9 fields[path] as path, case fields[method] when GET then read10 when POST, PUT then modify11 when DELETE then delete12 end as actionfrom ( select13 grok(
14 body, \[${ISO8661_DATETIME:ts} ${DATA:method} "${PATH:path}" uid:${DATA:user_id}15 ) as fields from http_event16)1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.变换:归一化

传入的数据记录通常需要针对模式进行规范化,以便目标系统处理它们。缺少的字段可能需要填充默认值,可能需要删除可选字段,并强制执行数据类型。
复制1-- Cleanse incoming data for downstream processes
2
3insert into sensor_readings
4
5select
6
7cast(ifnull(sensor_id, 0) as bigint) as sensor_id, lower(trim(name)) as name, cast(`value` as bigint) as reading
8
9from raw_sensor_readings1.2.3.4.5.6.7.8.9.转换:匿名化

在目标系统不需要信息来完成处理的情况下,匿名管道只是出于合规、监管或隐私原因而消除了敏感字段。
复制1-- Anonymize SSNs and zip codes2insert into user_events_masked3select4user_id,
5 username, overlay(ssn placing * from 1 for 12) as ssn, substring(zip_code from 1 for 2) as zip_code_1,
6action7from user_events1.2.3.4.5.6.7. 4:聚合
聚合管道通常使用 SQL 窗口函数将传入记录分组到存储桶中(通常基于时间),在这些存储桶上执行聚合操作。Count、Min、Max、Avg、WordPress模板Sum 是典型的运算符,但还有很多。
.
2 3insert into site_activity 4 5select 6 7window_start,
8 window_end,
9 path,
10status, count(1) as `count`1112from table(
1314tumble( table http_events, descriptor(_time),
15 interval 10 seconds16)
17)group by window_start, window_end, path, status1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17. 5:触发
我们的最终模式是触发器。与几乎所有其他模式不同,触发器输出记录可能与输入记录的模式几乎没有重叠,因为它表明已在一个或多个输入记录上检测到一组条件,并作为结果输出警报。输出模式可以表示检测到的条件、要采取的行动或两者兼而有之。
复制1-- Build hourly usage data for a Stripe integration on the output stream 2 3insert into stripe_product_usage 4 5select 6 7window_start as _time,
8 customer_id, abcd1234 as price_id sum(bytes_sent) / 1024 / 1024 as mb_sentfrom table(
9 tumble( table document_downloads, descriptor(_time),
10 interval 1 hour11)
12)group by window_start, customer_idhaving mb_sent > 10241.2.3.4.5.6.7.8.9.10.11.12.