Skip to main content

One post tagged with "Product"

View All Tags

image.png

❤️ 友情提示:代码演进较快,请注意文档的时效性哦!

引言

Databend 将存储引擎抽象成一个名为 Table 的接口,源码位于 query/catalog/src/table.rs

Table 接口定义了 readappendalteroptimizetruncate 以及 recluster 等方法,负责数据的读写和变更。解释器(interpreter)通过调用 Table trait 的方法生成物理执行的 pipeline

通过实现 Table 接口的方法,可以定义 Databend 的存储引擎,不同的实现对应不同的引擎。

Storage 主要关注 Table 接口的具体实现,涉及表的元信息,索引信息的管理,以及与底层 IO 的交互。

image.png

join order 的重要性

Join order 是指在执行SQL查询时,决定多个表进行 join 的顺序。它是数据库查询优化的一个重要方面,对查询性能和效率有着重要的影响, 不同的 join order 对性能可能有数量级的影响。

优化器优化 join order 的核心流程

  1. join plan 枚举
  2. 根据统计信息估算结果的大小 (cardinality estimation)
  3. 把 2 中的结果带入到代价模型计算枚举 plan 的 代价 (cost model)

本文中我们只关心第一步 join plan enumeration, 也就是 join reorder 算法。

join reorder 算法

贪心启发式算法

当需要 join 的表都数量过多时(通常超过10个),适合用贪心算法,其优势在于能够较快的找到还不错的 join order.

核心思想:从一张表拓展到 N 张表,每次选出使当前代价最小的一张表,加入到 join tree中,构建出 left-deep tree.

贪心算法也有很多拓展,主要拓展点是围绕如果 避免局部最优以及产生 bushy tree

枚举算法(top-down & bottom-up)

主流的两种

  • 基于规则变换的 Top-down 枚举,可以结合 Top-down cascasde 框架通过记忆化的方式来实现
  • 基于 DP 的 bottom-up 枚举, 典型代表 DPhyp 算法,其优势在于可以高效的产生 bushy tree

一般情况下,数据库系统会把贪心和枚举有效的结合,从而对任意数量的表 join 都能够在合理的时间内得到有效的 join order

Databend join reorder 现状

databend 优化器基于 Rule 进行优化,每条 Rule 通过 pattern 来匹配 plan 中的 sub-tree。主要分为两个阶段,启发式优化和基于 cascades 框架的代价优化,两个阶段共用一套 Rule。

在启发式阶段优化结束后,会对优化后的 plan 执行 DPhyp 算法来尝试得到最优的 join order, 如果 Dphyp 优化失败,会在 CBO 中找到最优的 left-deep tree. (CBO 中不尝试进行 bushy tree 优化,因为如果 Dphyp 已经优化失败,那么尝试在 CBO 中进行 bushy tree 优化,搜索空间很有可能爆炸,如 tpcds 64)

Databend 目前没有支持贪心算法 (下一阶段的 roadmap 会做相关支持来处理极端情况下的 case),首先会利用 dphyp 算法来得到最优解,如果 dphyp 失败(query 中存在不适合 dphyper 算法的 pattern),会在 cascades 框架中利用基于规则变换的 Top-down 枚举得到 left-deep tree. 如果表的数量过多,如超过十个,会在 dphyp 算法中放弃部分搜索空间来做 tradeoff。

Dphyp 的核心定义及算法

hypergraph

一个超图是一个由节点集合 V 和超边集合 E 组成的二元组 H = (V,E),其中:

  1. V是非空节点集合。
  2. E是超边集合,超边是V的非空子集(u ⊂ V)和(v ⊂ V)的无序对(u,v),并且满足额外条件 u∩v = ∅。

有了超图就可以描述多节点之间连接。

image.png

对于上图,它们的 join condition 是 R1.a + R2.b + R3.c = R4.d + R5.e + R6.f 所以 hyperedge 就是 {R1, R2, R3} — {R4, R5, R6}

csg-cmp-pair

csg: connected-subgraph (连通子图)

cmp: connected-complement (连通互补对)

如果两个 csg 之间没有交集,且存在超边连接,其中一个就是另一个的 cmp, 二者构成 csg-cmp-pair. 算法的核心就是通过递归无重复的枚举出所有的 csg-cmp-pair, 找出代价最小的包含所有点的 csg-cmp-pair.

algorithm

算法核心:hypergraph 中的节点是有序的,节点从后往前迭代(递减),每个节点只考虑其自身及其之后(序列号更大)的节点,找到可能的连通子图及其连通互补图,构成 csg-cmp-pair, 计算并更新出其代价,当迭代到最小的节点后,会得到包含所有点的 csg-cmp-pair, 算法结束。

算法流程

  1. EmitCsg: 寻找 {v} 的互补连通子图
    • a. 如果找到, EmitCsgCmp
    • b. EnumerateCmpRec:扩展互补连通子图
      • 如果扩展后的互补连通子图可以与 {v} 形成 csg-cmp-pair, 则 EmitCsgCmp
      • 回到 b,继续扩展
  2. EnumerateCsgRec: 扩展 {v}
    • a. 得到扩展后的 {v’}, 对 {v’} 执行 1
    • b. 回到2,继续扩展

核心代码和数据结构定义可参考:

(https://github.com/datafuselabs/databend/blob/main/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs)

image.png

Kafka Connect 介绍

Kafka Connect 是一个用于在 Apache Kafka® 和其他数据系统之间可扩展且可靠地流式传输数据的工具。通过将数据移入和移出 Kafka 进行标准化,使得快速定义连接器以在 Kafka 中传输大型数据集变得简单,可以更轻松地构建大规模的实时数据管道。

image.png

我们使用 Kafka Connector 读取或写入外部系统、管理数据流以及扩展系统,所有这些都无需开发新代码。Kafka Connect 管理与其他系统连接时的所有常见问题(Schema 管理、容错、并行性、延迟、投递语义等),每个 Connector 只关注如何在目标系统和 Kafka 之间复制数据。

Kafka 连接器通常用来构建 data pipeline,一般有两种使用场景:

  • 开始和结束的端点: 例如,将 Kafka 中的数据导出到 Databend 数据库,或者把 Mysql 数据库中的数据导入 Kafka 中。

  • 数据传输的中间媒介: 例如,为了把海量的日志数据存储到 Elasticsearch 中,可以先把这些日志数据传输到 Kafka 中,然后再从 Kafka 中将这些数据导入到 Elasticsearch 中进行存储。Kafka 连接器可以作为数据管道各个阶段的缓冲区,将消费者程序和生产者程序有效地进行解耦。

Kafka Connect 分为两种:

  • Source Connect: 负责将数据导入 Kafka。
  • Sink Connect: 负责将数据从 Kafka 系统中导出到目标表。

image.png

Databend Kafka Connect

Kafka 目前在 Confluent Hub 上提供了上百种 Connector,比如 Elasticsearch Service Sink Connector, Amazon Sink Connector, HDFS Sink 等,用户可以使用这些 Connector 以 Kafka 为中心构建任意系统之间的数据管道。现在我们也为 Databend 提供了 Kafka Connect Sink Plugin,这篇文章我们将会介绍如何使用 MySQL JDBC Source Connector 和 Databend Sink Connector 构建实时的数据同步管道。

image.png

启动 Kafka Connect

本文假定操作的机器上已经安装 Apache Kafka,如果用户还没有安装,可以参考 Kafka quickstart 进行安装。

Kafka Connect 目前支持两种执行模式:Standalone 模式和分布式模式。

启动模式

Standalone 模式

在 Standalone 模式下,所有的工作都在单个进程中完成。这种模式更容易配置以及入门,但不能充分利用 Kafka Connect 的某些重要功能,例如,容错。我们可以使用如下命令启动 Standalone 进程:

bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]

第一个参数 config/connect-standalone.properties 是 worker 的配置。这其中包括 Kafka 连接参数、序列化格式以及提交 Offset 的频率等配置:

bootstrap.servers=localhost:9092
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

后面的配置是指定要启动的 Connector 的参数。上述提供的默认配置适用于使用 config/server.properties 提供的默认配置运行的本地集群。如果使用不同配置或者在生产部署,那就需要对默认配置做调整。但无论怎样,所有 Worker(独立的和分布式的)都需要一些配置:

  • bootstrap.servers: 该参数列出了将要与 Connect 协同工作的 broker 服务器,Connector 将会向这些 broker 写入数据或者从它们那里读取数据。你不需要指定集群的所有 broker,但是建议至少指定 3 个。

  • key.converter 和 value.converter: 分别指定了消息键和消息值所使用的的转换器,用于在 Kafka Connect 格式和写入 Kafka 的序列化格式之间进行转换。这控制了写入 Kafka 或从 Kafka 读取的消息中键和值的格式。由于这与 Connector 没有任何关系,因此任何 Connector 可以与任何序列化格式一起使用。默认使用 Kafka 提供的 JSONConverter。有些转换器还包含了特定的配置参数。例如,通过将 key.converter.schemas.enable 设置成 true 或者 false 来指定 JSON 消息是否包含 schema。

  • offset.storage.file.filename: 用于存储 Offset 数据的文件。

这些配置参数可以让 Kafka Connect 的生产者和消费者访问配置、Offset 和状态 Topic。配置 Kafka Source 任务使用的生产者和 Kafka Sink 任务使用的消费者,可以使用相同的参数,但需要分别加上‘producer.’和‘consumer.’前缀。bootstrap.servers 是唯一不需要添加前缀的 Kafka 客户端参数。

distributed 模式

分布式模式可以自动平衡工作负载,并可以动态扩展(或缩减)以及提供容错。分布式模式的执行与 Standalone 模式非常相似:

bin/connect-distributed.sh config/connect-distributed.properties

不同之处在于启动的脚本以及配置参数。在分布式模式下,使用 connect-distributed.sh 来代替 connect-standalone.sh。第一个 worker 配置参数使用的是 config/connect-distributed.properties 配置文件:

bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
#offset.storage.partitions=25
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
#status.storage.partitions=5
offset.flush.interval.ms=10000

Kafka Connect 将 Offset、配置以及任务状态存储在 Kafka Topic 中。建议手动创建 Offset、配置和状态的 Topic,以达到所需的分区数和复制因子。如果在启动 Kafka Connect 时尚未创建 Topic,将使用默认分区数和复制因子来自动创建 Topic,这可能不适合我们的应用。在启动集群之前配置如下参数至关重要:

  • group.id: Connect 集群的唯一名称,默认为 connect-cluster。具有相同 group id 的 worker 属于同一个 Connect 集群。需要注意的是这不能与消费者组 ID 冲突。

  • config.storage.topic: 用于存储 Connector 和任务配置的 Topic,默认为 connect-configs。需要注意的是这是一个只有一个分区、高度复制、压缩的 Topic。我们可能需要手动创建 Topic 以确保配置的正确,因为自动创建的 Topic 可能有多个分区或自动配置为删除而不是压缩。

  • offset.storage.topic: 用于存储 Offset 的 Topic,默认为 connect-offsets。这个 Topic 可以有多个分区。

  • status.storage.topic: 用于存储状态的 Topic,默认为 connect-status。这个 Topic 可以有多个分区。

需要注意的是在分布式模式下需要通过 rest api 来管理 Connector。

比如:

GET /connectors – 返回所有正在运行的connector名。
POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
GET /connectors/{name} – 获取指定connetor的信息。
GET /connectors/{name}/config – 获取指定connector的配置信息。
PUT /connectors/{name}/config – 更新指定connector的配置信息。

配置 Connector

MySQL Source Connector

  1. 安装 MySQL Source Connector Plugin

这里我们使用 Confluent 提供的 JDBC Source Connector。

从 Confluent hub 下载 Kafka Connect JDBC 插件并将 zip 文件解压到 /path/kafka/libs 目录下。

  1. 安装 MySQL JDBC Driver

因为 Connector 需要与数据库进行通信,所以还需要 JDBC 驱动程序。JDBC Connector 插件也没有内置 MySQL 驱动程序,需要我们单独下载驱动程序。MySQL 为许多平台提供了 JDBC 驱动程序。选择 Platform Independent 选项,然后下载压缩的 TAR 文件。该文件包含 JAR 文件和源代码。将此 tar.gz 文件的内容解压到一个临时目录。将 jar 文件(例如,mysql-connector-java-8.0.17.jar),并且仅将此 JAR 文件复制到与 kafka-connect-jdbc jar 文件相同的 libs 目录下:

cp mysql-connector-j-8.0.32.jar /opt/homebrew/Cellar/kafka/3.4.0/libexec/libs/
  1. 配置 MySQL Connector

/path/kafka/config 下创建 mysql.properties 配置文件,并使用下面的配置:

name=test-source-mysql-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/mydb?useSSL=false
connection.user=root
connection.password=123456
#mode=timestamp+incrementing
mode=incrementing
table.whitelist=mydb.test_kafka
poll.interval.ms=1000
table.poll.interval.ms=3000
incrementing.column.name=id
#timestamp.column.name=tms
topics=test_kafka

针对配置我们这里重点介绍 modeincrementing.column.name ,和 timestamp.column.name 几个字段。Kafka Connect MySQL JDBC Source 提供了三种增量同步模式:

  • incrementing
  • timestamp
  • timestamp+incrementing
  1. 在 incrementing 模式下,每次都是根据 incrementing.column.name 参数指定的列,查询大于自上次拉取的最大 id:
SELECT * FROM mydb.test_kafka
WHERE id > ?
ORDER BY id ASC

这种模式的缺点是无法捕获行上更新操作(例如,UPDATE、DELETE)的变更,因为无法增大该行的 id。

  1. timestamp 模式基于表上时间戳列来检测是否是新行或者修改的行。该列最好是随着每次写入而更新,并且值是单调递增的。需要使用 timestamp.column.name 参数指定时间戳列。

需要注意的是时间戳列在数据表中不能设置为 Nullable.

在 timestamp 模式下,每次都是根据 timestamp.column.name 参数指定的列,查询大于自上次拉取成功的 gmt_modified:

SELECT * FROM mydb.test_kafka
WHERE tms > ? AND tms < ?
ORDER BY tms ASC

这种模式可以捕获行上 UPDATE 变更,缺点是可能造成数据的丢失。由于时间戳列不是唯一列字段,可能存在相同时间戳的两列或者多列,假设在导入第二条的过程中发生了崩溃,在恢复重新导入时,拥有相同时间戳的第二条以及后面几条数据都会丢失。这是因为第一条导入成功后,对应的时间戳会被记录已成功消费,恢复后会从大于该时间戳的记录开始同步。此外,也需要确保时间戳列是随着时间递增的,如果人为的修改时间戳列小于当前同步成功的最大时间戳,也会导致该变更不能同步。

  1. 仅使用 incrementing 或 timestamp 模式都存在缺陷。将 timestamp 和 incrementing 一起使用,可以充分利用 incrementing 模式不丢失数据的优点以及 timestamp 模式捕获更新操作变更的优点。需要使用 incrementing.column.name 参数指定严格递增列、使用 timestamp.column.name 参数指定时间戳列。
SELECT * FROM mydb.test_kafka
WHERE tms < ?
AND ((tms = ? AND id > ?) OR tms > ?)
ORDER BY tms, id ASC

由于 MySQL JDBC Source Connector 是基于 query-based 的数据获取方式,使用 SELECT 查询来检索数据,并没有复杂的机制来检测已删除的行,所以不支持 DELETE 操作。可以使用基于 log-based 的 [Kafka Connect Debezium]。

后面的演示中会分别演示上述模式的效果。更多的配置参数可以参考 MySQL Source Configs

Databend Kafka Connector

  1. 安装 OR 编译 Databend Kafka Connector

可以从源码编译得到 jar 或者从 release 直接下载。

git clone https://github.com/databendcloud/databend-kafka-connect.git & cd databend-kafka-connect
mvn -Passembly -Dmaven.test.skip package

databend-kafka-connect.jar 拷贝至 /path/kafka/libs 目录下。

  1. 安装 Databend JDBC Driver

Maven Central 下载最新的 Databend JDBC 并拷贝至 /path/kafka/libs 目录下。

  1. 配置 Databend Kafka Connector

/path/kafka/config 下创建 mysql.properties 配置文件,并使用下面的配置:

name=databend
connector.class=com.databend.kafka.connect.DatabendSinkConnector

connection.url=jdbc:databend://localhost:8000
connection.user=databend
connection.password=databend
connection.attempts=5
connection.backoff.ms=10000
connection.database=default

table.name.format=default.${topic}
max.retries=10
batch.size=1
auto.create=true
auto.evolve=true
insert.mode=upsert
pk.mode=record_value
pk.fields=id
topics=test_kafka
errors.tolerance=all

auto.createauto.evolve 设置成 true 后会自动建表并在源表结构发生变化时同步到目标表。关于更多配置参数的介绍可以参考 Databend Kafka Connect Properties

测试 Databend Kafka Connect

准备各个组件

  1. 启动 MySQL
version: '2.1'
services:
postgres:
image: debezium/example-postgres:1.1
ports:
- "5432:5432"
environment:
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
mysql:
image: debezium/example-mysql:1.1
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=123456
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
  1. 启动 Databend
version: '3'
services:
databend:
image: datafuselabs/databend
volumes:
- /Users/hanshanjie/databend/local-test/databend/databend-query.toml:/etc/databend/query.toml
environment:
QUERY_DEFAULT_USER: databend
QUERY_DEFAULT_PASSWORD: databend
MINIO_ENABLED: 'true'
ports:
- '8000:8000'
- '9000:9000'
- '3307:3307'
- '8124:8124'
  1. 以 standalone 模式启动 Kafka Connect,并加载 MySQL Source Connector 和 Databend Sink Connector:
./bin/connect-standalone.sh config/connect-standalone.properties config/databend.properties config/mysql.properties
[2023-09-06 17:39:23,128] WARN [databend|task-0] These configurations '[metrics.context.connect.kafka.cluster.id]' were supplied but are not used yet. (org.apache.kafka.clients.consumer.ConsumerConfig:385)
[2023-09-06 17:39:23,128] INFO [databend|task-0] Kafka version: 3.4.0 (org.apache.kafka.common.utils.AppInfoParser:119)
[2023-09-06 17:39:23,128] INFO [databend|task-0] Kafka commitId: 2e1947d240607d53 (org.apache.kafka.common.utils.AppInfoParser:120)
[2023-09-06 17:39:23,128] INFO [databend|task-0] Kafka startTimeMs: 1693993163128 (org.apache.kafka.common.utils.AppInfoParser:121)
[2023-09-06 17:39:23,148] INFO Created connector databend (org.apache.kafka.connect.cli.ConnectStandalone:113)
[2023-09-06 17:39:23,148] INFO [databend|task-0] [Consumer clientId=connector-consumer-databend-0, groupId=connect-databend] Subscribed to topic(s): test_kafka (org.apache.kafka.clients.consumer.KafkaConsumer:969)
[2023-09-06 17:39:23,150] INFO [databend|task-0] Starting Databend Sink task (com.databend.kafka.connect.sink.DatabendSinkConfig:33)
[2023-09-06 17:39:23,150] INFO [databend|task-0] DatabendSinkConfig values:...

Insert

Insert 模式下我们需要使用如下的 MySQL Connector 配置:

name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/mydb?useSSL=false
connection.user=root
connection.password=123456
#mode=timestamp+incrementing
mode=incrementing
table.whitelist=mydb.test_kafka
poll.interval.ms=1000
table.poll.interval.ms=3000
incrementing.column.name=id
#timestamp.column.name=tms
topics=test_kafka

在 MySQL 中创建数据库 mydb 和表 test_kafka:

CREATE DATABASE mydb;
USE mydb;

CREATE TABLE test_kafka (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));
ALTER TABLE test_kafka AUTO_INCREMENT = 10;

在插入数据之前,databend-kafka-connect 并不会收到 event 进行建表和数据写入。

插入数据:

INSERT INTO test_kafka VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"cloud","test for databend"),
(default,"spare tire","24 inch spare tire");

源表端插入数据后,

Databend 目标端的表就新建出来了:

image.png

同时数据也会成功插入:

image.png

Support DDL

