Skip to main content

One post tagged with "Join"

View All Tags

image.png

join order 的重要性

Join order 是指在执行SQL查询时,决定多个表进行 join 的顺序。它是数据库查询优化的一个重要方面,对查询性能和效率有着重要的影响, 不同的 join order 对性能可能有数量级的影响。

优化器优化 join order 的核心流程

  1. join plan 枚举
  2. 根据统计信息估算结果的大小 (cardinality estimation)
  3. 把 2 中的结果带入到代价模型计算枚举 plan 的 代价 (cost model)

本文中我们只关心第一步 join plan enumeration, 也就是 join reorder 算法。

join reorder 算法

贪心启发式算法

当需要 join 的表都数量过多时(通常超过10个),适合用贪心算法,其优势在于能够较快的找到还不错的 join order.

核心思想:从一张表拓展到 N 张表,每次选出使当前代价最小的一张表,加入到 join tree中,构建出 left-deep tree.

贪心算法也有很多拓展,主要拓展点是围绕如果 避免局部最优以及产生 bushy tree

枚举算法(top-down & bottom-up)

主流的两种

  • 基于规则变换的 Top-down 枚举,可以结合 Top-down cascasde 框架通过记忆化的方式来实现
  • 基于 DP 的 bottom-up 枚举, 典型代表 DPhyp 算法,其优势在于可以高效的产生 bushy tree

一般情况下,数据库系统会把贪心和枚举有效的结合,从而对任意数量的表 join 都能够在合理的时间内得到有效的 join order

Databend join reorder 现状

databend 优化器基于 Rule 进行优化,每条 Rule 通过 pattern 来匹配 plan 中的 sub-tree。主要分为两个阶段,启发式优化和基于 cascades 框架的代价优化,两个阶段共用一套 Rule。

在启发式阶段优化结束后,会对优化后的 plan 执行 DPhyp 算法来尝试得到最优的 join order, 如果 Dphyp 优化失败,会在 CBO 中找到最优的 left-deep tree. (CBO 中不尝试进行 bushy tree 优化,因为如果 Dphyp 已经优化失败,那么尝试在 CBO 中进行 bushy tree 优化,搜索空间很有可能爆炸,如 tpcds 64)

Databend 目前没有支持贪心算法 (下一阶段的 roadmap 会做相关支持来处理极端情况下的 case),首先会利用 dphyp 算法来得到最优解,如果 dphyp 失败(query 中存在不适合 dphyper 算法的 pattern),会在 cascades 框架中利用基于规则变换的 Top-down 枚举得到 left-deep tree. 如果表的数量过多,如超过十个,会在 dphyp 算法中放弃部分搜索空间来做 tradeoff。

Dphyp 的核心定义及算法

hypergraph

一个超图是一个由节点集合 V 和超边集合 E 组成的二元组 H = (V,E),其中:

  1. V是非空节点集合。
  2. E是超边集合,超边是V的非空子集(u ⊂ V)和(v ⊂ V)的无序对(u,v),并且满足额外条件 u∩v = ∅。

有了超图就可以描述多节点之间连接。

image.png

对于上图,它们的 join condition 是 R1.a + R2.b + R3.c = R4.d + R5.e + R6.f 所以 hyperedge 就是 {R1, R2, R3} — {R4, R5, R6}

csg-cmp-pair

csg: connected-subgraph (连通子图)

cmp: connected-complement (连通互补对)

如果两个 csg 之间没有交集,且存在超边连接,其中一个就是另一个的 cmp, 二者构成 csg-cmp-pair. 算法的核心就是通过递归无重复的枚举出所有的 csg-cmp-pair, 找出代价最小的包含所有点的 csg-cmp-pair.

algorithm

算法核心:hypergraph 中的节点是有序的,节点从后往前迭代(递减),每个节点只考虑其自身及其之后(序列号更大)的节点,找到可能的连通子图及其连通互补图,构成 csg-cmp-pair, 计算并更新出其代价,当迭代到最小的节点后,会得到包含所有点的 csg-cmp-pair, 算法结束。

算法流程

  1. EmitCsg: 寻找 {v} 的互补连通子图
    • a. 如果找到, EmitCsgCmp
    • b. EnumerateCmpRec:扩展互补连通子图
      • 如果扩展后的互补连通子图可以与 {v} 形成 csg-cmp-pair, 则 EmitCsgCmp
      • 回到 b,继续扩展
  2. EnumerateCsgRec: 扩展 {v}
    • a. 得到扩展后的 {v’}, 对 {v’} 执行 1
    • b. 回到2,继续扩展

