Skip to main content

Overview

Apache Flink is a powerful distributed stream processing framework. QuestDB integrates with Flink as both a source and sink, enabling real-time data processing pipelines with low-latency ingestion and querying capabilities.

Architecture

Flink can interact with QuestDB in multiple ways:
  • QuestDB as Sink: Write Flink processing results to QuestDB
  • QuestDB as Source: Read data from QuestDB for stream processing
  • JDBC Connector: Use PostgreSQL Wire Protocol (port 8812)
  • Custom Sink: Direct InfluxDB Line Protocol ingestion (port 9009)

Prerequisites

  • Apache Flink 1.15+ installed
  • QuestDB running (ports 8812 and 9009)
  • Flink JDBC connector
  • PostgreSQL JDBC driver

Setup

Add Dependencies

Add to your pom.xml:
<dependencies>
    <!-- Flink dependencies -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.18.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>1.18.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc</artifactId>
        <version>3.1.1</version>
    </dependency>
    
    <!-- PostgreSQL JDBC driver -->
    <dependency>
        <groupId>org.postgresql</groupId>
        <artifactId>postgresql</artifactId>
        <version>42.6.0</version>
    </dependency>
</dependencies>

Start QuestDB

docker run -p 9000:9000 -p 9009:9009 -p 8812:8812 questdb/questdb
QuestDB uses port 8812 for PostgreSQL Wire Protocol (JDBC) connections and port 9009 for InfluxDB Line Protocol (ILP) high-throughput ingestion.

JDBC Sink Example

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;

public class FlinkToQuestDB {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Sample data stream
        DataStream<SensorReading> sensorStream = env
            .fromElements(
                new SensorReading("sensor_001", 23.5, 65.2, System.currentTimeMillis()),
                new SensorReading("sensor_002", 24.1, 67.8, System.currentTimeMillis())
            );
        
        // Write to QuestDB using JDBC
        sensorStream.addSink(JdbcSink.sink(
            "INSERT INTO sensor_readings (sensor_id, temperature, humidity, timestamp) VALUES (?, ?, ?, to_timestamp(?))",
            (JdbcStatementBuilder<SensorReading>) (statement, reading) -> {
                statement.setString(1, reading.sensorId);
                statement.setDouble(2, reading.temperature);
                statement.setDouble(3, reading.humidity);
                statement.setLong(4, reading.timestamp * 1000); // Convert to microseconds
            },
            JdbcExecutionOptions.builder()
                .withBatchSize(1000)
                .withBatchIntervalMs(200)
                .withMaxRetries(3)
                .build(),
            new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                .withUrl("jdbc:postgresql://localhost:8812/qdb")
                .withDriverName("org.postgresql.Driver")
                .withUsername("admin")
                .withPassword("quest")
                .build()
        ));
        
        env.execute("Flink to QuestDB Example");
    }
    
    public static class SensorReading {
        public String sensorId;
        public double temperature;
        public double humidity;
        public long timestamp;
        
        public SensorReading(String sensorId, double temperature, double humidity, long timestamp) {
            this.sensorId = sensorId;
            this.temperature = temperature;
            this.humidity = humidity;
            this.timestamp = timestamp;
        }
    }
}

Kafka to QuestDB Pipeline

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.connector.jdbc.JdbcSink;

import java.util.Properties;

public class KafkaToQuestDB {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Kafka source configuration
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id", "flink-consumer");
        
        // Consume from Kafka
        DataStream<String> kafkaStream = env
            .addSource(new FlinkKafkaConsumer<>("sensor-data", new SimpleStringSchema(), props));
        
        // Parse and transform
        DataStream<SensorReading> sensorStream = kafkaStream
            .map(json -> parseJson(json));
        
        // Sink to QuestDB
        sensorStream.addSink(JdbcSink.sink(
            "INSERT INTO sensor_readings VALUES (?, ?, ?, ?)",
            (statement, reading) -> {
                statement.setString(1, reading.sensorId);
                statement.setDouble(2, reading.temperature);
                statement.setDouble(3, reading.humidity);
                statement.setTimestamp(4, new java.sql.Timestamp(reading.timestamp));
            },
            JdbcExecutionOptions.builder()
                .withBatchSize(5000)
                .withBatchIntervalMs(100)
                .build(),
            new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                .withUrl("jdbc:postgresql://localhost:8812/qdb")
                .withDriverName("org.postgresql.Driver")
                .build()
        ));
        
        env.execute("Kafka to QuestDB Pipeline");
    }
}

Reading from QuestDB

import org.apache.flink.connector.jdbc.JdbcInputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;

public class QuestDBSource {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
        // Read from QuestDB
        JdbcInputFormat jdbcInput = JdbcInputFormat.buildJdbcInputFormat()
            .setDrivername("org.postgresql.Driver")
            .setDBUrl("jdbc:postgresql://localhost:8812/qdb")
            .setUsername("admin")
            .setPassword("quest")
            .setQuery("SELECT sensor_id, temperature, humidity, timestamp FROM sensor_readings WHERE timestamp > dateadd('h', -1, now())")
            .setRowTypeInfo(new RowTypeInfo(
                BasicTypeInfo.STRING_TYPE_INFO,
                BasicTypeInfo.DOUBLE_TYPE_INFO,
                BasicTypeInfo.DOUBLE_TYPE_INFO,
                SqlTimeTypeInfo.TIMESTAMP
            ))
            .finish();
        
        DataSource<Row> source = env.createInput(jdbcInput);
        
        // Process data
        source.map(row -> {
            String sensorId = (String) row.getField(0);
            Double temperature = (Double) row.getField(1);
            return "Sensor: " + sensorId + ", Temp: " + temperature;
        }).print();
        
