Skip to main content

One post tagged with "Engineering"

View All Tags

image.png

❤️ 友情提示:代码演进较快,请注意文档的时效性哦!

引言

Databend 将存储引擎抽象成一个名为 Table 的接口,源码位于 query/catalog/src/table.rs

Table 接口定义了 readappendalteroptimizetruncate 以及 recluster 等方法,负责数据的读写和变更。解释器(interpreter)通过调用 Table trait 的方法生成物理执行的 pipeline

通过实现 Table 接口的方法,可以定义 Databend 的存储引擎,不同的实现对应不同的引擎。

Storage 主要关注 Table 接口的具体实现,涉及表的元信息,索引信息的管理,以及与底层 IO 的交互。

image.png

join order 的重要性

Join order 是指在执行SQL查询时,决定多个表进行 join 的顺序。它是数据库查询优化的一个重要方面,对查询性能和效率有着重要的影响, 不同的 join order 对性能可能有数量级的影响。

优化器优化 join order 的核心流程

  1. join plan 枚举
  2. 根据统计信息估算结果的大小 (cardinality estimation)
  3. 把 2 中的结果带入到代价模型计算枚举 plan 的 代价 (cost model)

本文中我们只关心第一步 join plan enumeration, 也就是 join reorder 算法。

join reorder 算法

贪心启发式算法

当需要 join 的表都数量过多时(通常超过10个),适合用贪心算法,其优势在于能够较快的找到还不错的 join order.

核心思想:从一张表拓展到 N 张表,每次选出使当前代价最小的一张表,加入到 join tree中,构建出 left-deep tree.

贪心算法也有很多拓展,主要拓展点是围绕如果 避免局部最优以及产生 bushy tree

枚举算法(top-down & bottom-up)

主流的两种

  • 基于规则变换的 Top-down 枚举,可以结合 Top-down cascasde 框架通过记忆化的方式来实现
  • 基于 DP 的 bottom-up 枚举, 典型代表 DPhyp 算法,其优势在于可以高效的产生 bushy tree

一般情况下,数据库系统会把贪心和枚举有效的结合,从而对任意数量的表 join 都能够在合理的时间内得到有效的 join order

Databend join reorder 现状

databend 优化器基于 Rule 进行优化,每条 Rule 通过 pattern 来匹配 plan 中的 sub-tree。主要分为两个阶段,启发式优化和基于 cascades 框架的代价优化,两个阶段共用一套 Rule。

在启发式阶段优化结束后,会对优化后的 plan 执行 DPhyp 算法来尝试得到最优的 join order, 如果 Dphyp 优化失败,会在 CBO 中找到最优的 left-deep tree. (CBO 中不尝试进行 bushy tree 优化,因为如果 Dphyp 已经优化失败,那么尝试在 CBO 中进行 bushy tree 优化,搜索空间很有可能爆炸,如 tpcds 64)

Databend 目前没有支持贪心算法 (下一阶段的 roadmap 会做相关支持来处理极端情况下的 case),首先会利用 dphyp 算法来得到最优解,如果 dphyp 失败(query 中存在不适合 dphyper 算法的 pattern),会在 cascades 框架中利用基于规则变换的 Top-down 枚举得到 left-deep tree. 如果表的数量过多,如超过十个,会在 dphyp 算法中放弃部分搜索空间来做 tradeoff。

Dphyp 的核心定义及算法

hypergraph

一个超图是一个由节点集合 V 和超边集合 E 组成的二元组 H = (V,E),其中:

  1. V是非空节点集合。
  2. E是超边集合,超边是V的非空子集(u ⊂ V)和(v ⊂ V)的无序对(u,v),并且满足额外条件 u∩v = ∅。

有了超图就可以描述多节点之间连接。

image.png

对于上图,它们的 join condition 是 R1.a + R2.b + R3.c = R4.d + R5.e + R6.f 所以 hyperedge 就是 {R1, R2, R3} — {R4, R5, R6}

csg-cmp-pair

csg: connected-subgraph (连通子图)

cmp: connected-complement (连通互补对)

如果两个 csg 之间没有交集,且存在超边连接,其中一个就是另一个的 cmp, 二者构成 csg-cmp-pair. 算法的核心就是通过递归无重复的枚举出所有的 csg-cmp-pair, 找出代价最小的包含所有点的 csg-cmp-pair.

algorithm

算法核心:hypergraph 中的节点是有序的,节点从后往前迭代(递减),每个节点只考虑其自身及其之后(序列号更大)的节点,找到可能的连通子图及其连通互补图,构成 csg-cmp-pair, 计算并更新出其代价,当迭代到最小的节点后,会得到包含所有点的 csg-cmp-pair, 算法结束。

算法流程

  1. EmitCsg: 寻找 {v} 的互补连通子图
    • a. 如果找到, EmitCsgCmp
    • b. EnumerateCmpRec:扩展互补连通子图
      • 如果扩展后的互补连通子图可以与 {v} 形成 csg-cmp-pair, 则 EmitCsgCmp
      • 回到 b,继续扩展
  2. EnumerateCsgRec: 扩展 {v}
    • a. 得到扩展后的 {v’}, 对 {v’} 执行 1
    • b. 回到2,继续扩展

核心代码和数据结构定义可参考:

(https://github.com/datafuselabs/databend/blob/main/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs)

image.png

Kafka Connect 介绍

Kafka Connect 是一个用于在 Apache Kafka® 和其他数据系统之间可扩展且可靠地流式传输数据的工具。通过将数据移入和移出 Kafka 进行标准化,使得快速定义连接器以在 Kafka 中传输大型数据集变得简单,可以更轻松地构建大规模的实时数据管道。

image.png

我们使用 Kafka Connector 读取或写入外部系统、管理数据流以及扩展系统,所有这些都无需开发新代码。Kafka Connect 管理与其他系统连接时的所有常见问题(Schema 管理、容错、并行性、延迟、投递语义等),每个 Connector 只关注如何在目标系统和 Kafka 之间复制数据。

Kafka 连接器通常用来构建 data pipeline,一般有两种使用场景:

  • 开始和结束的端点: 例如,将 Kafka 中的数据导出到 Databend 数据库,或者把 Mysql 数据库中的数据导入 Kafka 中。

  • 数据传输的中间媒介: 例如,为了把海量的日志数据存储到 Elasticsearch 中,可以先把这些日志数据传输到 Kafka 中,然后再从 Kafka 中将这些数据导入到 Elasticsearch 中进行存储。Kafka 连接器可以作为数据管道各个阶段的缓冲区,将消费者程序和生产者程序有效地进行解耦。

Kafka Connect 分为两种:

  • Source Connect: 负责将数据导入 Kafka。
  • Sink Connect: 负责将数据从 Kafka 系统中导出到目标表。

image.png

Databend Kafka Connect

Kafka 目前在 Confluent Hub 上提供了上百种 Connector,比如 Elasticsearch Service Sink Connector, Amazon Sink Connector, HDFS Sink 等,用户可以使用这些 Connector 以 Kafka 为中心构建任意系统之间的数据管道。现在我们也为 Databend 提供了 Kafka Connect Sink Plugin,这篇文章我们将会介绍如何使用 MySQL JDBC Source Connector 和 Databend Sink Connector 构建实时的数据同步管道。

image.png

启动 Kafka Connect

本文假定操作的机器上已经安装 Apache Kafka,如果用户还没有安装,可以参考 Kafka quickstart 进行安装。

Kafka Connect 目前支持两种执行模式:Standalone 模式和分布式模式。

启动模式

Standalone 模式

在 Standalone 模式下,所有的工作都在单个进程中完成。这种模式更容易配置以及入门,但不能充分利用 Kafka Connect 的某些重要功能,例如,容错。我们可以使用如下命令启动 Standalone 进程:

bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]

第一个参数 config/connect-standalone.properties 是 worker 的配置。这其中包括 Kafka 连接参数、序列化格式以及提交 Offset 的频率等配置:

bootstrap.servers=localhost:9092
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

后面的配置是指定要启动的 Connector 的参数。上述提供的默认配置适用于使用 config/server.properties 提供的默认配置运行的本地集群。如果使用不同配置或者在生产部署,那就需要对默认配置做调整。但无论怎样,所有 Worker(独立的和分布式的)都需要一些配置:

  • bootstrap.servers: 该参数列出了将要与 Connect 协同工作的 broker 服务器,Connector 将会向这些 broker 写入数据或者从它们那里读取数据。你不需要指定集群的所有 broker,但是建议至少指定 3 个。

  • key.converter 和 value.converter: 分别指定了消息键和消息值所使用的的转换器,用于在 Kafka Connect 格式和写入 Kafka 的序列化格式之间进行转换。这控制了写入 Kafka 或从 Kafka 读取的消息中键和值的格式。由于这与 Connector 没有任何关系,因此任何 Connector 可以与任何序列化格式一起使用。默认使用 Kafka 提供的 JSONConverter。有些转换器还包含了特定的配置参数。例如,通过将 key.converter.schemas.enable 设置成 true 或者 false 来指定 JSON 消息是否包含 schema。

  • offset.storage.file.filename: 用于存储 Offset 数据的文件。

这些配置参数可以让 Kafka Connect 的生产者和消费者访问配置、Offset 和状态 Topic。配置 Kafka Source 任务使用的生产者和 Kafka Sink 任务使用的消费者,可以使用相同的参数,但需要分别加上‘producer.’和‘consumer.’前缀。bootstrap.servers 是唯一不需要添加前缀的 Kafka 客户端参数。

distributed 模式

分布式模式可以自动平衡工作负载,并可以动态扩展(或缩减)以及提供容错。分布式模式的执行与 Standalone 模式非常相似:

bin/connect-distributed.sh config/connect-distributed.properties

不同之处在于启动的脚本以及配置参数。在分布式模式下,使用 connect-distributed.sh 来代替 connect-standalone.sh。第一个 worker 配置参数使用的是 config/connect-distributed.properties 配置文件:

bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
#offset.storage.partitions=25
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
#status.storage.partitions=5
offset.flush.interval.ms=10000

Kafka Connect 将 Offset、配置以及任务状态存储在 Kafka Topic 中。建议手动创建 Offset、配置和状态的 Topic,以达到所需的分区数和复制因子。如果在启动 Kafka Connect 时尚未创建 Topic,将使用默认分区数和复制因子来自动创建 Topic,这可能不适合我们的应用。在启动集群之前配置如下参数至关重要:

  • group.id: Connect 集群的唯一名称,默认为 connect-cluster。具有相同 group id 的 worker 属于同一个 Connect 集群。需要注意的是这不能与消费者组 ID 冲突。

  • config.storage.topic: 用于存储 Connector 和任务配置的 Topic,默认为 connect-configs。需要注意的是这是一个只有一个分区、高度复制、压缩的 Topic。我们可能需要手动创建 Topic 以确保配置的正确,因为自动创建的 Topic 可能有多个分区或自动配置为删除而不是压缩。

  • offset.storage.topic: 用于存储 Offset 的 Topic,默认为 connect-offsets。这个 Topic 可以有多个分区。

  • status.storage.topic: 用于存储状态的 Topic,默认为 connect-status。这个 Topic 可以有多个分区。

需要注意的是在分布式模式下需要通过 rest api 来管理 Connector。

比如:

GET /connectors – 返回所有正在运行的connector名。
POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
GET /connectors/{name} – 获取指定connetor的信息。
GET /connectors/{name}/config – 获取指定connector的配置信息。
PUT /connectors/{name}/config – 更新指定connector的配置信息。

配置 Connector

MySQL Source Connector

  1. 安装 MySQL Source Connector Plugin

这里我们使用 Confluent 提供的 JDBC Source Connector。

从 Confluent hub 下载 Kafka Connect JDBC 插件并将 zip 文件解压到 /path/kafka/libs 目录下。

  1. 安装 MySQL JDBC Driver

因为 Connector 需要与数据库进行通信,所以还需要 JDBC 驱动程序。JDBC Connector 插件也没有内置 MySQL 驱动程序,需要我们单独下载驱动程序。MySQL 为许多平台提供了 JDBC 驱动程序。选择 Platform Independent 选项,然后下载压缩的 TAR 文件。该文件包含 JAR 文件和源代码。将此 tar.gz 文件的内容解压到一个临时目录。将 jar 文件(例如,mysql-connector-java-8.0.17.jar),并且仅将此 JAR 文件复制到与 kafka-connect-jdbc jar 文件相同的 libs 目录下:

cp mysql-connector-j-8.0.32.jar /opt/homebrew/Cellar/kafka/3.4.0/libexec/libs/
  1. 配置 MySQL Connector

/path/kafka/config 下创建 mysql.properties 配置文件,并使用下面的配置:

name=test-source-mysql-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/mydb?useSSL=false
connection.user=root
connection.password=123456
#mode=timestamp+incrementing
mode=incrementing
table.whitelist=mydb.test_kafka
poll.interval.ms=1000
table.poll.interval.ms=3000
incrementing.column.name=id
#timestamp.column.name=tms
topics=test_kafka

针对配置我们这里重点介绍 modeincrementing.column.name ,和 timestamp.column.name 几个字段。Kafka Connect MySQL JDBC Source 提供了三种增量同步模式:

  • incrementing
  • timestamp
  • timestamp+incrementing
  1. 在 incrementing 模式下,每次都是根据 incrementing.column.name 参数指定的列,查询大于自上次拉取的最大 id:
SELECT * FROM mydb.test_kafka
WHERE id > ?
ORDER BY id ASC