核心代码和数据结构定义可参考:

(https://github.com/datafuselabs/databend/blob/main/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs)

image.png

Kafka Connect 介绍

Kafka Connect 是一个用于在 Apache Kafka® 和其他数据系统之间可扩展且可靠地流式传输数据的工具。通过将数据移入和移出 Kafka 进行标准化,使得快速定义连接器以在 Kafka 中传输大型数据集变得简单,可以更轻松地构建大规模的实时数据管道。

image.png

我们使用 Kafka Connector 读取或写入外部系统、管理数据流以及扩展系统,所有这些都无需开发新代码。Kafka Connect 管理与其他系统连接时的所有常见问题(Schema 管理、容错、并行性、延迟、投递语义等),每个 Connector 只关注如何在目标系统和 Kafka 之间复制数据。

Kafka 连接器通常用来构建 data pipeline,一般有两种使用场景:

  • 开始和结束的端点: 例如,将 Kafka 中的数据导出到 Databend 数据库,或者把 Mysql 数据库中的数据导入 Kafka 中。

  • 数据传输的中间媒介: 例如,为了把海量的日志数据存储到 Elasticsearch 中,可以先把这些日志数据传输到 Kafka 中,然后再从 Kafka 中将这些数据导入到 Elasticsearch 中进行存储。Kafka 连接器可以作为数据管道各个阶段的缓冲区,将消费者程序和生产者程序有效地进行解耦。

Kafka Connect 分为两种:

  • Source Connect: 负责将数据导入 Kafka。
  • Sink Connect: 负责将数据从 Kafka 系统中导出到目标表。

image.png

Databend Kafka Connect

Kafka 目前在 Confluent Hub 上提供了上百种 Connector,比如 Elasticsearch Service Sink Connector, Amazon Sink Connector, HDFS Sink 等,用户可以使用这些 Connector 以 Kafka 为中心构建任意系统之间的数据管道。现在我们也为 Databend 提供了 Kafka Connect Sink Plugin,这篇文章我们将会介绍如何使用 MySQL JDBC Source Connector 和 Databend Sink Connector 构建实时的数据同步管道。

image.png

启动 Kafka Connect

本文假定操作的机器上已经安装 Apache Kafka,如果用户还没有安装,可以参考 Kafka quickstart 进行安装。

Kafka Connect 目前支持两种执行模式:Standalone 模式和分布式模式。

启动模式

Standalone 模式

在 Standalone 模式下,所有的工作都在单个进程中完成。这种模式更容易配置以及入门,但不能充分利用 Kafka Connect 的某些重要功能,例如,容错。我们可以使用如下命令启动 Standalone 进程:

bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]

第一个参数 config/connect-standalone.properties 是 worker 的配置。这其中包括 Kafka 连接参数、序列化格式以及提交 Offset 的频率等配置:

bootstrap.servers=localhost:9092
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

后面的配置是指定要启动的 Connector 的参数。上述提供的默认配置适用于使用 config/server.properties 提供的默认配置运行的本地集群。如果使用不同配置或者在生产部署,那就需要对默认配置做调整。但无论怎样,所有 Worker(独立的和分布式的)都需要一些配置:

  • bootstrap.servers: 该参数列出了将要与 Connect 协同工作的 broker 服务器,Connector 将会向这些 broker 写入数据或者从它们那里读取数据。你不需要指定集群的所有 broker,但是建议至少指定 3 个。

  • key.converter 和 value.converter: 分别指定了消息键和消息值所使用的的转换器,用于在 Kafka Connect 格式和写入 Kafka 的序列化格式之间进行转换。这控制了写入 Kafka 或从 Kafka 读取的消息中键和值的格式。由于这与 Connector 没有任何关系,因此任何 Connector 可以与任何序列化格式一起使用。默认使用 Kafka 提供的 JSONConverter。有些转换器还包含了特定的配置参数。例如,通过将 key.converter.schemas.enable 设置成 true 或者 false 来指定 JSON 消息是否包含 schema。

  • offset.storage.file.filename: 用于存储 Offset 数据的文件。

这些配置参数可以让 Kafka Connect 的生产者和消费者访问配置、Offset 和状态 Topic。配置 Kafka Source 任务使用的生产者和 Kafka Sink 任务使用的消费者,可以使用相同的参数,但需要分别加上‘producer.’和‘consumer.’前缀。bootstrap.servers 是唯一不需要添加前缀的 Kafka 客户端参数。

