Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/questdb/questdb/llms.txt

Use this file to discover all available pages before exploring further.

The QuestDB Rust client provides a safe, idiomatic API for ingesting data using the InfluxDB Line Protocol.

Installation

Add to your Cargo.toml:
[dependencies]
questdb-rs = "3.0"
tokio = { version = "1", features = ["full"] }

Quick start

use questdb::ingress::{Sender, Buffer, TimestampNanos};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut sender = Sender::from_conf("http::addr=localhost:9000;")?;
    
    let mut buffer = Buffer::new();
    buffer
        .table("trades")?
        .symbol("symbol", "ETH-USD")?
        .symbol("side", "sell")?
        .column_f64("price", 2615.54)?
        .column_f64("amount", 0.00044)?
        .at_now()?;
    
    sender.flush(&mut buffer).await?;
    Ok(())
}

Configuration

use questdb::ingress::Sender;

// HTTP with authentication
let sender = Sender::from_conf(
    "http::addr=localhost:9000;username=admin;password=quest;"
)?;

// TCP with authentication
let tcp_sender = Sender::from_conf(
    "tcp::addr=localhost:9009;username=admin;token=your-token;"
)?;

Configuration options

protocol
string
http or tcp
addr
string
required
Server address (host:port)
username
string
Authentication username
password
string
Authentication password (HTTP)
token
string
Authentication token (TCP)

Data types

use questdb::ingress::{Buffer, TimestampNanos};
use std::time::SystemTime;

let mut buffer = Buffer::new();
buffer
    .table("sensors")?
    .symbol("location", "warehouse-1")?        // Symbol
    .column_i64("sensor_id", 12345)?            // i64
    .column_f64("temperature", 23.5)?           // f64
    .column_str("status", "active")?            // String
    .column_bool("is_online", true)?            // bool
    .column_ts("measured_at", TimestampNanos::now())?  // Timestamp
    .at_now()?;

Batching

use questdb::ingress::{Sender, Buffer};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut sender = Sender::from_conf("http::addr=localhost:9000;")?;
    let mut buffer = Buffer::new();
    
    // Send 1000 rows
    for i in 0..1000 {
        buffer
            .table("metrics")?
            .symbol("host", &format!("server-{}", i % 10))?
            .column_f64("cpu", (i % 100) as f64)?
            .column_f64("memory", ((i * 2) % 100) as f64)?
            .at_now()?;
    }
    
    sender.flush(&mut buffer).await?;
    Ok(())
}

Auto-flush

use questdb::ingress::Sender;

let mut sender = Sender::from_conf(
    "http::addr=localhost:9000;auto_flush=on;auto_flush_rows=1000;"
)?;

let mut buffer = Buffer::new();

// Automatically flushes after 1000 rows
for i in 0..5000 {
    buffer
        .table("events")?
        .symbol("type", "page_view")?
        .column_str("url", &format!("/page-{}", i))?
        .at_now()?;
    
    // Auto-flush happens internally
    sender.flush(&mut buffer).await?;
}

// Flush remaining rows
sender.flush(&mut buffer).await?;

Error handling

use questdb::ingress::{Sender, Buffer, IngressError};

#[tokio::main]
async fn main() {
    match run().await {
        Ok(_) => println!("Success"),
        Err(e) => eprintln!("Error: {}", e),
    }
}

async fn run() -> Result<(), IngressError> {
    let mut sender = Sender::from_conf("http::addr=localhost:9000;")?;
    let mut buffer = Buffer::new();
    
    buffer
        .table("trades")?
        .symbol("symbol", "BTC-USD")?
        .column_f64("price", 50000.0)?
        .at_now()?;
    
    sender.flush(&mut buffer).await?;
    Ok(())
}

Async/await

use questdb::ingress::{Sender, Buffer};
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut sender = Sender::from_conf("http::addr=localhost:9000;")?;
    let mut buffer = Buffer::new();
    
    loop {
        // Collect metrics
        buffer
            .table("system_metrics")?
            .symbol("host", "server-1")?
            .column_f64("cpu", get_cpu_usage())?
            .column_f64("memory", get_memory_usage())?
            .at_now()?;
        
        sender.flush(&mut buffer).await?;
        
        // Sleep for 1 second
        sleep(Duration::from_secs(1)).await;
    }
}

fn get_cpu_usage() -> f64 {
    // Implementation
    45.2
}

fn get_memory_usage() -> f64 {
    // Implementation
    62.8
}

Thread safety

use questdb::ingress::{Sender, Buffer};
use std::sync::Arc;
use tokio::task;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let sender = Arc::new(tokio::sync::Mutex::new(
        Sender::from_conf("http::addr=localhost:9000;")?)
    );
    
    let mut handles = vec![];
    
    // Spawn 10 tasks
    for i in 0..10 {
        let sender = Arc::clone(&sender);
        let handle = task::spawn(async move {
            let mut buffer = Buffer::new();
            
            for j in 0..100 {
                buffer
                    .table("worker_metrics")
                    .unwrap()
                    .symbol("worker_id", &format!("worker-{}", i))
                    .unwrap()
                    .column_i64("iteration", j)
                    .unwrap()
                    .at_now()
                    .unwrap();
            }
            
            let mut sender = sender.lock().await;
            sender.flush(&mut buffer).await.unwrap();
        });
        
        handles.push(handle);
    }
    
    // Wait for all tasks to complete
    for handle in handles {
        handle.await?;
    }
    
    Ok(())
}

Performance tips

Reuse Buffer instances to avoid allocations.
  1. Reuse buffers: Clear and reuse instead of creating new ones
  2. Batch writes: Send multiple rows before flushing
  3. Use HTTP: Better throughput for most workloads
  4. Enable auto-flush: Optimal for continuous streams

Next steps

ILP Reference

Learn about the protocol

Crates.io

View on crates.io