这种模式的缺点是无法捕获行上更新操作(例如,UPDATE、DELETE)的变更,因为无法增大该行的 id。

  1. timestamp 模式基于表上时间戳列来检测是否是新行或者修改的行。该列最好是随着每次写入而更新,并且值是单调递增的。需要使用 timestamp.column.name 参数指定时间戳列。

需要注意的是时间戳列在数据表中不能设置为 Nullable.

在 timestamp 模式下,每次都是根据 timestamp.column.name 参数指定的列,查询大于自上次拉取成功的 gmt_modified:

SELECT * FROM mydb.test_kafka
WHERE tms > ? AND tms < ?
ORDER BY tms ASC

这种模式可以捕获行上 UPDATE 变更,缺点是可能造成数据的丢失。由于时间戳列不是唯一列字段,可能存在相同时间戳的两列或者多列,假设在导入第二条的过程中发生了崩溃,在恢复重新导入时,拥有相同时间戳的第二条以及后面几条数据都会丢失。这是因为第一条导入成功后,对应的时间戳会被记录已成功消费,恢复后会从大于该时间戳的记录开始同步。此外,也需要确保时间戳列是随着时间递增的,如果人为的修改时间戳列小于当前同步成功的最大时间戳,也会导致该变更不能同步。

  1. 仅使用 incrementing 或 timestamp 模式都存在缺陷。将 timestamp 和 incrementing 一起使用,可以充分利用 incrementing 模式不丢失数据的优点以及 timestamp 模式捕获更新操作变更的优点。需要使用 incrementing.column.name 参数指定严格递增列、使用 timestamp.column.name 参数指定时间戳列。
SELECT * FROM mydb.test_kafka
WHERE tms < ?
AND ((tms = ? AND id > ?) OR tms > ?)
ORDER BY tms, id ASC

由于 MySQL JDBC Source Connector 是基于 query-based 的数据获取方式,使用 SELECT 查询来检索数据,并没有复杂的机制来检测已删除的行,所以不支持 DELETE 操作。可以使用基于 log-based 的 [Kafka Connect Debezium]。

后面的演示中会分别演示上述模式的效果。更多的配置参数可以参考 MySQL Source Configs

Databend Kafka Connector

  1. 安装 OR 编译 Databend Kafka Connector

可以从源码编译得到 jar 或者从 release 直接下载。

git clone https://github.com/databendcloud/databend-kafka-connect.git & cd databend-kafka-connect
mvn -Passembly -Dmaven.test.skip package

databend-kafka-connect.jar 拷贝至 /path/kafka/libs 目录下。

  1. 安装 Databend JDBC Driver

Maven Central 下载最新的 Databend JDBC 并拷贝至 /path/kafka/libs 目录下。

  1. 配置 Databend Kafka Connector

/path/kafka/config 下创建 mysql.properties 配置文件,并使用下面的配置:

name=databend
connector.class=com.databend.kafka.connect.DatabendSinkConnector

connection.url=jdbc:databend://localhost:8000
connection.user=databend
connection.password=databend
connection.attempts=5
connection.backoff.ms=10000
connection.database=default

table.name.format=default.${topic}
max.retries=10
batch.size=1
auto.create=true
auto.evolve=true
insert.mode=upsert
pk.mode=record_value
pk.fields=id
topics=test_kafka
errors.tolerance=all

auto.createauto.evolve 设置成 true 后会自动建表并在源表结构发生变化时同步到目标表。关于更多配置参数的介绍可以参考 Databend Kafka Connect Properties

测试 Databend Kafka Connect

准备各个组件

  1. 启动 MySQL
version: '2.1'
services:
postgres:
image: debezium/example-postgres:1.1
ports:
- "5432:5432"
environment:
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
mysql:
image: debezium/example-mysql:1.1
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=123456
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
  1. 启动 Databend
version: '3'
services:
databend:
image: datafuselabs/databend
volumes:
- /Users/hanshanjie/databend/local-test/databend/databend-query.toml:/etc/databend/query.toml
environment:
QUERY_DEFAULT_USER: databend
QUERY_DEFAULT_PASSWORD: databend
MINIO_ENABLED: 'true'
ports:
- '8000:8000'
- '9000:9000'
- '3307:3307'
- '8124:8124'
  1. 以 standalone 模式启动 Kafka Connect,并加载 MySQL Source Connector 和 Databend Sink Connector:
./bin/connect-standalone.sh config/connect-standalone.properties config/databend.properties config/mysql.properties
[2023-09-06 17:39:23,128] WARN [databend|task-0] These configurations '[metrics.context.connect.kafka.cluster.id]' were supplied but are not used yet. (org.apache.kafka.clients.consumer.ConsumerConfig:385)
[2023-09-06 17:39:23,128] INFO [databend|task-0] Kafka version: 3.4.0 (org.apache.kafka.common.utils.AppInfoParser:119)
[2023-09-06 17:39:23,128] INFO [databend|task-0] Kafka commitId: 2e1947d240607d53 (org.apache.kafka.common.utils.AppInfoParser:120)
[2023-09-06 17:39:23,128] INFO [databend|task-0] Kafka startTimeMs: 1693993163128 (org.apache.kafka.common.utils.AppInfoParser:121)
[2023-09-06 17:39:23,148] INFO Created connector databend (org.apache.kafka.connect.cli.ConnectStandalone:113)
[2023-09-06 17:39:23,148] INFO [databend|task-0] [Consumer clientId=connector-consumer-databend-0, groupId=connect-databend] Subscribed to topic(s): test_kafka (org.apache.kafka.clients.consumer.KafkaConsumer:969)
[2023-09-06 17:39:23,150] INFO [databend|task-0] Starting Databend Sink task (com.databend.kafka.connect.sink.DatabendSinkConfig:33)
[2023-09-06 17:39:23,150] INFO [databend|task-0] DatabendSinkConfig values:...

Insert

Insert 模式下我们需要使用如下的 MySQL Connector 配置:

name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/mydb?useSSL=false
connection.user=root
connection.password=123456
#mode=timestamp+incrementing
mode=incrementing
table.whitelist=mydb.test_kafka
poll.interval.ms=1000
table.poll.interval.ms=3000
incrementing.column.name=id
#timestamp.column.name=tms
topics=test_kafka

在 MySQL 中创建数据库 mydb 和表 test_kafka:

CREATE DATABASE mydb;
USE mydb;

CREATE TABLE test_kafka (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));
ALTER TABLE test_kafka AUTO_INCREMENT = 10;

在插入数据之前,databend-kafka-connect 并不会收到 event 进行建表和数据写入。

插入数据:

INSERT INTO test_kafka VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"cloud","test for databend"),
(default,"spare tire","24 inch spare tire");

源表端插入数据后,

Databend 目标端的表就新建出来了:

image.png

同时数据也会成功插入:

image.png

Support DDL

我们在配置文件中 auto.evolve=true,所以在源表结构发生变化的时候,会将 DDL 同步至目标表。这里我们正好需要将 MySQL Source Connector 的模式从 incrementing 改成 timestamp+incrementing,需要新增一个 timestamp 字段并打开 timestamp.column.name=tms 配置。我们在原表中执行:

alter table test_kafka add column tms timestamp;

并插入一条数据:

insert into test_kafka values(20,"new data","from kafka",now());

到目标表中查看:

image.png

发现 tms 字段已经同步至 Databend table,并且该条数据也已经插入成功:

image.png

Upsert

修改 MySQL Connector 的配置为:

name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/mydb?useSSL=false
connection.user=root
connection.password=123456
mode=timestamp+incrementing
#mode=incrementing
table.whitelist=mydb.test_kafka
poll.interval.ms=1000
table.poll.interval.ms=3000
incrementing.column.name=id
timestamp.column.name=tms
topics=test_kafka

主要是将 mode 改为 timestamp+incrementing并添加 timestamp.column.name 字段。

重启 Kafka Connect。

在源表中更新一条数据:

update test_kafka set name="update from kafka test" where id=20;

到目标表中可以看到更新的数据:

image.png

总结

通过上面的内容可以看到 Databend Kafka Connect 具有以下特性:

  • Table 和 Column 支持自动创建: auto.createauto-evolve 的配置支持下,可以自动创建 Table 和 Column,Table name 是基于 Kafka topic name 创建的;

  • Kafka Shemas 支持: Connector 支持 Avro、JSON Schema 和 Protobuf 输入数据格式。必须启用 Schema Registry 才能使用基于 Schema Registry 的格式;

  • 多个写入模式: Connector 支持 insertupsert 写入模式;

  • 多任务支持: 在 Kafka Connect 的能力下,Connector 支持运行一个或多个任务。增加任务的数量可以提高系统性能;

  • 高可用: 分布式模式下可以自动平衡工作负载,并可以动态扩展(或缩减)以及提供容错能力。

同时,Databend Kafka Connect 也能够使用原生 Connect 支持的配置,更多配置参考 Kafka Connect Sink Configuration Properties for Confluent Platform

image.png

对于 Databend 这样复杂的数据库服务端程序,往往需要支持大量的可配置选项,以帮助运维人员根据实际使用需要管理和调优系统。

Databend 目前支持三种配置方式:命令行、环境变量和配置文件,优先级依次递减。

  • 一般情况下,推荐使用配置文件来记录和管理各种配置。
  • 对于 K8S 集群,为了灵活变更部分配置(比如,特性开关),使用环境变量可能是更优雅的形式。
  • 命令行则用于调整本地环境下的少数冲突配置。

Databend Query 中的映射

对于 databend-query ,不管是什么形式的配置,其配置选项几乎可以看作是代码的扁平化树形映射,即基本符合代码中「配置域」+「配置项」的逻辑。

  • 环境变量和配置文件中,利用 serfig 将代码嵌套展开,使用 _ 做为分隔符。
  • 命令行中稍有不同:一方面,分隔符使用 -;另一方面,部分命令行选项的名称中没有绑定配置域。

为了更好理解这里的映射关系,我们可以深入到具体一项配置,下面将围绕 admin_api_address 这个配置项展开。

  • 在环境变量上,需要使用 QUERY_ADMIN_API_ADDRESSQUERY 表征这个配置所处的域,而 ADMIN_API_ADDRESS 是具体的配置项。
  • 在配置文件中,通常是使用 toml 来进行配置。 [query] 表征配置所处的域,admin_api_address 为具体的配置项。
[query]
...
# Databend Query http address.
# For admin RESET API.
admin_api_address = "0.0.0.0:8081"
...
  • 命令行中需要使用 --admin-api-address 进行配置,这一项没有绑定「配置域」。如果是配置 --storage-s3-access-key-id ,那么「storage」+ 「s3」构成配置域,「access-key-id」是具体的配置项。

在了解如何对 admin_api_address 进行配置后,让我们进入到配置相关的代码,进一步查看映射关系的代码形式(位于 src/query/config/src/config.rs)。

pub struct Config {
...

// Query engine config.
#[clap(flatten)]
pub query: QueryConfig,

...
}

/// Query config group.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Args)]
#[serde(default, deny_unknown_fields)]
pub struct QueryConfig {
...

#[clap(long, default_value = "127.0.0.1:8080")]
pub admin_api_address: String,

...
}

因为代码中使用了嵌套的层级结构,最上层是 Config,而 admin_api_addresspub query: QueryConfig 中的一个配置项,经过 serfig 处理后,需要使用 QUERY 或者 [query] 表征其所处的域,配置项就还是 admin_api_address

而命令行中具体的配置项名称和默认值会受到 #[clap(long = "<long-name>", default_value = "<value>")] 控制),clap 会接管配置:

  • admin_api_address 就变成了 --admin-api-address
  • --storage-s3-access-key-id 而言,其实际的代码层级是 Config -> StorageConfig -> S3StorageConfig -> access_key_id,字段之上有标注 #[clap(long = "storage-s3-access-key-id", default_value_t)] ,所以需要使用 --storage-s3-access-key-id 进行配置。

Databend Meta 中的映射

databend-meta 的配置文件和命令行逻辑与 databend-query 是基本一致的。但是环境变量是通过 serfig 内置的 serde-env 自行定义的映射关系(但同样可以尝试按「配置域」+「配置项」进行理解)。

同样具体到单独的某项配置来看一下,这里以 log_dir 为例。

  • 在环境变量上,需要使用 METASRV_LOG_DIRMETASRV 表征这个配置所处的域,而 LOG_DIR 是具体的配置项。
  • 而在配置文件中,这一配置项作用于全局,只需要:
log_dir                 = "./.databend/logs1"
  • 在命令行中当然也直接 --log-dir 进行配置。

让我们通过代码来解构其映射,代码位于 src/meta/service/src/configs/outer_v0.rs

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Parser)]
#[clap(about, version = &**METASRV_COMMIT_VERSION, author)]
#[serde(default)]
pub struct Config {
...
/// Log file dir
#[clap(long = "log-dir", default_value = "./.databend/logs")]
pub log_dir: String,
...
}

配置文件和命令行参数相关的配置项是由 Config 结构体管理的,逻辑与 databend-query 一致,就不再赘述。

而环境变量的配置项是由 ConfigViaEnv 结构体进行处理的,如下:

/// #[serde(flatten)] doesn't work correctly for env.
/// We should work around it by flatten them manually.
/// We are seeking for better solutions.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct ConfigViaEnv {
...
pub metasrv_log_dir: String,
...
}

