Overview
Apache Flink is a powerful distributed stream processing framework. QuestDB integrates with Flink as both a source and sink, enabling real-time data processing pipelines with low-latency ingestion and querying capabilities.Architecture
Flink can interact with QuestDB in multiple ways:- QuestDB as Sink: Write Flink processing results to QuestDB
- QuestDB as Source: Read data from QuestDB for stream processing
- JDBC Connector: Use PostgreSQL Wire Protocol (port 8812)
- Custom Sink: Direct InfluxDB Line Protocol ingestion (port 9009)
Prerequisites
- Apache Flink 1.15+ installed
- QuestDB running (ports 8812 and 9009)
- Flink JDBC connector
- PostgreSQL JDBC driver
Setup
Add Dependencies
Add to yourpom.xml:
<dependencies>
<!-- Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.18.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.18.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.1.1</version>
</dependency>
<!-- PostgreSQL JDBC driver -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.6.0</version>
</dependency>
</dependencies>
Start QuestDB
docker run -p 9000:9000 -p 9009:9009 -p 8812:8812 questdb/questdb
QuestDB uses port 8812 for PostgreSQL Wire Protocol (JDBC) connections and port 9009 for InfluxDB Line Protocol (ILP) high-throughput ingestion.
QuestDB as Flink Sink
JDBC Sink Example
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
public class FlinkToQuestDB {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Sample data stream
DataStream<SensorReading> sensorStream = env
.fromElements(
new SensorReading("sensor_001", 23.5, 65.2, System.currentTimeMillis()),
new SensorReading("sensor_002", 24.1, 67.8, System.currentTimeMillis())
);
// Write to QuestDB using JDBC
sensorStream.addSink(JdbcSink.sink(
"INSERT INTO sensor_readings (sensor_id, temperature, humidity, timestamp) VALUES (?, ?, ?, to_timestamp(?))",
(JdbcStatementBuilder<SensorReading>) (statement, reading) -> {
statement.setString(1, reading.sensorId);
statement.setDouble(2, reading.temperature);
statement.setDouble(3, reading.humidity);
statement.setLong(4, reading.timestamp * 1000); // Convert to microseconds
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(3)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://localhost:8812/qdb")
.withDriverName("org.postgresql.Driver")
.withUsername("admin")
.withPassword("quest")
.build()
));
env.execute("Flink to QuestDB Example");
}
public static class SensorReading {
public String sensorId;
public double temperature;
public double humidity;
public long timestamp;
public SensorReading(String sensorId, double temperature, double humidity, long timestamp) {
this.sensorId = sensorId;
this.temperature = temperature;
this.humidity = humidity;
this.timestamp = timestamp;
}
}
}
Kafka to QuestDB Pipeline
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.connector.jdbc.JdbcSink;
import java.util.Properties;
public class KafkaToQuestDB {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Kafka source configuration
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "flink-consumer");
// Consume from Kafka
DataStream<String> kafkaStream = env
.addSource(new FlinkKafkaConsumer<>("sensor-data", new SimpleStringSchema(), props));
// Parse and transform
DataStream<SensorReading> sensorStream = kafkaStream
.map(json -> parseJson(json));
// Sink to QuestDB
sensorStream.addSink(JdbcSink.sink(
"INSERT INTO sensor_readings VALUES (?, ?, ?, ?)",
(statement, reading) -> {
statement.setString(1, reading.sensorId);
statement.setDouble(2, reading.temperature);
statement.setDouble(3, reading.humidity);
statement.setTimestamp(4, new java.sql.Timestamp(reading.timestamp));
},
JdbcExecutionOptions.builder()
.withBatchSize(5000)
.withBatchIntervalMs(100)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://localhost:8812/qdb")
.withDriverName("org.postgresql.Driver")
.build()
));
env.execute("Kafka to QuestDB Pipeline");
}
}
QuestDB as Flink Source
Reading from QuestDB
import org.apache.flink.connector.jdbc.JdbcInputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
public class QuestDBSource {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Read from QuestDB
JdbcInputFormat jdbcInput = JdbcInputFormat.buildJdbcInputFormat()
.setDrivername("org.postgresql.Driver")
.setDBUrl("jdbc:postgresql://localhost:8812/qdb")
.setUsername("admin")
.setPassword("quest")
.setQuery("SELECT sensor_id, temperature, humidity, timestamp FROM sensor_readings WHERE timestamp > dateadd('h', -1, now())")
.setRowTypeInfo(new RowTypeInfo(
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO,
SqlTimeTypeInfo.TIMESTAMP
))
.finish();
DataSource<Row> source = env.createInput(jdbcInput);
// Process data
source.map(row -> {
String sensorId = (String) row.getField(0);
Double temperature = (Double) row.getField(1);
return "Sensor: " + sensorId + ", Temp: " + temperature;
}).print();
env.execute("Read from QuestDB");
}
}
Flink SQL with QuestDB
Table API Example
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class FlinkSQLQuestDB {
public static void main(String[] args) {
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// Define QuestDB as JDBC catalog
tableEnv.executeSql(
"CREATE TABLE sensor_readings (" +
" sensor_id STRING," +
" temperature DOUBLE," +
" humidity DOUBLE," +
" ts TIMESTAMP(3)" +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' = 'jdbc:postgresql://localhost:8812/qdb'," +
" 'table-name' = 'sensor_readings'," +
" 'username' = 'admin'," +
" 'password' = 'quest'" +
")"
);
// Query using Flink SQL
tableEnv.executeSql(
"SELECT sensor_id, AVG(temperature) as avg_temp " +
"FROM sensor_readings " +
"GROUP BY sensor_id"
).print();
}
}
Custom ILP Sink (High Performance)
For maximum throughput, use InfluxDB Line Protocol on port 9009:import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
public class QuestDBILPSink extends RichSinkFunction<SensorReading> {
private transient Socket socket;
private transient OutputStream out;
@Override
public void open(Configuration parameters) throws Exception {
socket = new Socket("localhost", 9009);
out = socket.getOutputStream();
}
@Override
public void invoke(SensorReading reading, Context context) throws Exception {
// Format: table_name,tag1=value1 field1=value1,field2=value2 timestamp
String line = String.format(
"sensor_readings,sensor_id=%s temperature=%f,humidity=%f %d\n",
reading.sensorId,
reading.temperature,
reading.humidity,
reading.timestamp * 1_000_000 // Convert to nanoseconds
);
out.write(line.getBytes(StandardCharsets.UTF_8));
out.flush();
}
@Override
public void close() throws Exception {
if (out != null) out.close();
if (socket != null) socket.close();
}
}
sensorStream.addSink(new QuestDBILPSink());
Window Operations
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
// Tumbling window aggregation
DataStream<SensorReading> aggregated = sensorStream
.keyBy(reading -> reading.sensorId)
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.reduce((r1, r2) -> new SensorReading(
r1.sensorId,
(r1.temperature + r2.temperature) / 2,
(r1.humidity + r2.humidity) / 2,
r2.timestamp
));
// Write aggregated results to QuestDB
aggregated.addSink(createQuestDBSink());
Performance Optimization
Best Practices
- Use ILP (port 9009) for highest throughput (millions/sec)
- Use JDBC (port 8812) for transactional writes
- Batch writes: 1000-5000 rows for JDBC
- Enable checkpointing for exactly-once semantics
- Use parallelism for distributed processing
Optimized JDBC Configuration
JdbcExecutionOptions.builder()
.withBatchSize(5000)
.withBatchIntervalMs(100)
.withMaxRetries(5)
.build()
Monitoring
Check Data in QuestDB
-- Query count
SELECT count() FROM sensor_readings;
-- Latest data
SELECT * FROM sensor_readings LATEST ON timestamp PARTITION BY sensor_id;
-- Aggregations
SELECT
sensor_id,
avg(temperature) as avg_temp,
max(humidity) as max_humidity
FROM sensor_readings
WHERE timestamp > dateadd('h', -1, now())
SAMPLE BY 5m;
Error Handling
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ResilientQuestDBSink implements SinkFunction<SensorReading> {
private static final Logger LOG = LoggerFactory.getLogger(ResilientQuestDBSink.class);
@Override
public void invoke(SensorReading value, Context context) {
int retries = 3;
while (retries > 0) {
try {
// Write to QuestDB
writeToQuestDB(value);
break;
} catch (Exception e) {
LOG.error("Failed to write to QuestDB, retries left: {}", retries, e);
retries--;
if (retries == 0) {
LOG.error("Failed to write after all retries: {}", value);
}
}
}
}
}
Complete Example: Real-time Analytics
public class RealtimeAnalytics {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
// Source: Read from Kafka
DataStream<Trade> trades = env
.addSource(new FlinkKafkaConsumer<>("trades", new TradeSchema(), kafkaProps))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Trade>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((trade, ts) -> trade.timestamp)
);
// Process: Calculate moving averages
DataStream<TradeStats> stats = trades
.keyBy(trade -> trade.symbol)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.process(new TradeStatsProcessor());
// Sink: Write to QuestDB
stats.addSink(JdbcSink.sink(
"INSERT INTO trade_stats VALUES (?, ?, ?, ?, ?)",
(statement, stat) -> {
statement.setString(1, stat.symbol);
statement.setDouble(2, stat.avgPrice);
statement.setDouble(3, stat.volume);
statement.setInt(4, stat.tradeCount);
statement.setTimestamp(5, new java.sql.Timestamp(stat.windowEnd));
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(100)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://localhost:8812/qdb")
.build()
));
env.execute("Real-time Trade Analytics");
}
}
Next Steps
Kafka Integration
Build end-to-end streaming pipelines
InfluxDB Line Protocol
Maximum ingestion performance