我们在配置文件中 auto.evolve=true,所以在源表结构发生变化的时候,会将 DDL 同步至目标表。这里我们正好需要将 MySQL Source Connector 的模式从 incrementing 改成 timestamp+incrementing,需要新增一个 timestamp 字段并打开 timestamp.column.name=tms 配置。我们在原表中执行:

alter table test_kafka add column tms timestamp;

并插入一条数据:

insert into test_kafka values(20,"new data","from kafka",now());

到目标表中查看:

image.png

发现 tms 字段已经同步至 Databend table,并且该条数据也已经插入成功:

image.png

Upsert

修改 MySQL Connector 的配置为:

name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/mydb?useSSL=false
connection.user=root
connection.password=123456
mode=timestamp+incrementing
#mode=incrementing
table.whitelist=mydb.test_kafka
poll.interval.ms=1000
table.poll.interval.ms=3000
incrementing.column.name=id
timestamp.column.name=tms
topics=test_kafka

主要是将 mode 改为 timestamp+incrementing并添加 timestamp.column.name 字段。

重启 Kafka Connect。

在源表中更新一条数据:

update test_kafka set name="update from kafka test" where id=20;

到目标表中可以看到更新的数据:

image.png

总结

通过上面的内容可以看到 Databend Kafka Connect 具有以下特性:

  • Table 和 Column 支持自动创建: auto.createauto-evolve 的配置支持下,可以自动创建 Table 和 Column,Table name 是基于 Kafka topic name 创建的;

  • Kafka Shemas 支持: Connector 支持 Avro、JSON Schema 和 Protobuf 输入数据格式。必须启用 Schema Registry 才能使用基于 Schema Registry 的格式;

  • 多个写入模式: Connector 支持 insertupsert 写入模式;

  • 多任务支持: 在 Kafka Connect 的能力下,Connector 支持运行一个或多个任务。增加任务的数量可以提高系统性能;

  • 高可用: 分布式模式下可以自动平衡工作负载,并可以动态扩展(或缩减)以及提供容错能力。

同时,Databend Kafka Connect 也能够使用原生 Connect 支持的配置,更多配置参考 Kafka Connect Sink Configuration Properties for Confluent Platform

image.png

对于 Databend 这样复杂的数据库服务端程序,往往需要支持大量的可配置选项,以帮助运维人员根据实际使用需要管理和调优系统。

Databend 目前支持三种配置方式:命令行、环境变量和配置文件,优先级依次递减。

  • 一般情况下,推荐使用配置文件来记录和管理各种配置。
  • 对于 K8S 集群,为了灵活变更部分配置(比如,特性开关),使用环境变量可能是更优雅的形式。
  • 命令行则用于调整本地环境下的少数冲突配置。

Databend Query 中的映射

对于 databend-query ,不管是什么形式的配置,其配置选项几乎可以看作是代码的扁平化树形映射,即基本符合代码中「配置域」+「配置项」的逻辑。

  • 环境变量和配置文件中,利用 serfig 将代码嵌套展开,使用 _ 做为分隔符。
  • 命令行中稍有不同:一方面,分隔符使用 -;另一方面,部分命令行选项的名称中没有绑定配置域。

为了更好理解这里的映射关系,我们可以深入到具体一项配置,下面将围绕 admin_api_address 这个配置项展开。

  • 在环境变量上,需要使用 QUERY_ADMIN_API_ADDRESSQUERY 表征这个配置所处的域,而 ADMIN_API_ADDRESS 是具体的配置项。
  • 在配置文件中,通常是使用 toml 来进行配置。 [query] 表征配置所处的域,admin_api_address 为具体的配置项。
[query]
...
# Databend Query http address.
# For admin RESET API.
admin_api_address = "0.0.0.0:8081"
...
  • 命令行中需要使用 --admin-api-address 进行配置,这一项没有绑定「配置域」。如果是配置 --storage-s3-access-key-id ,那么「storage」+ 「s3」构成配置域,「access-key-id」是具体的配置项。

在了解如何对 admin_api_address 进行配置后,让我们进入到配置相关的代码,进一步查看映射关系的代码形式(位于 src/query/config/src/config.rs)。

pub struct Config {
...

// Query engine config.
#[clap(flatten)]
pub query: QueryConfig,

...
}

/// Query config group.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Args)]
#[serde(default, deny_unknown_fields)]
pub struct QueryConfig {
...

#[clap(long, default_value = "127.0.0.1:8080")]
pub admin_api_address: String,

...
}

因为代码中使用了嵌套的层级结构,最上层是 Config,而 admin_api_addresspub query: QueryConfig 中的一个配置项,经过 serfig 处理后,需要使用 QUERY 或者 [query] 表征其所处的域,配置项就还是 admin_api_address

而命令行中具体的配置项名称和默认值会受到 #[clap(long = "<long-name>", default_value = "<value>")] 控制),clap 会接管配置:

  • admin_api_address 就变成了 --admin-api-address
  • --storage-s3-access-key-id 而言,其实际的代码层级是 Config -> StorageConfig -> S3StorageConfig -> access_key_id,字段之上有标注 #[clap(long = "storage-s3-access-key-id", default_value_t)] ,所以需要使用 --storage-s3-access-key-id 进行配置。

Databend Meta 中的映射

databend-meta 的配置文件和命令行逻辑与 databend-query 是基本一致的。但是环境变量是通过 serfig 内置的 serde-env 自行定义的映射关系(但同样可以尝试按「配置域」+「配置项」进行理解)。

同样具体到单独的某项配置来看一下,这里以 log_dir 为例。

  • 在环境变量上,需要使用 METASRV_LOG_DIRMETASRV 表征这个配置所处的域,而 LOG_DIR 是具体的配置项。
  • 而在配置文件中,这一配置项作用于全局,只需要:
log_dir                 = "./.databend/logs1"
  • 在命令行中当然也直接 --log-dir 进行配置。

让我们通过代码来解构其映射,代码位于 src/meta/service/src/configs/outer_v0.rs

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Parser)]
#[clap(about, version = &**METASRV_COMMIT_VERSION, author)]
#[serde(default)]
pub struct Config {
...
/// Log file dir
#[clap(long = "log-dir", default_value = "./.databend/logs")]
pub log_dir: String,
...
}

配置文件和命令行参数相关的配置项是由 Config 结构体管理的,逻辑与 databend-query 一致,就不再赘述。

而环境变量的配置项是由 ConfigViaEnv 结构体进行处理的,如下:

/// #[serde(flatten)] doesn't work correctly for env.
/// We should work around it by flatten them manually.
/// We are seeking for better solutions.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct ConfigViaEnv {
...
pub metasrv_log_dir: String,
...
}

Config 之间的映射关系位于 impl From<Config> for ConfigViaEnvimpl Into<Config> for ConfigViaEnv 这两个部分。对于 metasrv_log_dir 而言,就是映射到前面的 log_dir 字段。

由「3306π」社区主办,「Databend」参与协办的「数据库朋友圈」活动于 9 月 16 日在北京360大厦成功举办!该活动汇集了数据库领域的资深专家和企业家,共同探讨数据库技术变革。

下午,Databend Labs 联合创始人张雁飞作为「Serverless 数仓技术与挑战」专题的演讲嘉宾进行了分享。

image.png

主题: 「Serverless 数仓技术与挑战」

演讲嘉宾: 张雁飞

嘉宾介绍: Databend Labs 联合创始人。前青云数据库团队负责人、开源 Databend 项目主要负责人。

演讲大纲: 传统数仓在扩展性、成本和管理等方面具有局限性。在本次分享中,我们将介绍一种新型的 Serverless 数仓技术,这种技术不仅能够解决传统数仓的痛点,还能显著提升性能并降低成本。此外,我们还将讨论 Serverless 数仓所面临的技术挑战。

  • 传统数仓的局限性
  • 理想的 Serverless 数仓架构
  • 如何实现 Serverless 数仓以及有哪些挑战

以下为本次演讲的精彩内容:

当今(2023)大数据分析新问题

大数据分析面临的新问题

  • 近 5 年生产了 ~90% 数据

    • image.png
    •   根据 IDC 的统计和预测,近 5 年来产生了大约 90% 的数据。这里用的单位是 zttabytes(ZB),1024PB = 1EB,1024EB = 1ZB 是一个非常庞大的数字。过去的大数据架构难以适应当下的数据规模,亟需变更,怎么样才能做到弹性和 Serverless 拓展,从而匹配业务增长?
  • 计算和存储成本高昂

    • image.png
    •   在企业的IT基础设施中,云厂商提供的计算和存储服务导致了高昂的成本。经测算,如果为 EC2 实例创建总容量为 500TB 的 SSD(GP2)存储,每个月在 EBS 服务上将会花费超过 7 万 5 千美元。如何才能在保证低廉成本的同时满足业务性能需求,提供经济、高效能的大数据架构?
  • 大数据平台越来越复杂

    • image.png
    •   上图是知名投资机构 a16z 绘制的统一数据基础设施架构全景图,不难看出,庞大的数据量和复杂的数据需求,导致大数据平台也变得日益复杂,需要数十种工具紧密协作,对于产品生态愈发严苛的要求。如何无缝与其他工具进行集成,并且复用现有基础设施?

大数据架构,能否完美实现

上述这些问题,为大数据架构提出了新的要求,特别是在以下几个维度上,能否做到“完美”实现

  • 存储成本:极致低廉
  • 计算控制:极致精细,支持算子在 Lambda 函数中运行
  • 集群控制:极致弹性,按需伸缩、启停
  • 架构特点:all-in-one platform,完全 Serverless 化
  • 未来规划:为未来的云端大数据做好准备

传统数仓架构 vs. 弹性数仓架构

在进入到架构对比之前,我们先来看一个成本估测公式:Cost = Resource * Time ,也就是成本大致可以用资源与时间的乘积进行测算。

传统数仓架构

image.png

传统数仓往往采用 Shared-Nothing 架构,存储、计算一体化设计,弹性相对较弱。而且由于调度上采用资源固定(Fixed-Set)式调度策略,资源控制粒度粗,也会带来更多的成本。

对应到成本估测公式上,在时间一定的情况下,由于耗费资源数量较大,成本将会居高不下。

弹性数仓架构

image.png

弹性数仓则采用 Shared-Storage 架构,底层可以使用对象存储,真正做到存储、计算分离,从而支持实时弹性扩容和缩容以及资源按需(Workload-Based)式调度,资源控制粒度更细。

对应到成本估测公式上,相较于传统数仓,弹性数仓的成本将会显著降低:存储成本可以按实际使用量折算,不需要为冗余的存储进行服务;而计算成本则根据业务需要实时调度,按需启停,按量计费,无需保有大量空闲计算资源。

Databend: 新一代云数仓架构设计

新一代云数仓的架构新在哪里?影响现代云数仓架构设计的因素和挑战都有哪些?这一部分将会给你答案。

新一代云数仓

现有数仓的局限

ClickHouse 是一款流行的开源数仓,以性能卓越著称。采用向量化计算技术,细节优化非常到位。具有 Pipeline 处理器和调度器以及 MergeTree + Wide-Column 存储引擎,单机性能非常强悍。

缺点: 分布式能力弱,无法应对复杂分析,运维复杂度高,不是为云设计。

Snowflake 则是一款云数仓,支持多租户,存储、计算分离。基于对象存储,存储介质便宜。弹性能力非常强悍,面向云架构设计。

缺点: 单机性能一般,比较依赖分布式集群能力。

Databend = ClickHouse + Snowflake + Rust

前面列出的是目前在开源和商业化领域领先的两款数仓产品,看上去性能和弹性无法兼得,想要低成本和弹性计算是不是就必须放弃单节点的极致优化呢?我们来看一下 Databend 交出的答卷。

  • 借鉴 ClickHouse 向量化计算,提升单机计算性能。
  • 借鉴 Snowflake 存储、计算分离思想,提升分布式计算能力。
  • 借鉴 Git,MVCC 列式存储引擎,支持 Insert / Read / Delete / Update / Merge 等操作,以及 Time Travel 等高级特性。
  • 全面支持 HDFS 、基于云的对象存储、IPFS 等 20 多种存储协议。
  • 基于便宜的对象存储也能方便的做实时性分析。
  • 完全使用 Rust 研发(超过 33 万行代码),研发第一天就在 Github 开源。
  • 高弹性 + 强分布式,致力于解决大数据分析成本和复杂度问题。

云数仓架构设计

Databend Cloud 架构全景图

Databend Cloud 是基于开源云原生数仓项目 Databend 打造的一款易用、低成本、高性能的新一代大数据分析平台,提供一站式 SaaS 服务,免运维、开箱即用。下面是 Databend Cloud 的架构全景图,也是 Databend Labs 团队对新一代云数仓的架构的设计与实现。

image.png

影响云数仓架构设计的因素与挑战

Databend / Databend Cloud 之所以演化出现在的架构,是因为新一代云数仓除了要在性能上比肩传统数仓、弹性上对标弹性数仓之外,还必须解决下面几个重要的问题:

  • Ingest 海量数据网络费用问题:传统 INSERT 模式费用昂贵,需要一套基于 S3 的免费方案。
  • 对象存储不是为数仓而设计,延迟和性能如何平衡:Network-Bound -> IO-Bound -> CPU-Bound 。
  • 如何让系统更加智能,根据查询模式自动创建索引:如何让某些场景的 Query 越跑越快...
  • 如何面向 Warehouse + Datalake 双重需求设计?

前两个问题是云带来的挑战,而后两个问题将直面用户需求,一旦考虑清楚这些问题,云数仓的架构也就呼之欲出了。

Databend 生态全景图

数仓的产品的成败,除了本身的设计和实现之外,也非常依赖数据生态,其关键在于解决数据的输入与输出问题。

Databend 自身支持一定 ETL 能力,能够使用 Stage 和 Multiple Catalog 挂载外部数据源,提供全量、增量、条件等多种导入方式,支持使用 PRESIGN 上传和下载数据。

Databend 积极融入大数据生态,拓展「Databend 朋友圈」,提供全链路解决方案,帮助用户将数据转化为商业洞见。

image.png

Databend 为用户提供价值

Databend 是一款开源、开放,运维简单、分钟级部署,为云端海量数据分析而设计的新一代云数仓。

我们在前面介绍了 Databend 的设计与实现,以及在生态方面做的一些努力,但产品是否能够占据市场、满足用户需求,还需要靠数据说话。

Databend v1.0 于 2023 年 3 月 5 日正式发布,目前处于 v1.2 版本,我们统计了以下几条关键数据:

  • 替换 Trino/Presto 场景成本降低了 75%
  • 替换 Elasticsearch 场景成本降低了 90%
  • 归档场景成本降低了 95%
  • 日志和历史订单分析场景成本降低了 75%
  • ~1PB+/天(2023.9 统计)在使用 Databend 写入公有云对象存储
  • 用户来自欧洲、北美、东南亚、印度、非洲、中国等地,每月节省数百万美元

以下是一些在生产环境中使用 Databend 的用户,感谢他们一直以来的支持与陪伴。我们将继续提供更有价值的服务。

image.png

Databend 在开源社区

Databend 从第一天起就在 GitHub 上开源,目前已经成为 Rust 社区中的明星数据库项目。我们与上下游社区紧密协作,共同建设 Rust 大数据生态。Databend 目前的贡献者中不乏大公司背景,比如 SAP、Yahoo、Fortinet、Shopee、Alibaba、Tencent、ByteDance、EMQ、快手,Databend 社区正在被顶级需求、顶级场景驱动。

image.png

体验 Databend

最后,欢迎大家体验 Databend 产品与生态,与我们共同建设坚实可靠的大数据基础设施。

image.png

随着架构的不断迭代和更新,大数据系统的查询目标也从大吞吐量查询逐步转移转向快速的交互式查询,对于查询的及时响应提出了更高要求。许多企业的数仓/数据湖中都有 PB 级的数据,其中绝大多数都属于旧有系统中的历史数据,很少更新,移动起来很麻烦,重新组织元数据也需要花费大量的时间。需要解决的问题是:如何在保证现有的数据和元数据不变的情况下加速查询。

image.png

上图是一个典型的使用 Databend 加速 Hive 查询的架构。用户使用 trino 、Spark 等引擎将数据纳入 Hive 进行管理,数据的存放位置则位于 S3 、GCS 、HDFS 等存储服务之中。引入 Databend 可以带来更好的查询性能。

和 trino 以及大多数支持 Hive Catalog / Connector 的查询引擎一样,Databend 可以复用 Hive 除了运行时(查询引擎)之外的其他组件,包括用于管理文件和底层存储的存储服务和管理 SQL 表到文件和目录映射的 Hive MetaStore 。

Databend 中的数据按三层进行组织:catalog -> database -> tablecatalog 作为数据最大一层,会包含所有的数据库和表。通过 CREATE CATALOG 语句,用户可以轻松创建 Hive Catalog 。在执行查询时,需要按 <catalog>.<database>.<table> 的格式指定到表。

SELECT * FROM <catalog>.<database>.<table>;

通过这种形式,用户无需向 Databend 中导入数据,就可以直接查询位于 Hive/Iceberg Catalog 中的数据,并获得 Databend 的性能保证。

Workshop :使用 Databend 加速 Hive 查询

接下来,让我们通过两个例子,了解 Databend 是如何加速不同存储服务下的 Hive 查询的。

使用 HDFS 存储

Hive + HDFS 的实验环境可以使用 https://github.com/PsiACE/databend-workshop/tree/main/hive-hdfs 中的环境搭建

docker-compose up -d

接下来,让我们一起准备数据:

  • 进入 hive-server ,使用 beeline 连接:
docker-compose exec hive-server bash
beeline -u jdbc:hive2://localhost:10000
  • 创建数据库、表和数据,注意,需要以 Parquet 格式存储:
CREATE DATABASE IF NOT EXISTS abhighdb;

USE abhighdb;

CREATE TABLE IF NOT EXISTS alumni(
alumni_id int,
first_name string,
middle_name string,
last_name string,
passing_year int,
email_address string,
phone_number string,
city string,
state_code string,
country_code string
)
STORED AS PARQUET;

INSERT INTO abhighdb.alumni VALUES
(1,"Rakesh","Rahul","Pingle",1994,"rpingle@nps.gov",9845357643,"Dhule","MH","IN"),
(2,"Abhiram","Vijay","Singh",1994,"asingh@howstuffworks.com",9987654354,"Chalisgaon","MH","IN"),
(3,"Dhriti","Anay","Rokade",1996,"drokade@theguardian.com",9087654325,"Nagardeola","MH","IN"),
(4,"Vimal","","Prasad",1995,"vprasad@cmu.edu",9876574646,"Kalwadi","MH","IN"),
(5,"Kabir","Amitesh","Shirode",1996,"kshirode@google.co.jp",9708564367,"Malegaon","MH","IN"),
(6,"Rajesh","Sohan","Reddy",1994,"rreddy@nytimes.com",8908765784,"Koppal","KA","IN"),
(7,"Swapnil","","Kumar",1994,"skumar@apache.org",8790654378,"Gurugram","HR","IN"),
(8,"Rajesh","","Shimpi",1994,"rshimpi@ucoz.ru",7908654765,"Pachora","MH","IN"),
(9,"Rakesh","Lokesh","Prasad",1993,"rprasad@facebook.com",9807564775,"Hubali","KA","IN"),
(10,"Sangam","","Mishra",1994,"smishra@facebook.com",9806564775,"Hubali","KA","IN"),
(11,"Sambhram","Akash","Attota",1994,"sattota@uol.com.br",7890678965,"Nagpur","MH","IN");

SELECT * FROM abhighdb.alumni;

image.png

由于 HDFS 支持需要使用 libjvm.so 和 Hadoop 的若干 Jar 包,请确保你安装了正确的 JDK 环境并配置相关的环境变量:

export JAVA_HOME=/path/to/java
export LD_LIBRARY_PATH=${JAVA_HOME}/lib/server:${LD_LIBRARY_PATH}
export HADOOP_HOME=/path/to/hadoop
export CLASSPATH=/all/hadoop/jar/files

参考 Deploying a Standalone Databend ,使用带有 HDFS 特性的 Databend 分发(databend-hdfs-*),部署一个单节点的 Databend 实例。

