gogogo
管理员
管理员
  • UID25
  • 粉丝0
  • 关注0
  • 发帖数1377
阅读:444回复:0

Kafka 流数据 SQL 引擎 -- KSQL

楼主#
更多 发布于:2023-12-08 10:35


KSQL 是什么?

KSQL 是一个 Kafka 的 SQL 引擎,可以让我们在流数据上持续执行 SQL 查询,支持强大的流处理操作,包括聚合、连接、窗口、会话等等。。
例如,有一个用户点击流的 topic,和一个可持续更新的用户信息表,使用 KSQL 对点击流数据、用户表进行建模,并把二者连接起来,之后 KSQL 会持续查询这个 topic 的数据流,并放入表中。
KSQL 是开源的、分布式的,具有高可靠、可扩展、实时的特性。

KSQL是Apache Kafka的流式SQL引擎,它大大降低了流处理世界的门槛。
KSQL实现了非常有效的功能:使用数据领域中大多数社区已知的语义SQL实时读取,编写和转换数据!




KSQL 解决了什么问题?

KSQL 的主要目的是为了降低流处理的操作门槛,为 Kafka 提供了简单而完善的 SQL 交互接口
之前,为了使用流处理引擎,需要熟悉一些开发语言,例如 Java, C#, Python,Kafka 的流处理引擎作为 Kafka 项目的一部分,是一个 Java 库,需要使用者有熟练的 Java 技能
相对的,KSQL 只需要使用者熟悉 SQL 即可,这使得 Kafka Stream 能够进入更广阔的应用领域,例如商业分析,熟悉 SQL 的分析人员就可以操作,而不用一定是开发人员。

KSQL能做什么呢?
KSQL是开源的(Apache 2.0许可),并构建在Kafka的Streams API之上。这意味着它支持各种强大的流处理操作,包括过滤,转换,聚合,连接,窗口和会话。
通过这种方式,您可以实时检测异常和欺诈活动,监控基础架构和物联网设备,执行基于会话的用户活动分析,执行实时ETL等等。
从通用的角度来看,当数据流中需要动态地进行转换,集成和分析时,你应该使用KSQL。
实时监控和实时分析
KSQL的一个用途是定义实时计算的自定义业务级度量标准,您可以从中监视和提醒。例如,展示视频游戏特许经营权的并发在线玩家数量(“我们的玩家是否参与?最新游戏扩展是否增加了游戏时间?”)或报告电子商务网站的废弃购物车数量(“我们的在线商店的最新更新是否让客户更容易结账?”)
另一个用途是在KSQL中为您的业务应用程序定义正确性概念,然后检查它们是否在生产中运行时满足此要求
KSQL可以直接从原始事件流中定义适当的度量标准,无论这些是从数据库更新,应用程序,移动设备还是任何其他类型生成的:
在线数据集成和丰富
公司完成的大多数数据处理属于数据丰富领域:从几个数据库中获取数据,转换数据,将其连接在一起,并将其存储到键值存储,搜索索引,缓存或其他数据服务系统。
KSQL与Kafka连接器一起用于Oracle,MySQL,Elasticsearch,HDFS或S3等系统时,可以实现从批量数据集成到实时数据集成的转变。
如下面的KSQL查询所示,您可以使用流表连接来丰富包含存储在表中的元数据的数据流,或者在将流加载到另一个系统之前对个人身份信息(PII)进行简单过滤。
安全和异常检测
KSQL查询可以将事件流转换为数字时间序列聚合,这些聚合使用Kafka-Elastic连接器注入系统(如Elastic),然后在实时仪表板(如Grafana)中可视化。安全用例通常与监视和分析类似。在这里,您不是要监控应用程序行为或业务行为,而是在寻找欺诈,滥用,垃圾邮件,入侵或其他不良行为的模式。
KSQL提供了一种简单而复杂的实时方法来定义这些模式并查询实时流。
流和数据库
当然,KSQL的使用案例比我在这篇短篇文章中所展示的更多,例如监控车队(“未来几天卡车是否需要预测性维护?”)或分布式物联网设备和家庭自动化传感器(“为什么二楼的温度会上升?”),或者实时分析Oracle中的数据库更新。一些有创意的用户甚至使用KSQL 实时分析赛车遥测数据。
但是,让我们先从这些具体的例子后退一步。在我看来,更令人兴奋的是,通过将数据库从内向外转换,KSQL将流(kafka)和数据库(Oracle、MySQL和Friends)的世界结合在一起。在KSQL中,类似于Kafka的Streams API,有两个核心数据抽象:流和表。它们允许您以流或表格式处理数据。这一点很重要,因为在实践中,几乎每个想要实现的实时用例都需要流和表。




KSQL 的应用场景有哪些?



1. 实时监控 实时分析


CREATE TABLE error_counts AS SELECT error_code, count(*)FROM monitoring_stream WINDOW TUMBLING (SIZE 1 MINUTE) WHERE type = 'ERROR'
KSQL 可以让我们对应用产生的事件流自定义测量指标,如日志事件、数据库更新事件等等
例如在一个 web app 中,每当有新用户注册时都需要进行一些检查,如欢迎邮件是否发送了、一个新的用户记录是否创建了、信用卡是否绑定了……,这些点可能分布在多个服务中,这时可以使用 KSQL 对事件流进行统一的监控分析


2. 安全和异常检查

比如对于欺诈、入侵等非法行为,可以定义出检查模型,通过 KSQL 对实时数据流进行检测

CREATE STREAM possible_fraud AS SELECT card_number, count(*) FROM authorization_attempts WINDOW TUMBLING (SIZE 5 SECONDS) GROUP BY card_number HAVING count(*) > 3;
KSQL 可以把事件流转换成数值化的时间序列数据,然后通过 Kafka-Elastic connector 导入到 Elastic,并通过 Grafana UI 视图化的展示出来


KSQL 的核心概念



1. STREAM 流

stream 是一个无限的结构化数据序列,这个数据是不可修改的,新的数据可以进入流中,但流中的数据是不可以被修改和删除的
stream 可以从一个 kafka topic 中创建,或者从已存在的流或表中派生出来

CREATE STREAM pageviews (viewtime BIGINT, userid VARCHAR, pageid VARCHAR) WITH (kafka_topic='pageviews', value_format=’JSON’);


2. TABLE 表

table 是一个流或者其他表的视图,是流中数据的一个集合,table 中的数据是可变的,可以插入,也可以修改、删除
table 同样可以从一个 kafka topic 中创建,或者从已存在的流或表中派生出来

CREATE TABLE users (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid  VARCHAR) WITH (kafka_topic='users', value_format='DELIMITED');




KSQL server 进程用来执行请求,多个 KSQL server 构成一个集群,可以动态增加 KSQL server 的数量来提示处理能力
KSQL server 具有自动容错能力,如果一个失败,其他的会自动接替他的工作
KSQL 有一个命令行终端,输入的命令会通过 REST API 发送到集群,通过命令行,我们可以检查所有流和表、执行查询、查看请求的状态信息等等
大体上看,KSQL 的构成包括:

  1. Kafka 的 Streams API

  2. 分布式 SQL 引擎

  3. REST API


小结

KSQL 是 confluent 刚刚发布的,目前是开发预览版,很快会发布正式版。
KSQL 极大方便了 Kafka 中流数据的处理,虽然还不能在产品中使用,可以提前了解一下。



https://cloud.tencent.com/developer/news/383098
游客


返回顶部