kdb_codec
A Rust library focused on handling the kdb+ IPC (Inter-Process Communication) wire protocol. This library provides efficient encoding, decoding, and communication with q/kdb+ processes using idiomatic Rust patterns.
Inspired by the original kdbplus crate, this library addresses critical cancellation safety issues while maintaining full compatibility with the kdb+ IPC protocol.
Why This Library?
The original kdbplus crate had a fundamental cancellation safety issue in its receive_message() implementation. When used with tokio::select! or other cancellation-aware patterns, partial reads could cause message corruption:
// ⚠️ UNSAFE - could lose data on cancellation in original kdbplus
select! {
msg = socket.receive_message() => { /* ... */ }
_ = timeout => { /* partial read gets lost */ }
}Our Solution: This library uses tokio-util::codec::Framed with a custom KdbCodec, ensuring true cancellation safety:
// ✅ SAFE - Framed maintains buffer state across cancellations
let mut framed = Framed::new(stream, KdbCodec::new(true));
select! {
msg = framed.next() => { /* buffer state preserved */ }
_ = timeout => { /* can safely retry */ }
}The Framed pattern maintains internal buffer state, so cancelled reads never lose data. All partial reads are preserved in the codec's buffer and properly reassembled on the next attempt.
Features
- Cancellation Safe: Built on
tokio-util::codec::Framedfor true cancellation safety - Tokio Codec Pattern: Modern async/await interface with proper buffer management
- QStream Client: High-level async client for q/kdb+ communication
- Intuitive Data Access: Index trait for ergonomic K object access with
[]syntax - Full Compression Support: Compatible with kdb+
-18!(compress) and-19!(decompress) - Multiple Connection Methods: TCP, TLS, and Unix Domain Socket support
- Type-Safe: Strong typing for all kdb+ data types
- Minimal Dependencies: No
async-recursionor unnecessary proc-macros - Zero-Copy Operations: Efficient message handling with minimal allocations
Quick Example
use kdb_codec::*;
use tokio::net::TcpStream;
use tokio_util::codec::Framed;
use futures::{SinkExt, StreamExt};
#[tokio::main]
async fn main() -> Result<()> {
let stream = TcpStream::connect("127.0.0.1:5000").await?;
let mut framed = Framed::new(stream, KdbCodec::new(true));
// Send query - cancellation safe!
let query = K::new_string("1+1".to_string(), 0);
let msg = KdbMessage::new(qmsg_type::synchronous, query);
framed.send(msg).await?;
// Receive response - even if cancelled, buffer state is preserved
if let Some(Ok(response)) = framed.next().await {
println!("Result: {}", response.payload);
}
Ok(())
}Next Steps
- Installation - Add kdb_codec to your project
- Codec Pattern - Learn about the tokio codec architecture
- K Macro - Simplified data construction
- Index Trait - Ergonomic data access with [] syntax
- Type Mapping & Coverage - Supported IPC types and missing types