image.png

通过 BendSQL 连接这个 Databend 实例,然后创建对应的 Hive Catalog ,记得要通过 CONNECTION 字段为其配置对应的存储后端:

CREATE CATALOG hive_hdfs_ctl TYPE = HIVE CONNECTION =(
METASTORE_ADDRESS = '127.0.0.1:9083'
URL = 'hdfs:///'
NAME_NODE = 'hdfs://localhost:8020'
);

在上面的语句中,我们创建了一个底层存储使用 HDFS 的 Hive Catalog:

  • 通过 TYPE 指定创建 Hive 类型的 Catalog 。

  • CONNECTION 用于指定 HIVE 相关的存储/元数据访问信息,可以阅读 Docs | Connection Parameters 了解更多相关信息。

    • METASTORE_ADDRESS 对应 Hive MetaStore 的地址
    • URL 对应 HDFS 的 Path
    • NAME_NODE 则对应 HDFS 的 Name Node 地址

让我们尝试运行一个简单的 SELECT 查询,验证其是否能够正常工作:

SELECT * FROM hive_hdfs_ctl.abhighdb.alumni;

image.png

使用 S3-like 对象存储

Trino + Hive + MinIO 的实验环境可以使用 https://github.com/sensei23/trino-hive-docker/ 进行搭建。

cd docker-compose
docker build -t my-hive-metastore .
docker-compose up -d

在执行完 docker-compose up -d 等前置步骤后,先进入 MinIO 控制面板,创建一个名为 tpch 的 Bucket 。

image.png

运行下述命令可以打开 trino 命令行工具:

docker container exec -it docker-compose-trino-coordinator-1 trino

接着创建一个小型的 TPCH 客户表。注意,为了满足 Databend 使用要求,这里需要使用 Parquet 格式:

CREATE SCHEMA minio.tpch
WITH (location = 's3a://tpch/');

CREATE TABLE minio.tpch.customer
WITH (
format = 'PARQUET',
external_location = 's3a://tpch/customer/'
)
AS SELECT * FROM tpch.tiny.customer;

image.png

查询对应的 Hive 元数据,可以看到像下面这样的信息:

 DB_ID |      DB_LOCATION_URI      |   NAME   | OWNER_NAME | OWNER_TYPE | CTLG_NAME 
-------+---------------------------+----------+------------+------------+-----------
1 | file:/user/hive/warehouse | default | public | ROLE | hive
3 | s3a://tpch/ | tpch | trino | USER | hive

参考 Deploying a Standalone Databend 部署一个单节点的 Databend 实例。

image.png

通过 BendSQL 连接这个 Databend 实例,然后创建对应的 Hive Catalog ,记得要通过 CONNECTION 字段为其配置对应的存储后端:

CREATE CATALOG hive_minio_ctl 
TYPE = HIVE
CONNECTION =(
METASTORE_ADDRESS = '127.0.0.1:9083'
URL = 's3://tpch/'
AWS_KEY_ID = 'minio'
AWS_SECRET_KEY = 'minio123'
ENDPOINT_URL = 'http://localhost:9000'
);

在上面的语句中,我们创建了一个底层存储使用 MinIO 的 Hive Catalog:

  • 通过 TYPE 指定创建 Hive 类型的 Catalog 。

  • CONNECTION 用于指定 HIVE 相关的存储/元数据访问信息,可以阅读 Docs | Connection Parameters 了解更多相关信息。

    • METASTORE_ADDRESS 对应 Hive MetaStore 的地址
    • URL 则对应 MinIO 的 Bucket 或者 Path
    • AWS_KEY_IDAWS_SECRET_KEY 则对应访问时的校验,这里使用了 MinIO 服务的用户名和密码
    • ENDPOINT_URL 是 MinIO 对象存储服务的 API 端点

让我们尝试运行一个简单的 SELECT 查询,验证其是否能够正常工作:

SELECT * FROM hive_minio_ctl.tpch.customer LIMIT 5;

image.png


提示

  • 要使用 SQL 语句创建带有多种存储支持的 Hive Catalog,推荐使用 v1.2.100-nightly 及以后版本。
  • 不再需要从 toml 文件进行配置就可以获得多源数据目录能力。
  • 如果需要获取 HDFS 存储服务支持,则需要部署或者编译带有 HDFS 特性的 Databend ,比如 databend-hdfs-v1.2.100-nightly-x86_64-unknown-linux-gnu.tar.gz
  • 对于 Hive Catalog ,Databend 目前只支持查询 Parquet 格式的数据,且只支持 SELECT,不支持其他 DDL 、DML 和 UDFs 。
  • Databend 的语法与 Hive 并不完全兼容,关于 SQL 兼容性相关的内容,可以查看 Docs | SQL Conformance

image.png

以「启航 • AIGC 软件工程变革」为主题的 QCon 全球软件开发大会北京站于 9 月 5 日在北京富力万丽酒店圆满落幕!此次大会包含向量数据库、云原生、异构计算、面向 AI 的存储、微服务架构治理、FinOps 等近 30 个精彩专题。Databend Labs 作为深耕云原生数据库领域的科技公司受邀参与。

9月3日下午,Databend 研发工程师 - 邰翀作为「构建未来软件的编程语言」专题的演讲嘉宾参与本次分享。

image.png

主题: 「Rust:构建新时代基础设施的首选语言」

演讲嘉宾: 邰翀

嘉宾介绍: Databend 研发工程师

本次分享聚焦于数据库和 AI 领域,从跨云数据访问和向量数据库的现实需求谈起,阐述为什么 Rust 是适合于新时代基础设施的编程语言,并分析 Rust 在新时代基础设施下的新机遇。本次分享主要分为四个部分:

  • Rust 新时代基础设施的最佳选择
  • All in Rust 为 Databend 带来了什么
  • Rust 如何成为构建 Vector Embeddings 的关键语言
  • Rust 的机遇与挑战

Rust 新时代基础设施的最佳选择

在此前,当我们谈论基础设施时,首先可能会想到服务器、Oracle 等。而近些年来,我们谈论基础设施已经离不开云和构建在云上的各种服务。

新时代基础设施是指:可以在云上自由部署、与云完美融合的基础设施。在这个新时代中,Databend 和一些数据库同行都选择了 Rust 作为首选语言。我们可以观察到 amazom、 微软、Firefox、飞书、TiKV、云数仓 Databend、OpenDAL 等公司或者项目在使用 Rust 。

image.png

Rust 的发展时间线

  • 2006 年, Graydon Hoare 着手设计和实现 Rust 语言,此时,还只是他的个人项目。
  • 2009 年,Mozilla 开始赞助这个项目,并成立团队支持 Rust 的开发。
  • 2010 年,Rust 首次公开,并在一年后实现了编译器自举,到 2015 年发布了第一个稳定版本。
  • 2021 年,Rust 基金会成立,为 Rust 语言带来更广阔的发展前景。

Rust 与内存安全

Rust 的创始人 Graydon Hoare 曾经说过:“Rust是一种采用过去的知识解决将来的问题的技术。”

在我的理解中,Rust 的主要目标之一就是解决内存安全问题。

下面列出一些常见的内存安全问题:

  • Out-Of-Bound 假设有两个线程,其中一个线程在访问链表,另一个线程在进行删除,这个时候可能会产生越界问题。
  • Use After Free 指针指向了堆上一块内存,但是这块内存因为扩容或者其他原因导致重分配,给了恶意代码修改并且执行被释放内存的机会。

通用手法

针对内存安全问题,不同的编程语言都会提出自己的一套解决方案,目前通用的方式包括:

  • 垃圾回收(Garbage Collection,GC) :设立专门的垃圾回收机制来检测内存。由于回收机制的时间不固定,这种方式可能会导致内存不可控。对于数据库软件而言,如果在做大量数据的聚合相加,可能会导致内存飙升,并且很长时间无法释放,容易诱发 OOM 。

  • 自动引用计数(Automatic Reference Counting,ARC) :可以自动地跟踪和管理对象的引用计数,从而避免了手动管理内存的繁琐和容易出错的问题,算是一种比较好的方式。

  • 手动管理内存(Manualy Handle Memory) :开发者自行管理分配和释放,对程序开发功底要求比较高。

Rust 怎么做

上面的三种通用手法是通过管理引用来处理内存安全问题的,而 Rust 则选择通过限制引用行为来解决内存安全问题。 更具体地,Rust 引入了所有权、借用检查和生命周期这三个重要的概念。

  • 所有权(Ownership) :每个值只能有一个所有者。

  • 借用检查(Borrow Check) :帮助管理所有权的一套规则,能够处理内存分配和释放,防止数据竞争。

  • 生命周期(Lifetimes) :程序中每个变量都有一个固定的作用域,当超出变量的作用域以后,变量就会被销毁。变量在作用域中从初始化到销毁的整个过程称之为生命周期。

Rust :最受程序员推崇的语音

正因为 Rust 语言表现力、卓越的性能以及自己独有一套的内存管理方式,蝉联八届 Stack Overflow Developer Survey 最受程序员推崇的语言。

image.png

All in Rust  为 Databend 带来了什么

Databend Labs 成立于 2021 年 3 月,是一家开源 Data Cloud 服务商,致力于为大数据生态提供坚实可靠的基础设施。我们的核心团队成员来自 ClickHouse 社区、谷歌 Anthos、阿里云等国内外知名互联网和云计算公司,团队在云原生数据库领域有着丰富的工程经验,同时也是数据库开源社区活跃贡献者。

我们在用 Rust 做什么?

Databend是一款使用 Rust 研发、开源、弹性、低成本,基于对象存储也可以做实时分析的新式数仓,具有即时扩缩容能力,能在数分钟内增加数百倍的算力,为企业提供了一个用于存储、管理和分析大量数据的集中式平台,从而助力企业更准确地洞察业务、制定战略。

image.png

除了 Databend 之外,我们也使用 Rust 开发和维护了大量项目:

  • OpenRaft 是在 tokio 之上实现的异步 Raft 框架,在 leadership 上做了大量优化,性能非常强劲。

  • BendSQL 是基于 Databend HTTP API 和 Arrow Flight API 设计和实现的原生客户端。

  • AskBend 是一款知识库智能问答系统,基于 OpenAI 的 API 并且利用 Databend 内置的一系列 SQL 函数(AI Functions)打造。访问链接:https://ask.databend.rs 即可体验。

  • OpenDAL 提供一个统一、简单、高效、可靠、可观察的数据访问层,让开发者可以无缝地使用不同的存储服务,并享受到最佳的用户体验。今年 3 月份的时候已经移交到 Apache 软件基金会孵化器中进行孵化。

Rust 给我们带来了什么?

数据库本身就是计算机科学皇冠上的一颗明珠,是一个庞大而又复杂的系统。而在 Databend 的设计和实现中,我们还从目前市场上最优秀的数仓中借鉴了一些经验。例如,我们参考了 Clickhouse 的向量化设计,以提高单机的性能。同时,我们也借鉴了 Snowflake 的集群优点,以增强分布式计算能力。

选择 Rust 作为这样一个复杂系统开发的首选语言,我们收获了这些:

  • 高效 的研发 虽然 Databend 从第一版发布至今只有 2 年多,但从开源到文章撰写时为止,已经累计有 8000 多个 PR 。

  • 优越 性能: Databend 在 Clickbench 基准测试中,数据导入性能排名第一,并且在 c6a.4xlarge 机型性能登顶,除了优秀的设计和实现之外,Rust 功不可没。

  • 活跃的 社区: 得益于 Rust 语言的流行和社区的持续关注,到文章撰写时为止已有超过 200 位贡献者,收获 6.5 k star ,是 Rust 社区中的明星项目之一。

  • image.png

Rust 如何成为构建 Vector Embeddings 的关键语言

同样是数据库领域,现在让我们将目光转向今年在 AI 上大放异彩的向量数据库。

向量嵌入(Vector Embeddings)可以将数据转化为一个包含其实际含义的向量空间。而向量数据库则致力于挖掘存储和处理向量数据的能力,并提供高效的向量检索功能。

为什么需要向量数据库

对于 GPT 这样的大模型而言,Tokens 大小限制了应用的进一步开发,而引入向量数据库之后,就可以利用向量检索和向量索引能力,相似度更近的数据紧凑存放。这样可以带来两个方面的好处:

  • 降低 GPT 的使用成本。
  • 维护长期记忆,帮助 AI 理解和执行复杂任务。

Rust 如何进入向量数据库

让我们一起来看一下 Rust 是怎么进入到向量数据库领域的:

  • 研发新的向量数据库: 获得知名投资机构 YC 的青睐的 LanceDB  使用 Rust 设计了针对向量数据库的存储格式。

  • 重写现有向量数据库: Pinecone 将 C++ 和 Python 代码库使用 Rust 完全重写。在今年 5 月份获得 a16z 一亿美元 B 轮投资。

  • 拓展旧有向量数据库: Milvus 则计划引入 OpenDAL(Rust 开发的存储访问层)的 C++ binding,以支持跨多云数据存储能力。

  • 拓展现有其他数据库: Databend 支持向量类型的存取与基本相似度查询功能,并且提供 AI Functions ,能够与 OpenAI API 进行交互。

  • 拓展旧有其他数据库: AI 初创公司 tensorchord 使用 Rust 开发 PostgreSQL 的向量处理拓展 pgvecto.rs 。

Rust 的机遇与挑战

回顾 Databend Labs 使用 Rust 的研发历程,并且结合业界其他公司的经验,我们认为,Rust 能够成为构建新时代基础设施的首选语言的主要因素有以下几点:

  • 内存安全
  • 性能保障
  • 敏捷开发

机遇

Rust 在新时代中的机遇,其实可以和前面 Rust 如何进入向量数据库结合起来看,这里同样举几个典型的项目作为例子。

  • 新项目服务旧场景:
    • Polars 是 Pandas 的有力竞争者,团队宣布种子轮收获 400 万美元融资,用于打造 OLAP 计算平台。
    • HuggingFace 开源新的深度学习框架 Candle ,使用 Rust 编写。
  • 新项目服务新场景
    • llmchain-rs 针对大模型工具链提供一站式解决方案。
    • mosec 针对大模型部署和服务开发提供高性能解决方案。

挑战

  • 程序的可靠性仍然需要开发者自己去管理,并不能因为使用 Rust 就掉以轻心。一个有意思的段子是 “声称内存安全的项目中,往往充斥着大量的 unsafe 代码”。

  • 尽管生态已经日趋完善,但在实际开发过程中,仍然少不了造轮子。而对于非系统编程,或者原型快速开发阶段,使用 Rust 在开发工具和生态对接上相比 Java 、Python 等语言还存在一些问题。

  • 由于 Rust 本身的复杂性,新手仍然需要迈过门槛,而且 Rust 语言进入到项目以后在编译时间、CI 流水线等方面都需要进行不同程度的调试与改造。

image.png

目前开发者们需要尝鲜 Databend, 可以选择使用 Databend Cloud 或者按官方文档部署 Databend 服务。 由于 Databend 架构有三层,因此部署Databend 服务一般需要启动 databend-query, databend-meta , minio 三个进程,同时需要修改端口等配置项,流程上略显复杂。 有没有更快的方式可以快速尝鲜 Databend 呢?

Python Binding

一种快速的方式是将 Databend 跑在python中,借助 rust 优良的生态,我们基于 pyo3 库发布了 python binding,可以在本地 juypter 或者 colab 等在线服务中使用 Databend:

# pip install databend
from databend import SessionContext

ctx = SessionContext()

df = ctx.sql("select number, number + 1, number::String as number_p_1 from numbers(8)")

# convert to pyarrow
df.to_py_arrow()

# convert to pandas
df.to_pandas()

Databend Local 模式

借鉴于 clickhouse-local , duckdb 等嵌入型数据库的优点,我们在 Databend 中也可以开启 local 模式。

local 模式 是一个 Databend 的简易版本,用户无需部署 Databend 服务即可在命令中 用 SQL 和 Databend 交互。它的好处在于简化了开发安装,同时方便开发者们用 SQL 使用 Databend支持的功能进行简单的数据处理。 如果你需要在生产环境使用 Databend,我们建议按官网推荐部署 Databend 服务 或 Databend Cloud,但如果你是开发人员或测试工程师,你可以使用 local 模式 来玩转 Databend。

local 模式将启动一个临时的 databend-query 进程,这个进程融合了客户端和服务端,并且他的存储是在临时目录中,生命周期跟随进程,进程离开后资源也将销毁,你可以在一个服务器中启动多个 local 进程,他们的资源是相互隔离的。

下面通过例子介绍一下,每个例子都是简短的几行命令,介绍 local 模式可以实现什么功能。

在这之前,你需要下载 databend-query 二进制,然后将二进制放到 PATH 环境变量中,植入 bend-local 工具别名

alias bend-local="databend-query local"
  • 命令行交互 ( REPL ) 模式

    • 直接在终端输入 bend-local 这一行命令后,我们将进入 REPL 模式,这里融合了客户端和服务端,类似 duckdb cli 工具使用。

    •   ❯ bend-local
      Welcome to Databend, version v1.2.4-nightly-326cabe38056168dd261f744609ea85319f02686(rust-1.72.0-nightly-2023-09-02T15:18:48.006847567Z).

      databend-local:) select max(a) from range(1,1000) t(a);
      ┌────────────┐
      │ max(a) │
      │ Int64 NULL │
      ├────────────┤
      │ 999 │
      └────────────┘
      1 row result in 0.036 sec. Processed 999 rows, 999 B (27.89 thousand rows/s, 217.90 KiB/s)

      databend-local:)
    • 值得注意的是,bend-local 支持配置文件 ~/.config/databend/config.toml 来做一些个性化客户端配置,配置文件的格式和 bendsql 是兼容的。

  • 一行命令生成一个 parquet 文件

    • 支持 --query, --output-format 参数 传入查询 SQL 和输出格式
    •    bend-local --query "select number, number + 1 as b from numbers(10)" --output-format parquet > /tmp/a.parquet
  • Shell pipe 模式分析数据, \$STDIN 宏将解析 stdin 流作为一个临时 stage 表

    •   ❯ echo '3,4' | bend-local -q "select $1 a, $2 b  from $STDIN  (file_format => 'csv') " --output-format table

      SELECT $1 AS a, $2 AS b FROM 'fs:///dev/fd/0' (FILE_FORMAT => 'csv')

      ┌─────────────────┐
      │ a │ b │
      │ String │ String │
      ├────────┼────────┤
      │ '3' │ '4' │
      └─────────────────┘

注意上面的 SQL 在 shell 中,使用了 \$ 来对 shell 进行转义

  • 读取 stage table (本地文件,外部 s3 等)

    •   ❯ bend-local --query "select count() from 'fs:///tmp/a.parquet'  (file_format => 'parquet') "
      10

      ❯ bend-local --query "select count() from 'https://datafuse-1253727613.cos.ap-hongkong.myqcloud.com/data/books.parquet' (file_format => 'parquet') "
      2

      ❯ bend-local --query "select $1, $2 from 'http://www.geoplugin.net/csv.gp?ip=3.3.3.3' (file_format => 'csv') "
  • 分析系统进程 ,找出每个用户占用的内存

    •   ❯ ps aux | tail -n +2 | awk '{ printf("%s\t%s\n", $1, $4) }' | bend-local -q "select  $1 as user,  sum($2::double) as memory  from $STDIN  (file_format => 'tsv')  group by user  "
      sundy 9.100000000000001
      root 1.2
      dbus 0.0
  • 数据清洗,将一个格式转换为其他格式 (支持csv, tsv, parquet, ndjson 等)

    •   ❯ bend-local -q 'select rand() as a, rand() as b from numbers(100)' > /tmp/a.tsv
        ❯ cat /tmp/a.tsv | bend-local -q "select $1 a, $2 b  from $STDIN  (file_format => 'tsv') " --output-format parquet > /tmp/a.parquet
  • 其他好玩的分析例子,等待你的挖掘

image.png

Databend Cluster Key 是指 Databend 可以按声明的 key 排序存储,主要用于用户对时间响应比较高,同时愿意为这个 cluster key 进行额排序操作的用户。 Databend 只支持一个 Cluster key,Cluster key中可以包含多列及表达式。