distributed 模式

分布式模式可以自动平衡工作负载,并可以动态扩展(或缩减)以及提供容错。分布式模式的执行与 Standalone 模式非常相似:

bin/connect-distributed.sh config/connect-distributed.properties

不同之处在于启动的脚本以及配置参数。在分布式模式下,使用 connect-distributed.sh 来代替 connect-standalone.sh。第一个 worker 配置参数使用的是 config/connect-distributed.properties 配置文件:

bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
#offset.storage.partitions=25
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
#status.storage.partitions=5
offset.flush.interval.ms=10000

Kafka Connect 将 Offset、配置以及任务状态存储在 Kafka Topic 中。建议手动创建 Offset、配置和状态的 Topic,以达到所需的分区数和复制因子。如果在启动 Kafka Connect 时尚未创建 Topic,将使用默认分区数和复制因子来自动创建 Topic,这可能不适合我们的应用。在启动集群之前配置如下参数至关重要:

  • group.id: Connect 集群的唯一名称,默认为 connect-cluster。具有相同 group id 的 worker 属于同一个 Connect 集群。需要注意的是这不能与消费者组 ID 冲突。

  • config.storage.topic: 用于存储 Connector 和任务配置的 Topic,默认为 connect-configs。需要注意的是这是一个只有一个分区、高度复制、压缩的 Topic。我们可能需要手动创建 Topic 以确保配置的正确,因为自动创建的 Topic 可能有多个分区或自动配置为删除而不是压缩。

  • offset.storage.topic: 用于存储 Offset 的 Topic,默认为 connect-offsets。这个 Topic 可以有多个分区。

  • status.storage.topic: 用于存储状态的 Topic,默认为 connect-status。这个 Topic 可以有多个分区。

需要注意的是在分布式模式下需要通过 rest api 来管理 Connector。

比如:

GET /connectors – 返回所有正在运行的connector名。
POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
GET /connectors/{name} – 获取指定connetor的信息。
GET /connectors/{name}/config – 获取指定connector的配置信息。
PUT /connectors/{name}/config – 更新指定connector的配置信息。

配置 Connector

MySQL Source Connector

  1. 安装 MySQL Source Connector Plugin

这里我们使用 Confluent 提供的 JDBC Source Connector。

从 Confluent hub 下载 Kafka Connect JDBC 插件并将 zip 文件解压到 /path/kafka/libs 目录下。

  1. 安装 MySQL JDBC Driver

因为 Connector 需要与数据库进行通信,所以还需要 JDBC 驱动程序。JDBC Connector 插件也没有内置 MySQL 驱动程序,需要我们单独下载驱动程序。MySQL 为许多平台提供了 JDBC 驱动程序。选择 Platform Independent 选项,然后下载压缩的 TAR 文件。该文件包含 JAR 文件和源代码。将此 tar.gz 文件的内容解压到一个临时目录。将 jar 文件(例如,mysql-connector-java-8.0.17.jar),并且仅将此 JAR 文件复制到与 kafka-connect-jdbc jar 文件相同的 libs 目录下:

cp mysql-connector-j-8.0.32.jar /opt/homebrew/Cellar/kafka/3.4.0/libexec/libs/
  1. 配置 MySQL Connector

/path/kafka/config 下创建 mysql.properties 配置文件,并使用下面的配置:

name=test-source-mysql-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/mydb?useSSL=false
connection.user=root
connection.password=123456
#mode=timestamp+incrementing
mode=incrementing
table.whitelist=mydb.test_kafka
poll.interval.ms=1000
table.poll.interval.ms=3000
incrementing.column.name=id
#timestamp.column.name=tms
topics=test_kafka

针对配置我们这里重点介绍 modeincrementing.column.name ,和 timestamp.column.name 几个字段。Kafka Connect MySQL JDBC Source 提供了三种增量同步模式:

  • incrementing
  • timestamp
  • timestamp+incrementing
  1. 在 incrementing 模式下,每次都是根据 incrementing.column.name 参数指定的列,查询大于自上次拉取的最大 id:
SELECT * FROM mydb.test_kafka
WHERE id > ?
ORDER BY id ASC

这种模式的缺点是无法捕获行上更新操作(例如,UPDATE、DELETE)的变更,因为无法增大该行的 id。

  1. timestamp 模式基于表上时间戳列来检测是否是新行或者修改的行。该列最好是随着每次写入而更新,并且值是单调递增的。需要使用 timestamp.column.name 参数指定时间戳列。

