Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/questdb/questdb/llms.txt

Use this file to discover all available pages before exploring further.

Overview

Apache Kafka is a distributed streaming platform. QuestDB integrates with Kafka through the Kafka Connect JDBC Sink Connector, enabling you to stream data from Kafka topics directly into QuestDB tables.

Architecture

Kafka → Kafka Connect → JDBC Sink Connector → QuestDB (PostgreSQL Wire Protocol)

Prerequisites

  • QuestDB running (port 8812 for PostgreSQL Wire Protocol)
  • Apache Kafka installed
  • Kafka Connect JDBC plugin
  • PostgreSQL JDBC driver

Installation

Step 1: Download Required Components

# Download Kafka Connect JDBC
wget https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/10.7.0/kafka-connect-jdbc-10.7.0.jar

# Download PostgreSQL JDBC driver
wget https://jdbc.postgresql.org/download/postgresql-42.6.0.jar

Step 2: Configure Kafka Connect

Place both JAR files in your Kafka Connect plugins directory:
mkdir -p /opt/kafka/plugins/jdbc
cp kafka-connect-jdbc-*.jar /opt/kafka/plugins/jdbc/
cp postgresql-*.jar /opt/kafka/plugins/jdbc/

Configuration

JDBC Sink Connector Configuration

Create connect-questdb.properties:
name=questdb-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector

# QuestDB connection (PostgreSQL Wire Protocol)
connection.url=jdbc:postgresql://localhost:8812/qdb?useSSL=false
connection.user=admin
connection.password=quest

# Kafka topic configuration
topics=sensor-data,trades,metrics

# Insert mode
insert.mode=insert
pk.mode=none
auto.create=true
auto.evolve=true

# PostgreSQL dialect
dialect.name=PostgreSqlDatabaseDialect

# Batch settings for performance
batch.size=3000
max.retries=10
retry.backoff.ms=3000
QuestDB uses port 8812 for PostgreSQL Wire Protocol connections. The connector communicates with QuestDB as if it were a PostgreSQL database.

Stand-alone Mode

Create connect-standalone.properties:
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

plugin.path=/opt/kafka/plugins

Starting the Pipeline

Step 1: Start Zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

Step 2: Start Kafka Server

bin/kafka-server-start.sh config/server.properties

Step 3: Start QuestDB

docker run -p 9000:9000 -p 9009:9009 -p 8812:8812 questdb/questdb

Step 4: Start Kafka Connect

bin/connect-standalone.sh \
  config/connect-standalone.properties \
  config/connect-questdb.properties

Sending Data

Create a Kafka Topic

bin/kafka-topics.sh --create \
  --topic sensor-data \
  --bootstrap-server localhost:9092 \
  --partitions 3 \
  --replication-factor 1

Produce Messages

bin/kafka-console-producer.sh \
  --topic sensor-data \
  --bootstrap-server localhost:9092

Sample Message with Schema

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "sensor_id"
      },
      {
        "type": "double",
        "optional": false,
        "field": "temperature"
      },
      {
        "type": "double",
        "optional": false,
        "field": "humidity"
      },
      {
        "type": "int64",
        "optional": false,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "timestamp"
      }
    ],
    "optional": false,
    "name": "sensor_reading"
  },
  "payload": {
    "sensor_id": "sensor_001",
    "temperature": 23.5,
    "humidity": 65.2,
    "timestamp": 1678901234000
  }
}

Trading Data Example

{
  "schema": {
    "type": "struct",
    "fields": [
      {"type": "string", "field": "symbol"},
      {"type": "double", "field": "price"},
      {"type": "int64", "field": "quantity"},
      {
        "type": "int64",
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "trade_time"
      }
    ]
  },
  "payload": {
    "symbol": "BTC-USD",
    "price": 45123.50,
    "quantity": 100,
    "trade_time": 1678901234000
  }
}

Verifying Data in QuestDB

Query via Web Console

Open http://localhost:9000 and run:
SELECT * FROM 'sensor-data' LIMIT 10;

Query via PostgreSQL Wire Protocol

psql -h localhost -p 8812 -U admin -d qdb -c "SELECT count() FROM 'sensor-data'"

Performance Optimization

Best Practices for High-Throughput
  • Increase batch.size to 3000-5000 for higher throughput
  • Use multiple Kafka partitions for parallelism
  • Set auto.create=true to automatically create tables
  • Configure appropriate buffer.memory in Kafka producer
  • Monitor connector lag and adjust workers accordingly

Connector Performance Settings

# High-throughput settings
batch.size=5000
tasks.max=3
max.retries=10
retry.backoff.ms=1000

# Connection pooling
connection.attempts=3
connection.backoff.ms=10000

Handling Timestamps

QuestDB automatically handles Kafka timestamp types:
{
  "type": "int64",
  "name": "org.apache.kafka.connect.data.Timestamp",
  "version": 1,
  "field": "timestamp"
}
This maps to QuestDB’s TIMESTAMP type with microsecond precision.

Alternative: InfluxDB Line Protocol

For higher performance, consider using InfluxDB Line Protocol (ILP) on port 9009 instead of Kafka Connect. ILP provides lower latency and higher throughput for streaming ingestion.
# Example: Direct ILP ingestion
echo "sensor_data,sensor_id=001 temperature=23.5,humidity=65.2" | \
  nc -q0 localhost 9009

Monitoring

Check Connector Status

curl http://localhost:8083/connectors/questdb-sink/status

Monitor QuestDB Ingestion

Query system tables:
SELECT * FROM tables();
SELECT * FROM table_columns('sensor-data');

Troubleshooting

Common Issues

Connection refused on port 8812
  • Verify QuestDB is running
  • Check PostgreSQL Wire Protocol is enabled
  • Ensure firewall allows port 8812
Schema evolution errors
  • Set auto.evolve=true in connector config
  • Verify schema compatibility
  • Check QuestDB logs for schema errors
High lag / slow ingestion
  • Increase batch.size
  • Add more connector tasks
  • Check network latency
  • Monitor QuestDB resource usage

Next Steps

InfluxDB Line Protocol

Higher performance ingestion method

PostgreSQL Wire Protocol

Learn about the connection protocol