基本语法

-- 语法:
alter table T cluster by(c1, fun(c2));

-- 例如:
alter table T cluster by(user_id); -- 指定数据按 user_id 排序存储

-- 日志场景 按 msg_id, 小时 排序存储
alter table T cluster by(msg_id, to_yyyymmddhh(c_timestamp));

-- 强制数据排序
optimize table T compact;
alter table T recluster final; -- 全局排序, 建议第一次创建 Cluster key 后使用,后期如果遇到性能退化,也可以再次使用

更多关于 Databend Cluster key 语法参考:

https://docs.databend.cn/sql/sql-commands/ddl/clusterkey/

使用注意事项

目前 Databend 在表有 cluster key 的情况下,使用

  • copy into
  • replace into

这两种方式写入数据时,会自动执行 compact 和 recluster 操作。

关于 Databend Cluster Key 你需要了解的:

  1. Databend 中数据分区按: block_size_threshold (default: 100M ) or row_per_block(default 100万) 组织,两者任意达到之一就会生成新的 Block
  2. 新生成的 Block 中会按定义的 cluster key 排序存储,当该key的 min = max 时,该 block 为 constant_block, 同时 cluster key 不保证全局有序
  3. 多个 block 之间可能有重叠区间,如,cluster by (age)

image.png

不同区间的重叠形成了不同的深度,例如上图:

select * from T where age >30 and age <35; 

这样一个查询,需要查找到的深度为 3 ,即为 3 个 Block。

所以表中指定列的重叠block-partitions的平均深度,越小越好。如下所示:

-- 可以通过 clustering_information('db','tbname') 查看该表的 Cluster 信息
select * from clustering_information('wubx','sbtest10w')\G;
*************************** 1. row ***************************
cluster_by_keys: (id) -- 定义的 Cluster key
total_block_count: 451 -- 当前有多少的 block
constant_block_count: 0 -- min/max 相等 block, 也就说 block 中只包括一个(组) cluster_key 的值
unclustered_block_count: 0 -- 还没 Cluster 的 Block
average_overlaps: 2.1774 -- 在一个 Range 范围内,有多少个 block有重叠比率
average_depth: 2.4612 -- cluster key 在分区的重叠分区数的平均深度
block_depth_histogram: {"00001":32,"00002":217,"00003":164,"00004":38}
1 row in set (0.02 sec)
Read 1 rows, 448.00 B in 0.015 sec., 67.92 rows/sec., 29.71 KiB/sec.

结果中最重要信息是“average_depth”,数字越小, 表的clustering效果越好,上图为: 2.46,属于比较好的状态(小于 total_block_count * 0.1 ) 。block_depth_histogram 告诉更多关于每个深度有多少个分区的详细信息。 如果在较低深度中的分区数更多,则表的聚类效果更好。 例如"00004" : 38 表示 (3,4] 有 38 个 block 有 4 个深度。

其它优化建议

  1. 一般来讲声明 Cluster key 后对于区间查询和点查都有较大的优化
  2. 如果声明 cluster key 后,还想进一步的提升点查或是区间查询的能力,可以通过调整 block 大小
-- 把 Block 的大小修改为压缩前 50M ,行数不超过 10 万行
alter table T set options(row_per_block=100000,block_size_threshold=52428800);

关于 options 查看: https://docs.databend.cn/sql/sql-reference/table-engines/fuse#options

默认数据分布:

image.png

优化数据在 Block 中的分布

create table sbtest10w like sbtest1;
alter table sbtest10w set options(row_per_block=100000,block_size_threshold=52428800);
insert into sbtest10w select * from sbtest1;

image.png

  1. 对于特别宽的表,建议查询中只访问需要的列来减少时间开销

image.png

image.png

  1. 对于复杂的 SQL 里面有大量聚合的操作还是推荐大一点的 Block 及行数

参考

image.png

作者: 黄志武

大参林医药集团股份有限公司,信息中心数据库组组长,13年数据库行业从业经历,Oracle OCM,关注Oracle、MySQL、Redis、MongoDB、Oceanbase、Tidb、Polardb-X、TDSQL、CDH、Clickhouse、Doris、Databend等多方面的关键领域技术,服务过传统通信、电力,互联网、移动互联网等行业。

大参林医药集团股份有限公司成立于 1999 年,是中国具有影响力的药品零售连锁集团化企业。在数字化转型升级的趋势背景下,支撑持续稳定的零售业务生态,离不开高效的信息化、数据化、智能化的技术支持。

需求概述

大参林医药集团零售供应链数据庞大,涉及大表较多,最大单表数据量达到 93 亿,历史数据存储在大数据服务 CDH 。由于技术架构升级改造原因,该 CDH 需要下线,但是业务部门提出需要保留数据用于审计追溯。若考虑通过关系型分布式数据库进行迁移,如 OceanBase、TiDB,对于历史数据的关联并行查询也是一种挑战;若考虑只通过 COS、OSS, S3 对象存储备份导出的文档数据文件,受限于平台技术,无法执行数据关联查询。由于时间紧迫,急需一种投入成本低、见效快的替代方案。

使用 CDH 的痛点

现在大数据平台数据增长迅速, 数据量超过 30T,机器集群硬件配置不足以承担目前的业务压力,成本投入也越来越大。

初见 Databend

Databend 是一个开源的 Elastic 和 Workload-Aware 现代云数据仓库。使用最新的矢量化查询处理技术,可以在对象存储( S3、Azure Blob、谷歌云存储、华为云 OBS 或 MinIO )上进行超快的数据分析。

Databend 产品特点:

  • 即时弹性

Databend 将存储与计算完全分离,用户可以根据应用程序的需要轻松扩展或缩小。

  • 优异的性能

Databend 利用数据级并行( Vectorized Query Execution )和指令级并行( SIMD )技术,提供性能卓越的数据分析。

  • 类似 Git 的 MVCC 存储

Databend 使用快照存储数据。查询、克隆和恢复表中的历史数据非常容易。

  • 支持半结构化数据

Databend 支持摄取各种格式的半结构化数据,例如 CSV、JSON 和 Parquet,这些数据位于云端或您的本地文件系统中;Databend 还支持半结构化数据类型:ARRAY、MAP、JSON,便于半结构化导入和操作。

  • MySQL/ClickHouse 兼容

Databend 符合 ANSI SQL 并兼容 MySQL/ClickHouse 协议,可以轻松连接现有工具( MySQL Client、ClickHouse Client、Vector、DBeaver、Jupyter、JDBC 等)。

  • 使用方便

Databend 没有要构建的索引,不需要手动调整,不需要手动计算分区或分片数据,所有这些都在数据加载到表中时完成。

技术选型

Databend 是一个数据仓库平台,同样具备类似通用的大数据平台 CDH 的的支持能力。在选择数据归档方案时,分 3 个方面做了对比:

  • 存储成本:对象存储和 HDD, SSD 的成本,其中对象存储是 HDD 的 1/10, 是 SSD 的 1/30;
  • 数据迁移成本:数据备份文件导出后直接迁到对象存储中,可以实现无脑在 Databend 直接加载存储,通过读取备份文件的表对象信息完成创建表和加载数据;
  • 关联查询能力:查询方式简单,兼容 MySQL 协议,可不用改变 Mysql 的使用习惯,直接无需过多改动即可通过原来的业务 SQL 进行关联查询;

Databend方案

目前 Databend 主要用于数据归档。实现方式是将大数据平台 CDH 导出的 Parquet 文件,通过腾讯云的文件迁移同步工具 cos_migrate_tool 实现传输备份至腾讯云 COS,使用 Databend 的单节点部署方案把该 COS 直接加载,即可实现 COS 下文件自动识别。

  • 创建 Stage
create stage if not exists mystage
url = 's3://cos存储桶/backup/'
connection=(endpoint_url='https://cos.ap-guangzhou.myqcloud.com'
access_key_id='ACCESS_KEY_ID' secret_access_key='SECRET_ACCESS_KEY');
  • 查看 Stage 中的文件
list @mystage;
  • Load stage 中的文件到 Databend

从文件中获取表结构来创建表

create table t1 as select * from @mystage/bi/t1/ (pattern=>'.*parq') limit 0;

加载文件往表中写入数据

copy /*+ set_var(max_threads=5) */ into t1 from @mystage/bi/t1/ pattern='.*[.]parq' file_format=(type=parquet);
  • 查询

查询

use bi;
select * from t1 limit 10;

现在 Databend 支持复杂的查询语法,可以满足平时业务需求。

Databend 使用现状

目前使用Databend,对大表数据的查询加载速度提升2倍;腾讯云 COS 存储成本相对于 CDH 本地盘及副本模式成本下下降 15 倍左右;且性能满足日常的数据审计查询需求。

总结

采用Databend有非常不错的体验,简单易用、查询迅速,对业务常用的历史数据查询无缝切换,极大地缩短了项目周期,提升了效率,减少了业务方的焦虑。

简介

Debezium Server Databend 是一个基于 Debezium Engine 自研的轻量级 CDC 项目,用于实时捕获数据库更改并将其作为事件流传递最终将数据写入目标数据库 Databend。它提供了一种简单的方式来监视和捕获关系型数据库的变化,并支持将这些变化转换为可消费事件。

使用 Debezium server databend 实现 CDC 无须依赖大型的 Data Infra 比如 Flink, Kafka, Spark 等,只需一个启动脚本即可开启实时数据同步。

这篇教程将展示如何基于 Debezium server databend 快速构建 MySQL 到 Databend 的实时数据同步。

假设我们有电子商务业务,商品的数据存储在 MySQL ,我们需要实时把它同步到 Databend 中。

接下来的内容将介绍如何使用 Debezium server databend CDC 来实现这个需求,系统的整体架构如下图所示:

准备阶段

准备一台已经安装了 Docker ,docker-compose 以及 Java 11 环境 的 Linux 或者 MacOS 。

准备教程所需要的组件

接下来的教程将以 docker-compose 的方式准备所需要的组件。

debezium-MySQL

docker-compose.yaml

version: '2.1'
services:
postgres:
image: debezium/example-postgres:1.1
ports:
- "5432:5432"
environment:
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
mysql:
image: debezium/example-mysql:1.1
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=123456
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw

Debezium Server Databend

  • Clone 项目: git clone ``https://github.com/databendcloud/debezium-server-databend.git

  • 从项目根目录开始:

    • 构建和打包 debezium server: mvn -Passembly -Dmaven.test.skip package
    • 构建完成后,解压服务器分发包: unzip debezium-server-databend-dist/target/debezium-server-databend-dist*.zip -d databendDist
    • 进入解压后的文件夹: cd databendDist
    • 创建 application.properties 文件并修改: nano conf/application.properties,将下面的 application.properties 拷贝进去,根据用户实际情况修改相应的配置。
    • 使用提供的脚本运行服务: bash run.sh
    • Debezium Server with Databend 将会启动

同时我们也提供了相应的 Docker image,可以在容器中一键启动:

version: '2.1'
services:
debezium:
image: ghcr.io/databendcloud/debezium-server-databend:pr-2
ports:
- "8080:8080"
- "8083:8083"
volumes:
- $PWD/conf:/app/conf
- $PWD/data:/app/data

NOTE: 在容器中启动注意所连接数据库的网络。

Debezium Server Databend Application Properties

本文章使用下面提供的配置,更多的参数说明以及配置可以参考文档

debezium.sink.type=databend
debezium.sink.databend.upsert=true
debezium.sink.databend.upsert-keep-deletes=false
debezium.sink.databend.database.databaseName=debezium
debezium.sink.databend.database.url=jdbc:databend://tnf34b0rm--xxxxxx.default.databend.cn:443
debezium.sink.databend.database.username=cloudapp
debezium.sink.databend.database.password=password
debezium.sink.databend.database.primaryKey=id
debezium.sink.databend.database.tableName=products
debezium.sink.databend.database.param.ssl=true

# enable event schemas
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json

# mysql source
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=60000

debezium.source.database.hostname=127.0.0.1
debezium.source.database.port=3306
debezium.source.database.user=root
debezium.source.database.password=123456
debezium.source.database.dbname=mydb
debezium.source.database.server.name=from_mysql
debezium.source.include.schema.changes=false
debezium.source.table.include.list=mydb.products
# debezium.source.database.ssl.mode=required
# Run without Kafka, use local file to store checkpoints
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.source.database.history.file.filename=data/status.dat
# do event flattening. unwrap message!
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true

# ############ SET LOG LEVELS ############
quarkus.log.level=INFO
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN

准备数据

MySQL 数据库中准备数据

进入 MySQL 容器

docker-compose exec mysql mysql -uroot -p123456

创建数据库 mydb 和表 products,并插入数据:

CREATE DATABASE mydb;
USE mydb;

CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));
ALTER TABLE products AUTO_INCREMENT = 10;

INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"cloud","test for databend"),
(default,"spare tire","24 inch spare tire");

在 Databend 中创建 Database

NOTE: 用户可以不必先在 Databend 中创建表,系统检测到后会自动为用户建表。

启动 Debezium Server Databend

bash run.sh

首次启动会进入 init snapshot 模式,通过配置的 Batch Size 全量将 MySQL 中的数据同步到 Databend,所以在 Databend 中可以看到 MySQL 中的数据已经同步过来了:

同步 Insert 数据

我们继续往 MySQL 中插入 5 条数据:

INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer");

Debezium server databend 日志:

同时在 Databend 中可以查到 5 条数据已经同步过来了:

同步 Update 数据

配置文件中 debezium.sink.databend.upsert=true ,所以我们也可以处理 Update/Delete 的事件。

在 MySQL 中更新 id=10 的数据:

update products set name="from debezium" where id=10;

在 Databend 中可以查到 id 为 10 的数据已经被更新:

同步 Delete 数据

在配置文件中,有以下的配置,既可开启处理 Delete 事件的能力:

debezium.sink.databend.upsert-keep-deletes=false
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true

Debezim Server 对 Delete 的处理比较复杂,在 DELETE 操作下会生成两条事件记录:

  1. 一个包含 "op": "d",其他的行数据以及字段;
  2. 一个tombstones记录,它具有与被删除行相同的键,但值为null。

这两条事件会同时发出,在 Debezium Server Databend 中我们选择对 Delete 数据实行软删除,这就要求我们在 target table 中拥有 __deleted 字段,当 Delete 事件过来的时候我们将该字段置为 TRUE 后插入到目标表。

这样设计的好处是,有些用户想要保留这些数据,但可能未来会想到将其删除,这样就为用户提供了可选的方案,未来想要删除这些数据的时候,只需要 delete from table where __deleted=true 即可。

关于 Debezium 对删除事件的说明以及处理方式,详情可参考文档

在 MySQL 中删除 id=12 的数据:

delete from products where id=12;

在 Databend 中可以观察到 id=12 的值的 __deleted 字段已经被置为 true

环境清理

操作结束后,在 docker-compose.yml 文件所在的目录下执行如下命令停止所有容器:

docker-compose down

结论

以上就是基于轻量级 CDC debezium server databend 构建 MySQL 到 Databend 的 实时数据同步的全部过程,这种方式不需要依赖 Flink, Kafka 等大型组件,启动和管理非常方便。

Alt text

几周前,Databricks 和 Snowflake 召开了各自的年度大会,除了今年一路持续走红的 AI ,数据湖/数据仓库技术的发展仍然值得关注,毕竟数据才是基本盘。Apache Iceberg 无疑是数据湖方案的大赢家,Databricks 新推出的 UniForm,为以 Apache Iceberg 和 Hudi 表格式读取 Delta 中的数据提供了进一步的支持。而 Snowflake 也适时推出了 Iceberg Tables 更新,宣称要进一步打破数据孤岛。

Databend 最近几个月正在推动的重要新特性之一就是支持读取 Apache Iceberg 表格式的数据,尽管还没有完全落地,但已经取得了不错的进展。

今天这篇文章旨在为大家提前演示这一新特性 —— 使用 Databend 挂载并查询 Iceberg Catalog ,我们将介绍 Iceberg、表格式的一些核心概念,并且展示 Databend 的解决方案(包括 Databend 的多源数据目录能力和以 Rust 从头实现的 IceLake)。此外,我们还会提供完整的 workshop ,供大家尝鲜体验。

Apache Iceberg

时至今日,越来越多的数据进入云端,并且存储在对象存储之中,但这并不能完全适应现代分析的需求。这里有两个问题需要解决:第一个是数据以何种形式组织,也就是说,如何得到更结构化的数据存储。第二个问题还要更进一步,如何为用户提供更广泛的一致性保证以及业务中需要的模式信息,以及更多适应现代分析负载的高级特性。

数据湖往往会关注并解决第一个问题,而表格式则会致力于为第二个问题提供解决方案。

Apache Iceberg 是一种高性能的开放表格式,专为大规模分析工作负载而设计,简单而又可靠。 同时支持 Spark、Trino、Flink、Presto、Hive 和 Impala 等查询引擎,并且具备模式演变(Full Schema Evolution)、时间旅行和回滚等杀手级特性。另外,Apache Iceberg 的数据分片和明确定义的数据结构还使得对数据源进行并发访问更加安全、可靠和方便。

如果你对 Iceberg 感兴趣,我们也推荐阅读像 Docker, Spark, and Iceberg: The Fastest Way to Try Iceberg! 这样的文章进行探索。

表格式初探

表格式(Table Format)是一种利用文件集合存储数据的规范。它主要包含以下三个部分的定义:

  • 如何将数据存储在文件中
  • 如何存储相关文件的元数据
  • 如何存储有关表本身的元数据

表格式的文件通常存储在 HDFS、S3 或 GCS 这样的底层存储服务中,上层则会对接 Databend、Snowflake 等数据仓库。相比 CSV 或 Parquet ,表格式提供了表形式的标准的结构化数据定义,无需加载到数据仓库中就可以使用。

尽管表格式领域还有像 Delta Lake 和 Apache Hudi 这样的强劲对手,但这篇文章是关于 Apache Iceberg 的,所以,还是让我们把目光转向 Apache Iceberg ,一起了解一下它的底层文件组织结构。

stack of Apache Iceberg

上图中的 s0 、s1 代表的是表的快照信息(snapshot),也就是表在某个时刻的状态。每次 commit 都会生成一个快照,每个快照都会对应一个清单列表(manifest list),而每个清单列表可以维护多个清单文件(manifest file)的地址与统计信息。清单文件中会记录当前操作生成数据文件(data file)的地址和统计信息,比如列中的最大值最小值和数据行数等。

Databend 多源数据目录

要想在 Databend 中实现 Iceberg 集成,头一件是 Databend 的多源数据目录能力。多源数据目录将会允许将原本由其他数据分析系统所管理的数据挂载到 Databend 。

从设计之初,Databend 的目标就是成为云原生的 OLAP 数据仓库,并考虑到多源数据处理的问题。Databend 中的数据按三层进行组织:catalog -> database -> tablecatalog 作为数据最大一层,会包含所有的数据库和表。

团队在此基础上设计并实现对 Hive 和 Iceberg 数据目录的支持,提供配置文件和 CREATE CATALOG 语句多种挂载形式,从而支持对相关数据进行查询。

要想挂载数据位于 S3 中的 Iceberg Catalog,只需要执行下面的 SQL 语句:

CREATE CATALOG iceberg_ctl
TYPE=ICEBERG
CONNECTION=(
URL='s3://warehouse/path/to/db'
AWS_KEY_ID='admin'
AWS_SECRET_KEY='password'
ENDPOINT_URL='your-endpoint-url'
);

IceLake - Apache Iceberg 的纯 Rust 实现

尽管 Rust 生态中近年来涌现出不少数据库、大数据分析相关的新项目,但 Rust 生态中仍然缺乏成熟的 Apache Iceberg 绑定,这为 Databend 集成 Iceberg 制造了不少困难。 Databend Labs 支持并发起的 IceLake 旨在填补这一空白,并致力于建立一个开放生态系统:

  • 用户可以从 任何 存储服务(如 s3、gcs、azblob、hdfs 等)读写 Iceberg 表。
  • 任何 数据库都可以集成 icelake,以支持读写 Iceberg 表。
  • 提供原生的 arrow 格式互转换的能力。
  • 提供多种语言绑定,使其他语言可以享有 Rust 核心带来的 Iceberg 生态支持。