需要注意的是时间戳列在数据表中不能设置为 Nullable.

在 timestamp 模式下,每次都是根据 timestamp.column.name 参数指定的列,查询大于自上次拉取成功的 gmt_modified:

SELECT * FROM mydb.test_kafka
WHERE tms > ? AND tms < ?
ORDER BY tms ASC

这种模式可以捕获行上 UPDATE 变更,缺点是可能造成数据的丢失。由于时间戳列不是唯一列字段,可能存在相同时间戳的两列或者多列,假设在导入第二条的过程中发生了崩溃,在恢复重新导入时,拥有相同时间戳的第二条以及后面几条数据都会丢失。这是因为第一条导入成功后,对应的时间戳会被记录已成功消费,恢复后会从大于该时间戳的记录开始同步。此外,也需要确保时间戳列是随着时间递增的,如果人为的修改时间戳列小于当前同步成功的最大时间戳,也会导致该变更不能同步。

  1. 仅使用 incrementing 或 timestamp 模式都存在缺陷。将 timestamp 和 incrementing 一起使用,可以充分利用 incrementing 模式不丢失数据的优点以及 timestamp 模式捕获更新操作变更的优点。需要使用 incrementing.column.name 参数指定严格递增列、使用 timestamp.column.name 参数指定时间戳列。
SELECT * FROM mydb.test_kafka
WHERE tms < ?
AND ((tms = ? AND id > ?) OR tms > ?)
ORDER BY tms, id ASC

由于 MySQL JDBC Source Connector 是基于 query-based 的数据获取方式,使用 SELECT 查询来检索数据,并没有复杂的机制来检测已删除的行,所以不支持 DELETE 操作。可以使用基于 log-based 的 [Kafka Connect Debezium]。

后面的演示中会分别演示上述模式的效果。更多的配置参数可以参考 MySQL Source Configs

Databend Kafka Connector

  1. 安装 OR 编译 Databend Kafka Connector

可以从源码编译得到 jar 或者从 release 直接下载。

git clone https://github.com/databendcloud/databend-kafka-connect.git & cd databend-kafka-connect
mvn -Passembly -Dmaven.test.skip package

databend-kafka-connect.jar 拷贝至 /path/kafka/libs 目录下。

  1. 安装 Databend JDBC Driver

Maven Central 下载最新的 Databend JDBC 并拷贝至 /path/kafka/libs 目录下。

  1. 配置 Databend Kafka Connector

/path/kafka/config 下创建 mysql.properties 配置文件,并使用下面的配置:

name=databend
connector.class=com.databend.kafka.connect.DatabendSinkConnector

connection.url=jdbc:databend://localhost:8000
connection.user=databend
connection.password=databend
connection.attempts=5
connection.backoff.ms=10000
connection.database=default

table.name.format=default.${topic}
max.retries=10
batch.size=1
auto.create=true
auto.evolve=true
insert.mode=upsert
pk.mode=record_value
pk.fields=id
topics=test_kafka
errors.tolerance=all

auto.createauto.evolve 设置成 true 后会自动建表并在源表结构发生变化时同步到目标表。关于更多配置参数的介绍可以参考 Databend Kafka Connect Properties

测试 Databend Kafka Connect

准备各个组件

  1. 启动 MySQL
version: '2.1'
services:
postgres:
image: debezium/example-postgres:1.1
ports:
- "5432:5432"
environment:
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
mysql:
image: debezium/example-mysql:1.1
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=123456
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
  1. 启动 Databend
version: '3'
services:
databend:
image: datafuselabs/databend
volumes:
- /Users/hanshanjie/databend/local-test/databend/databend-query.toml:/etc/databend/query.toml
environment:
QUERY_DEFAULT_USER: databend
QUERY_DEFAULT_PASSWORD: databend
MINIO_ENABLED: 'true'
ports:
- '8000:8000'
- '9000:9000'
- '3307:3307'
- '8124:8124'
  1. 以 standalone 模式启动 Kafka Connect,并加载 MySQL Source Connector 和 Databend Sink Connector:
