Fluxion
A reactive stream processing library for Rust with temporal ordering guarantees, efficient async execution and friendly fluent API.
π See why Fluxion sets new standards for quality β
Table of Contents
- Features
- Independent Code Reviews
- Quick Start
- Operator Documentation
- Core Concepts
- Documentation
- Development
Features
- π Rx-Style Operators: Familiar reactive programming patterns (
combine_latest,with_latest_from,ordered_merge, etc.) - β±οΈ Temporal Ordering: Guaranteed ordering semantics with
Sequenced<T>wrapper - β‘ Async Execution: Efβ¦
Fluxion
A reactive stream processing library for Rust with temporal ordering guarantees, efficient async execution and friendly fluent API.
π See why Fluxion sets new standards for quality β
Table of Contents
- Features
- Independent Code Reviews
- Quick Start
- Operator Documentation
- Core Concepts
- Documentation
- Development
Features
- π Rx-Style Operators: Familiar reactive programming patterns (
combine_latest,with_latest_from,ordered_merge, etc.) - β±οΈ Temporal Ordering: Guaranteed ordering semantics with
Sequenced<T>wrapper - β‘ Async Execution: Efficient async processing with
subscribe_asyncandsubscribe_latest_async - π‘οΈ Type-Safe Error Handling: Comprehensive error propagation through
StreamItem<T>- see the Error Handling Guide - π Excellent Documentation: Detailed guides, examples, and API docs
- β Well Tested: 1,500+ tests with comprehensive coverage
π Independent Code Reviews
π Other Assesssments
Quick Start
Add Fluxion to your Cargo.toml:
[dependencies]
fluxion-rx = "0.2.1"
fluxion-test-utils = "0.2.1"
tokio = { version = "1.48.0", features = ["full"] }
anyhow = "1.0.100"
futures = "0.3.31"
Basic Usage
use fluxion_rx::FluxionStream;
use fluxion_test_utils::sequenced::Sequenced;
use futures::StreamExt;
#[tokio::test]
async fn test_take_latest_when_int_bool() -> anyhow::Result<()> {
// Define enum to hold int and bool types
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
enum Value {
Int(i32),
Bool(bool),
}
// Create int stream and bool trigger stream
let (tx_int, rx_int) = tokio::sync::mpsc::unbounded_channel::<Sequenced<Value>>();
let (tx_trigger, rx_trigger) = tokio::sync::mpsc::unbounded_channel::<Sequenced<Value>>();
let int_stream = FluxionStream::from_unbounded_receiver(rx_int);
let trigger_stream = FluxionStream::from_unbounded_receiver(rx_trigger);
let mut pipeline = int_stream.take_latest_when(trigger_stream, |_| true);
// Send int values first - they will be buffered
tx_int.send(Sequenced::with_sequence(Value::Int(10), 1))?;
tx_int.send(Sequenced::with_sequence(Value::Int(20), 2))?;
tx_int.send(Sequenced::with_sequence(Value::Int(30), 3))?;
// Trigger with bool - should emit latest int value (30) with trigger's sequence
tx_trigger.send(Sequenced::with_sequence(Value::Bool(true), 4))?;
let result1 = pipeline.next().await.unwrap().unwrap();
assert!(matches!(result1.get(), Value::Int(30)));
assert_eq!(result1.sequence(), 4);
// After first trigger, send more int values
tx_int.send(Sequenced::with_sequence(Value::Int(40), 5))?;
// Need another trigger to emit the buffered value
tx_trigger.send(Sequenced::with_sequence(Value::Bool(true), 6))?;
let result2 = pipeline.next().await.unwrap().unwrap();
assert!(matches!(result2.get(), Value::Int(40)));
assert_eq!(result2.sequence(), 6);
// Send another int and trigger
tx_int.send(Sequenced::with_sequence(Value::Int(50), 7))?;
tx_trigger.send(Sequenced::with_sequence(Value::Bool(true), 8))?;
let result3 = pipeline.next().await.unwrap().unwrap();
assert!(matches!(result3.get(), Value::Int(50)));
assert_eq!(result3.sequence(), 8);
Ok(())
}
Chaining Multiple Operators
Fluxion operators can be chained to create complex processing pipelines. Here a complete example:
Example: combine_latest -> filter_ordered - Sampling on Trigger Events
use fluxion_rx::{FluxionStream, Ordered};
use fluxion_test_utils::sequenced::Sequenced;
use futures::StreamExt;
#[tokio::test]
async fn test_combine_latest_int_string_filter_order() -> anyhow::Result<()> {
// Define enum to hold both int and string types
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
enum Value {
Int(i32),
Str(String),
}
// Create two input streams
let (tx_int, rx_int) = tokio::sync::mpsc::unbounded_channel::<Sequenced<Value>>();
let (tx_str, rx_str) = tokio::sync::mpsc::unbounded_channel::<Sequenced<Value>>();
let int_stream = FluxionStream::from_unbounded_receiver(rx_int);
let str_stream = FluxionStream::from_unbounded_receiver(rx_str);
// Chain: combine_latest -> filter
let mut pipeline = int_stream
.combine_latest(vec![str_stream], |_| true)
.filter_ordered(|combined| {
// Keep only if first value (int) is > 50
matches!(combined.values()[0], Value::Int(x) if x > 50)
});
// Send initial values
tx_str.send(Sequenced::with_sequence(Value::Str("initial".into()), 1))?;
tx_int.send(Sequenced::with_sequence(Value::Int(30), 2))?;
tx_int.send(Sequenced::with_sequence(Value::Int(60), 3))?; // Passes filter (60 > 50)
tx_str.send(Sequenced::with_sequence(Value::Str("updated".into()), 4))?;
tx_int.send(Sequenced::with_sequence(Value::Int(75), 5))?; // Passes filter (75 > 50)
// Results: seq 3 (Int 60), seq 4 (Int 60 + Str updated), seq 5 (Int 75)
let result1 = pipeline.next().await.unwrap();
let combined1 = result1.get().values();
assert!(matches!(combined1[0], Value::Int(60)));
assert!(matches!(combined1[1], Value::Str(ref s) if s == "initial"));
let result2 = pipeline.next().await.unwrap();
let combined2 = result2.get().values();
assert!(matches!(combined2[0], Value::Int(60)));
assert!(matches!(combined2[1], Value::Str(ref s) if s == "updated"));
let result3 = pipeline.next().await.unwrap();
let combined3 = result3.get().values();
assert!(matches!(combined3[0], Value::Int(75)));
assert!(matches!(combined3[1], Value::Str(ref s) if s == "updated"));
Ok(())
}
π Operator Documentation
- All Operators - Complete operator reference
- Operators Roadmap - Planned future operators
Core Concepts
Stream Operators
Combining Streams:
combine_latest- Emit when any stream emits, with latest from allwith_latest_from- Sample secondary streams on primary emissionordered_merge- Merge multiple streams preserving temporal order
Filtering & Gating:
emit_when- Gate emissions based on a filter conditiontake_latest_when- Sample stream on trigger eventstake_while_with- Emit while condition holds
Transformation:
combine_with_previous- Pair consecutive valuesmap_ordered- Transform while preserving orderfilter_ordered- Filter while preserving order
Async Execution
Sequential Processing:
use fluxion_exec::SubscribeAsyncExt;
stream
.subscribe_async(
|item, _token| async move {
process(item).await?;
Ok::<(), MyError>(())
},
None,
Some(|err| eprintln!("Error: {}", err))
)
.await?;
Latest-Value Processing (with auto-cancellation):
use fluxion_exec::SubscribeLatestAsyncExt;
stream
.subscribe_latest_async(
|item, token| async move {
expensive_operation(item, token).await?;
Ok::<(), MyError>(())
},
Some(|err| eprintln!("Error: {}", err)),
None
)
.await?;
Documentation
π Guides
- Integration Guide - Learn the three patterns for integrating events (intrinsic, extrinsic, wrapper ordering)
- Error Handling Guide - Comprehensive guide to error propagation and recovery strategies
π¦ Crate Documentation
- fluxion-rx - Main convenience crate (re-exports all operators)
- fluxion-stream - Stream operators and composition patterns
- fluxion-exec - Async execution and subscription utilities
- fluxion-core - Core traits, types, and utilities
- fluxion-merge - Stream merging operators
- fluxion-ordered-merge - Generic ordered merging
- fluxion-test-utils - Testing helpers and fixtures
π‘ Complete Example
The stream-aggregation example demonstrates production-ready patterns:
- Real-world architecture: 3 producers, 1 aggregator, 1 consumer
- Ordered stream combining: Merges sensor readings, metrics, and system events
- Type-safe transformations: Uses
UnboundedReceiverExtfor elegant type erasure - Graceful shutdown: Proper cleanup with
CancellationToken - Error handling: Demonstrates best practices throughout
Why this example matters:
- Shows how all the pieces fit together in a realistic application
- Demonstrates the
into_fluxion_stream()pattern for combining heterogeneous streams - Illustrates proper resource management and cancellation
- Serves as a template for building your own event processing systems
Run it with: cargo run --package rabbitmq-aggregator-example
π§ API Documentation
Generate and browse full API documentation:
cargo doc --no-deps --open
Or for specific crates:
cargo doc --package fluxion-stream --open
cargo doc --package fluxion-exec --open
Development
Prerequisites
- Rust toolchain (version pinned in
rust-toolchain.toml) - Cargo
Building
# Run CI checks locally (PowerShell)
.\.ci\ci.ps1
Workspace Structure
This repository is organized as a Cargo workspace with the following crates:
- fluxion-rx - Main convenience crate (re-exports from other crates)
- fluxion-stream - Stream operators and combinators
- fluxion-exec - Execution utilities and subscriptions
- fluxion-core - Core traits, types, and utilities
- fluxion-merge - Stream merging operators
- fluxion-ordered-merge - Generic ordered merging implementation
- fluxion-test-utils - Test helpers and fixtures
See individual crate READMEs for detailed documentation.
Development Notes
- All clippy, formatting, and documentation warnings are treated as errors in CI
- Use
.ci/coverage.ps1to collect code coverage locally (requirescargo-llvm-cov) - See ROADMAP.md for planned features and release schedule
Project Status
Current Version: 0.2.1
- β Published to crates.io
- β Core functionality complete
- β Comprehensive test coverage
- β Error propagation and handling implemented
- π Documentation complete for current features
See ROADMAP.md for details on the path to 1.0.0.
Contributing
Contributions are welcome! Please see CONTRIBUTING.md for guidelines.
Before submitting a PR:
- Run tests:
cargo test --workspace - Run clippy:
cargo clippy --workspace -- -D warnings - Format code:
cargo fmt --all - Update documentation if needed
License
Licensed under the Apache License, Version 2.0. See LICENSE for details.
Acknowledgments
Inspired by ReactiveX and other reactive programming libraries, with a focus on Rustβs safety and performance characteristics.
Security
All commits and releases are GPG signed.
Key ID: 5729DA194B0929542BF79074C2A11DED229A1E51 Fingerprint: 5729 DA19 4B09 2954 2BF7 9074 C2A1 1DED 229A 1E51
Author
Name: Umberto Gotti Email: umberto.gotti@umbertogotti.dev Twitter: https://x.com/GottiUmberto LinkedIn: www.linkedin.com/in/umberto-gotti-85346b48