Config 之间的映射关系位于 impl From<Config> for ConfigViaEnvimpl Into<Config> for ConfigViaEnv 这两个部分。对于 metasrv_log_dir 而言,就是映射到前面的 log_dir 字段。

由「3306π」社区主办,「Databend」参与协办的「数据库朋友圈」活动于 9 月 16 日在北京360大厦成功举办!该活动汇集了数据库领域的资深专家和企业家,共同探讨数据库技术变革。

下午,Databend Labs 联合创始人张雁飞作为「Serverless 数仓技术与挑战」专题的演讲嘉宾进行了分享。

image.png

主题: 「Serverless 数仓技术与挑战」

演讲嘉宾: 张雁飞

嘉宾介绍: Databend Labs 联合创始人。前青云数据库团队负责人、开源 Databend 项目主要负责人。

演讲大纲: 传统数仓在扩展性、成本和管理等方面具有局限性。在本次分享中,我们将介绍一种新型的 Serverless 数仓技术,这种技术不仅能够解决传统数仓的痛点,还能显著提升性能并降低成本。此外,我们还将讨论 Serverless 数仓所面临的技术挑战。

  • 传统数仓的局限性
  • 理想的 Serverless 数仓架构
  • 如何实现 Serverless 数仓以及有哪些挑战

以下为本次演讲的精彩内容:

当今(2023)大数据分析新问题

大数据分析面临的新问题

  • 近 5 年生产了 ~90% 数据

    • image.png
    •   根据 IDC 的统计和预测,近 5 年来产生了大约 90% 的数据。这里用的单位是 zttabytes(ZB),1024PB = 1EB,1024EB = 1ZB 是一个非常庞大的数字。过去的大数据架构难以适应当下的数据规模,亟需变更,怎么样才能做到弹性和 Serverless 拓展,从而匹配业务增长?
  • 计算和存储成本高昂

    • image.png
    •   在企业的IT基础设施中,云厂商提供的计算和存储服务导致了高昂的成本。经测算,如果为 EC2 实例创建总容量为 500TB 的 SSD(GP2)存储,每个月在 EBS 服务上将会花费超过 7 万 5 千美元。如何才能在保证低廉成本的同时满足业务性能需求,提供经济、高效能的大数据架构?
  • 大数据平台越来越复杂

    • image.png
    •   上图是知名投资机构 a16z 绘制的统一数据基础设施架构全景图,不难看出,庞大的数据量和复杂的数据需求,导致大数据平台也变得日益复杂,需要数十种工具紧密协作,对于产品生态愈发严苛的要求。如何无缝与其他工具进行集成,并且复用现有基础设施?

大数据架构,能否完美实现

上述这些问题,为大数据架构提出了新的要求,特别是在以下几个维度上,能否做到“完美”实现

  • 存储成本:极致低廉
  • 计算控制:极致精细,支持算子在 Lambda 函数中运行
  • 集群控制:极致弹性,按需伸缩、启停
  • 架构特点:all-in-one platform,完全 Serverless 化
  • 未来规划:为未来的云端大数据做好准备

传统数仓架构 vs. 弹性数仓架构

在进入到架构对比之前,我们先来看一个成本估测公式:Cost = Resource * Time ,也就是成本大致可以用资源与时间的乘积进行测算。

传统数仓架构

image.png

传统数仓往往采用 Shared-Nothing 架构,存储、计算一体化设计,弹性相对较弱。而且由于调度上采用资源固定(Fixed-Set)式调度策略,资源控制粒度粗,也会带来更多的成本。

对应到成本估测公式上,在时间一定的情况下,由于耗费资源数量较大,成本将会居高不下。

弹性数仓架构

image.png

弹性数仓则采用 Shared-Storage 架构,底层可以使用对象存储,真正做到存储、计算分离,从而支持实时弹性扩容和缩容以及资源按需(Workload-Based)式调度,资源控制粒度更细。

对应到成本估测公式上,相较于传统数仓,弹性数仓的成本将会显著降低:存储成本可以按实际使用量折算,不需要为冗余的存储进行服务;而计算成本则根据业务需要实时调度,按需启停,按量计费,无需保有大量空闲计算资源。

Databend: 新一代云数仓架构设计

新一代云数仓的架构新在哪里?影响现代云数仓架构设计的因素和挑战都有哪些?这一部分将会给你答案。

新一代云数仓

现有数仓的局限

ClickHouse 是一款流行的开源数仓,以性能卓越著称。采用向量化计算技术,细节优化非常到位。具有 Pipeline 处理器和调度器以及 MergeTree + Wide-Column 存储引擎,单机性能非常强悍。

缺点: 分布式能力弱,无法应对复杂分析,运维复杂度高,不是为云设计。

Snowflake 则是一款云数仓,支持多租户,存储、计算分离。基于对象存储,存储介质便宜。弹性能力非常强悍,面向云架构设计。

缺点: 单机性能一般,比较依赖分布式集群能力。

Databend = ClickHouse + Snowflake + Rust

前面列出的是目前在开源和商业化领域领先的两款数仓产品,看上去性能和弹性无法兼得,想要低成本和弹性计算是不是就必须放弃单节点的极致优化呢?我们来看一下 Databend 交出的答卷。

  • 借鉴 ClickHouse 向量化计算,提升单机计算性能。
  • 借鉴 Snowflake 存储、计算分离思想,提升分布式计算能力。
  • 借鉴 Git,MVCC 列式存储引擎,支持 Insert / Read / Delete / Update / Merge 等操作,以及 Time Travel 等高级特性。
  • 全面支持 HDFS 、基于云的对象存储、IPFS 等 20 多种存储协议。
  • 基于便宜的对象存储也能方便的做实时性分析。
  • 完全使用 Rust 研发(超过 33 万行代码),研发第一天就在 Github 开源。
  • 高弹性 + 强分布式,致力于解决大数据分析成本和复杂度问题。

云数仓架构设计

Databend Cloud 架构全景图

Databend Cloud 是基于开源云原生数仓项目 Databend 打造的一款易用、低成本、高性能的新一代大数据分析平台,提供一站式 SaaS 服务,免运维、开箱即用。下面是 Databend Cloud 的架构全景图,也是 Databend Labs 团队对新一代云数仓的架构的设计与实现。

image.png

影响云数仓架构设计的因素与挑战

Databend / Databend Cloud 之所以演化出现在的架构,是因为新一代云数仓除了要在性能上比肩传统数仓、弹性上对标弹性数仓之外,还必须解决下面几个重要的问题:

  • Ingest 海量数据网络费用问题:传统 INSERT 模式费用昂贵,需要一套基于 S3 的免费方案。
  • 对象存储不是为数仓而设计,延迟和性能如何平衡:Network-Bound -> IO-Bound -> CPU-Bound 。
  • 如何让系统更加智能,根据查询模式自动创建索引:如何让某些场景的 Query 越跑越快...
  • 如何面向 Warehouse + Datalake 双重需求设计?

前两个问题是云带来的挑战,而后两个问题将直面用户需求,一旦考虑清楚这些问题,云数仓的架构也就呼之欲出了。

Databend 生态全景图

数仓的产品的成败,除了本身的设计和实现之外,也非常依赖数据生态,其关键在于解决数据的输入与输出问题。

Databend 自身支持一定 ETL 能力,能够使用 Stage 和 Multiple Catalog 挂载外部数据源,提供全量、增量、条件等多种导入方式,支持使用 PRESIGN 上传和下载数据。

Databend 积极融入大数据生态,拓展「Databend 朋友圈」,提供全链路解决方案,帮助用户将数据转化为商业洞见。

image.png

Databend 为用户提供价值

Databend 是一款开源、开放,运维简单、分钟级部署,为云端海量数据分析而设计的新一代云数仓。

我们在前面介绍了 Databend 的设计与实现,以及在生态方面做的一些努力,但产品是否能够占据市场、满足用户需求,还需要靠数据说话。

Databend v1.0 于 2023 年 3 月 5 日正式发布,目前处于 v1.2 版本,我们统计了以下几条关键数据:

  • 替换 Trino/Presto 场景成本降低了 75%
  • 替换 Elasticsearch 场景成本降低了 90%
  • 归档场景成本降低了 95%
  • 日志和历史订单分析场景成本降低了 75%
  • ~1PB+/天(2023.9 统计)在使用 Databend 写入公有云对象存储
  • 用户来自欧洲、北美、东南亚、印度、非洲、中国等地,每月节省数百万美元

以下是一些在生产环境中使用 Databend 的用户,感谢他们一直以来的支持与陪伴。我们将继续提供更有价值的服务。

image.png

Databend 在开源社区

Databend 从第一天起就在 GitHub 上开源,目前已经成为 Rust 社区中的明星数据库项目。我们与上下游社区紧密协作,共同建设 Rust 大数据生态。Databend 目前的贡献者中不乏大公司背景,比如 SAP、Yahoo、Fortinet、Shopee、Alibaba、Tencent、ByteDance、EMQ、快手,Databend 社区正在被顶级需求、顶级场景驱动。

image.png

体验 Databend

最后,欢迎大家体验 Databend 产品与生态,与我们共同建设坚实可靠的大数据基础设施。

image.png

随着架构的不断迭代和更新,大数据系统的查询目标也从大吞吐量查询逐步转移转向快速的交互式查询,对于查询的及时响应提出了更高要求。许多企业的数仓/数据湖中都有 PB 级的数据,其中绝大多数都属于旧有系统中的历史数据,很少更新,移动起来很麻烦,重新组织元数据也需要花费大量的时间。需要解决的问题是:如何在保证现有的数据和元数据不变的情况下加速查询。

image.png

上图是一个典型的使用 Databend 加速 Hive 查询的架构。用户使用 trino 、Spark 等引擎将数据纳入 Hive 进行管理,数据的存放位置则位于 S3 、GCS 、HDFS 等存储服务之中。引入 Databend 可以带来更好的查询性能。

和 trino 以及大多数支持 Hive Catalog / Connector 的查询引擎一样,Databend 可以复用 Hive 除了运行时(查询引擎)之外的其他组件,包括用于管理文件和底层存储的存储服务和管理 SQL 表到文件和目录映射的 Hive MetaStore 。

Databend 中的数据按三层进行组织:catalog -> database -> tablecatalog 作为数据最大一层,会包含所有的数据库和表。通过 CREATE CATALOG 语句,用户可以轻松创建 Hive Catalog 。在执行查询时,需要按 <catalog>.<database>.<table> 的格式指定到表。

SELECT * FROM <catalog>.<database>.<table>;

通过这种形式,用户无需向 Databend 中导入数据,就可以直接查询位于 Hive/Iceberg Catalog 中的数据,并获得 Databend 的性能保证。

Workshop :使用 Databend 加速 Hive 查询

接下来,让我们通过两个例子,了解 Databend 是如何加速不同存储服务下的 Hive 查询的。

使用 HDFS 存储

Hive + HDFS 的实验环境可以使用 https://github.com/PsiACE/databend-workshop/tree/main/hive-hdfs 中的环境搭建

docker-compose up -d

接下来,让我们一起准备数据:

  • 进入 hive-server ,使用 beeline 连接:
docker-compose exec hive-server bash
beeline -u jdbc:hive2://localhost:10000
  • 创建数据库、表和数据,注意,需要以 Parquet 格式存储:
CREATE DATABASE IF NOT EXISTS abhighdb;

USE abhighdb;

CREATE TABLE IF NOT EXISTS alumni(
alumni_id int,
first_name string,
middle_name string,
last_name string,
passing_year int,
email_address string,
phone_number string,
city string,
state_code string,
country_code string
)
STORED AS PARQUET;

INSERT INTO abhighdb.alumni VALUES
(1,"Rakesh","Rahul","Pingle",1994,"rpingle@nps.gov",9845357643,"Dhule","MH","IN"),
(2,"Abhiram","Vijay","Singh",1994,"asingh@howstuffworks.com",9987654354,"Chalisgaon","MH","IN"),
(3,"Dhriti","Anay","Rokade",1996,"drokade@theguardian.com",9087654325,"Nagardeola","MH","IN"),
(4,"Vimal","","Prasad",1995,"vprasad@cmu.edu",9876574646,"Kalwadi","MH","IN"),
(5,"Kabir","Amitesh","Shirode",1996,"kshirode@google.co.jp",9708564367,"Malegaon","MH","IN"),
(6,"Rajesh","Sohan","Reddy",1994,"rreddy@nytimes.com",8908765784,"Koppal","KA","IN"),
(7,"Swapnil","","Kumar",1994,"skumar@apache.org",8790654378,"Gurugram","HR","IN"),
(8,"Rajesh","","Shimpi",1994,"rshimpi@ucoz.ru",7908654765,"Pachora","MH","IN"),
(9,"Rakesh","Lokesh","Prasad",1993,"rprasad@facebook.com",9807564775,"Hubali","KA","IN"),
(10,"Sangam","","Mishra",1994,"smishra@facebook.com",9806564775,"Hubali","KA","IN"),
(11,"Sambhram","Akash","Attota",1994,"sattota@uol.com.br",7890678965,"Nagpur","MH","IN");

SELECT * FROM abhighdb.alumni;

image.png

由于 HDFS 支持需要使用 libjvm.so 和 Hadoop 的若干 Jar 包,请确保你安装了正确的 JDK 环境并配置相关的环境变量:

export JAVA_HOME=/path/to/java
export LD_LIBRARY_PATH=${JAVA_HOME}/lib/server:${LD_LIBRARY_PATH}
export HADOOP_HOME=/path/to/hadoop
export CLASSPATH=/all/hadoop/jar/files