./bin/connect-standalone.sh config/connect-standalone.properties config/databend.properties config/mysql.properties
[2023-09-06 17:39:23,128] WARN [databend|task-0] These configurations '[metrics.context.connect.kafka.cluster.id]' were supplied but are not used yet. (org.apache.kafka.clients.consumer.ConsumerConfig:385)
[2023-09-06 17:39:23,128] INFO [databend|task-0] Kafka version: 3.4.0 (org.apache.kafka.common.utils.AppInfoParser:119)
[2023-09-06 17:39:23,128] INFO [databend|task-0] Kafka commitId: 2e1947d240607d53 (org.apache.kafka.common.utils.AppInfoParser:120)
[2023-09-06 17:39:23,128] INFO [databend|task-0] Kafka startTimeMs: 1693993163128 (org.apache.kafka.common.utils.AppInfoParser:121)
[2023-09-06 17:39:23,148] INFO Created connector databend (org.apache.kafka.connect.cli.ConnectStandalone:113)
[2023-09-06 17:39:23,148] INFO [databend|task-0] [Consumer clientId=connector-consumer-databend-0, groupId=connect-databend] Subscribed to topic(s): test_kafka (org.apache.kafka.clients.consumer.KafkaConsumer:969)
[2023-09-06 17:39:23,150] INFO [databend|task-0] Starting Databend Sink task (com.databend.kafka.connect.sink.DatabendSinkConfig:33)
[2023-09-06 17:39:23,150] INFO [databend|task-0] DatabendSinkConfig values:...

Insert

Insert 模式下我们需要使用如下的 MySQL Connector 配置:

name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/mydb?useSSL=false
connection.user=root
connection.password=123456
#mode=timestamp+incrementing
mode=incrementing
table.whitelist=mydb.test_kafka
poll.interval.ms=1000
table.poll.interval.ms=3000
incrementing.column.name=id
#timestamp.column.name=tms
topics=test_kafka

在 MySQL 中创建数据库 mydb 和表 test_kafka:

CREATE DATABASE mydb;
USE mydb;

CREATE TABLE test_kafka (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));
ALTER TABLE test_kafka AUTO_INCREMENT = 10;

在插入数据之前,databend-kafka-connect 并不会收到 event 进行建表和数据写入。

插入数据:

INSERT INTO test_kafka VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"cloud","test for databend"),
(default,"spare tire","24 inch spare tire");

源表端插入数据后,

Databend 目标端的表就新建出来了:

image.png

同时数据也会成功插入:

image.png

Support DDL

我们在配置文件中 auto.evolve=true,所以在源表结构发生变化的时候,会将 DDL 同步至目标表。这里我们正好需要将 MySQL Source Connector 的模式从 incrementing 改成 timestamp+incrementing,需要新增一个 timestamp 字段并打开 timestamp.column.name=tms 配置。我们在原表中执行:

alter table test_kafka add column tms timestamp;

并插入一条数据:

insert into test_kafka values(20,"new data","from kafka",now());

到目标表中查看:

image.png

发现 tms 字段已经同步至 Databend table,并且该条数据也已经插入成功:

image.png

Upsert

修改 MySQL Connector 的配置为:

name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/mydb?useSSL=false
connection.user=root
connection.password=123456
mode=timestamp+incrementing
#mode=incrementing
table.whitelist=mydb.test_kafka
poll.interval.ms=1000
table.poll.interval.ms=3000
incrementing.column.name=id
timestamp.column.name=tms
topics=test_kafka

主要是将 mode 改为 timestamp+incrementing并添加 timestamp.column.name 字段。

重启 Kafka Connect。

在源表中更新一条数据:

update test_kafka set name="update from kafka test" where id=20;

到目标表中可以看到更新的数据:

image.png

总结

通过上面的内容可以看到 Databend Kafka Connect 具有以下特性:

  • Table 和 Column 支持自动创建: auto.createauto-evolve 的配置支持下,可以自动创建 Table 和 Column,Table name 是基于 Kafka topic name 创建的;

  • Kafka Shemas 支持: Connector 支持 Avro、JSON Schema 和 Protobuf 输入数据格式。必须启用 Schema Registry 才能使用基于 Schema Registry 的格式;

  • 多个写入模式: Connector 支持 insertupsert 写入模式;

  • 多任务支持: 在 Kafka Connect 的能力下,Connector 支持运行一个或多个任务。增加任务的数量可以提高系统性能;

  • 高可用: 分布式模式下可以自动平衡工作负载,并可以动态扩展(或缩减)以及提供容错能力。

同时,Databend Kafka Connect 也能够使用原生 Connect 支持的配置,更多配置参考 Kafka Connect Sink Configuration Properties for Confluent Platform