当前 IceLake 已经支持读取 Apache Iceberg 存储服务中的数据(Parquet 格式)。而 Databend 的 Iceberg 数据目录能力正是由 IceLake 支撑的,其设计与实现在和 Databend 集成中得到了验证。

此外,我们还与 Iceberg 社区成员携手发起并参与 iceberg-rust 项目。该项目旨在将 IceLake 中的 Iceberg 相关实现贡献给上游,目前第一个版本正在紧锣密鼓的开发中,欢迎关注 https://github.com/apache/iceberg-rust

Workshop:体验 Databend 的 Iceberg 能力

在这个 Workshop 中,我们将会展示如何准备 Iceberg 表格式的数据,并以 Catalog 的形式将其挂载到 Databend 上,执行一些基本的查询。相关的文件和配置可以在 PsiACE/databend-workshop 中找到。

如果你本身有一些符合 Iceberg 表格式的数据存放在 OpenDAL 支持的存储服务中,我们更推荐使用 Databend Cloud ,这样你就可以跳过繁琐的服务部署和数据准备流程,轻松上手 Iceberg Catalog 。

启动服务

为了简化 Iceberg 的服务部署和数据准备问题,我们将会使用 Docker 和 Docker Compose ,你需要先安装这些组件,然后编写 docker-compose.yml 文件。

version: "3"

services:
spark-iceberg:
image: tabulario/spark-iceberg
container_name: spark-iceberg
build: spark/
networks:
iceberg_net:
depends_on:
- rest
- minio
volumes:
- ./warehouse:/home/iceberg/warehouse
- ./notebooks:/home/iceberg/notebooks/notebooks
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
ports:
- 8888:8888
- 8080:8080
- 10000:10000
- 10001:10001
rest:
image: tabulario/iceberg-rest
container_name: iceberg-rest
networks:
iceberg_net:
ports:
- 8181:8181
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
- CATALOG_WAREHOUSE=s3://warehouse/
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio:9000
minio:
image: minio/minio
container_name: minio
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
- MINIO_DOMAIN=minio
networks:
iceberg_net:
aliases:
- warehouse.minio
ports:
- 9001:9001
- 9000:9000
command: ["server", "/data", "--console-address", ":9001"]
mc:
depends_on:
- minio
image: minio/mc
container_name: mc
networks:
iceberg_net:
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc rm -r --force minio/warehouse;
/usr/bin/mc mb minio/warehouse;
/usr/bin/mc policy set public minio/warehouse;
tail -f /dev/null
"
networks:
iceberg_net:

在上述的配置文件中,我们使用 MinIO 作为底层存储,Iceberg 提供表格式能力,至于 spark-iceberg ,可以帮助我们准备一些预置数据并执行转换操作。

接下来,我们在 docker-compose.yml 文件对应的目录下启动所有服务:

docker-compose up -d

数据准备

在这个 Workshop 中,我们计划使用 NYC Taxis 数据集(纽约出租车搭乘数据),在 spark-iceberg 中已经内置了 Parquet 数据,我们只需要将其转化为 Iceberg 格式。

首先启用 pyspark-notebook :

docker exec -it spark-iceberg pyspark-notebook

接下来我们就可以在 http://localhost:8888 使用 Jupyter Notebook :

这里我们需要运行一小段程序,实施数据转换的操作:

df = spark.read.parquet("/home/iceberg/data/yellow_tripdata_2021-04.parquet")
df.write.saveAsTable("nyc.taxis", format="iceberg")

第一行将会读取 Parquet 数据,而第二行将会将其转储为 Iceberg 格式。

为了验证数据是否成功转换,我们可以访问位于 http://localhost:9001 的 MinIO 实例,可以看到数据是按之前描述的 Iceberg 底层文件组织形式进行管理的。

部署 Databend

这里我们使用手动部署单节点 Databend 服务的形式,总体上部署过程可以参考 Databend 官方文档 ,需要注意的一些细节如下:

  • 首先是需要为日志和 Meta 数据准备相关的目录

    sudo mkdir /var/log/databend
    sudo mkdir /var/lib/databend
    sudo chown -R $USER /var/log/databend
    sudo chown -R $USER /var/lib/databend
  • 其次,因为默认的 admin_api_address 已经被前面的服务占用掉,所以需要编辑 databend-query.toml 进行一些修改避免冲突:

    admin_api_address = "0.0.0.0:8088"
  • 另外,我们还需要根据 Docs | Configuring Admin Users 配置管理员用户,由于只是一个 workshop ,这里选择最简单的方式,只是取消 [[query.users]] 字段以及 root 用户的注释:

    [[query.users]]
    name = "root"
    auth_type = "no_password"
  • 由于我们本地部署 MinIO ,没有设置证书加密,需要使用不安全的 HTTP 协议加载数据,所以还需要更改 databend-query.toml 配置文件以允许这一行为。在生产服务中请尽可能避免开启它:

    ...
    [storage]
    ...
    allow_insecure = true
    ...

接下来就可以正常启动 Databend :

./scripts/start.sh

我们强烈推荐你使用 BendSQL 作为客户端,当然,我们也支持像 MySQL Client 和 HTTP API 等多种访问形式。

挂载 Iceberg Catalog

根据之前的配置文件,只需要执行下述 SQL 就可以一键挂载 Iceberg Catalog 。

CREATE CATALOG iceberg_ctl
TYPE=ICEBERG
CONNECTION=(
URL='s3://warehouse/'
AWS_KEY_ID='admin'
AWS_SECRET_KEY='password'
ENDPOINT_URL='http://localhost:9000'
);

为了验证是否成功,我们可以执行 SHOW CATALOGS 查看:

Databend 也支持了 SHOW DATABASESSHOW TABLES 语句,之前转换数据时的 nyc.taxis 对应在 MinIO 中是二级目录,而在 Databend 则会映射到数据库和表。

执行查询

数据已经挂载,那么就让我们尝试执行一些简单的查询。

首先是对数据进行行数统计,可以看到一共挂载了 200 万行数据到 Databend:

SELECT count(*) FROM iceberg_ctl.nyc.taxis;

让我们从其中几列试着取一些数据出来:

SELECT tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count FROM iceberg_ctl.nyc.taxis LIMIT 5;

下面的查询可以帮助我们探索乘客数量和旅程距离之间的相关性,这里只取其中 10 条结果:

SELECT
passenger_count,
to_year(tpep_pickup_datetime) AS year,
round(trip_distance) AS distance,
count(*)
FROM
iceberg_ctl.nyc.taxis
GROUP BY
passenger_count,
year,
distance
ORDER BY
year,
count(*) DESC
LIMIT
10;

总结

在这篇文章中,我们介绍到 Apache Iceberg 表格式和 Databend 的相关解决方案,并且提供了一个完整的 workshop 供大家探索。

不难看出,尽管目前我们只为 Iceberg Catalog 提供了单机模式的目录挂载能力,但 Databend 已经可以胜任一些基本的查询处理任务。也欢迎大家在自己感兴趣的数据上进行尝试,并给我们提供一些反馈。

各位社区小伙伴们,Databend 于 2023 年 6 月 29 日迎来了 v1.2.0 版本的正式发布!相较于 v1.1.0 版本,开发者们一共新增了 600 次 commit,涉及 3083 个文件变更,约 17 万 行代码修改。感谢各位社区伙伴的参与,以及每一个让 Databend 变得更好的你!

在 v1.2.0 版本中,Databend 新增了 BITMAP 数据类型使用列号直接查询 CSV/TSV/NDJSON 文件AI Functions 等特性,并 设计并实现全新哈希表 大幅提升 Join 的性能。这个版本的发布使得 Databend 更接近实现 LakeHouse 的愿景,能够直接读取和分析储存在对象存储上的 CSV/TSV/NDJSON/Parquet 等格式文件,你也可以在 Databend 内部对这些文件进行 ETL 操作,从而做一些更高性能的 OLAP 分析。

同时,Databend 也设计并实现了 计算列VACUUM TABLEServeless Background Service 等企业级特性,感兴趣的小伙伴可以联系 Databend 团队 了解升级信息,或者访问 Databend Cloud 即时体验。

Databend x 内核

Databend 重要新特性速览,遇到更贴近你心意的 Databend。

数据类型:BITMAP

Databend 新增对 BITMAP 数据类型的支持并实现了一系列相关函数。

BITMAP 是一种压缩数据结构,可以高效地存储和操作布尔值集合。它提供了快速的集合运算和聚合能力,在数据分析和查询方面应用广泛。常见的使用场景包括:去重计数、过滤选择和压缩存储。

Databend 中的 BITMAP 数据类型实现采用 RoaringTreemap 。与其他位图实现相比,使用这种数据结构可以提高性能并减少内存使用。

SELECT user_id, bitmap_count(page_visits) AS total_visits
FROM user_visits

+--------+------------+
|user_id |total_visits|
+--------+------------+
| 1| 4|
| 2| 3|
| 3| 4|
+--------+------------+

如果你想要了解更多信息,请查看下面列出的资源。

使用列号直接查询 CSV/TSV/NDJSON 文件

要想查询 CSV/TSV/NDJSON 这类没有 schema 的文件,以前需要将其加载到表中再进行查询。但是有时候用户事先并不了解文件的具体情况(例如 CSV 文件有多少列),或者只是想做临时的查询。

为此,Databend 引入了列号(column position), 使用 $N 语法来表示第 N 列。CSV/TSV 文件所有列都视作 String 类型,如果某行的列数少于使用到的列号,则会用空字符串进行补齐。NDJSON 文件只有一列 $1 ,类型是 Variant。

将这一能力与 COPY 语句结合使用,可以实现按需加载部分列,并在加载同时使用函数进行数据转换。

SELECT $1 FROM @my_stage (FILE_FORMAT=>'ndjson')

COPY INTO my_table FROM (SELECT TRIM($2) SELECT @my_stage t) FILE_FORMAT = (type = CSV)

如果你想要了解更多信息,请查看下面列出的资源。

设计并实现全新哈希表以提高 Hash Join 性能

在过去,Databend 的哈希表是为了满足聚合算子的需求而专门设计的,为了进一步提高 Hash Join 的性能,我们着手设计并实现了一种专为 Hash Join 优化的全新哈希表,通过并行化的设计,使 Databend 能够充分利用计算资源,同时在内存控制方面也变得更为精准,避免了不必要的内存开销,显著提高了 Hash Join 的性能。

商业智能分析师 Mimoune Djouallah 评论称:Databend 性能出色,在 8 核和 32GB 内存的条件下,运行 TPCH-SF10 只需要 25 秒。他甚至撰写了一篇题为《Databend and the rise of Data warehouse as a code》的博客文章。

如果你想要了解更多信息,请查看下面列出的资源。

AI Functions

Databend 在 v1.2.0 版本引入了强大的 AI 功能,实现了 Data 与 AI 的无缝融合,我们可以通过 SQL 来实现:

  1. 自然语言生成 SQL
  2. Embedding 向量化并存储
  3. 相似度计算
  4. 文本生成

自然语言生成 SQL

比如,如果你在一个 nginx log 的数据库中提问:"What are the top 5 IP addresses making the most requests",运用 Databend 的 AI_TO_SQL 函数,你将直接得到相应的 SQL 语句,使用非常便捷。

Embedding 向量化

借助 Databend 的 AI_EMBEDDING_VECTOR 函数,我们可以实现数据的向量化,并将其保存在 Databend 的 ARRAY 类型中。这样一来,Databend 事实上变成了一个向量数据库。

相似度计算

在向量化表示下,可以计算两个词语、句子或文档的相似度。 例如,假设我们有 "dog" 和 "puppy" 两个词语(也可以是句子),我们首先将它们各自转化为向量 v1 和 v2,然后使用余弦相似度来计算它们的相似度。

cos_sim = dot(v1, v2) / (norm(v1) * norm(v2))

Databend 中的 COSINE_DISTANCE 函数就是对这个公式的实现。

文本生成

文本生成在很多场景下非常有用,现在你可以在 SQL 中使用 AI_TEXT_COMPLETION 函数来完成。

目前,我们已经利用以上 Data + AI 能力,对 https://docs.databend.cn 的所有文档进行了 Embedding 处理,并将其存储到 Databend 中,构建了一个智能问答网站:https://ask.databend.rs。 在这个网站上,你可以提问关于 Databend 的任何问题。

Databend 企业级特性

全新企业级特性上线啦!了解 Databend 如何推动更具价值的数据分析服务。

计算列

计算列(Computed Columns)是一种通过表达式从其他列计算生成数据的列,使用计算列可以将表达式的数据存储下来加快查询速度,同时可以简化一些复杂的查询的表达式。计算列包括存储(STORED)和虚拟(VIRTUAL)两种类型。

  • 存储计算列在每次插入或更新时生成数据并存储在磁盘,在查询时不需要重复计算,可以更快的读取数据。
  • 虚拟计算列不存储数据,不占用额外的空间,在每次查询时实时进行计算。

计算列对读取 JSON 内部字段的数据特别有用,通过将常用的内部字段定义为计算列可以大大减少每次查询过程中提取 JSON 数据的耗时操作。例如:

CREATE TABLE student (
profile variant,
id int64 null as (profile['id']::int64) stored,
name string null as (profile['name']::string) stored
);

INSERT INTO student VALUES ('{"id":1, "name":"Jim", "age":20}'),('{"id":2, "name":"David", "age": 21}');

SELECT id, name FROM student;
+------+-------+
| id | name |
+------+-------+
| 1 | Jim |
| 2 | David |
+------+-------+

如果你想要了解更多信息,请查看下面列出的资源。

VACUUM TABLE

VACUUM TABLE 命令通过从表中永久删除历史数据文件来释放存储空间,有助于优化系统性能。删除的文件包括:

  • 与表相关的快照及其关联的 segments 和 blocks。
  • 孤立文件。在 Databend 中,孤立文件指不再与该表关联的快照、segments 和 blocks。孤立文件可能由各种操作和错误生成,例如在数据备份和还原期间,并且随着时间的推移会占用宝贵的磁盘空间并降低系统性能。

如果你想要了解更多信息,请查看下面列出的资源。

Serverless Background Service

Databend 的内置存储引擎 FuseTable 是一种与 Apache Iceberg 类似的日志结构表,在数据持续写入的过程中,需要定期执行表压缩、重聚类和清理以合并小数据块。小数据块合并的过程会涉及按聚类键排序数据或清理不需要的分支等阶段。

为了自动化执行这一过程需要使用不同的驱动,增加了基础设施的复杂性。而且必须部署和维护其他服务来触发驱动事件。为简化这一过程,Databend 设计并实现了 Serverless Background Service ,能自动发现数据写入之后需要压缩、重排序、清理的表,无需其他服务,也无需用户手动操作,自动触发对应表的维护工作,降低了用户维护的负担,提升了表查询的性能,也降低了数据在对象存储中的成本。

Databend x 生态

Databend 的生态版图得到了进一步的完善。是时候将 Databend 引入你的数据洞见工作流啦!

databend 的 Python 绑定

Databend 现在提供 Python 绑定,为在 Python 中执行 SQL 查询提供了新选择。该绑定内置 Databend,无需部署实例即可使用。

pip install databend

从 databend 导入 SessionContext 并创建会话上下文即可开始使用:

from databend import SessionContext
ctx = SessionContext()
df = ctx.sql("select number, number + 1, number::String as number_p_1 from numbers(8)")

结果 DataFrame 可以使用 to_py_arrow()to_pandas() 转换为 PyArrow 或者 Pandas 格式:

df.to_pandas() # Or, df.to_py_arrow()

现在行动起来,将 Databend 集成到你的数据科学工作流中。

BendSQL - Databend 原生命令行工具

BendSQL 是一款专为 Databend 设计的原生命令行工具,现在已经用 Rust 语言重写,并且同时支持 REST APIFlight SQL 协议。

使用 BendSQL,你可以轻松高效地管理 Databend 中的数据库、表和数据,并轻松执行各种查询和运算。

bendsql> select avg(number) from numbers(10);

SELECT
avg(number)
FROM
numbers(10);

┌───────────────────┐
avg(number)
│ Nullable(Float64)
├───────────────────┤
4.5
└───────────────────┘

1 row in 0.259 sec. Processed 10 rows, 10B (38.59 rows/s, 308B/s)

我们期待与你分享更多关于 BendSQL 的更新!欢迎试用并给我们反馈。

数据集成和 BI 服务

Apache DolphinScheduler

Apache DolphinScheduler 是一个分布式和可扩展的开源工作流协调平台,具有强大的 DAG 可视化界面。支持 30+ 的任务类型,包括像 Flink SQL、DataX、HiveCli 等。能够高并发、高吞吐量、低延迟和稳定地执行百万个量级任务,可以根据计划时间(特殊日期范围或特殊的日期列表)批量执行任务,而且在不影响工作流模板的情况下,工作流实例支持修改、回滚和重新运行。

DolphinScheduler 现已支持 Databend 数据源,可以使用 DolphinScheduler 管理 DataX 任务实现从 MySQL 到 Databend 异构数据库数据同步。

Apache Flink CDC(Change Data Capture)是指 Apache Flink 使用基于 SQL 的查询从各种来源捕获和处理实时数据更改的能力。CDC 允许监视和捕获数据库或流系统中发生的数据修改(插入、更新和删除),并对这些更改进行实时响应。

Databend 现在提供 Flink SQL Connector,可以将 Flink 的流处理能力与 Databend 集成。通过对连接器进行配置,可以以流的形式从各种数据库中捕获数据更改,并将其载入到 Databend 中以进行实时处理和分析。

如果你想要了解更多信息,请查看下面列出的资源。

Tableau

Tableau 是一款流行的数据可视化和业务智能工具。它提供了直观、交互式的方式来探索、分析和呈现数据,帮助用户更好地理解数据的意义和洞察。

参考 Other Databases (JDBC),将 databend-jdbc 放置在 Tableau 驱动路径下,就可以使用 Tableau 分析 Databend 中的数据。

如果你想要了解更多信息,请查看下面列出的资源。

下载使用

如果你对我们新版本功能感兴趣,欢迎访问 https://github.com/datafuselabs/databend/releases/tag/v1.2.0-nightly 页面查看全部的 changelog 或者下载 release 体验。

如果你还在使用旧版本的 Databend,我们推荐升级到最新版本,升级过程请参考:

https://docs.databend.cn/doc/operations/upgrade

意见反馈

如果您遇到任何使用上的问题,欢迎随时通过 GitHub issue 或社区用户群中提建议

GitHub: https://github.com/datafuselabs/databend/

2023 年 4 月 20 日,Databend Cloud 经历了近两年的打磨终于发布了!🎉

此次发布会由北京数变科技有限公司【Databend Labs】联合阿里云共同举办。Databend Cloud 借助于云原生数仓 Databend 实现了云简单易用的大数据分析场景。

以下内容来自 Databend 联合创始人 - 王吟、Databend Cloud 平台负责人 - 李亚舟以及阿里云智能资深产品运营专家 - 蔡亮伟,在本次发布会上的分享总结。

🙋 本次发布会分为三个部分:

第一部分:王吟,李亚舟分享:「 云上数据变革,Databend Cloud 发布」

第二部分:蔡亮伟分享:「阿里云对象存储 OSS, 构建企业级数据湖底座」

第三部分:在阿里云平台上如何开通 Databend Cloud

databend-cloud-release-wy-clw.jpeg (上图: 王吟(左)蔡亮伟(右)共同见证 Databend Cloud 获得阿里云生态认证)

云上数据变革,Databend Cloud 发布

🙋 这部分的主题大纲

1. Databend Cloud 主要功能介绍

2. Databend Cloud 和传统数仓的区别,为什么要选择 Databend Cloud

3. Databend 登顶 ClickBench 测试介绍

4. Databend Cloud 未来定位

Databend Cloud 主要功能介绍

Databend 是一款使用 Rust 研发,完全面向云架构,基于对象存储构建的云原生数仓。

1.png