参考 Deploying a Standalone Databend ,使用带有 HDFS 特性的 Databend 分发(databend-hdfs-*),部署一个单节点的 Databend 实例。

image.png

通过 BendSQL 连接这个 Databend 实例,然后创建对应的 Hive Catalog ,记得要通过 CONNECTION 字段为其配置对应的存储后端:

CREATE CATALOG hive_hdfs_ctl TYPE = HIVE CONNECTION =(
METASTORE_ADDRESS = '127.0.0.1:9083'
URL = 'hdfs:///'
NAME_NODE = 'hdfs://localhost:8020'
);

在上面的语句中,我们创建了一个底层存储使用 HDFS 的 Hive Catalog:

  • 通过 TYPE 指定创建 Hive 类型的 Catalog 。

  • CONNECTION 用于指定 HIVE 相关的存储/元数据访问信息,可以阅读 Docs | Connection Parameters 了解更多相关信息。

    • METASTORE_ADDRESS 对应 Hive MetaStore 的地址
    • URL 对应 HDFS 的 Path
    • NAME_NODE 则对应 HDFS 的 Name Node 地址

让我们尝试运行一个简单的 SELECT 查询,验证其是否能够正常工作:

SELECT * FROM hive_hdfs_ctl.abhighdb.alumni;

image.png

使用 S3-like 对象存储

Trino + Hive + MinIO 的实验环境可以使用 https://github.com/sensei23/trino-hive-docker/ 进行搭建。

cd docker-compose
docker build -t my-hive-metastore .
docker-compose up -d

在执行完 docker-compose up -d 等前置步骤后,先进入 MinIO 控制面板,创建一个名为 tpch 的 Bucket 。

image.png

运行下述命令可以打开 trino 命令行工具:

docker container exec -it docker-compose-trino-coordinator-1 trino

接着创建一个小型的 TPCH 客户表。注意,为了满足 Databend 使用要求,这里需要使用 Parquet 格式:

CREATE SCHEMA minio.tpch
WITH (location = 's3a://tpch/');

CREATE TABLE minio.tpch.customer
WITH (
format = 'PARQUET',
external_location = 's3a://tpch/customer/'
)
AS SELECT * FROM tpch.tiny.customer;

image.png

查询对应的 Hive 元数据,可以看到像下面这样的信息:

 DB_ID |      DB_LOCATION_URI      |   NAME   | OWNER_NAME | OWNER_TYPE | CTLG_NAME 
-------+---------------------------+----------+------------+------------+-----------
1 | file:/user/hive/warehouse | default | public | ROLE | hive
3 | s3a://tpch/ | tpch | trino | USER | hive

参考 Deploying a Standalone Databend 部署一个单节点的 Databend 实例。

image.png

通过 BendSQL 连接这个 Databend 实例,然后创建对应的 Hive Catalog ,记得要通过 CONNECTION 字段为其配置对应的存储后端:

CREATE CATALOG hive_minio_ctl 
TYPE = HIVE
CONNECTION =(
METASTORE_ADDRESS = '127.0.0.1:9083'
URL = 's3://tpch/'
AWS_KEY_ID = 'minio'
AWS_SECRET_KEY = 'minio123'
ENDPOINT_URL = 'http://localhost:9000'
);

在上面的语句中,我们创建了一个底层存储使用 MinIO 的 Hive Catalog:

  • 通过 TYPE 指定创建 Hive 类型的 Catalog 。

  • CONNECTION 用于指定 HIVE 相关的存储/元数据访问信息,可以阅读 Docs | Connection Parameters 了解更多相关信息。

    • METASTORE_ADDRESS 对应 Hive MetaStore 的地址
    • URL 则对应 MinIO 的 Bucket 或者 Path
    • AWS_KEY_IDAWS_SECRET_KEY 则对应访问时的校验,这里使用了 MinIO 服务的用户名和密码
    • ENDPOINT_URL 是 MinIO 对象存储服务的 API 端点

让我们尝试运行一个简单的 SELECT 查询,验证其是否能够正常工作:

SELECT * FROM hive_minio_ctl.tpch.customer LIMIT 5;

image.png


提示

  • 要使用 SQL 语句创建带有多种存储支持的 Hive Catalog,推荐使用 v1.2.100-nightly 及以后版本。
  • 不再需要从 toml 文件进行配置就可以获得多源数据目录能力。
  • 如果需要获取 HDFS 存储服务支持,则需要部署或者编译带有 HDFS 特性的 Databend ,比如 databend-hdfs-v1.2.100-nightly-x86_64-unknown-linux-gnu.tar.gz
  • 对于 Hive Catalog ,Databend 目前只支持查询 Parquet 格式的数据,且只支持 SELECT,不支持其他 DDL 、DML 和 UDFs 。
  • Databend 的语法与 Hive 并不完全兼容,关于 SQL 兼容性相关的内容,可以查看 Docs | SQL Conformance

image.png

以「启航 • AIGC 软件工程变革」为主题的 QCon 全球软件开发大会北京站于 9 月 5 日在北京富力万丽酒店圆满落幕!此次大会包含向量数据库、云原生、异构计算、面向 AI 的存储、微服务架构治理、FinOps 等近 30 个精彩专题。Databend Labs 作为深耕云原生数据库领域的科技公司受邀参与。

9月3日下午,Databend 研发工程师 - 邰翀作为「构建未来软件的编程语言」专题的演讲嘉宾参与本次分享。

image.png

主题: 「Rust:构建新时代基础设施的首选语言」

演讲嘉宾: 邰翀

嘉宾介绍: Databend 研发工程师

本次分享聚焦于数据库和 AI 领域,从跨云数据访问和向量数据库的现实需求谈起,阐述为什么 Rust 是适合于新时代基础设施的编程语言,并分析 Rust 在新时代基础设施下的新机遇。本次分享主要分为四个部分:

  • Rust 新时代基础设施的最佳选择
  • All in Rust 为 Databend 带来了什么
  • Rust 如何成为构建 Vector Embeddings 的关键语言
  • Rust 的机遇与挑战

Rust 新时代基础设施的最佳选择

在此前,当我们谈论基础设施时,首先可能会想到服务器、Oracle 等。而近些年来,我们谈论基础设施已经离不开云和构建在云上的各种服务。

新时代基础设施是指:可以在云上自由部署、与云完美融合的基础设施。在这个新时代中,Databend 和一些数据库同行都选择了 Rust 作为首选语言。我们可以观察到 amazom、 微软、Firefox、飞书、TiKV、云数仓 Databend、OpenDAL 等公司或者项目在使用 Rust 。

image.png

Rust 的发展时间线

  • 2006 年, Graydon Hoare 着手设计和实现 Rust 语言,此时,还只是他的个人项目。
  • 2009 年,Mozilla 开始赞助这个项目,并成立团队支持 Rust 的开发。
  • 2010 年,Rust 首次公开,并在一年后实现了编译器自举,到 2015 年发布了第一个稳定版本。
  • 2021 年,Rust 基金会成立,为 Rust 语言带来更广阔的发展前景。

Rust 与内存安全

Rust 的创始人 Graydon Hoare 曾经说过:“Rust是一种采用过去的知识解决将来的问题的技术。”

在我的理解中,Rust 的主要目标之一就是解决内存安全问题。

下面列出一些常见的内存安全问题:

  • Out-Of-Bound 假设有两个线程,其中一个线程在访问链表,另一个线程在进行删除,这个时候可能会产生越界问题。
  • Use After Free 指针指向了堆上一块内存,但是这块内存因为扩容或者其他原因导致重分配,给了恶意代码修改并且执行被释放内存的机会。

通用手法

针对内存安全问题,不同的编程语言都会提出自己的一套解决方案,目前通用的方式包括:

  • 垃圾回收(Garbage Collection,GC) :设立专门的垃圾回收机制来检测内存。由于回收机制的时间不固定,这种方式可能会导致内存不可控。对于数据库软件而言,如果在做大量数据的聚合相加,可能会导致内存飙升,并且很长时间无法释放,容易诱发 OOM 。

  • 自动引用计数(Automatic Reference Counting,ARC) :可以自动地跟踪和管理对象的引用计数,从而避免了手动管理内存的繁琐和容易出错的问题,算是一种比较好的方式。

  • 手动管理内存(Manualy Handle Memory) :开发者自行管理分配和释放,对程序开发功底要求比较高。

Rust 怎么做

上面的三种通用手法是通过管理引用来处理内存安全问题的,而 Rust 则选择通过限制引用行为来解决内存安全问题。 更具体地,Rust 引入了所有权、借用检查和生命周期这三个重要的概念。

  • 所有权(Ownership) :每个值只能有一个所有者。

  • 借用检查(Borrow Check) :帮助管理所有权的一套规则,能够处理内存分配和释放,防止数据竞争。

  • 生命周期(Lifetimes) :程序中每个变量都有一个固定的作用域,当超出变量的作用域以后,变量就会被销毁。变量在作用域中从初始化到销毁的整个过程称之为生命周期。

Rust :最受程序员推崇的语音

正因为 Rust 语言表现力、卓越的性能以及自己独有一套的内存管理方式,蝉联八届 Stack Overflow Developer Survey 最受程序员推崇的语言。

image.png

All in Rust  为 Databend 带来了什么

Databend Labs 成立于 2021 年 3 月,是一家开源 Data Cloud 服务商,致力于为大数据生态提供坚实可靠的基础设施。我们的核心团队成员来自 ClickHouse 社区、谷歌 Anthos、阿里云等国内外知名互联网和云计算公司,团队在云原生数据库领域有着丰富的工程经验,同时也是数据库开源社区活跃贡献者。

我们在用 Rust 做什么?

Databend是一款使用 Rust 研发、开源、弹性、低成本,基于对象存储也可以做实时分析的新式数仓,具有即时扩缩容能力,能在数分钟内增加数百倍的算力,为企业提供了一个用于存储、管理和分析大量数据的集中式平台,从而助力企业更准确地洞察业务、制定战略。

image.png

除了 Databend 之外,我们也使用 Rust 开发和维护了大量项目:

  • OpenRaft 是在 tokio 之上实现的异步 Raft 框架,在 leadership 上做了大量优化,性能非常强劲。

  • BendSQL 是基于 Databend HTTP API 和 Arrow Flight API 设计和实现的原生客户端。

  • AskBend 是一款知识库智能问答系统,基于 OpenAI 的 API 并且利用 Databend 内置的一系列 SQL 函数(AI Functions)打造。访问链接:https://ask.databend.rs 即可体验。

  • OpenDAL 提供一个统一、简单、高效、可靠、可观察的数据访问层,让开发者可以无缝地使用不同的存储服务,并享受到最佳的用户体验。今年 3 月份的时候已经移交到 Apache 软件基金会孵化器中进行孵化。

Rust 给我们带来了什么?

数据库本身就是计算机科学皇冠上的一颗明珠,是一个庞大而又复杂的系统。而在 Databend 的设计和实现中,我们还从目前市场上最优秀的数仓中借鉴了一些经验。例如,我们参考了 Clickhouse 的向量化设计,以提高单机的性能。同时,我们也借鉴了 Snowflake 的集群优点,以增强分布式计算能力。

选择 Rust 作为这样一个复杂系统开发的首选语言,我们收获了这些:

  • 高效 的研发 虽然 Databend 从第一版发布至今只有 2 年多,但从开源到文章撰写时为止,已经累计有 8000 多个 PR 。

  • 优越 性能: Databend 在 Clickbench 基准测试中,数据导入性能排名第一,并且在 c6a.4xlarge 机型性能登顶,除了优秀的设计和实现之外,Rust 功不可没。

  • 活跃的 社区: 得益于 Rust 语言的流行和社区的持续关注,到文章撰写时为止已有超过 200 位贡献者,收获 6.5 k star ,是 Rust 社区中的明星项目之一。

  • image.png

Rust 如何成为构建 Vector Embeddings 的关键语言

同样是数据库领域,现在让我们将目光转向今年在 AI 上大放异彩的向量数据库。

向量嵌入(Vector Embeddings)可以将数据转化为一个包含其实际含义的向量空间。而向量数据库则致力于挖掘存储和处理向量数据的能力,并提供高效的向量检索功能。

为什么需要向量数据库

对于 GPT 这样的大模型而言,Tokens 大小限制了应用的进一步开发,而引入向量数据库之后,就可以利用向量检索和向量索引能力,相似度更近的数据紧凑存放。这样可以带来两个方面的好处:

  • 降低 GPT 的使用成本。
  • 维护长期记忆,帮助 AI 理解和执行复杂任务。

Rust 如何进入向量数据库

让我们一起来看一下 Rust 是怎么进入到向量数据库领域的:

  • 研发新的向量数据库: 获得知名投资机构 YC 的青睐的 LanceDB  使用 Rust 设计了针对向量数据库的存储格式。

  • 重写现有向量数据库: Pinecone 将 C++ 和 Python 代码库使用 Rust 完全重写。在今年 5 月份获得 a16z 一亿美元 B 轮投资。

  • 拓展旧有向量数据库: Milvus 则计划引入 OpenDAL(Rust 开发的存储访问层)的 C++ binding,以支持跨多云数据存储能力。

  • 拓展现有其他数据库: Databend 支持向量类型的存取与基本相似度查询功能,并且提供 AI Functions ,能够与 OpenAI API 进行交互。

  • 拓展旧有其他数据库: AI 初创公司 tensorchord 使用 Rust 开发 PostgreSQL 的向量处理拓展 pgvecto.rs 。