        env.execute("Read from QuestDB");
    }
}

Table API Example

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class FlinkSQLQuestDB {
    public static void main(String[] args) {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);
        
        // Define QuestDB as JDBC catalog
        tableEnv.executeSql(
            "CREATE TABLE sensor_readings (" +
            "  sensor_id STRING," +
            "  temperature DOUBLE," +
            "  humidity DOUBLE," +
            "  ts TIMESTAMP(3)" +
            ") WITH (" +
            "  'connector' = 'jdbc'," +
            "  'url' = 'jdbc:postgresql://localhost:8812/qdb'," +
            "  'table-name' = 'sensor_readings'," +
            "  'username' = 'admin'," +
            "  'password' = 'quest'" +
            ")"
        );
        
        // Query using Flink SQL
        tableEnv.executeSql(
            "SELECT sensor_id, AVG(temperature) as avg_temp " +
            "FROM sensor_readings " +
            "GROUP BY sensor_id"
        ).print();
    }
}

Custom ILP Sink (High Performance)

For maximum throughput, use InfluxDB Line Protocol on port 9009:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;

public class QuestDBILPSink extends RichSinkFunction<SensorReading> {
    private transient Socket socket;
    private transient OutputStream out;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        socket = new Socket("localhost", 9009);
        out = socket.getOutputStream();
    }
    
    @Override
    public void invoke(SensorReading reading, Context context) throws Exception {
        // Format: table_name,tag1=value1 field1=value1,field2=value2 timestamp
        String line = String.format(
            "sensor_readings,sensor_id=%s temperature=%f,humidity=%f %d\n",
            reading.sensorId,
            reading.temperature,
            reading.humidity,
            reading.timestamp * 1_000_000 // Convert to nanoseconds
        );
        out.write(line.getBytes(StandardCharsets.UTF_8));
        out.flush();
    }
    
    @Override
    public void close() throws Exception {
        if (out != null) out.close();
        if (socket != null) socket.close();
    }
}
Usage:
sensorStream.addSink(new QuestDBILPSink());

Window Operations

import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;

// Tumbling window aggregation
DataStream<SensorReading> aggregated = sensorStream
    .keyBy(reading -> reading.sensorId)
    .window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
    .reduce((r1, r2) -> new SensorReading(
        r1.sensorId,
        (r1.temperature + r2.temperature) / 2,
        (r1.humidity + r2.humidity) / 2,
        r2.timestamp
    ));

// Write aggregated results to QuestDB
aggregated.addSink(createQuestDBSink());

Performance Optimization

Best Practices
  • Use ILP (port 9009) for highest throughput (millions/sec)
  • Use JDBC (port 8812) for transactional writes
  • Batch writes: 1000-5000 rows for JDBC
  • Enable checkpointing for exactly-once semantics
  • Use parallelism for distributed processing

Optimized JDBC Configuration

JdbcExecutionOptions.builder()
    .withBatchSize(5000)
    .withBatchIntervalMs(100)
    .withMaxRetries(5)
    .build()

Monitoring

Check Data in QuestDB

-- Query count
SELECT count() FROM sensor_readings;

-- Latest data
SELECT * FROM sensor_readings LATEST ON timestamp PARTITION BY sensor_id;

-- Aggregations
SELECT 
    sensor_id,
    avg(temperature) as avg_temp,
    max(humidity) as max_humidity
FROM sensor_readings
WHERE timestamp > dateadd('h', -1, now())
SAMPLE BY 5m;

Error Handling

import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResilientQuestDBSink implements SinkFunction<SensorReading> {
    private static final Logger LOG = LoggerFactory.getLogger(ResilientQuestDBSink.class);
    
    @Override
    public void invoke(SensorReading value, Context context) {
        int retries = 3;
        while (retries > 0) {
            try {
                // Write to QuestDB
                writeToQuestDB(value);
                break;
            } catch (Exception e) {
                LOG.error("Failed to write to QuestDB, retries left: {}", retries, e);
                retries--;
                if (retries == 0) {
                    LOG.error("Failed to write after all retries: {}", value);
                }
            }
        }
    }
}

Complete Example: Real-time Analytics

public class RealtimeAnalytics {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
        
        // Source: Read from Kafka
        DataStream<Trade> trades = env
            .addSource(new FlinkKafkaConsumer<>("trades", new TradeSchema(), kafkaProps))
            .assignTimestampsAndWatermarks(
                WatermarkStrategy.<Trade>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                    .withTimestampAssigner((trade, ts) -> trade.timestamp)
            );
        
        // Process: Calculate moving averages
        DataStream<TradeStats> stats = trades
            .keyBy(trade -> trade.symbol)
            .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .process(new TradeStatsProcessor());
        
        // Sink: Write to QuestDB
        stats.addSink(JdbcSink.sink(
            "INSERT INTO trade_stats VALUES (?, ?, ?, ?, ?)",
            (statement, stat) -> {
                statement.setString(1, stat.symbol);
                statement.setDouble(2, stat.avgPrice);
                statement.setDouble(3, stat.volume);
                statement.setInt(4, stat.tradeCount);
                statement.setTimestamp(5, new java.sql.Timestamp(stat.windowEnd));
            },
            JdbcExecutionOptions.builder()
                .withBatchSize(1000)
                .withBatchIntervalMs(100)
                .build(),
            new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                .withUrl("jdbc:postgresql://localhost:8812/qdb")
                .build()
        ));
        
        env.execute("Real-time Trade Analytics");
    }
}

Next Steps

Kafka Integration

Build end-to-end streaming pipelines

InfluxDB Line Protocol

Maximum ingestion performance