Documentation Index
Fetch the complete documentation index at: https://mintlify.com/questdb/questdb/llms.txt
Use this file to discover all available pages before exploring further.
The QuestDB Rust client provides a safe, idiomatic API for ingesting data using the InfluxDB Line Protocol.
Installation
Add to your Cargo.toml:
[dependencies]
questdb-rs = "3.0"
tokio = { version = "1", features = ["full"] }
Quick start
use questdb::ingress::{Sender, Buffer, TimestampNanos};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut sender = Sender::from_conf("http::addr=localhost:9000;")?;
let mut buffer = Buffer::new();
buffer
.table("trades")?
.symbol("symbol", "ETH-USD")?
.symbol("side", "sell")?
.column_f64("price", 2615.54)?
.column_f64("amount", 0.00044)?
.at_now()?;
sender.flush(&mut buffer).await?;
Ok(())
}
Configuration
use questdb::ingress::Sender;
// HTTP with authentication
let sender = Sender::from_conf(
"http::addr=localhost:9000;username=admin;password=quest;"
)?;
// TCP with authentication
let tcp_sender = Sender::from_conf(
"tcp::addr=localhost:9009;username=admin;token=your-token;"
)?;
Configuration options
Server address (host:port)
Authentication password (HTTP)
Authentication token (TCP)
Data types
use questdb::ingress::{Buffer, TimestampNanos};
use std::time::SystemTime;
let mut buffer = Buffer::new();
buffer
.table("sensors")?
.symbol("location", "warehouse-1")? // Symbol
.column_i64("sensor_id", 12345)? // i64
.column_f64("temperature", 23.5)? // f64
.column_str("status", "active")? // String
.column_bool("is_online", true)? // bool
.column_ts("measured_at", TimestampNanos::now())? // Timestamp
.at_now()?;
Batching
use questdb::ingress::{Sender, Buffer};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut sender = Sender::from_conf("http::addr=localhost:9000;")?;
let mut buffer = Buffer::new();
// Send 1000 rows
for i in 0..1000 {
buffer
.table("metrics")?
.symbol("host", &format!("server-{}", i % 10))?
.column_f64("cpu", (i % 100) as f64)?
.column_f64("memory", ((i * 2) % 100) as f64)?
.at_now()?;
}
sender.flush(&mut buffer).await?;
Ok(())
}
Auto-flush
use questdb::ingress::Sender;
let mut sender = Sender::from_conf(
"http::addr=localhost:9000;auto_flush=on;auto_flush_rows=1000;"
)?;
let mut buffer = Buffer::new();
// Automatically flushes after 1000 rows
for i in 0..5000 {
buffer
.table("events")?
.symbol("type", "page_view")?
.column_str("url", &format!("/page-{}", i))?
.at_now()?;
// Auto-flush happens internally
sender.flush(&mut buffer).await?;
}
// Flush remaining rows
sender.flush(&mut buffer).await?;
Error handling
use questdb::ingress::{Sender, Buffer, IngressError};
#[tokio::main]
async fn main() {
match run().await {
Ok(_) => println!("Success"),
Err(e) => eprintln!("Error: {}", e),
}
}
async fn run() -> Result<(), IngressError> {
let mut sender = Sender::from_conf("http::addr=localhost:9000;")?;
let mut buffer = Buffer::new();
buffer
.table("trades")?
.symbol("symbol", "BTC-USD")?
.column_f64("price", 50000.0)?
.at_now()?;
sender.flush(&mut buffer).await?;
Ok(())
}
Async/await
use questdb::ingress::{Sender, Buffer};
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut sender = Sender::from_conf("http::addr=localhost:9000;")?;
let mut buffer = Buffer::new();
loop {
// Collect metrics
buffer
.table("system_metrics")?
.symbol("host", "server-1")?
.column_f64("cpu", get_cpu_usage())?
.column_f64("memory", get_memory_usage())?
.at_now()?;
sender.flush(&mut buffer).await?;
// Sleep for 1 second
sleep(Duration::from_secs(1)).await;
}
}
fn get_cpu_usage() -> f64 {
// Implementation
45.2
}
fn get_memory_usage() -> f64 {
// Implementation
62.8
}
Thread safety
use questdb::ingress::{Sender, Buffer};
use std::sync::Arc;
use tokio::task;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let sender = Arc::new(tokio::sync::Mutex::new(
Sender::from_conf("http::addr=localhost:9000;")?)
);
let mut handles = vec![];
// Spawn 10 tasks
for i in 0..10 {
let sender = Arc::clone(&sender);
let handle = task::spawn(async move {
let mut buffer = Buffer::new();
for j in 0..100 {
buffer
.table("worker_metrics")
.unwrap()
.symbol("worker_id", &format!("worker-{}", i))
.unwrap()
.column_i64("iteration", j)
.unwrap()
.at_now()
.unwrap();
}
let mut sender = sender.lock().await;
sender.flush(&mut buffer).await.unwrap();
});
handles.push(handle);
}
// Wait for all tasks to complete
for handle in handles {
handle.await?;
}
Ok(())
}
Reuse Buffer instances to avoid allocations.
- Reuse buffers: Clear and reuse instead of creating new ones
- Batch writes: Send multiple rows before flushing
- Use HTTP: Better throughput for most workloads
- Enable auto-flush: Optimal for continuous streams
Next steps
ILP Reference
Learn about the protocol
Crates.io
View on crates.io