Rust 的机遇与挑战

回顾 Databend Labs 使用 Rust 的研发历程,并且结合业界其他公司的经验,我们认为,Rust 能够成为构建新时代基础设施的首选语言的主要因素有以下几点:

  • 内存安全
  • 性能保障
  • 敏捷开发

机遇

Rust 在新时代中的机遇,其实可以和前面 Rust 如何进入向量数据库结合起来看,这里同样举几个典型的项目作为例子。

  • 新项目服务旧场景:
    • Polars 是 Pandas 的有力竞争者,团队宣布种子轮收获 400 万美元融资,用于打造 OLAP 计算平台。
    • HuggingFace 开源新的深度学习框架 Candle ,使用 Rust 编写。
  • 新项目服务新场景
    • llmchain-rs 针对大模型工具链提供一站式解决方案。
    • mosec 针对大模型部署和服务开发提供高性能解决方案。

挑战

  • 程序的可靠性仍然需要开发者自己去管理,并不能因为使用 Rust 就掉以轻心。一个有意思的段子是 “声称内存安全的项目中,往往充斥着大量的 unsafe 代码”。

  • 尽管生态已经日趋完善,但在实际开发过程中,仍然少不了造轮子。而对于非系统编程,或者原型快速开发阶段,使用 Rust 在开发工具和生态对接上相比 Java 、Python 等语言还存在一些问题。

  • 由于 Rust 本身的复杂性,新手仍然需要迈过门槛,而且 Rust 语言进入到项目以后在编译时间、CI 流水线等方面都需要进行不同程度的调试与改造。

image.png

目前开发者们需要尝鲜 Databend, 可以选择使用 Databend Cloud 或者按官方文档部署 Databend 服务。 由于 Databend 架构有三层,因此部署Databend 服务一般需要启动 databend-query, databend-meta , minio 三个进程,同时需要修改端口等配置项,流程上略显复杂。 有没有更快的方式可以快速尝鲜 Databend 呢?

Python Binding

一种快速的方式是将 Databend 跑在python中,借助 rust 优良的生态,我们基于 pyo3 库发布了 python binding,可以在本地 juypter 或者 colab 等在线服务中使用 Databend:

# pip install databend
from databend import SessionContext

ctx = SessionContext()

df = ctx.sql("select number, number + 1, number::String as number_p_1 from numbers(8)")

# convert to pyarrow
df.to_py_arrow()

# convert to pandas
df.to_pandas()

Databend Local 模式

借鉴于 clickhouse-local , duckdb 等嵌入型数据库的优点,我们在 Databend 中也可以开启 local 模式。

local 模式 是一个 Databend 的简易版本,用户无需部署 Databend 服务即可在命令中 用 SQL 和 Databend 交互。它的好处在于简化了开发安装,同时方便开发者们用 SQL 使用 Databend支持的功能进行简单的数据处理。 如果你需要在生产环境使用 Databend,我们建议按官网推荐部署 Databend 服务 或 Databend Cloud,但如果你是开发人员或测试工程师,你可以使用 local 模式 来玩转 Databend。

local 模式将启动一个临时的 databend-query 进程,这个进程融合了客户端和服务端,并且他的存储是在临时目录中,生命周期跟随进程,进程离开后资源也将销毁,你可以在一个服务器中启动多个 local 进程,他们的资源是相互隔离的。

下面通过例子介绍一下,每个例子都是简短的几行命令,介绍 local 模式可以实现什么功能。

在这之前,你需要下载 databend-query 二进制,然后将二进制放到 PATH 环境变量中,植入 bend-local 工具别名

alias bend-local="databend-query local"
  • 命令行交互 ( REPL ) 模式

    • 直接在终端输入 bend-local 这一行命令后,我们将进入 REPL 模式,这里融合了客户端和服务端,类似 duckdb cli 工具使用。

    •   ❯ bend-local
      Welcome to Databend, version v1.2.4-nightly-326cabe38056168dd261f744609ea85319f02686(rust-1.72.0-nightly-2023-09-02T15:18:48.006847567Z).

      databend-local:) select max(a) from range(1,1000) t(a);
      ┌────────────┐
      │ max(a) │
      │ Int64 NULL │
      ├────────────┤
      │ 999 │
      └────────────┘
      1 row result in 0.036 sec. Processed 999 rows, 999 B (27.89 thousand rows/s, 217.90 KiB/s)

      databend-local:)
    • 值得注意的是,bend-local 支持配置文件 ~/.config/databend/config.toml 来做一些个性化客户端配置,配置文件的格式和 bendsql 是兼容的。

  • 一行命令生成一个 parquet 文件

    • 支持 --query, --output-format 参数 传入查询 SQL 和输出格式
    •    bend-local --query "select number, number + 1 as b from numbers(10)" --output-format parquet > /tmp/a.parquet
  • Shell pipe 模式分析数据, \$STDIN 宏将解析 stdin 流作为一个临时 stage 表

    •   ❯ echo '3,4' | bend-local -q "select $1 a, $2 b  from $STDIN  (file_format => 'csv') " --output-format table

      SELECT $1 AS a, $2 AS b FROM 'fs:///dev/fd/0' (FILE_FORMAT => 'csv')

      ┌─────────────────┐
      │ a │ b │
      │ String │ String │
      ├────────┼────────┤
      │ '3' │ '4' │
      └─────────────────┘

注意上面的 SQL 在 shell 中,使用了 \$ 来对 shell 进行转义

  • 读取 stage table (本地文件,外部 s3 等)

    •   ❯ bend-local --query "select count() from 'fs:///tmp/a.parquet'  (file_format => 'parquet') "
      10

      ❯ bend-local --query "select count() from 'https://datafuse-1253727613.cos.ap-hongkong.myqcloud.com/data/books.parquet' (file_format => 'parquet') "
      2

      ❯ bend-local --query "select $1, $2 from 'http://www.geoplugin.net/csv.gp?ip=3.3.3.3' (file_format => 'csv') "
  • 分析系统进程 ,找出每个用户占用的内存

    •   ❯ ps aux | tail -n +2 | awk '{ printf("%s\t%s\n", $1, $4) }' | bend-local -q "select  $1 as user,  sum($2::double) as memory  from $STDIN  (file_format => 'tsv')  group by user  "
      sundy 9.100000000000001
      root 1.2
      dbus 0.0
  • 数据清洗,将一个格式转换为其他格式 (支持csv, tsv, parquet, ndjson 等)

    •   ❯ bend-local -q 'select rand() as a, rand() as b from numbers(100)' > /tmp/a.tsv
        ❯ cat /tmp/a.tsv | bend-local -q "select $1 a, $2 b  from $STDIN  (file_format => 'tsv') " --output-format parquet > /tmp/a.parquet
  • 其他好玩的分析例子,等待你的挖掘

image.png

Databend Cluster Key 是指 Databend 可以按声明的 key 排序存储,主要用于用户对时间响应比较高,同时愿意为这个 cluster key 进行额排序操作的用户。 Databend 只支持一个 Cluster key,Cluster key中可以包含多列及表达式。

基本语法

-- 语法:
alter table T cluster by(c1, fun(c2));

-- 例如:
alter table T cluster by(user_id); -- 指定数据按 user_id 排序存储

-- 日志场景 按 msg_id, 小时 排序存储
alter table T cluster by(msg_id, to_yyyymmddhh(c_timestamp));

-- 强制数据排序
optimize table T compact;
alter table T recluster final; -- 全局排序, 建议第一次创建 Cluster key 后使用,后期如果遇到性能退化,也可以再次使用

更多关于 Databend Cluster key 语法参考:

https://docs.databend.cn/sql/sql-commands/ddl/clusterkey/

使用注意事项

目前 Databend 在表有 cluster key 的情况下,使用

  • copy into
  • replace into

这两种方式写入数据时,会自动执行 compact 和 recluster 操作。

关于 Databend Cluster Key 你需要了解的:

  1. Databend 中数据分区按: block_size_threshold (default: 100M ) or row_per_block(default 100万) 组织,两者任意达到之一就会生成新的 Block
  2. 新生成的 Block 中会按定义的 cluster key 排序存储,当该key的 min = max 时,该 block 为 constant_block, 同时 cluster key 不保证全局有序
  3. 多个 block 之间可能有重叠区间,如,cluster by (age)

image.png

不同区间的重叠形成了不同的深度,例如上图:

select * from T where age >30 and age <35; 

这样一个查询,需要查找到的深度为 3 ,即为 3 个 Block。

所以表中指定列的重叠block-partitions的平均深度,越小越好。如下所示:

-- 可以通过 clustering_information('db','tbname') 查看该表的 Cluster 信息
select * from clustering_information('wubx','sbtest10w')\G;
*************************** 1. row ***************************
cluster_by_keys: (id) -- 定义的 Cluster key
total_block_count: 451 -- 当前有多少的 block
constant_block_count: 0 -- min/max 相等 block, 也就说 block 中只包括一个(组) cluster_key 的值
unclustered_block_count: 0 -- 还没 Cluster 的 Block
average_overlaps: 2.1774 -- 在一个 Range 范围内,有多少个 block有重叠比率
average_depth: 2.4612 -- cluster key 在分区的重叠分区数的平均深度
block_depth_histogram: {"00001":32,"00002":217,"00003":164,"00004":38}
1 row in set (0.02 sec)
Read 1 rows, 448.00 B in 0.015 sec., 67.92 rows/sec., 29.71 KiB/sec.

结果中最重要信息是“average_depth”,数字越小, 表的clustering效果越好,上图为: 2.46,属于比较好的状态(小于 total_block_count * 0.1 ) 。block_depth_histogram 告诉更多关于每个深度有多少个分区的详细信息。 如果在较低深度中的分区数更多,则表的聚类效果更好。 例如"00004" : 38 表示 (3,4] 有 38 个 block 有 4 个深度。

其它优化建议

  1. 一般来讲声明 Cluster key 后对于区间查询和点查都有较大的优化
  2. 如果声明 cluster key 后,还想进一步的提升点查或是区间查询的能力,可以通过调整 block 大小
-- 把 Block 的大小修改为压缩前 50M ,行数不超过 10 万行
alter table T set options(row_per_block=100000,block_size_threshold=52428800);

关于 options 查看: https://docs.databend.cn/sql/sql-reference/table-engines/fuse#options

默认数据分布:

image.png

优化数据在 Block 中的分布

create table sbtest10w like sbtest1;
alter table sbtest10w set options(row_per_block=100000,block_size_threshold=52428800);
insert into sbtest10w select * from sbtest1;

image.png

  1. 对于特别宽的表,建议查询中只访问需要的列来减少时间开销

image.png

image.png

  1. 对于复杂的 SQL 里面有大量聚合的操作还是推荐大一点的 Block 及行数

参考

背景

时至今日,Databend 已经成长为一个大型、复杂、完备的数据库系统。团队维护着数十万行代码,每次发布十几个编译产物,并且还提供基于 Docker 的一些构建工具以改善开发者 / CI 的体验。

之前的文章介绍过 PGO,用户可以根据自己的工作负载去调优 Databend 的编译。再早些时候,还有一篇介绍 Databend 开发环境和编译的文章。 对于 Databend 这样的中大型 Rust 程序而言,编译实在算不上是一件轻松的事情:

  • 一方面,在复杂的项目依赖和样板代码堆积之下,Rust 的编译时间显得不那么理想,前两年 Brian Anderson 的文章中也提到“Rust 糟糕的编译时间”这样的描述。
  • 另一方面,为了维护构建结果,不得不引入一些技巧来维护编译流水线的稳定,这并不是一件“一劳永逸”的事情,随着 Workflow 复杂性的提高,就不得不陷入循环之中。

为了优化编译体验,Databend 陆陆续续做过很多优化工作,今天的文章将会和大家一同回顾 Databend 中改善编译时间的一些优化。

可观测性

可观测性并不是直接作用于编译优化的手段,但可以帮助我们识别当前编译的瓶颈在什么地方,从而对症下药。

cargo build --timings

这一命令有助于可视化程序的编译过程。

在 1.59 或更早版本时可以使用 cargo +nightly build -Ztimings

在浏览器中打开结果 HTML 可以看到一个甘特图,其中展示了程序中各个 crate 之间的依赖关系,以及程序的编译并行程度和代码生成量级。 通过观察图表,我们可以决定是否要提高某一模块的代码生成单元数目,或者要不要进一步拆解以优化整个编译流程。

cargo-depgraph

这个工具其实不太常用,但可以拿来分析依赖关系。有助于找到一些潜在的优化点,特别是需要替换某些同类依赖或者优化 crates 组织层次的时候。

无痛优化,从调整配置开始

改善编译体验的第一步其实并不是直接对代码动手术,很多时候,只需要变更少数几项配置,就能够获得很大程度上的改善。

Bump, Bump, Booooooooom

前面提到过 Rust 团队的成员们也很早就意识到,编译时间目前还并不理想。所以编译器团队同样会有计划去不断进行针对性的优化。经常可以看到在版本更新说明中有列出对编译的一些改进工作。

[toolchain]
channel = "nightly-2023-03-10"
components = ["rustfmt", "clippy", "rust-src", "miri"]

另外,上游项目同样可能会随着时间的推移去改善过去不合理的设计,很多时候这些改进也最终会反映在对编译的影响上。