Databend Cloud 是基于 Databend 打造的弹性云数仓,由 3 层组成,底层基于对象存储,如阿里云 OSS,上层计算节点采用不同的规格,好比个人选购衣服,有小号,中号,大号等,同理计算节点也有小号 1x、中号 2x,大号 8x,根据自己计算的需求,选择不同的型号。最上层就是我们元数据和管理集群,实现多租户的隔离,保证用户的数据安全。

Databend Cloud 架构如下:

2.png

在 Databend Cloud 可以为用户提供:

  • 一站式数据分析和管理平台
  • 丰富的租户及组织管理能力
  • 按资源使用情况计费,零管理零运维
  • 多云解决方案
  • Databend 核心团队提供支撑

Databend 性能如何呢?

ClickBench 是 ClickHouse 发起的分析型数据库性能测试排行榜,收录了 Snowflake、ClickHouse 等 50 多个主流分析型数据库的测试结果,它采用一个公开的标准来衡量数据库的性能,我们的导入性能在三个机型下均为第一名,在 hot run 查询下,我们有一个机型是第一名,其他两个机型分别是第二第三名。

3.png

Databend Cloud 和自己搭建数仓的区别,为什么要选择 Databend Cloud

目前,很多用户在公有云中利用云主机自己部署数仓集群,通常采用传统数仓的存算一体架构,底层存储利用硬盘来构建。

Databend Cloud 底层采用对象存储使用多少用多少无需考虑空间容量,成本是云硬盘的 1/3 到 1/8 甚至更少,使用的计算资源也跟业务需求相关,如果是跑批类业务,只需要在指定时间使用。整体而言可以为企业节省 80% 以上成本。

4.png

Databend Cloud 和使用公有云数仓的区别

公有云中数仓主要还是存算一体架构,底层也是基于云硬盘来构建,相比于场景一,公有云厂商通常提供 PaaS 服务,一键可以安装整个集群到用户 VPC 中,但还需要用户时刻关注集群状态。

成本上,集群长期占用,费用甚至比第一种场景用户自己搭建还要高。所以整体而言,相对于 Databend Cloud 需要一定的运维成本,整体费用高出 4-10 倍以上。

5.png

相对公有云中数仓服务 Databend Cloud 是一家更加开放的公司,可以给你提供多云无锁定服务,同时 Databend Cloud 也是基于 Databend 研发,Databend 也可以在现在市场上所有的公有云上实现私有化部署, 可以让用户实现零担心被云锁定问题。

Databend Cloud 适用于以下业务场景

Databend Cloud 主要定位在云上大数据存储及分析,从现在实际用户使用场景来看 Databend Cloud 给用户提供了:

  • 基于存算分离,可以实现计算层的独立扩容及收缩
  • 基于对象存储,帮助用户实现结构化和半结构化的高压缩存储
  • 提多云上服务体验统一
  • 计算分析能力强,用户数据不只存储,同时拥有灵活的计算能力
  • 基于 AI 实现的问答应用,智能客服等

6.jpeg

部分用户

7.png

阿里云对象存储如何帮助云原生数仓构建统一数据底座

阿里云对象存储已经为上万家客户提供了云上数据湖和数仓分析业务的支持,在此过程中,对象存储不断提升和演进数据湖存储能力,从 1.0 到 3.0 版本,其最终愿景是为上层分析的生态应用提供统一的、多协议接入的存储底座。

在过去的一年中,阿里云存储团队与云原生数仓 Databend Cloud 团队展开了深度合作,通过对象存储 OSS 支持 Databend 实现存算分离,一起迈向 Serverless 架构的未来。

8.png

在此过程中,双方在数据存储方面为客户带来了如下价值:

  • 稳定可靠: 采用多种高可靠技术,如校验,多可用区,跨 Region 复制实现数据不丢不错,保证数据的持续访问。
  • 安全可信: 通过多种数据加密技术,配合完善的权限管控能力,实现全链路的访问安全和存储安全。
  • 弹性伸缩: 以服务化的方式,按量付费,提供从零到上万亿对象和 EB 级存储空间的扩展性。
  • 极致性能: 可为客户提供 Tbps 级别吞吐带宽,优化海量小文件读写,单命名空间支持 50 亿文件。

在阿里云平台如何开通 Databend Cloud

Databend Cloud 现在已经上架阿里云市场,现在阿里云用户开通 Databend 也比较方便,开通的方法可以看 B 站视频👇

另外也可以通过阿里云 OSS 控制台中开通:

9.png

关于 Databend Cloud

Databend Cloud 是基于 Databend 实现上的云原生数仓 SAAS 产品,提供的一站式 SaaS 云数据平台,它具备:免安装、免运维,注册账号即可使用,按你的 SQL 查询按需按量付费,真正做到使用才付费,不查询不使用不付费的特点。它为用户提供了:

  • 存算分离,或是进一步算算分离: 资源弹性,按量付费
  • 同一份数据支持多个计算节点共同访问
  • 基于应用层做好数据加密
  • 多级存储,对用户透明
  • 支持数据 time travel ,可以让用户非常方便地访问数据任意时间点
  • 多云无锁定,统一用户体的大数据解决方案

Databend Cloud 正是基于以上的条件为用户定制一个多云,无锁定,高弹性,高性能,低成本的云上大数据解决方案。

👨‍💻‍ 海外官网: https://www.databend.com

💻 国内官网: https://www.databend.cn

📖 开源社区网站: https://docs.databend.cn

Github: https://github.com/datafuselabs/databend

🎬 B 站: 搜 Databend,技术分享视频

内容提要:本文将会介绍天空计算的背景,以及 Databend 是如何从数据存储、数据管理、数据共享等视角考虑跨云数据存储与访问的。

背景

云计算时代的开端可以追溯到 2006 年,当时 AWS 开始提供 S3 和 EC2 服务。2013 年,云原生概念刚刚被提出,甚至还没有一个完整的愿景。时间来到 2015 年 CNCF 成立,接下来的五年中,这一概念变得越来越流行,并且成为技术人绕不开的话题。

根据 CNCF 对云原生的定义:云原生技术使组织能够在公共、私有和混合云这类现代、动态的环境中构建和运行可扩展的应用程序。典型示例包括:容器、服务网格、微服务、不变基础设施和声明式 API。

然而,无论是公有云还是私有云、无论是云计算还是云服务,在天空中都已经存在太多不同类型的“云”。每个“云”都拥有自己独特的 API 和生态系统,并且彼此之间缺乏互操作性,能够兼容的地方也是寥寥无几。云已经成为事实上的孤岛。这个孤岛不仅仅是指公有云和私有云之间的隔阂,还包括了不同公有云之间、不同私有云之间、以及公有云和私有云之间的隔阂。这种孤岛现象不仅给用户带来了很多麻烦,也限制了云计算的发展。

2021 年 RISELab 发表了题为 The Sky Above The Clouds 的论文,讨论关于天空计算的未来。天空计算将云原生的思想进一步扩展,从而囊括公有云、私有云和边缘设备。其目标是实现一种统一的 API 和生态体系,使得不同云之间可以无缝地协作和交互。这样一来,用户就可以在不同的云之间自由地迁移应用程序和数据,而不必担心兼容性和迁移成本的问题。同时,天空计算还可以提供更高效、更安全、更可靠的计算服务,从而满足用户对于云计算的不断增长的需求。总体上讲,天空计算致力于允许应用跨多个云厂商运行,实现多云之间的互操作性。

(上图引自论文,展示不同类型的多云与天空的区别)

The Databend Way

跨云的关键

Databend 能够满足用户在不同的云之间自由地访问数据并进行查询,而不必担心兼容性和迁移成本的问题。同时,Databend 还可以提供更高效、更安全、更可靠的计算服务,从而满足用户对于云计算的不断增长的需求。从这个角度来看,Databend 已经初步形成了一套天空计算的解决方案。那么,对 Databend 而言,跨云的关键到底落在哪里呢?

(上图所示为 Databend Cloud 架构示意图)

Databend 采用存算分离的架构,并完全面向云对象存储进行设计决策。得益于存储与计算分离、存储与状态分离,Databend 可以实现对资源的精细化控制,轻松部署与扩展 Query 和 Meta 节点,并支持多种不同的计算场景和存储场景,而无需考虑跨云数据管理与移动的问题。

Query 节点和 Meta 节点本身都是轻量化的服务,并且对于部署环境没有严格的依赖。但数据的存储和访问管理就不一样,我们需要考虑不同云服务之间的 API 兼容性、以及如何与云服务本身的安全机制交互从而提供更安全的访问控制机制。对于 Databend 而言,跨云,或者说实现天空计算的关键,就落在数据的管理与访问之上。

(OpenDAL 可以将数据访问问题从 M*N 转化为 M+N)

为了解决这一问题,Databend 抽象出一套统一的数据访问层(OpenDAL,现在是 Apache 软件基金会旗下的孵化项目),从而屏蔽了不同云服务之间的 API 兼容性问题。在接下来的部分,我们将会从不同的视角来观察 Databend 的无痛数据访问体验,体验真正完全云原生的天空计算的魅力。

数据存储

Databend 存储后端的细节隐藏在简单的配置之下,通过修改配置文件就可以轻松地在十数种存储服务之间切换。例如,如果你想使用 AWS S3,只需要指定类型为 s3 即可,Databend 会自动尝试使用 IAM 来进行认证。如果你想使用其他与 S3 兼容的对象存储服务,也可以通过 endpoint_url 等设置来调整。

[storage]
type = "s3"

[storage.s3]
bucket = "databend"

当然,仅支持 S3 兼容的对象存储服务还不够。Databend 通过 OpenDAL 实现了 Google Cloud Storage、Azure Blob、Aliyun OSS、Huawei OBS 和 HDFS 等服务的原生存储后端支持。 这意味着 Databend 可以充分利用各种供应商提供的 API,为用户带来更优秀的体验。例如,Aliyun OSS 的原生支持使得 Databend 可以通过 Aliyun RAM 对用户进行认证和授权,无需设置静态密钥,从而大大提高安全性并降低运维负担。

(上图选自阿里云官网,访问控制场景与能力)

此外,原生支持还可以避免出现非预期行为,并与服务供应商提供更紧密的集成。虽然各大厂商都提供了 S3 兼容 API,但它们之间存在微妙差异,在出现非预期行为时可能会导致服务性能下降或读写数据功能异常。Google Cloud Storage 提供了 S3 兼容的 XML API,但却没有支持批量删除对象的功能。这导致用户在调用该接口时遇到意外错误。而 Google Cloud Storage 的原生支持使 Databend 不必担心 GCS 对 S3 的兼容实现问题对用户业务造成影响。

总之,Databend 通过为各个服务实现原生支持来为用户提供高效可靠的数据分析服务。

数据管理

前面讲过了存储后端的跨云支持,现在让我们将目光聚焦到数据的管理。更具体来说,数据在 Databend 工作流中的流入与流出。

COPY INTO,数据载入

要讲数据管理,就不得不讨论数据从哪里来。过去可能还需要考虑是否需要迁移存储服务,但现在,你可以从数十种 Databend 支持或兼容的存储服务中加载数据,一切都显得那么自然。

COPY INTO 语句是窥探 Databend 跨云能力的一个窗口,下面的示例展示了如何从 Azure Blob 加载数据到 Databend 之中。

COPY INTO mytable
FROM 'azblob://mybucket/data.csv'
CONNECTION = (
ENDPOINT_URL = 'https://<account_name>.blob.core.windows.net'
ACCOUNT_NAME = '<account_name>'
ACCOUNT_KEY = '<account_key>'
)
FILE_FORMAT = (type = CSV);

当然,不止是 Azure Blob,Databend 支持的其他云对象存储服务、IPFS 以及可以经由 HTTPS 访问的文件都可以作为 External location,通过 COPY INTO 语句加载进来。

Databend 的 COPY INTO 语句还支持进行基本的转换服务,可以减轻 ETL 工作的负担。

Stage,数据暂存区

刚刚提到 External location,事实上,要加载到 Databend 中的数据文件还可以在 Stage 中暂存。Databend 同样支持 Internal stage 和 Named external stage。

数据文件可以经由 PUT_INTO_STAGE API 上传到 Internal Stage,由 Databend 交付当前配置的存储后端进行统一管理。而 Named external stage 则可以用于挂载其他 Databend 支持的多种存储服务之中的 bucket。

下面的例子展示了如何在 Databend 中创建一个名为 whdfs 的 Stage,通过 WebHDFS 协议将 HDFS 中 data-files 目录下的数据文件导入 Databend。

bendsql> CREATE STAGE IF NOT EXISTS whdfs URL='webhdfs://127.0.0.1:9870/data-files/' CONNECTION=(HTTPS='false');
Query OK, 0 rows affected (0.01 sec)

bendsql> COPY INTO books FROM @whdfs FILES=('books.csv') file_format=(type=CSV field_delimiter=',' record_delimiter='\n' skip_header=0);
Query OK, 2 rows affected (1.83 sec)

如果你并不想直接导入数据,也可以尝试 SELECT FROM STAGE ,快速分析位于暂存区中的数据文件。

Catalog,数据挂载

放在对象存储中的数据加载得到了解决,还有一个值得思考的问题是,如果数据原本由其他数据分析系统所管理,该怎么办?

Databend 提供多源数据目录(Multiple Catalog)的支持,允许挂载 Hive、Iceberg 等外部数据目录。

下面的示例展示如何利用配置文件挂载 Hive 数据目录。

[catalogs.hive]
type = "hive"
# hive metastore address, such as 127.0.0.1:9083
address = "<hive-metastore-address>"

除了挂载,查询也是小菜一碟 select * from hive.$db.$table limit 10;

当然,这一切也可以通过 CREATE CATALOG 语句轻松搞定,下面的例子展示了如何挂载 Iceberg 数据目录。

CREATE CATALOG iceberg_ctl
TYPE=ICEBERG
CONNECTION=(
URL="s3://my_bucket/path/to/db"
AWS_KEY_ID="<access-key>"
AWS_SECRET_KEY="<secret_key>"
SESSION_TOKEN="<session_token>"
);

Multiple Catalog 相关的能力还在积极开发迭代中,感兴趣的话可以保持关注。

再探 COPY INTO,数据导出

数据导出是数据管理中的另外一个重要话题,简单来讲,就是转储查询结果以供进一步的分析和处理。

这一能力同样由 COPY INTO 语法提供支持,当然,同样支持数十种存储服务和多种文件输出格式。下面的示例展示了如何将查询结果以 CSV 格式文件的形式导出到指定 Stage 中。

-- Unload the data from a query into a CSV file on the stage
COPY INTO @s2 FROM (SELECT name, age, id FROM test_table LIMIT 100) FILE_FORMAT = (TYPE = CSV);

这一语法同样支持导出到 External location,真正做到数据的自由流动。

Databend 还支持 PRESIGN ,用来为 Stage 中的文件生成预签名的 URL,用户可以通过 Web 浏览器或 API 请求自由访问该文件。

数据共享

刚才提到的 Databend 数据管理环节跨云主要是指 Databend 与外部服务之间的交互。此外,Databend 实例之间也可以经由多种云存储服务来支持数据共享。

为了更好地满足多云环境下的数据库查询需求,Databend 设计并实现了一套 RESTful API 来支持数据共享。

(上图所示为数据共享的工作流)

通过在配置文件中添加 share_endpoint_address 相关配置,用户可以利用预先部署好的 open-sharing 服务,经由熟悉的云存储服务共享 Databend 管理的数据库或表。

CREATE SHARE myshare;
GRANT USAGE ON DATABASE db1 TO SHARE myshare;
GRANT SELECT ON TABLE db1.table1 TO SHARE myshare;
ALTER SHARE myshare ADD TENANTS = vendor;

此时,表 db1.table1 将对接受方租户 vendor 可见,并能够进行必要的查询。

CREATE DATABASE db2 FROM SHARE myshare;
SELECT * FROM db2.table1;

跨云的未来

上面的几个视角,只是展示 Databend 在天空计算道路上的一个小小侧影。

数据合规、隐私保护等内容同样是我们所关心的重要议题。

Databend 的愿景是成为未来跨云分析的基石,让数据分析变得更加简单、快速、便捷和智能。

总结

本文介绍了天空计算的概念和背景,以及 Databend 的跨云数据存储和访问。

天空计算是一种将公有云、私有云和边缘设备统一起来的方法,目标是提供一种无缝的 API 和生态体系,使得用户可以在不同的云之间自由地迁移应用程序和数据。

Databend 是一个开源的、完全面向云架构的新式数仓,它采用存算分离的架构,并抽象出一套统一的数据访问层(OpenDAL),从而屏蔽了不同云服务之间的 API 兼容性问题。Databend 可以满足用户在不同的云之间自由地访问数据并进行查询,而不必担心兼容性和迁移成本的问题。同时,Databend 还可以提供更高效、更安全、更可靠的计算服务,从而满足用户对于云计算的不断增长的需求。

欢迎部署 Databend 或者访问 Databend Cloud,即刻探索天空计算的无尽魅力。

各位社区小伙伴们,Databend 于 2023 年 4 月 14 日迎来了 v1.1.0 版本的正式发布!这次新版本是 Databend 发布 1.0 版本之后的第一个大版本!相较于 v1.0.0 版本,开发者们一共新增了 1,616 次 commit,共计 505 个优化和修复,涉及 2,069 个文件变更,约 16 万 行代码修改。感谢各位社区伙伴的参与,以及每一个让 Databend 变得更好的你!

在 v1.1.0 版本中,我们为 COPY INTO 支持了基本的 ETL 能力,在数据导入过程中即可轻松转换数据;Databend 现在能够成功运行所有 TPC-DS 查询,此外,还进行了一些性能优化和功能改进。

Databend x 内核

Databend 重要新特性速览,遇到更贴近你心意的 Databend。

COPY INTO 支持 ETL 能力

COPY INTO 是 Databend 跨多云数据导入的重要路径,现在,它也具备基本的数据转换能力,避免在临时表中存储预转换数据,并支持列重新排序、列省略和基于 SELECT 的转换查询。

  CREATE TABLE my_table(id int, name string, time date);

COPY INTO my_table
FROM (SELECT t.id, t.name, to_date(t.timestamp) FROM @mystage t)
FILE_FORMAT = (type = parquet) PATTERN='.*parquet';

这一功能可以帮助你简化 ETL 工作流,从而更专注于数据分析。

Docs - Load Data | Transforming Data During a Load

支持全部 TPC-DS 查询

Databend 现已支持全部 99 条 TPC-DS 查询!

TPC-DS 是一个面向决策支持系统的包含多维度常规应用模型的决策支持 benchmark,它对决策支持系统的几个普遍适用方面进行建模,包括查询和数据维护。TPC-DS 被广泛用于衡量决策支持和分析系统的性能。

Blog - Benchmarking TPC-DS with Databend

REPLACE INTO

Databend 现在支持使用 REPLACE INTO 语句插入或更新数据。该语句允许你指定一个冲突键(conflict key),用于判断是应该插入一行新数据,还是更新一行已有数据。

如果表中已经存在与冲突键相同的行,Databend 会用新数据更新这一行。否则,新数据会作为一行新记录添加到表中。你可以使用这个语句来轻松地同步不同来源的数据或处理重复记录。

#> CREATE TABLE employees(id INT, name VARCHAR, salary INT);
#> REPLACE INTO employees (id, name, salary) ON (id) VALUES (1, 'John Doe', 50000);
#> SELECT * FROM Employees;
+------+----------+--------+
| id | name | salary |
+------+----------+--------+
| 1 | John Doe | 50000 |
+------+----------+--------+

Window Functions

窗口函数(Window Functions)为每行数据进行一次计算:输入多行(一个窗口)、返回一个值。在报表等分析型查询中,窗口函数能优雅地表达某些需求,发挥不可替代的作用。

  -- use aggrerate window function
SELECT date, AVG(amount) over (partition by date)
FROM BookSold

June 21|544.0
June 21|544.0
June 22|454.5
June 22|454.5
June 23|643.0
June 23|643.0

聚合窗口函数可以将聚合运算应用于窗口中的每一行数据。Databend 所支持的所有聚合函数都可以作为聚合窗口函数使用。

Docs - SQL Functions | Window Functions

Databend x 生态

