Documentation Index Fetch the complete documentation index at: https://mintlify.com/questdb/questdb/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Apache Kafka is a distributed streaming platform. QuestDB integrates with Kafka through the Kafka Connect JDBC Sink Connector, enabling you to stream data from Kafka topics directly into QuestDB tables.
Architecture
Kafka → Kafka Connect → JDBC Sink Connector → QuestDB (PostgreSQL Wire Protocol)
Prerequisites
QuestDB running (port 8812 for PostgreSQL Wire Protocol)
Apache Kafka installed
Kafka Connect JDBC plugin
PostgreSQL JDBC driver
Installation
Step 1: Download Required Components
# Download Kafka Connect JDBC
wget https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/10.7.0/kafka-connect-jdbc-10.7.0.jar
# Download PostgreSQL JDBC driver
wget https://jdbc.postgresql.org/download/postgresql-42.6.0.jar
Place both JAR files in your Kafka Connect plugins directory:
mkdir -p /opt/kafka/plugins/jdbc
cp kafka-connect-jdbc- * .jar /opt/kafka/plugins/jdbc/
cp postgresql- * .jar /opt/kafka/plugins/jdbc/
Configuration
JDBC Sink Connector Configuration
Create connect-questdb.properties:
name =questdb-sink
connector.class =io.confluent.connect.jdbc.JdbcSinkConnector
# QuestDB connection (PostgreSQL Wire Protocol)
connection.url =jdbc:postgresql://localhost:8812/qdb? useSSL =false
connection.user =admin
connection.password =quest
# Kafka topic configuration
topics =sensor-data,trades,metrics
# Insert mode
insert.mode =insert
pk.mode =none
auto.create =true
auto.evolve =true
# PostgreSQL dialect
dialect.name =PostgreSqlDatabaseDialect
# Batch settings for performance
batch.size =3000
max.retries =10
retry.backoff.ms =3000
QuestDB uses port 8812 for PostgreSQL Wire Protocol connections. The connector communicates with QuestDB as if it were a PostgreSQL database.
Stand-alone Mode
Create connect-standalone.properties:
bootstrap.servers =localhost:9092
key.converter =org.apache.kafka.connect.json.JsonConverter
value.converter =org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable =false
value.converter.schemas.enable =true
offset.storage.file.filename =/tmp/connect.offsets
offset.flush.interval.ms =10000
plugin.path =/opt/kafka/plugins
Starting the Pipeline
Step 1: Start Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
Step 2: Start Kafka Server
bin/kafka-server-start.sh config/server.properties
Step 3: Start QuestDB
docker run -p 9000:9000 -p 9009:9009 -p 8812:8812 questdb/questdb
Step 4: Start Kafka Connect
bin/connect-standalone.sh \
config/connect-standalone.properties \
config/connect-questdb.properties
Sending Data
Create a Kafka Topic
bin/kafka-topics.sh --create \
--topic sensor-data \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1
Produce Messages
bin/kafka-console-producer.sh \
--topic sensor-data \
--bootstrap-server localhost:9092
Sample Message with Schema
{
"schema" : {
"type" : "struct" ,
"fields" : [
{
"type" : "string" ,
"optional" : false ,
"field" : "sensor_id"
},
{
"type" : "double" ,
"optional" : false ,
"field" : "temperature"
},
{
"type" : "double" ,
"optional" : false ,
"field" : "humidity"
},
{
"type" : "int64" ,
"optional" : false ,
"name" : "org.apache.kafka.connect.data.Timestamp" ,
"version" : 1 ,
"field" : "timestamp"
}
],
"optional" : false ,
"name" : "sensor_reading"
},
"payload" : {
"sensor_id" : "sensor_001" ,
"temperature" : 23.5 ,
"humidity" : 65.2 ,
"timestamp" : 1678901234000
}
}
Trading Data Example
{
"schema" : {
"type" : "struct" ,
"fields" : [
{ "type" : "string" , "field" : "symbol" },
{ "type" : "double" , "field" : "price" },
{ "type" : "int64" , "field" : "quantity" },
{
"type" : "int64" ,
"name" : "org.apache.kafka.connect.data.Timestamp" ,
"version" : 1 ,
"field" : "trade_time"
}
]
},
"payload" : {
"symbol" : "BTC-USD" ,
"price" : 45123.50 ,
"quantity" : 100 ,
"trade_time" : 1678901234000
}
}
Verifying Data in QuestDB
Query via Web Console
Open http://localhost:9000 and run:
SELECT * FROM 'sensor-data' LIMIT 10 ;
Query via PostgreSQL Wire Protocol
psql -h localhost -p 8812 -U admin -d qdb -c "SELECT count() FROM 'sensor-data'"
Best Practices for High-Throughput
Increase batch.size to 3000-5000 for higher throughput
Use multiple Kafka partitions for parallelism
Set auto.create=true to automatically create tables
Configure appropriate buffer.memory in Kafka producer
Monitor connector lag and adjust workers accordingly
# High-throughput settings
batch.size =5000
tasks.max =3
max.retries =10
retry.backoff.ms =1000
# Connection pooling
connection.attempts =3
connection.backoff.ms =10000
Handling Timestamps
QuestDB automatically handles Kafka timestamp types:
{
"type" : "int64" ,
"name" : "org.apache.kafka.connect.data.Timestamp" ,
"version" : 1 ,
"field" : "timestamp"
}
This maps to QuestDB’s TIMESTAMP type with microsecond precision.
Alternative: InfluxDB Line Protocol
For higher performance, consider using InfluxDB Line Protocol (ILP) on port 9009 instead of Kafka Connect. ILP provides lower latency and higher throughput for streaming ingestion.
# Example: Direct ILP ingestion
echo "sensor_data,sensor_id=001 temperature=23.5,humidity=65.2" | \
nc -q0 localhost 9009
Monitoring
Check Connector Status
curl http://localhost:8083/connectors/questdb-sink/status
Monitor QuestDB Ingestion
Query system tables:
SELECT * FROM tables();
SELECT * FROM table_columns( 'sensor-data' );
Troubleshooting
Common Issues
Connection refused on port 8812
Verify QuestDB is running
Check PostgreSQL Wire Protocol is enabled
Ensure firewall allows port 8812
Schema evolution errors
Set auto.evolve=true in connector config
Verify schema compatibility
Check QuestDB logs for schema errors
High lag / slow ingestion
Increase batch.size
Add more connector tasks
Check network latency
Monitor QuestDB resource usage
Next Steps
InfluxDB Line Protocol Higher performance ingestion method
PostgreSQL Wire Protocol Learn about the connection protocol