一个改善编译时间的最简单的优化方式就是始终跟进上游的变更,并且秉着“上游优先”的理念去参与到生态建设之中。Databend 团队从一开始就是 Rust nightly 的忠实簇拥,并且为更新工具链和依赖关系提供了简明的指导。

缓存,转角遇到 sccache

缓存是一种常见的编译优化手段,思路也很简单,只需要把预先构建好的产物存储起来,在下次构建的时候继续拿过来用。

早期 Databend 使用 rust-cache 这个 action 在 CI 中加速缓存,获得了不错的效果。但是很遗憾,我们不得不经常手动更新 key 来清理缓存,以避免构建时的误判。而且,Rust 早期对增量构建的支持也很差劲,有那么一段时间可能会考虑如何配置流水线来进行一些权衡。

随着时间的推移,一切变得不同了起来。

首先是 Sccache 恢复了活力,而 OpenDAL 也成功打入其内部,成为支撑 Rust 编译缓存生态的重要组件,尽管在本地构建时使用它常常无法展现出真正的威力,但是放在 CI 中,还是能够带来很大惊喜的。

另一个重要的改变是,Rust 社区意识到增量编译对于 CI 来讲并不能很好 Work。

CI builds often are closer to from-scratch builds, as changes are typically much bigger than from a local edit-compile cycle. For from-scratch builds, incremental adds an extra dependency-tracking overhead. It also significantly increases the amount of IO and the size of ./target, which make caching less effective.

轻装上阵,将冷气传递给每一个依赖

Rust 生态里面有一个很有意思的项目是 https://github.com/mTvare6/hello-world.rs,它尽可能给你展现了如何让一个 Rust 项目变得尽可能糟糕。

特别是:

in a few lines of code with few(1092) dependencies

Rust 自身是不太能很好自动处理这一点的,它需要把所有依赖一股脑下载下来编译一通。所以避免无用依赖的引入就成为一件必要的事情了。

最开始的时候,Databend 引入 cargo-udeps 来检查无用的依赖,大多数时候都工作很良好,但最大的缺点在于,每次使用它检查依赖就相当于要编译一遍,在 CI 中无疑是不划算的。

sundy-li 发现了另外一个快速好用的工具,叫做 cargo-machete。

一个显著的优点是它很快,因为一切只需要简单的正则表达式来处理。而且也支持了自动修复,这意味着我们不再需要挨个检索文件再去编辑。

不过 machete 并不是完美的工具,由于只是进行简单的正则处理,有一些情况无法准确识别,不过 ignore 就好了,总体上性价比还是很高的。

稀疏索引

为了确定 crates.io 上存在哪些 crates,Cargo 需要下载并读取 crates.io-index,该索引位于托管在 GitHub 上的 git 存储库中,其中列出了所有 crates 的所有版本。

然而,随着时间推移,由于索引已经大幅增长,初始获取和更新变得很慢。RFC 2789 引入了稀疏索引来改进 Cargo 访问索引的方式,并使用 https://index.crates.io/ 进行托管。

[registries.crates-io]
protocol = "sparse"

linker

如果项目比较大,而且依赖繁多,那么可能在链接时间上会比较浪费。特别是在你只改了几行代码,但编译却花了很久的时候。

最简单的办法就是选择比默认链接器更快的链接器。

lld 或者 mold 都可以改善链接时间,Databend 最后选择使用 mold。其实在 Databend 这个量级的程序上,两个链接器的差距并不明显,但是,使用 mold 的一个潜在好处是能够节约一部分编译时候消耗的内存。

[target.x86_64-unknown-linux-gnu]
linker = "clang"
rustflags = ["-C", "link-arg=-fuse-ld=/path/to/mold"]

编译相关配置

先看一个常见的 split-debuginfo,在 MacOS 上,rustc 会运行一个名为 dsymutil 的工具,该工具会分析二进制文件,然后构建调试信息目录。配置 split-debuginfo,可以跳过 dsymutil,从而加快构建速度。

split-debuginfo = "unpacked"

另外的一个例子是 codegen-units,Databend 在编译时使用 codegen-units = 1 来增强优化,并且克制二进制体积大小。但是考虑到部分依赖在编译时会有特别长的代码生成时间(因为重度依赖宏),所以需要针对性放开一些限制。

[profile.release.package]
arrow2 = { codegen-units = 4 }
common-functions = { codegen-units = 16 }
databend-query = { codegen-units = 4 }
databend-binaries = { codegen-units = 4 }

重新思考,更合理的代码组织

前面是一些配置上的调整,接下来将会探讨重构对代码编译时间的一些影响。

拆分到更合理的 crates 规模

对于一个大型的 All in One 式的 Crate 而言,拆分 crates 算是一个比较有收益的重构。一方面可以显著改善并行度。另一方面,通过解耦交叉依赖/循环依赖,可以帮助 Rust 更快地处理代码编译。

同时,还有一个潜在的好处,就是拆分以后,由于代码的边界更为清晰,维护起来也会省力一些。

单元式测试与集成式测试的界限

单元测试的常见组织形式包括在 src 中维护 tests mod,和在 tests 目录下维护对应的测试代码。

根据 Delete Cargo Integration Tests 的建议,Databend 很早就从代码中剥离了所有的单元测试,并组织成类似这样的形式

tests/
it/
main.rs
foo.rs
bar.rs

这种形式避免将 tests/ 下面的每个文件都编译成一个单独的二进制文件,从而减轻对编译时间的影响。

另外,Rust 编译时处理 tests mod 和 docs tests 也需要花费大量时间,特别是 docs tests 还需要另外构建目标,在采用上面的组织形式之后,就可以在配置中关掉。

但是,这种形式并不十分优雅,不得不为所有需要测试的内容设置成 public,容易破坏代码之间的模块化组织,在使用前建议进行深入评估。

更优雅的测试方法

对应到编译时间上,可以简单认为,单元测试里需要编译的代码越多,编译时间自然就会越慢。

另外,对于 Databend 而言,有相当一部分测试都是对输入输出的端到端测试,如果硬编码在单元测试中需要增加更多额外的格式相关的工作,维护也会比较费力。

Databend 巧妙运用 golden files 测试和 SQL logic 测试,替换了大量内嵌在单元测试中的 SQL 查询测试和输出结果检查,从而进一步改善了编译时间。

遗珠之憾

cargo nextest

cargo nextest 让测试也可以快如闪电,并且提供更精细的统计和优雅的视图。Rust 社区中有不少项目通过引入 cargo nextest 大幅改善测试流水线的时间。

但 Databend 目前还无法迁移到这个工具上。一方面,配置相关的测试暂时还不被支持,如果再针对去单独跑 cargo test 还要重新编译。另一方面,有一部分与超时相关的测试设定了执行时间,必须等待执行完成。

cargo hakari

改善依赖项的编译,典型的例子其实是 workspace-hack,将重要的公共依赖放在一个目录下,这样这些依赖就不需要反复编译了。Rust 社区中的 cargo-hakari,可以用来自动化管理 workspace-hack。

Databend 这边则是由于有大量的 common 组件,主要二进制程序都建立在 common 组件上,暗中符合这一优化思路。另外,随着 workspace 支持依赖继承之后,维护压力也得到减轻。

总结

这篇文章介绍了 Databend 团队在改善 Rust 项目编译时间上做的一些探索和努力,从配置优化和代码重构这两个角度,提供了一些能够优化编译时间的一些建议。

参考资料

尊敬的 Databenders,在 Databend Labs 成立两周年之际,我们非常高兴地宣布 Databend v1.0 正式发布。

Databend 社区一直在致力于解决大数据分析的成本和复杂度问题,并正在被顶级场景和顶级需求所推动。 根据可统计信息,每天约 700TB 数据在使用 Databend 写入到云对象存储并进行分析,用户来自欧洲、北美、东南亚、非洲、中国等地,每月为他们节省数百万美元成本。 Databend v1.0 是一个具有里程碑意义的版本,我们相信它将进一步加速云端海量数据分析的发展。

今天,我将首先介绍 Databend v1.0 相比 v0.9 版本所做的改进,然后探讨我们团队的愿景和未来展望。现在就让我们开始吧!

v1.0 改进

Databend 在版本 v1.0 中实现了惊人的性能提升,在 ClickBench 测试中获得:数据加载第一名,在查询环节,c6a.4xlarge 第一名,c5a.4xlarge 第二名,c6a.metal 第三名

此外,Databend 社区还在版本 v1.0 中推出了多项新功能:

UPDATE

现在,用户可以使用 UPDATE 语句来更新 Databend 中的数据。

更新语句的格式如下:

-- Update a book (Id: 103)
UPDATE bookstore SET book_name='The long answer (2nd)' WHERE book_id=103;

通过支持 UPDATE 功能,Databend 实现了对 CRUD 操作的完整支持。

ALTER TABLE

在 v1.0 中,用户可以使用 ALTER TABLE 来修改 Databend 中的表结构:

-- Add a column
ALTER TABLE t ADD COLUMN c Int DEFAULT 10;

DECIMAL

在完成了 Databend 类型系统的大型重构之后,社区在一个坚实的基础上实现了 DECIMAL 数据类型的支持!

-- Create a table with decimal data type.
create table tb_decimal(c1 decimal(36, 18));

-- Insert two values.
insert into tb_decimal values(0.152587668674722117), (0.017820781941443176);

select * from tb_decimal;
+----------------------+
| c1 |
+----------------------+
| 0.152587668674722117 |
| 0.017820781941443176 |
+----------------------+

Native Format

在 v0.9 版本中引入的 Native Format strawboat 得到了进一步的完善!社区为 strawboat 增加了半结构化数据的支持,并引入了多项性能优化,帮助 Databend 在 HITS 数据集的性能取得了巨大提升。

CBO

引入了直方图框架,可以利用统计信息更为精确地进行代价估算。进一步完善和强化 join reorder 算法,从而大大的提高多表 join 的性能,帮助 Databend 在 TPCH 数据集上的性能取得显著提升。

SELECT FROM STAGE

STAGE 是 Databend 数据流转的核心。我们之前已经支持从 STAGE 中加载数据和向 STAGE 中导出数据,现在我们更进一步,支持了直接在 STAGE 中进行数据查询!

用户只需要为 Databend 创建一个包含数据文件的 STAGE,就可以轻松进行数据查询,无需编写复杂的建表语句或繁琐的数据导入流程。

select min(number), max(number) from @lake (pattern => '.*parquet');
+-------------+-------------+
| min(number) | max(number) |
+-------------+-------------+
| 0 | 9 |
+-------------+-------------+

如果用户只需要进行一次性的查询,还可以直接使用更简短的 URI 形式:

select count(*), author
from 'https://datafuse-1253727613.cos.ap-hongkong.myqcloud.com/data/books.parquet' (file_format => 'parquet')
group by author;
+----------+---------------------+
| count(*) | author |
+----------+---------------------+
| 1 | Jim Gray |
| 1 | Michael Stonebraker |
+----------+---------------------+

Query Result Cache

在 v1.0 版本中,Databend 社区借鉴了 ClickHouse 社区的设计,并增加了 Query Result Cache 功能。当底层数据没有发生变化时,执行相同的查询会命中缓存,避免了重复执行查询的过程。

MySQL [(none)]> SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) 
FROM hits
GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10;
+--------------------+-------------+-----+----------------+----------------------+
| watchid | clientip | c | sum(isrefresh) | avg(resolutionwidth) |
+--------------------+-------------+-----+----------------+----------------------+
| 6655575552203051303| 1611957945 | 2 | 0 | 1638.0 |
| 8566928176839891583| -1402644643 | 2 | 0 | 1368.0 |
| 7904046282518428963| 1509330109 | 2 | 0 | 1368.0 |
| 7224410078130478461| -776509581 | 2 | 0 | 1368.0 |
| 5957995970499767542| 1311505962 | 1 | 0 | 1368.0 |
| 5295730445754781367| 1398621605 | 1 | 0 | 1917.0 |
| 8635802783983293129| 900266514 | 1 | 1 | 1638.0 |
| 5650467702003458413| 1358200733 | 1 | 0 | 1368.0 |
| 6470882100682188891| -1911689457 | 1 | 0 | 1996.0 |
| 6475474889432602205| 1501294204 | 1 | 0 | 1368.0 |
+--------------------+-------------+-----+----------------+---------------------+
10 rows in set (3.255 sec)

MySQL [(none)]> SELECT
WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth)
FROM hits
GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10;

+---------------------+-------------+------+----------------+--------------+
| watchid | clientip | c | sum(isrefresh)| avg(resolutionwidth) |
+---------------------+-------------+------+----------------+-------------+
| 6655575552203051303 | 1611957945 | 2 | 0| 1638.0 |
| 8566928176839891583 | -1402644643 | 2 | 0| 1368.0 |
| 7904046282518428963 | 1509330109 | 2 | 0| 1368.0 |
| 7224410078130478461 | -776509581 | 2 | 0| 1368.0 |
| 5957995970499767542 | 1311505962 | 1 | 0| 1368.0 |
| 5295730445754781367 | 1398621605 | 1 | 0| 1917.0 |
| 8635802783983293129 | 900266514 | 1 | 1| 1638.0 |
| 5650467702003458413 | 1358200733 | 1 | 0| 1368.0 |
| 6470882100682188891 | -1911689457 | 1 | 0| 1996.0 |
| 6475474889432602205 | 1501294204 | 1 | 0| 1368.0 |
+---------------------+-------------+---+---------------+-----------------+
10 rows in set (0.066 sec)