Databend 的生态版图得到了进一步的完善。是时候将 Databend 引入你的数据洞见工作流啦!

可视化大盘

Metabase、Redash 和 Grafana 都是开源的可视化工具,能够从多个数据源中查询数据并将其可视化。

Databend 现在提供对上述三种工具的支持。你可以使用 Databend 作为数据源,利用这三种工具轻松构建可视化大盘,更好地理解和分析你的数据。

Granafa Dashboard

Docs - Data Visualization | Metabase

Docs - Data Visualization | Redash

Docs - Data Visualization | Connecting Databend With Grafana

编程语言支持

除了支持 Python、Go、Java 之外,Databend 现在还拥有自己的 Rust driver。这意味着你可以使用 Rust 轻松连接 Databend 并执行 SQL 查询。

  use databend_driver::new_connection;

let dsn = "databend://root:@localhost:8000/default?sslmode=disable";
let conn = new_connection(dsn).unwrap();

let sql_create = "CREATE TABLE books (
title VARCHAR,
author VARCHAR,
date Date
);";
conn.exec(sql_create).await.unwrap();

crates.io - databend-driver

Databend x AI

当云数仓遇到当下最热的大模型会擦出怎样的火花?Databend 与 OpenAI 联乘,让生产力多一点 AI。

AI Functions

Databend 现在内置实用 AI 函数 ai_to_sql,支持将自然语言转换为 SQL 语句,轻松为复杂分析任务编写高质量的 SQL。

  SELECT * FROM ai_to_sql(
'List the total amount spent by users from the USA who are older than 30 years, grouped by their names, along with the number of orders they made in 2022');

另外,Databend 还支持文本 embeding 生成、相似度检索、文本补全等能力,一站式轻松构建基于 SQL 查询、由 AI 赋能的生产力工具。

  SELECT doc_id, text_content, cosine_distance(embedding, ai_embedding_vector('What is a subfield of artificial intelligence?')) AS distance
FROM embeddings
ORDER BY distance ASC
LIMIT 5;

SELECT ai_text_completion('Artificial intelligence is a fascinating field. What is a subfield of artificial intelligence?') AS completion;

Docs - SQL Functions | AI Functions

AskBend

不如问问神奇海螺吧!Databend 现在上线 AskBend 知识库问答系统,可以在线问答关于 Databend 的一切。

P.S. AskBend 现已开源,由 Databend Cloud 和 AI Functions 强力驱动,你也可以使用 Markdown 文件创建并部署自己的智能小助手。

AskBend - asking for Databend documentation

Github - datafuselabs/askbend

下载使用

如果你对我们新版本功能感兴趣,欢迎来 https://github.com/datafuselabs/databend/releases/tag/v1.1.0-nightly 页面查看全部的 changelog 或者 下载 release 体验。

如果你还在使用旧版本的 Databend,我们推荐升级到最新版本,升级过程请参考:

https://docs.databend.cn/doc/operations/upgrade

意见反馈

如果您遇到任何使用上的问题,欢迎随时通过 GitHub issue 或社区用户群中提建议

GitHub: https://github.com/datafuselabs/databend/

work with kubesphere

前言

Databend 是一款完全面向云对象存储的新一代云原生数据仓库,专为弹性和高效设计,为您的大规模分析需求保驾护航。Databend 同时是一款符合 Apache-2.0 协议的开源软件,除了访问云服务(https://app.databend.com/)之外,用户还可以自己部署 Databend 生产集群以满足工作负载需要。

Databend 的典型使用场景包括:

  • 实时分析平台,日志的快速查询与可视化。
  • 云数据仓库,历史订单数据的多维度分析和报表生成。
  • 混合云架构,统一管理和处理不同来源和格式的数据。
  • 成本和性能敏感的 OLAP 场景,动态调整存储和计算资源。

KubeSphere 是在 Kubernetes 之上构建的以应用为中心的多租户容器平台,提供全栈的 IT 自动化运维的能力,可以管理多个节点上的容器化应用,提供高可用性、弹性扩缩容、服务发现、负载均衡等功能。

利用 KubeSphere 部署和管理 Databend 具有以下优点:

  • 使用 Helm Charts 部署 Databend 集群,简化应用管理、部署过程和参数设置。
  • 利用 Kubernetes 的特性来实现 Databend 集群的自动恢复、水平扩展、负载均衡等。
  • 与 Kubernetes 上的其他服务或应用轻松集成和交互,如 MinIO、Prometheus、Grafana 等。

本文将会介绍如何使用 KubeSphere 创建和部署 Databend 高可用集群,并使用 QingStor 作为底层存储服务。

配置对象存储

对象存储是一种存储模型,它把数据作为对象来管理和访问,而不是文件或块。对象存储的优点包括:可扩展性、低成本、高可用性等。

Databend 完全面向对象存储而设计,在减少复杂性和成本的同时提高灵活性和效率。Databend 支持多种对象存储服务,如 AWS S3、Azure Blob、Google Cloud Storage、HDFS、Alibaba Cloud OSS、Tencent Cloud COS 等。您可以根据业务的需求和偏好选择合适的服务来存放你的数据。

这里我们以青云 QingStor 为例,介绍与 S3 兼容的对象存储相关配置的预先准备工作。

创建 Bucket

对象存储服务(QingStor)提供了一个无限容量的在线文件存储和访问平台。每个用户可创建多个存储空间(Bucket);您可以将任意类型文件通过控制台或 QingStor API 上传至一个存储空间(Bucket)中;存储空间(Bucket)支持访问控制,您可以将自己的存储空间(Bucket)开放给指定的用户,或所有用户。

登录青云控制台,选中对象存储服务,新建用于验证的 bucket。

需要关注的是 bucket 的名字 <bucket> 和其所在的可用区 <region>

由于这里使用 s3 兼容服务,所以最后连接的 endpoint_url 是 s3.<bucket>.<region>.qingstor.com

创建 API 密钥

API 密钥(Access Key)可以让您通过发送 API 指令来访问青云的服务。API 密钥 ID 须作为参数包含在每一个请求中发送;而 API 密钥的私钥负责生成 API 请求串的签名,私钥需要被妥善保管,切勿外传。默认所有 IP 地址都可使用此密钥调用 API,设置 IP 白名单后只有白名单范围内的 IP 地址才可使用此密钥。

点击右上方菜单,选中 API 密钥,创建新的密钥用于 API 访问。

下载文件中的 qy_access_key_id 对应 access_key_idqy_secret_access_key 对应 secret_access_key

准备 KubeSphere 环境

KubeSpherehttps://kubesphere.io)是在 Kubernetes 之上构建的开源容器平台,提供全栈的 IT 自动化运维的能力,简化企业的 DevOps 工作流。KubeSphere 已被海内外数万家企业采用。此外,KubeSphere 还拥有极为开放的生态,KubeSphere 在 OpenPitrix 的基础上,为用户提供了一个基于 Helm 的应用商店,用于应用生命周期管理。KubeSphere 应用商店让 ISV、开发者和用户能够在一站式服务中只需点击几下就可以上传、测试、安装和发布应用。目前 Databend 已入驻 KubeSphere 应用商店。

KubeSphere 环境搭建

All-in-One 模式部署测试环境

参考官方文档

在 Azure 上 Spot 一台机器:

Welcome to Ubuntu 18.04.6 LTS (GNU/Linux 5.4.0-1089-azure x86_64)

* Documentation: https://help.ubuntu.com
* Management: https://landscape.canonical.com
* Support: https://ubuntu.com/advantage

System information as of Tue Sep 6 02:09:16 UTC 2022

System load: 0.15 Processes: 376
Usage of /: 4.8% of 28.89GB Users logged in: 0
Memory usage: 0% IP address for eth0: 10.0.0.4
Swap usage: 0%

以 All-In-One 模式部署:

注意,需要在 root 下运行。

apt install socat conntrack containerd
systemctl daemon-reload
systemctl enable --now containerd
curl -sfL https://get-kk.kubesphere.io | VERSION=v3.0.2 sh -
chmod +x kk
./kk create cluster --with-kubernetes v1.22.12 --with-kubesphere v3.3.1
+------+------+------+---------+----------+-------+-------+---------+-----------+--------+--------+------------------+------------+-------------+------------------+--------------+
| name | sudo | curl | openssl | ebtables | socat | ipset | ipvsadm | conntrack | chrony | docker | containerd | nfs client | ceph client | glusterfs client | time |
+------+------+------+---------+----------+-------+-------+---------+-----------+--------+--------+------------------+------------+-------------+------------------+--------------+
| ks | y | y | y | y | y | | | y | y | | 1.5.9-0ubuntu3.1 | | | | UTC 02:53:56 |
+------+------+------+---------+----------+-------+-------+---------+-----------+--------+--------+------------------+------------+-------------+------------------+--------------+

如果提示依赖缺失,可以根据需要安装,sudo apt install <name> ,这里只安装前两个。

Kubernetes Version ≥ 1.18
socatRequired
conntrackRequired
ebtablesOptional but recommended
ipsetOptional but recommended
ipvsadmOptional but recommended

访问 KubeSphere 控制面板。

执行下面命令查看关于登录的信息:

Collecting installation results ...
#####################################################
### Welcome to KubeSphere! ###
#####################################################

Console: http://10.0.0.4:30880
Account: admin
Password: P@88w0rd

NOTES:
1. After you log into the console, please check the
monitoring status of service components in
"Cluster Management". If any service is not
ready, please wait patiently until all components
are up and running.
2. Please change the default password after login.

#####################################################
https://kubesphere.io 2022-09-06 15:41:44
#####################################################

访问 30880 端口,并使用用户名密码登录,就可以访问 KubeSphere。为确保能够访问 KubeSphere 和其他服务,请根据实际情况在云平台控制面板为相应端口添加入站出站规则。

KubeSphere Cloud 创建演示环境

创建轻量集群服务:

注册并登录 https://kubesphere.cloud 之后,可以轻松创建轻量集群服务。

使用默认配置创建免费版集群即可尝鲜体验,个人用户每月有 10 小时免费额度。

访问 KubeSphere 控制面板。

点击进入 KubeSphere,使用临时帐号密码登录。

插件启用

登录后的界面,如下图所示:

如需使用应用商店,可以参考 KubeSphere 文档 - 在安装后启用应用商店 启用。

开启后可以在应用商店中搜索找到 Databend,结果类似下图。

企业空间与项目管理

点击平台管理进入访问控制页面,选中企业空间,点击创建,在名称一栏填写你想使用的名称,比如 databend

在侧边栏选中项目,点击创建,分别创建为 databend-metadatabend-query 准备的项目。创建后效果如图所示:

部署 Databend

应用模板载入

虽然应用商店中已经有 Databend 可供选用,但版本较旧(v0.8.122-nightly),新的 PR(v1.0.3-nightly)需要等合并之后才可用,所以建议添加 Databend 官方维护的 helm-charts 作为应用模板。

Databend 官方提供了 Helm Charts,而 KubeSphere 也支持使用 Helm Charts 应用模板。

应用模板是用户上传、交付和管理应用的一种方式。一般来说,根据一个应用的功能以及与外部环境通信的方式,它可以由一个或多个 Kubernetes 工作负载(例如部署有状态副本集守护进程集)和服务组成。作为应用模板上传的应用基于 Helm 包构建。 可以将 Helm Chart 交付至 KubeSphere 的公共仓库,或者导入私有应用仓库来提供应用模板。 https://kubesphere.io/zh/docs/v3.3/workspace-administration/upload-helm-based-application/

在企业空间侧边栏选中 应用管理 ,点击 应用仓库 ,添加 Databend 官方维护的 Helm Charts

待状态变为成功后,就可以基于模板安装部署新的 Databend 应用。

Databend 部署模型

参考文档

典型的 Databend 集群架构如下图所示,需要分别部署多个 Meta 和 Query 节点:

在集群模式下部署 Databend 时,首先需要启动一个 Meta 节点,然后设置并启动其他 Meta 节点以加入第一个 Meta 节点,形成集群。在成功启动所有 Meta 节点后,逐个启动 Query 节点。每个 Query 节点在启动后自动注册到 Meta 节点以形成集群。

Meta 高可用集群部署

选中 databend-meta 项目。点击侧边栏应用负载,选中应用。点击创建,并选中从应用模板。下拉栏中选中之前添加的 Databend,效果如图:

选中 databend-meta,点击安装,设定应用名称及版本,我们推荐总是使用最新版本,以获得更好的体验。

使用示例设置,创建 3 副本的 databend-meta 节点形成集群。生产环境下推荐至少使用 3 副本高可用集群,可以参考 Databend 官方文档进行配置。

bootstrap: true
replicaCount: 3
persistence:
size: 5Gi # 考虑到宿主机资源有限,仅供示范
serviceMonitor:
enabled: true

Query 集群部署

在 Meta 节点的所有副本就绪之后,就可以开始部署 Query 集群。

Query 节点部署的前置步骤与 Meta 节点类似。进入 databend-query 项目,仿照之前的步骤选中 databend-query 应用模板进行创建即可。

配置中需要关注的部分是:

  • databend-meta 连接:这里的地址取决于之前部署的 Meta 集群的相关信息。
  • 存储方式:本示例连接的是 QingStor,使用 S3 兼容协议,所以需要特别关注 endpoint_url
  • 内置用户创建:创建一个名为 databend 密码为 databend 的内置用户,以方便在非 localhost 情况下访问。

这里启动的是一个单副本的 Query 集群,实际情况下可以根据工作负载规模灵活调整。

replicaCount: 1
config:
query:
clsuterId: default
# add builtin user
users:
- name: databend
# available type: sha256_password, double_sha1_password, no_password, jwt
authType: double_sha1_password
# echo -n "databend" | sha1sum | cut -d' ' -f1 | xxd -r -p | sha1sum
authString: 3081f32caef285c232d066033c89a78d88a6d8a5
meta:
# Set endpoints to use remote meta service
# depends on previous deployed meta service、namespace and nodes
endpoints:
- "databend-meta-0.databend-meta.databend-meta.svc:9191"
- "databend-meta-1.databend-meta.databend-meta.svc:9191"
- "databend-meta-2.databend-meta.databend-meta.svc:9191"
storage:
# s3, oss
type: s3
s3:
bucket: "<bucket>"
endpoint_url: "https://s3.<region>.qingstor.com" # for qingstor
access_key_id: "<key>"
secret_access_key: "<secret>"
# [recommended] enable monitoring service
serviceMonitor:
enabled: true
# [recommended] enable access from outside cluster
service:
type: LoadBalancer

KubeSphere 监控

KubeSphere 观测工作负载

待状态变为运行中即可,这时可以很方便使用 KubeSphere 观测工作负载。

资源状态

  • databend-meta

  • databend-query

监控

  • databend-meta

  • databend-query

可访问性测试

节点状态检测

如果是在 All-in-One 模式下部署,我们可以轻松使用容器组 IP 地址来测试节点状态。

psiace@ks:~$ curl 10.233.107.113:8080/v1/health
{"status":"pass"}

而使用 KubeSphere Cloud 部署时,可以在 KubeSphere Cloud 控制面板,选择网络以创建访问规则。

这里以 8080(Admin API)和 8000(Query HTTP Handler)端口为例:

创建后的结果如下图所示:

同样我们可以使用 curl 来检查节点状态。

psiace@ks:~$ curl https://admin-gfkyzxaz.c.kubesphere.cloud:30443/v1/health
{"status":"pass"}

执行查询

bendsql 是一个十分方便的命令行界面工具,可以帮助您顺畅高效地使用 Databend。bendsql 也支持连接 Databend Cloud,管理计算集群和运行 SQL 查询。

安装 bendsql

$ go install github.com/databendcloud/bendsql/cmd/bendsql@latest

连接 databend 集群(以 KubeSphere Cloud 为例)

$ bendsql connect -H query-gfkyzxaz.c.kubesphere.cloud -P 30443 -u databend -p databend --ssl
Connected to Databend on Host: query-gfkyzxaz.c.kubesphere.cloud
Version: DatabendQuery v0.9.57-nightly-df858a1(rust-1.68.0-nightly-2023-03-01T01:23:11.56066902Z)

尝试执行查询

$ bendsql query
Connected with driver databend (DatabendQuery v0.9.57-nightly-df858a1(rust-1.68.0-nightly-2023-03-01T01:23:11.56066902Z))
Type "help" for help.

dd:databend@query-gfkyzxaz/default=> SELECT avg(number) FROM numbers(1000);
+-------------+
| avg(number) |
+-------------+
| 499.5 |
+-------------+
(1 row)

总结

本文介绍了如何使用 KubeSphere 创建和部署 Databend 高可用集群,后端存储服务采用 QingStor,最后使用 bendsql 演示连接集群和执行查询。

作者介绍:邰翀 Databend 研发工程师

Alt text

现在互联网应用越来越复杂,每个公司都会有多种多样的数据库。通常是用最好的硬件来跑 OLTP,甚至还在 OLTP 中进行分库分表来足业务,这样对于一些分析,聚合,排序操作非常麻烦。这也有了异构数据库的数据同步需求,今天重点给大家介绍两个利器:异构数据迁移:Addax 结合云原生数仓 Databend 实现异构数据库数据合并及分析。

Addax 是一个异构数据源离线同步工具,最初来源于阿里的 DataX,致力于实现包括关系型数据库 (MySQL、PostgreSQL、Oracle 等)、HDFS、Hive、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。

Databend 是一个开源、弹性、负载感知的现代云数仓库,赋能企业降本增效。在之前的文章中介绍了如何快速部署 Databend

为什么是 Addax 没有选用 Datax, 实际这次发起这个项目的原因是一个用户原来的环境中有 Clickhouse,Datax 不支持 Clickhose 读取和写入,所以我们优先支持了 Addax 这个异构迁移工具。下面我们通过一个简单练习,让你学习使用 Addax , 另外通过几个进阶案例给你展示一下 Addax 的魅力。

本文中仅以 Addax 的 mysqlreader plugin 为例进行实验,databendwriter 支持所有 Addax 提供的 reader plugin。

1. Addax 基本使用

1.1. 安装 Addax

# more info https://wgzhao.github.io/Addax/4.0.11/
wget https://github.com/wgzhao/Addax/releases/download/4.0.11/addax-4.0.11.tar.gz;
tar xvf addax-4.0.11.tar.gz;

Addax 也支持 Docker 安装编译安装

1.2. Demo (from MySQL to Databend)

在 MySQL Server 中建立迁移用户。(本例中待迁移的表为 db.tb01)

mysql> create user 'mysqlu1'@'%' identified by '123';
mysql> grant all on *.* to 'mysqlu1'@'%';
mysql> create database db;
mysql> create table db.tb01(id int, col1 varchar(10));
mysql> insert into db.tb01 values(1, 'test1'), (2, 'test2'), (3, 'test3');

在 Databend 中建立对应的表结构。(将 MySQL 的 db.tb01 数据迁移至 Databend 的 migrate_db.tb01)

databend> create database migrate_db;
databend> create table migrate_db.tb01(id int null, col1 String null);

进行如下配置后,即可开始迁移。

$ cd addax-4.0.11/bin;

$ cat <<EOF > ./mysql2databend.json
{
"job": {
"setting": {
"speed": {
"channel": 4
}
},
"content": {
"writer": {
"name": "databendwriter",
"parameter": {
"preSql": [
"truncate table @table"
],
"postSql": [
],
"username": "u1",
"password": "123",
"database": "migrate_db",
"table": "tb01",
"jdbcUrl": "jdbc:mysql://127.0.0.1:3307/migrate_db",
"loadUrl": ["127.0.0.1:8000","127.0.0.1:8000"],
"fieldDelimiter": "\\x01",
"lineDelimiter": "\\x02",
"column": ["*"],
"format": "csv"
}
},
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "mysqlu1",
"password": "123",
"column": [
"*"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/db"
],
"driver": "com.mysql.jdbc.Driver",
"table": [
"tb01"
]
}
]
}
}
}
}
}
EOF

$ ./addax.sh -L debug ./mysql2databend.json

1.3. 校验数据

databend> select * from migrate_db.tb01;
+------+-------+
| id | col1 |
+------+-------+
| 1 | test1 |
| 2 | test2 |
| 3 | test3 |
+------+-------+

