Standard WebSocket compression uses framed compression where every message is compressed independently of any other. This makes the compression more effective for larger messages, since the compressor has more ‘context’ to work with. To control our robots we’re sending about 10 messages per second that are medium-sized, serde flexbuffer-encoded messages, about 100KB each. These compress fairly well with per-frame compression.
There is a more effective method I’ve figured out however: share a single encoder context across messages. Then for every message, compress the data and flush the output. On the other side the same is done to decompress: create a decoder context, feed received messages into it and…
Standard WebSocket compression uses framed compression where every message is compressed independently of any other. This makes the compression more effective for larger messages, since the compressor has more ‘context’ to work with. To control our robots we’re sending about 10 messages per second that are medium-sized, serde flexbuffer-encoded messages, about 100KB each. These compress fairly well with per-frame compression.
There is a more effective method I’ve figured out however: share a single encoder context across messages. Then for every message, compress the data and flush the output. On the other side the same is done to decompress: create a decoder context, feed received messages into it and whatever is yielded for every frame, that’s the decompressed message. In zstandard terminology, you start a frame but never finish it. Every time you flush it will end a block. Normally when doing streamed compression this is done after the input or output crosses a certain threshold, but flushing makes it work for a framed transport like WebSockets.
I came up with this because we were optimizing the bandwidth for our construction robots, as we work on-site using a Wi-Fi connection. The first attempt used zstd with dictionary compression, but sending around and maintaining the dictionary was tedious. So I realized: what if we could create the dictionary on the fly? This is essentially what happens: the encoder learns and get better at compressing the stream as more data is fed through it. In our case it decreased the bandwidth by another 80% compared to per-message zstandard compression.
This is why H264 interframe video encoding beats MJPEG intraframe encoding: the codec has a lot more context to work with.
Other uses for streaming compression
This approach could also be adopted for the OpenTelemetry Collector, currently every batch export is compressed individually, this could also use streaming compression for a free bandwidth reduction. This doesn’t seem to be easy to do in gRPC (it only supports per-message compression, even in a streaming RPC) so it would require a custom protocol.
I’m such a big fan of streaming compression that I also made a Rust crate to compress HTTP responses, supporting streaming for gRPC-web and SSE. I had to create this crate because the response compression built into tower-http (the most widely used implementation) doesn’t work with streaming responses.
Example Code
Here’s basically what I do to compress/decompress WebSocket frames:
use std::io::Write;
use zstd_safe::{CCtx, CParameter, DCtx, DParameter, FrameFormat, InBuffer, OutBuffer};
pub struct Decoder {
dctx: DCtx<'static>,
}
impl Decoder {
pub fn new() -> Self {
let dctx = DCtx::create();
Decoder {
dctx,
}
}
pub fn decode(&mut self, data: &[u8], dst: &mut impl Write) -> std::io::Result<()> {
let mut buf: [0u8; 16_384];
let mut in_buffer = InBuffer::around(data);
loop {
let mut out_buffer = OutBuffer::around(&mut buf);
self.dctx
.decompress_stream(&mut out_buffer, &mut in_buffer)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
let pos = out_buffer.pos();
if in_buffer.pos() >= data.len() && pos == 0 {
break;
}
dst.write_all(&buf[..pos])?;
}
Ok(())
}
}
pub struct Encoder {
cctx: CCtx<'static>,
}
impl Encoder {
pub fn new() -> Self {
let cctx = CCtx::create();
Encoder {
cctx,
}
}
pub fn encode(&mut self, data: &[u8], dst: &mut impl Write) -> std::io::Result<()> {
let mut buf: [0u8; 16_384];
let mut in_buffer = InBuffer::around(data);
while in_buffer.pos() < data.len() {
let mut out_buffer = OutBuffer::around(&mut self.buf);
self.cctx
.compress_stream2(&mut out_buffer, &mut in_buffer, zstd_safe::zstd_sys::ZSTD_EndDirective::ZSTD_e_continue)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
dst.write_all(&buf[..out_buffer.pos()])?;
}
loop {
let mut out_buffer = OutBuffer::around(&mut buf);
let remaining = self.cctx
.compress_stream2(&mut out_buffer, &mut in_buffer, zstd_safe::zstd_sys::ZSTD_EndDirective::ZSTD_e_flush)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
dst.write_all(&buf[..out_buffer.pos()])?;
if remaining == 0 {
break;
}
}
Ok(())
}
}
Dec 2025