Table Data Cache

缓存是存算分离架构中的重要组成部分。在 v1.0 版本中,Databend 社区为我们带来了 Table Data Cache!当 Databend 执行查询时,会根据访问数据的热度情况决定是否将该数据块保存到缓存中,以加速下一次访问。

Aggregate Spill

在 v1.0 版本中,Databend 引入了 Aggregate spill, 当在 Databend 中执行聚合查询时,会根据 Databend 当前的内存使用情况动态决定将内存中的聚合数据临时保存并持久化到对象存储中,防止查询过程中使用过高的内存。

未来展望

经过这些版本的打磨,Databend 终于有了一个雏形。现在,让我们重新认识一下 Databend:

  • 一个使用 Rust 开发的云原生数据仓库:存算分离,面向对象存储设计,极致弹性

  • 支持完整的 CRUD 特性,提供了 MySQL/Clickhouse/HTTP RESTful 等协议支持

  • 提供原生的 ARRAY、MAP、JSON 等复杂类型和 DECIMAL 高精度类型支持

  • 构建了类似于 Git 的 MVCC 列式存储引擎,支持 Data Time Travel 和 Data Share 能力

  • 不受存储供应商的限制,可以在任何存储服务上运行,并直接查询任何存储服务上的数据

  • 目前已全面支持 HDFS/Cloud-Based Object Storage 协议,包括:阿里云 OSS,腾讯云 COS,华为云 OBS,以及 S3,Azure Blob, Google Cloud Storage

Databend 的征程远远不止于此,在未来我们希望 Databend 能拥有:

更强大的功能

在紧随其后的 v1.1 版本中,我们希望实现如下功能:

  • JSON 索引:提高半结构化数据检索能力

  • 分布式 Ingest 能力:提高数据写入速度

  • MERGE INTO 功能:实现数据源增、删、改的实时 CDC 能力

  • Windows Function

我们希望这些功能能进一步满足用户的需求,并且实现 Databend 在 CDC 场景下的突破。

更开放的社区

Databend Labs 由一群开源爱好者组成,Databend 项目从创建之初就是采用 Apache 2.0 协议授权的开源项目。在借鉴和吸收 ClickHouse,CockroachDB 等开源项目优秀思想的同时,我们也在以自己的方式回馈社区:

  • 开源了 Databend 元数据服务集群的共识引擎 Openraft

  • 向 Apache 软件基金会捐赠了底层的数据访问引擎 OpenDAL 并成功进入孵化器开始孵化

  • 成为向量计算基础库 arrow2 等多个依赖项目的贡献者

  • 跟进并采用 Rust Nightly,帮助 Rust 社区复现并验证问题

没有开源社区就没有今天的 Databend,感谢 144 个参与 Databend 的贡献者!接下来,我们将更开放地与其他开源社区合作,支持读写 IcebergDelta Lake 等格式,打破数据间的壁垒,使数据能够更自由灵活地流转。


感谢大家!

关于 Databend

Databend 是一款开源、弹性、低成本,基于对象存储也可以做实时分析的新式数仓。期待您的关注,一起探索云原生数仓解决方案,打造新一代开源 Data Cloud。

Databend Cloud: https://databend.cn

Databend 文档:https://docs.databend.cn/

Wechat:Databend

GitHub: https://github.com/datafuselabs/databend

grafana-display-log-on-databend

本篇文章以 Grafana 展示 Databend 中 Nginx Log。

在 databend 新建 grafana 用户

  1. 先连接到 databend

    ❯ bendsql connect
    Connected to Databend on Host: localhost
    Version: DatabendQuery v0.9.46-nightly-67b0fe6(rust-1.68.0-nightly-2023-02-22T03:47:09.491571Z)
    ❯ bendsql query
    Connected with driver databend (DatabendQuery v0.9.46-nightly-67b0fe6(rust-1.68.0-nightly-2023-02-22T03:47:09.491571Z))Type "help" for help.
    dd:root@localhost/default=>
  2. 创建用户并赋予权限

    CREATE USER grafana IDENTIFIED BY 'grafana_password';
    GRANT SELECT ON *.* TO grafana;

安装 grafana 并配置数据源

  1. 打开 grafana 插件页面,并搜索Altinity plugin for ClickHouse 安装

  1. 用刚刚安装的插件新建数据源,并配置接口和用户名密码

  1. 保存并测试数据源

使用数据源

我们使用一个已经有的 nginx log 表来进行可视化

CREATE TABLE `access_logs` (  
`timestamp` TIMESTAMP,
`client` VARCHAR,
`method` VARCHAR,
`path` VARCHAR,
`protocol` VARCHAR,
`status` INT,
`size` INT,
`referer` VARCHAR,
`agent` VARCHAR,
`request` VARCHAR
);
  1. 新建 dashboard 和 panel,选择刚刚创建的数据源,选择 database 和 table,点击 Go to Query

  1. 输入可视化查询
SELECT 
(to_int64(timestamp) div 1000000 div $interval * $interval) * 1000 as t,
status,
count() as qps
FROM $table
WHERE timestamp >= to_datetime($from)
AND timestamp <= to_datetime($to)GROUP BY t, statusORDER BY t

常用宏参考:

  • $interval 在 panel 配置 Query Options 里选择的 interval

  • $table 新建 panel 时选择的 database 和 table

  • $from 在 grafana UI 上选择的时间范围 (单位为 ms)

  • $to 在 grafana UI 上选择的时间范围 (单位为 ms)

  1. 查看效果

按上述步骤多添加几个 panel 之后查看整体效果:

Connect With Us

Databend 是一款开源、弹性、低成本,基于对象存储也可以做实时分析的新式数仓。期待您的关注,一起探索云原生数仓解决方案,打造新一代开源 Data Cloud。

databend-on-top-clickbench-benchmark

经历近两年的打磨,Databend 终于要发 1.0 版本了!我们深知从零开始构建数据库是一段非常艰辛的历程。从两年前的第一行代码到现在的 30 万行 Rust 代码,记录了 Databenders 小伙伴们的不懈付出。

许多人都非常关注我们是如何从零开始构建数据库的。相比于其他基于成熟底层技术栈的数据库产品,我们的性能表现如何?今天我们来基于最新版本来测试体验下 Databend 的优秀性能。

考虑到市场上已经有很多优秀的 OLAP 数据库,我们希望使用一个公开的标准来衡量 Databend 的性能,而不是自己建立测试标准。因此,我们选择了 ClickBench[1],这是由 ClickHouse 发起的分析型数据库性能测试排行榜,测试数据集为 hits,来自生产环境,使用 clickhouse-obfuscator 工具进行了混淆处理,覆盖了常见的多维报表查询,能够真实地反映各类数据库在生产环境下的性能比较。这个排行榜收录了 Snowflake、Doris、ClickHouse、MySQL、Greenplum、DuckDB 等 50 个主流数据库的测试结果。

评测方式是在特定的机型下,将近 70G 的原始数据导入到数据库中,并对比数据的导入时间。接着,对 43 个查询 SQL 进行测试,在每次执行 SQL 之前清空内存缓存,然后重复执行三次,并记录三次执行中耗时最短的一次。ClickBench 考察的是数据库在所有测试场景下的总体表现,因此会将所有数据库在所有场景的评分结果进行对比计算,得出总排名。

为了展现 Databend 的真实性能,我们没有针对测试场景进行特别优化:

  • 全部使用默认配置,不进行任何参数调优

  • 不对数据进行 特定字段分区导入,使用默认建表语句

  • 不对原始数据进行 Cache,也不对查询结果进行 Cache,只 Cache 元数据和索引

我们提交了最常见的三种机型的测试结果:

  • c6a.metal, 500gb gp2 (192core)

  • c6a.4xlarge, 500gb gp2 (16core)

  • c5.4xlarge, 500gb gp2 (16core)

1 导入性能,三个机型下均排行第一

特别是在 c6a.metal 机型上,我们的导入性能仅需要 70 秒。与其他数据库的性能表现相比,可以看到 Databend 的导入性能有巨大的优势。这主要得益于 Databend 在向量化并行导入下的极致优化,以及 pipeline 在计算和 io 之间的优秀调度能力。

此外,我们的底层 Storage Access 模块都基于 OpenDAL[2] ,它具有简洁的 API 设计,同时不失原生 API 的性能,覆盖了 s3、fs、memory、ftp、azblob、ipfs 等十多种后端存储。许多优秀的 Rust 项目,如 RisingWave、Greptimedb、Sscache 等,都采用了 OpenDAL 作为底层 IO 模块。目前,OpenDAL 即将进入 Apache 孵化器,旨在打造云原生的统一存储底座。

2 查询性能,三个机型下各居一二三

在 hot run 查询下,Databend 在 c6a.4xlarge 上表现出色,排名第一;在 c5.4xlarge 微弱劣势居第二,c6a.metal 居第三。

得益于 Databend 新的表达式系统设计[3],所有的算子都已经实现了向量化,并且我们所有的算子都有基于 Domain 值域推导能力。基于此,我们可以应用强大的常量折叠框架来做数据多级裁剪,尽可能略过不必要的数据块。此外,pipeline 的调度能力以及聚合算子的功能也再次加强,使得 CPU 和 IO 能够高效调度,从而发挥极致性能。

Databend 是面向对象存储而设计的新一代云原生数仓,并没有对本地 fs 场景做太多优化,在上面榜单中,前三的差距其实并不是很大。因此,在不太擅长的场景中,结合高性能的计算能力,Databend 也可以取得不错的优势。

由于 Databend 使用了默认配置,禁用了 DataCache,因此,在 cold run 场景下,对比意义不大。我们也可以对建表语句做一些针对性的优化(例如,按 UserID 分区导入,优化 Q17 等按 UserId 聚合场景),或者加入一些参数调优,这样性能估计还能再提升一个档次,但面向 Benchmark 调优并不是这次测试的主要意义,以用户为中心,在通用场景带给用户极致的性能、易用的产品体验才是我们的终极目标。

做 Benchmark 主要为了让我们有个衡量性能的方式,从而提升产品质量,ClickBench 测试非常具有代表意义,所以我们将 ClickBench 集成到了各个版本和 PR 的性能测试 CI 中[4],方便开发者观测性能回退和提升,优化产品开发。

3 更多

登顶 ClickBench 证明了 Databend 的导入查询性能已经取得非常优异的成绩,从零用 Rust 实现的数据库不一定比 C++ 慢,两年时间我们也可以做出别人十年的成就。 但这只是我们前进的一小步。性能并不能完全体现数据库的全部优点,但它始终是我们数据库内核开发者心中的一个目标。

在 v1.0 即将发布之际,希望有更多的开源爱好者加入 Databend 社区,和优秀的开发者一起协作,打造世界一流的数据库产品!让我们一起携手努力,为数据库技术的进步和发展贡献力量!

4 引用

[1] ClickBench: https://benchmark.clickhouse.com/

[2] OpenDAL: https://github.com/datafuselabs/opendal

[3] Databend 新的表达式系统设计: https://github.com/datafuselabs/databend/pull/9411

[4] PR 的性能测试 CI 中: https://repo.databend.rs/benchmark/clickbench/release/index.html

Connect With Us

Databend 是一款开源、弹性、低成本,基于对象存储也可以做实时分析的新式数仓。期待您的关注,一起探索云原生数仓解决方案,打造新一代开源 Data Cloud。

mysql-databend-diff

Databend 提供了 MySQL 协议的兼容,但实质上和 MySQL 还有一定的区别。Databend 定位在基于对象存储实现一个真正的存算分离的弹性数仓。现在也在有很多朋友利用 Databend 做 MySQL 和 RDS MySQL 的归档存储分析。

这里给大家介绍一下 Databend 数据类型,索引,DDL,协议 几方面和 MySQL 的区别,其中文章最后一条可能减少你使用很多麻烦,以下详细内容供大家参考:

1. 数据类型

TypeDatabendMySQL
tinyintYesYes
smallintYesYes
mediumintNoYes
intYesYes
bigintYesYes

2. 浮点类型

TypeDatabendMySQL
decimalYesYes
floatYesYes
doubleYesYes

3. 日期类型

TypeDatabendMySQL
datetimeYesYes
timestampYesYes
dateNoYes
timeNoYes
yearNoYes

在 Databend 中 datetime 实质上是 timstamp 的同义词,现在支持 6 位精度:YYYY-MM-DD hh:mm:ss[.fraction]

4. 字符类型

TypeDatabendMySQL
varcharYesYes
stringYesNo
binaryNoYes
varbinaryNoYes
blogNoYes
enumNoYes
setNoYes

在 Databend 中 string 是 Varchar 的同义词,另外使用需要注意在 Databend 声明 Varchar 不需要声明长度,存储按实际长度存储

5. JSON 类型

TypeDatabendMySQL
jsonYesYes
variantYesNo

Databend 中 json 基于 jsonb 实现,函数上和 MySQL 不一样,参考:https://docs.databend.cn/sql/sql-functions/semi-structured-functions/

json 格式建议只是使用在数据清洗过程

6. 嵌套类型

TypeDatabendMySQL
arrayYesNo
tupleYesNo

Databend 主要定位在大数据解环境,对于数据格式支持更加利于使用一点。后续马上会加一个 map 类型。

7. 其它数据类型

TypeDatabendMySQL
bitNoYes
booleanYesNo

如你用 MySQL 的习惯使用 Databend 需要别小心 Databend 的 Boolean 类型,MySQL 没有 Boolean 类型,一般是使用 tinyint 中的 0 和 1 表示。

