Skip to main content

One post tagged with "kafka"

View All Tags

This post will guide you through the process of loading data from Kafka, a message queueing system, into Databend Cloud.

Alt text

In this post:


1. Creating a Table in Databend Cloud

Sign in to Databend Cloud, and create a table (for example, using the following script) on Worksheets > New Worksheet.

create table orders(
ordertime UInt64,
orderid UInt64,
itemid varchar,
orderunits float,
address json
);

Alt text

2. Obtaining a Connection String from Databend Cloud

  1. In Databend Cloud, select the Home tab, then click Connect.

Alt text

  1. Click Reset DB password to generate a new password, then click on the generated password to copy and save it to a secure place.

Alt text

The connection string in the snapshot above provides the following information about the connection:

  • host='tn3ftqihs--bl.ch.aws-us-east-2.default.databend.com',
  • database="default",
  • user="cloudapp",
  • password="<your-password>"

3. Installing databend-py

databend-py is a Python SQL driver for Databend Cloud that provides a standard SQL interface for users to manipulate data in Databend Cloud.

To install databend-py:

pip install databend-py
pip install kafka-python

Test the connection string with the following code. Verify that it returns "1" without any errors.

from databend_py import Client
client = Client(
host='tn3ftqihs--bl.ch.aws-us-east-2.default.databend.com',
database="default",
user="cloudapp",
password="x")
print(client.execute("SELECT 1"))

4. Developing Your Program

4.1 Loading Data

Write code to perform the following tasks for data loading:

  1. To connect to Kafka, you need to install the dependency kafka-python.
Kafka information:
Topic: s3_topic
bootstrap_servers: 192.168.1.100:9092
For a Kafka cluster, separate multiple addresses by commas.
  1. Connect to a warehouse in Databend Cloud.
  2. Get data from Kafka.
  3. Get a presigned URL.
  4. Upload your file.
  5. Load data from the file with COPY INTO.
#!/usr/bin/env python3
from databend_py import Client
import os
import io
import requests
import time
from kafka import KafkaConsumer

consumer = KafkaConsumer('s3_topic', bootstrap_servers = '192.168.1.100:9092')

client = Client(
host='tn3ftqihs--bl.ch.aws-us-east-2.default.databend.com',
database="default",
user="cloudapp",
password="x")

def get_url(file_name):
sql="presign upload @~/%s"%(file_name)
_, row = client.execute(sql)
url = row[0][2]
return url

def upload_file(url,content):
response=requests.put(url, data=content)

def load_file(table_name, file_name):
copy_sql="copy into %s from @~ files=('%s') file_format=(type='ndjson') purge=true"%(table_name,file_name)
client.execute(copy_sql)

def get_table_num(table_name):
sql="select count(*) from %s"%(table_name)
_, row=client.execute(sql)
print(row[0][0])

i=0
c=b''
table_name='orders'
# you can increase this value to increase the throughput
step = 10000
for msg in consumer:
c = c+msg.value+b'\n'
i = i + 1
if i % step == 0 :
file_name='%d.json'%(i)
#print(file_name)
url = get_url(file_name)
#print(url)
content = io.BytesIO(c)
r = upload_file(url,content)
load_file(table_name,file_name)
get_table_num(table_name)
c= b''

A demo program for generating the test data:

 ./bin/ksql-datagen quickstart=orders format=json  topic=s3_topic maxInterval=10

The following is an example of data written to the Kafka topic. For the supported data formats, see https://docs.databend.cn/doc/load-data/.

Alt text

4.2 Querying Data

 #!/usr/bin/env python3
from databend_py import Client

client = Client(
host='tn3ftqihs--bl.ch.aws-us-east-2.default.databend.com',
database="default",
user="cloudapp",
password="x")

sql="select * from orders limit 1"
_, row = client.execute(sql)
print(row)