更多使用方式参见:https://wgzhao.github.io/Addax/4.0.11/writer/databendwriter/

1.4. 小结

上面的例子是通过 Addax 跑通一个表的迁移到 Databend,通过一个简单的例子也可以感受一下 Addax 大概的流程。

但 Addax 远比这个 Demo 强大。另外 Addax 强大之处可能通过参数来控制配置文件,这样比轻松地实现一个配置迁移,甚至可以传入 SQL 这样来读取指定区间做数据的迁移。

2. Addax 进阶使用

Addax 配置框架可以参考:https://wgzhao.github.io/Addax/4.0.11/setupJob/

如果只是使用源端和目标,这块在配置中主要需要关注:

    "content": {
"reader": {},
"writer": {},
}

另外 Addax 参考了 DataX 的设计和使用习惯,对于 DataX 支持语法在 Addax 都可以使用。

下面我举几个生产中可能会用到例子,来给大家参考一下:

  • Case1: 生产中 10 张表的数据合并到 Databend 中一张表
  • Case2: 把 MySQL 中所有的表都迁移到 Databend 中
  • Case3: 指定 SQL 读取原表的数据,迁移到指定的表中

Case 1: 生产中 10 张表的数据合并到 Databend 中一张表

这个需求在生产中比较常见,需要把线上的数据汇聚一个地方进行分析,这块正好可以利用 Databend 基于对象存储及高压缩的能力。假设源端是 MySQL 数据库,目标端是 Databend, 下面是一个简化的配置:

      "reader": {
"name": "mysqlreader",
...
"connection": [
...
"table": [
'${dst_table}'
]
]
},
"writer": {
"name": "databendwriter",
...
"table": '${src_table}',
...
}

基于 这配置我们需要写一个脚本来调用,例如要迁移的前缀是 sbtest 从 1 到 10,最终合并为 sbtest 调用方法如下:

pre_tb="sbtest"
dst_tb="sbtest"
for t in `seq 1 10`
do
tb=$pre_tb$t
echo $tb
./bin/addax.sh ./job/my2databend.json -p "-Dsrc_table=$tb -Ddst_table=$dst_tb"
done

mysql2databend.json 配置参考: databend-workshop/mysql2databend.json at main · wubx/databend-workshop

Case 2: 把 MySQL 中所有的表都迁移到 Databend 中

基于上面的案例,估计大概已经明白其中的套路了。如果 MySQL 到 Databend 还可以基于上面的配置文件,只需要把要迁移的表整理成一个 list 控制。全库迁移难点在于表结构生成,对于 Databend 表结构的生成只需要字段名和类型即可。

表结构生成的脚本参考:https://github.com/wubx/databend-workshop/blob/main/addax_mysql2databend/mysql_str2databend.py

调用方法:

#python3 g_mysql.py -H MySQL_IP -P MySQL_PORT -u MySQL_User -p mysql_password -d dbname |mysql -h databend_ip -Pdatabend_port -udatabend_user dbname

python3 g_mysql.py -H 172.21.16.9 -P 3306 -u root -p vgypH8nc -d wubx |mysql -h 127.0.0.1 -P3307 -uroot wubx

定制迁移配置

      "reader": {
"name": "mysqlreader",
...
"connection": [
...
"table": [
'${src_table}'
]
]
},
"writer": {
"name": "databendwriter",
...
"table": '${src_table}',
...
}

调用方式

pre_tb="sbtest"
for t in `seq 1 10`
do
tb=$pre_tb$t
echo $tb
./bin/addax.sh ./job/mysql2databend.json -p "-Dsrc_table=$tb"
done

Case 3: 指定 SQL 读取原表的数据,迁移到指定的表中

这种场景适合定期小批量迁移的,但原始表里需要有时间字段,比如订单类数据迁移。这里我们要使用到 Addax 数据读取中指定 querySQL 这个特性

    "content": {
"reader": {
"name": "mysqlreader",
"parameter": {
...
"connection": [
{
"querySql": ["$DS"],
...
}
]
}
},
"writer": {
"name": "databendwriter",
...
"table": '${DT}',
...
}

调用方法:

# 利用 DS 传入读取的 SQL ,通过 DT 指定写入的表名
python3 ./bin/addax.py job/msql.json -p "-DDS='select * from sbtest1 limit 10' -DDT=sbtest"

小结

本部分通过几个案例分展示了 Addax 的数据读取能力,通过灵活的组合基本可以满足生产中的各种需求。实际使用中如果为了提速,需要调整任务的配置和 autoPK 的这个功能。另外也可以使用 Addax 和 Databend 也可以担负起数据库的归档需求。

Databend 兼容 MySQL 协议,如果使用 Databend 做 ODS 层,又想结合原来的大数据生态使用,也可以使用 Addax 直接使用 mysqlreader 插件读取 Databend 中的数据。

3. 使用中注意事项

Addax 使用中对于具体的数据库,建议多阅读官方的 reader 或是 writer 的说明,根据实际情况决定合理的配置。这里的案例中 Databend 使用的是 Streaming_load 接口进行的数据写入,如果单节点 databend-query 有压力的情况下,也可以考虑配置多个 databend-query,Addax 也是支持多个 databend-query 写入。

以上都是参考,如果还有不能满足你的使用,或是你对 Addax 和 Databend 有进一步需求,也可以 wx 小 D: Databend 约一些线上的交流。

Alt text

Databend 经历了 2022 年一整年的研发,Databend 的功能和稳定性得到了显著增强,一些用户开始在生产中使用。Databend 帮助他们极大地降低了成本和操作的复杂性问题。

下面是 Databend Roadmap in 2023 (讨论)。

早期的 Roadmap:

主要任务

v1.0 (计划 Release 时间:2023.3.5)

TaskStatusComments
(Query) Support Decimal data type#2931PLANhigh-priority(release in v1.0 )
(Query) Query external stage file(parquet)IN PROGRESShigh-priority(release in v1.0)
(Query) Array functions#7931PLANhigh-priority(release in v1.0)
(Planner) CBOIN PROGRESShigh-priority(release in v1.0)
(Processor) Aggregation spillingIN PROGRESShigh-priority(release in v1.0)
(Storage) Update#9261IN PROGRESSneed-optimized(release in v1.0)
(Storage) Alter tableIN PROGRESShigh-priority(release in v1.0 )
(Storage) Block data cachePLANhigh-priority(release in v1.0 )
(Storage) Fuse engine orphan data cleanupPLANhigh-priority(release in v1.0)
(Integration) CDCIN PROGRESShigh-priority(release in v1.0)

在 Databend v1.0 版本中,主要本着生产中用户一些反馈把 Databend 进一步的增强:

  • 增加 Decimal 类型;

  • 支持直接查询外部 parquet 的文件;

  • 主要 Array 函数支持;

  • 支持:alter table 增加删除列的操作,update 进一步增强,支持 CDC 相关操作;

  • 引入 Block data cache 用于节省对象存储上的查询请求费用,同时也启到加速作用;

目前 CDC 及数据迁移工作这块对接了:

  1. 支持 Addax 数据迁到到 Databend 中 [done]

  2. 支持 DataX 到 Databend & Databend Cloud 的数据迁移对接中 [IN PROGRESS]

  3. 对接 Tapdata 到 Databend & Databend Cloud 的 CDC 数据写入 [IN PROGRESS]

  4. 对接 Airbyte & DBT 到 Databend & Databend Cloud 的数据写入 [IN PROGRESS]

v1.1 (计划 Release 时间:2023.4.5)

TaskStatusComments
(Query) Fulltext index#3915PLANhigh-priority(release in v1.1)
(Query) JSON indexingPLANhigh-priority(release in v1.1)
(Query) Distributed COPY#8594PLANhigh-priority(release in v1.1)
(Storage) Fuse engine re-clusteringPLANhigh-priority(release in v1.1)
(Storage) Fuse engine segment treePLANhigh-priority(release in v1.1)

在 Databend v1.1 也许你已经发现 json indexing, fulltext index 这样的功能,这也是 v1.1 版本的主要目标,目前挺多用户使用 Databend 替代 ES 做日志相关的服务,这样也就要求 Databend 对 json 字段的处理能力要求非常高了。概括来讲 v1.1 需要具备:

  1. 具备替换 ES 初步能力

  2. 支持分布式调用 copy 数据导入能力

  3. 支持 cluster key,让数据按某个字段排序

  4. 支持 segment tree 让数据写入及缓存更加的高效,单表 PB 级别就非常轻松了

Databend 开发是以对象存储为基础,用户需求为核心,2023 年整体上以小步快跑的方式进行开发,经团队和用户的沟通功能需求大概如下:

Query 功能

TaskStatusComments
Update#9261DONEneed optimized(release in v1.0)
PrivilegesIN PROGRESS
Alter tableIN PROGRESShigh-priority(release in v1.0 )
Window functionPLAN
Lambda function and high-order functionsPLAN
TimestampTz data typePLAN
Materialized viewPLAN
Support SET_VAR hints#8833PLAN
Parquet readerPLAN
DataFramePLAN
Data Sharing(community version)IN PROGRESS
Concurrent query enhancePLAN
Distributed COPY#8594PLAN
Support Decimal data type#2931PLANhigh-priority(release in v1.0 )

在 Query 规划中有一些特别的功能:

基础部分:

  1. update 实现上的优化 [v1.0]

  2. alter table 表结构秒级变更的实现 [v1.0]

  3. read parquet 文件

进阶部分:

  1. Windows Function 窗口函数支持,这个跑通 TPC-DS 必备条件

  2. 物化视图,后续可以让 Databend 支持更加复杂的 ETL

  3. DataFrame 这块是新型数据工作者比较喜欢的方式

高级部分:

  1. 数据共享,这是构建 Data market 一个基础

  2. 并发访问控制

  3. 分布式的 copy 数据装载控制

改进

TaskStatusComments
New expression#9411DONE
Error messagePLAN

New expression 是 2023 年开年之初合并进来最大的一个功能。基于 New expression 有了完善的类型推导机制,在 SQL 的编译期能尽可能推断出表达式的执行方式,极简的表达式函数注册逻辑,以及在数据库类型级别实现了泛型的推导。在新的类型系统基础上,常量折叠,类型推导,函数的注册,查询数据裁剪 等模块都能享受到新类型系统带来的红利。

优化 错误 信息提示,让使用者更容易定位到问题。

Resource Quota

TaskStatusComments
Session-level quota control (CPU/Memory)IN PROGRESS
User-level quota control (CPU/Memory)PLAN

限制 Query & User 使用最大资源,这块特别是一些复杂的 Query 在资源有限的情况下,需要通过一些控制来降低内存的使用,减少 OOM 问题。

Planner

TaskStatusComments
Scalar expression normalizationPLAN
Column constraint frameworkPLAN
Functional dependency framework#7438PLAN
Join reorderIN PROGRESS
CBOIN PROGRESShigh-priority(release in v1.0)
Support TPC-DSPLAN
Support optimization tracingPLANEasy to debug/study.

在 2022 年 Databend 支持复杂的 join 运算,跑通了 TPCH,现在需要通过 CBO 增强后,进一步提升 join 的智能优化。

Cache

TaskStatusComments
Unified cache layerIN PROGRESS
Meta data cacheIN PROGRESS
Index data cacheIN PROGRESS
Block data cachePLANhigh-priority(release in v1.0 )

对于 Cache 有三块明确的需求

  1. 减少 Query 节点和对象存储的交互,降低费用;

  2. 让数据库在 Query 节点存储直接命中,提升性能;

  3. 通过更优的格式存储提升查询能力,目前测试 databend nagive engine 在本地盘情况下,可以和 Clickhouse 对齐。

Data Storage

TaskStatusComments
Fuse engine re-clusteringPLANhigh-priority(release in v1.1)
Fuse engine orphan data cleanupPLANhigh-priority(release in v1.0)
Fuse engine segment treePLANSupport large dataset(PB) in one table

存储这块目前来看有几个比较明确的功能需求:

  1. 支持 re-clustering,让数据按某些列有序的存储,减少排序和区间查询的扫描范围

  2. 清理计算及使用中产生的孤儿数据(v1.0 版本就完成)

  3. 进一步优化 PB 级别的表,降低资源使用

Distributed Query Execution

TaskStatusComments
Visualized profilingIN PROGRESS
Aggregation spillingIN PROGRESShigh-priority(release in v1.0)

分布式执行是 Databend 在追求的方向,执行计划可视化增强,同时也要在分布式执行上引入 spilling 策略,用于支持离线环境使用较小的资源来计算较大的数据的场景。

TaskStatusComments
JSON indexingPLANhigh-priority
Fulltext index#3915PLANhigh-priority
Array functions#7931PLANhigh-priority
Faiss index#9699PLAN

Databend 在 json 支持引,利用 Rust 实现 mongodb 的 jsonb 实现,让 Databend 的 json 支持上有了进一步的提升。今年计划进一步的增强 json 的地实现,使期可以满足 ES 的一部分功能。

LakeHouse

TaskStatusComments
Apache HiveIN PROGRESS
Apache IcebergIN PROGRESS
Delta LakeIN PROGRESS
Querying external storage(Parquet)IN PROGRESS

原生大数据生态对接,Databend 给现有大数据生态加速。目前已经上生产环境的有:Databend + Hive,也在不断的优化中。

Integrations

TaskStatusComments
Dbt integrationIN PROGRESS
Airbyte integrationIN PROGRESS
Datadog Vector integrate with Rust-driverIN PROGRESS
Datax integrate with Java-driverIN PROGRESS
CDC with FlinkPLAN
CDC with KafkaPLAN

生态对接方面,目前 Databend 主要集中 ETL 及 数据迁移相关的工具,同时也会也对接分析及可视化类工具。

Meta

TaskStatusComments
Jepsen testIN PROGRESS
Store membership in raftPLAN
Nonblocking snapshot buildingPLAN
Snapshot file format implPLAN
Upgrade on-disk store formatPLAN

Databend meta 是 Databend 非常核心的一个组件。主要用于 Databend 中定义的 meta 信息,事务,锁,一致性信息的管理。

Testing

TaskStatusComments
SQLlogic TestIN PROGRESSSupports more test cases
SQLancer TestIN PROGRESSSupports more types and more cases
Fuzzer TestPLAN

Databend 目前测试从 Python 的 SQLlogic Test 切换到了 Rust SQLlogic Test 测试性能有了显著的提升,同时集成了 SQLancer Test 让 Databend 也有了更强的稳定性。后期还是需要增加更多的测试用例。

如果以上还你特别需要,还没包含进来的,也欢迎来 Databend repo 讨论区交流。如果你想参与其中某个功能的开发也欢迎参与进来。

Releases

各位社区小伙伴们,历经数月开发,Databend 于 2023 年 1 月 13 日迎来了 v0.9.0 版本的正式发布!这次新版本是 Databend 迈向 1.0 版本的最后一个大版本,也是迄今为止我们对核心代码重构幅度最大的一个版本!相较于 v0.8.0 版本,开发者们一共新增了 5000 多次 commit,共计700多个优化和修复,涉及4347 个文件变更,约34w行代码修改。感谢各位社区伙伴的参与,以及每一个让 Databend 变得更好的你!

在 v0.9.0 版本中,我们引入了新的类型系统,新的表达式计算框架,JSONB 支持,完整的 join 支持和优化,CBO 支持,Native Storage Format 等主要功能优化,同时性能、稳定性、易用性等方面做了大量优化增强,欢迎大家下载试用。

性能对比

在新版本中,我们在执行引擎,优化器,存储层都做了很多优化,大部分场景都有 2 倍以上提升,下面是在 hits 数据集使用 fuse 默认引擎在 s3 存储下两个版本的性能对比

全新的类型系统

为了让 Databend 拥有一个易于理解而又功能强大的类型推导系统,我们借鉴了不少优秀编程语言的编译器内部设计,然后从中精简出适用于 SQL 使用的子集。基于目前的纯静态的类型系统,我们有了完善的类型推导机制,在 SQL 的编译期能尽可能推断出表达式的执行方式,极简的表达式函数注册逻辑,以及在数据库类型级别实现了泛型的推导。

在新的类型系统基础上,常量折叠,类型推导,函数的注册,查询数据裁剪 等模块都能享受到新类型系统带来的红利。 这里有一份简短的介绍:https://zhuanlan.zhihu.com/p/561777236 由于这里太小写不下,不久之后,我们会对此做一个深入的分享,感兴趣的朋友可以关注下。

JSONB 支持

新版本中,我们实现 Rust 版本的 JSONB,默认的 JSON 数据类型 将使用 JSONB 存储,同时也兼容老的 JsonText 格式。基于二进制 JSON 格式,存储空间和查询性能都得到非常明显的优化。

参考:https://docs.databend.cn/doc/contributing/rfcs/json-optimization

完善的 Join SQL 支持

支持完整的 Join types: inner/natural/cross/outer/semi/anti join。

在过去的数月中,针对社区和线上用户的反馈,对 hash join 进行了深度的优化,能够覆盖大多数场景的性能要求。

CBO 支持

在统计信息中,我们加入了 NDV 的统计计算逻辑,用户可以通过类似 presto 的 "Analyze" 命令来生成统计信息表。JOIN 可以利用已有的统计信息,对逻辑计划进行基于代价的优化。后续 CBO 支持完善后,我们会更新 TPCH 100G 数据下的查询性能数据对比。

Native 格式支持

Databend 支持 Git-Like 的 Fuse engine,基于此 engine,我们可以快速回溯到某个历史时间点来查询,在数据库内部实现了 "时间旅行"。而在 Fuse engine 的内部,我们也支持了除 Parquet 之外的 新的 Storage Format --- strawboat: https://github.com/sundy-li/strawboat。 Strawboat 是基于 arrow 的 native storage format,基于它我们在数据读取方面可以做的比 Parquet 更高效,在 hits 数据集中,全表扫 native 格式能快 2-3 倍。在 hits 数据集中,本地部署的场景下取得非常可观的提升,后续我们会完善下性能对比到 clickbench 中。

高效的 bloom filter 过滤

新版本我们引入了 xor filter 来为每个列计算存储 bloom filter,新的 bloom filter 较比之前的版本,导入查询性能,占用空间能都得到了不少优化,参考:https://www.databend.com/blog/xor-filter

设计并开源 serverless DataSharing protocol

实现了基于 object storage presign 短期访问 token 的方式,多租户之间零信任数据共享解决方案。

在基本性能一致的情况下,使用 aws lambda,以 serverless 的方式实现数据共享。

Stage 相关

实现了 UserStage 功能,类似 linux 的 home 目录: COPY INTO my_table FROM @~;

Stage 的数据导入支持 meta 存储状态,这意味着我们可以一直从 stage 存入新文件来导入 databend;

支持从 Stage 中按不同格式导出多个文件;

从 Stage 导入表支持并行化;

...

其他

除了上面的主要功能外,我们还有其他的新功能或优化点:

  1. duckdb 的 read_parquet,支持无需导入,直接读取本地的 parquet 文件

  2. 常用函数性能优化,常用 GEO 函数支持

  3. Distinct 性能优化

  4. Adaptive String HashTable

  5. SQLancer 对接

  6. Parquet 读取加速

  7. 使用 Rust 重写了之前的 python 版本 sqllogictest

  8. NDJSON and JSON output format 支持

  9. ALTER TABLE 支持 recluster

  10. 根据 https://db.in.tum.de/~freitag/papers/p23-freitag-cidr19.pdf,支持 hyperloglog 的更新和删除。 ...

下载使用

如果你对我们新版本功能感兴趣,欢迎来 https://github.com/datafuselabs/databend/releases/tag/v0.9.0-nightly 页面查看全部的 changelog 或者 下载 release 体验。

如果你在使用旧版本的 Databend,你可以直接升级到新版本,升级过程请参考:https://docs.databend.cn/doc/operations/upgrade

意见反馈

如果您遇到任何使用上的问题,欢迎随时通过 GitHub issue 或社区用户群中提建议

GitHub: https://github.com/datafuselabs/databend/

致谢

最后感谢参与新版本设计开发,测试,文档贡献的开发者们。

感谢有你们 (GitHub 昵称排序):