8. 索引上区别

在 Databend 中不用定义索引,默认情况下每一列都自带 min/max, bloom index 索引,在 Databend 中也没有唯一约束,外键等。这里使用上也需注意一下。

9. DDL 支持

目前 Databend 已经支持无 Block 实现 alter table 操作。

参考:https://docs.databend.cn/sql/sql-commands/ddl/table/alter-table-column

10. 协议和一些细节

Databend 支持 MySQL 协议 和 Clickhouse HTTP 协议,同时也支持 HTTP Restful API 设计。

参考:https://docs.databend.cn/developer/apis/http

Databend 在双引号和单引号这块参考了 PostgreSQL 明确约束,如果你是 MySQL 使用的风格的用户可以通过:

set global sql_dialect='MySQL';

把 SQL 会话习惯更改成:MySQL 风格。

Databend 默认时区是:timezone= UTC,如果你在国内使用可以通过:

 set global timezone='Asia/Shanghai';

小结

现在 Databend 和 MySQL 结构的场景

  • 使用 Databend 使用对象存储的成本优势担任 MySQL 的数据归档和分析

  • 使用 Databend 担任 MySQL 的离线 AP 库

  • 使用 Databend 把线上的分库分表的库合并到一起

Connect With Us

Databend 是一款开源、弹性、低成本,基于对象存储也可以做实时分析的新式数仓。期待您的关注,一起探索云原生数仓解决方案,打造新一代开源 Data Cloud。

Alt text

在介绍 给 Databend 添加 Scalar 函数 | 函数开发系例一 后,我们来看 Aggregate Function。

Aggregate Function 用于对列的值进行操作并返回单个值。常见的 Agg Function 有 sum, count, avg 等。

函数注册

Agg Function 的 register 函数以小写的函数名和 AggregateFunctionDescription 类型为参数,每个被注册的函数都存入 case_insensitive_desc ( HashMap 结构) 中。

case_insensitive_combinator_desc 是为了存储一些组合函数,比如与 _if 组合的 count_if, sum_if 这类函数。

pub struct AggregateFunctionFactory {
    case_insensitive_desc: HashMap<String, AggregateFunctionDescription>,
    case_insensitive_combinator_desc: Vec<(String, CombinatorDescription)>,
}
impl AggregateFunctionFactory {
  ...
pub fn register(&mut self, name: &str, desc: AggregateFunctionDescription) {
        let case_insensitive_desc = &mut self.case_insensitive_desc;
        case_insensitive_desc.insert(name.to_lowercase(), desc);
    }
  ...
}

每个被注册的函数都要实现 trait AggregateFunctionAggregateFunctionFeatures。其中 AggregateFunctionFeaturesScalar 中的 FunctionProperty 比较类似,都是存储函数的一些特质。

pub type AggregateFunctionRef = Arc<dyn AggregateFunction>;
pub type AggregateFunctionCreator =
    Box<dyn Fn(&str, Vec<Scalar>, Vec<DataType>) -> Result<AggregateFunctionRef> + Sync + Send>;
pub struct AggregateFunctionDescription {
    pub(crate) aggregate_function_creator: AggregateFunctionCreator,
    pub(crate) features: AggregateFunctionFeatures,
}

主要来看 trait AggregateFunction,这里面是 Agg Function 的构成。

函数构成

可以看到与 Scalar 直接使用一个 Struct 不同,AggregateFunction 是一个 trait。因为聚合函数是按 block 累加列中的数据,再累加过程中会产生一些中间结果。

因此 Aggregate Function 必须有初始状态,而且聚合过程中生成的结果也要是 mergeable (可合并) 和 serializable (可序列化) 的。

主要函数有:

  • name 表示被注册的函数的名字,比如 avg, sum 等等。
  • return_type 表示被注册的函数返回值的类型,同样的函数返回值可能会由于参数类型的不同而产生变化。比如 sum(int8) 参数为 i8 类型,但是返回返回值可能是 int64。
  • init_state 用来初始化聚合函数状态。
  • state_layout 用来表示 state 在内存中的大小和内存块的排列方式。
  • accumulate 用于 SingleStateAggregator。也就是着整个块可以在单个状态下聚合,没有任何 keys。比如 select count(*) from t 此时查询中没有任何分组列的聚合,这时会调度 accumulate 函数。
  • accumulate_keys 则是用于 PartialAggregator。这里需要考虑 key 和 offset,每个 key 代表一个唯一的内存地址,记为函数参数 place。
  • serialize 将聚合过程中的 state 序列化为二进制。
  • deserialize 从二进制反序列化为 state
  • merge 用于合并其他 state 到当前 state
  • merge_result 可以将 Aggregate Function state 合并成单个值。

示例

以 avg 为例

具体实现在 aggregate_avg.rs 中。

因为我们需要累加每个值,并除以非 null 总行数。因此 avg function 被定义为 struct AggregateAvgFunction<T, SumT>。其中 T 和 SumT 是实现 Number 的逻辑类型。

在聚合过程中 avg 会产生的中间状态值是 已经累加的值的总和 以及 已经扫描过的非 null 的行。因此 AggregateAvgState 可以被定义为如下结构。

#[derive(Serialize, Deserialize)]
struct AggregateAvgState<T: Number> {
    #[serde(bound(deserialize = "T: DeserializeOwned"))]
    pub value: T,
    pub count: u64,
}
  • return_type 设置为 Float64Type。比如 value = 3, count = 2, avg = value/count。
  • init_state 初始状态设置 value 为 T 的 default 值,count 为 0。
  • accumulate AggregateAvgState 的 count, value 分别对 block 中非 NULL 的行数和值进行累加。
  • accumulate_keys 通过 place.get::<AggregateAvgState<SumT>>() 获取对应的状态值,并进行更新。
fn accumulate_keys(
    &self,
    places: &[StateAddr],
    offset: usize,
    columns: &[Column],
    _input_rows: usize,
) -> Result<()> {
    let darray = NumberType::<T>::try_downcast_column(&columns[0]).unwrap();
    darray.iter().zip(places.iter()).for_each(|(c, place)| {
        let place = place.next(offset);
        let state = place.get::<AggregateAvgState<SumT>>();
        state.add(c.as_(), 1);
    });
    Ok(())
}

类似的聚合函数示例也可以参考 sum 和 count 的实现:

函数测试

Unit Test

聚合函数相关单元测试在 agg.rs 中。

Logic Test

Functions 相关的 logic 测试在 tests/logictest/suites/base/02_function/ 中。

关于 Databend

Databend 是一款开源、弹性、低成本,基于对象存储也可以做实时分析的新式数仓。期待您的关注,一起探索云原生数仓解决方案,打造新一代开源 Data Cloud。

Alt text

在 Databend 中按函数实现分为了:scalars 函数和 aggregates 函数。

Scalar 函数: 基于输入值,返回单个值。常见的 Scalar function 有 now, round 等。

Aggregate 函数: 用于对列的值进行操作并返回单个值。常见的 Agg function 有 sum, count, avg 等。

https://github.com/datafuselabs/databend/tree/main/src/query/functions/src

该系列共两篇,本文主要介绍 Scalar Function 从注册到执行是如何在 Databend 运行起来的。

函数注册

由 FunctionRegistry 接管函数注册。

#[derive(Default)]
pub struct FunctionRegistry {
    pub funcs: HashMap<&'static str, Vec<Arc<Function>>>,
    #[allow(clippy::type_complexity)]
    pub factories: HashMap<
        &'static str,
        Vec<Box<dyn Fn(&[usize], &[DataType]) -> Option<Arc<Function>> + 'static>>,
    >,
    pub aliases: HashMap<&'static str, &'static str>,
}

三个 item 都是 Hashmap。

其中,funcs 和 factories 都用来存储被注册的函数。不同之处在于 funcs 注册的都是固定参数个数的函数(目前支持最少参数个数为 0,最多参数个数为 5),分为 register_0_arg, register_1_arg 等等。而 factories 注册的都是参数不定长的函数(如 concat),调用 register_function_factory 函数。

由于一个函数可能有多个别名(如 minus 的别名有 subtract 和 neg),因此有了 alias,它的 key 是某个函数的别名,v 是当前的存在的函数名,调用 register_aliases 函数。

另外,根据不同的功能需求,我们提供了不同级别的 register api。

640.png

函数构成

已知 funcs 的 value 是函数主体,我们来看一下 Function 在 Databend 中是怎么构建的。

pub struct Function {
    pub signature: FunctionSignature,
    #[allow(clippy::type_complexity)]
    pub calc_domain: Box<dyn Fn(&[Domain]) -> Option<Domain>>,
    #[allow(clippy::type_complexity)]
    pub eval: Box<dyn Fn(&[ValueRef<AnyType>], FunctionContext) -> Result<Value<AnyType>, String>>,
}

其中,signature 包括 函数名,参数类型,返回类型以及函数特性(目前暂未有函数使用特性,仅作为保留位)。要特别注意的是,在注册时函数名需要是小写。而一些 token 会经过 src/query/ast/src/parser/token.rs 转换。

#[allow(non_camel_case_types)]
#[derive(Logos, Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum TokenKind {
    ...
    #[token("+")]
    Plus,
    ...
}

以实现 `select 1+2` 的加法函数为例子,`+` 被转换为 Plus,而函数名需要小写,因此我们在注册时函数名使用 `plus`

with_number_mapped_type!(|NUM_TYPE| match left {
    NumberDataType::NUM_TYPE => {
        registry.register_1_arg::<NumberType<NUM_TYPE>, NumberType<NUM_TYPE>, _, _>(
            "plus",
            FunctionProperty::default(),
            |lhs| Some(lhs.clone()),
            |a, _| a,
        );
    }
});

calc_domain 用来计算输出值的输入值的集合。用数学公式描述的话比如 `y = f(x)` 其中域就是 x 值的集合,可以作为 f 的参数生成 y 值。这可以使我们在索引数据时轻松过滤掉不在域内的值,极大提升响应效率。

eval 可以理解成函数的具体实现内容。本质是接受一些字符或者数字,将他们解析成表达式,再转换成另外一组值。

示例

目前在 function-v2 中实现的函数有这几类:arithmetric, array, boolean, control, comparison, datetime, math, string, string_mult_args, variant

以 length 的实现为例:

length 接受一个 String 类型的值为参数,返回一个 Number 类型。名字为 length,domain 不做限制(因为任何 string 都有长度)最后一个参数是一个闭包函数,作为 length 的 eval 实现部分。

registry.register_1_arg::<StringType, NumberType<u64>, _, _>(
    "length",
    FunctionProperty::default(),
    |_| None,
    |val, _| val.len() as u64,
);

在 register_1_arg 的实现中,我们看到调用的函数是 register_passthrough_nullable_1_arg,函数名包含一个 nullable。而 eval 被 vectorize_1_arg 调用。

注意:请不要手动修改 register_1_arg 所在的文件 src/query/expression/src/register.rs 。因为它是被 src/query/codegen/src/writes/register.rs 生成的。

pub fn register_1_arg<I1: ArgType, O: ArgType, F, G>(
    &mut self,
    name: &'static str,
    property: FunctionProperty,
    calc_domain: F,
    func: G,
) where
    F: Fn(&I1::Domain) -> Option<O::Domain> + 'static + Clone + Copy,
    G: Fn(I1::ScalarRef<'_>, FunctionContext) -> O::Scalar + 'static + Clone + Copy,
{
    self.register_passthrough_nullable_1_arg::<I1, O, _, _>(
        name,
        property,
        calc_domain,
        vectorize_1_arg(func),
    )
}

这是因为 eval 在实际应用场景中接受的不只是字符或者数字,还可能是 null 或者其他各种类型。而 null 无疑是最特殊的一种。而我们接收的参数也可能是一个列或者一个值。比如

select length(null);
+--------------+
| length(null) |
+--------------+
|         NULL |
+--------------+
select length(id) from t;
+------------+
| length(id) |
+------------+
|          2 |
|          3 |
+------------+

基于此,如果我们在函数中无需对 null 类型的值做特殊处理,直接使用 register_x_arg 即可。如果需要对 null 类型做特殊处理,参考 try_to_timestamp

而对于需要在 vectorize 中进行特化的函数则需要调用 register_passthrough_nullable_x_arg,对要实现的函数进行特定的向量化优化。

例如 comparison 函数 regexp 的实现:regexp 接收两个 String 类型的值,返回 Bool 值。在向量化执行中,为了进一步优化减少重复正则表达式的解析,引入了 HashMap 结构。因此单独实现了 `vectorize_regexp`

registry.register_passthrough_nullable_2_arg::<StringType, StringType, BooleanType, _, _>(
    "regexp",
    FunctionProperty::default(),
    |_, _| None,
    vectorize_regexp(|str, pat, map, _| {
        let pattern = if let Some(pattern) = map.get(pat) {
            pattern
        } else {
            let re = regexp::build_regexp_from_pattern("regexp", pat, None)?;
            map.insert(pat.to_vec(), re);
            map.get(pat).unwrap()
        };
        Ok(pattern.is_match(str))
    }),
);

函数测试

Unit Test

函数相关单元测试在 scalars 目录中。

Logic Test

Functions 相关的 logic 测试在 02_function 目录中。

关于 Databend

Databend 是一款开源、弹性、低成本,基于对象存储也可以做实时分析的新式数仓。期待您的关注,一起探索云原生数仓解决方案,打造新一代开源 Data Cloud。