Welcome to part 3 of our series on parsing ROS bags in the MCAP format. In Part 1, we went over the basics of ROS and MCAP, and wrote some simple parsers. In Part 2, we filled out our parser enough to see the complete record structure of our bag. In this part, we’re going to filter messages so that we only get those messages that are associated with particular topics. This will leave us with the actual data inside those messages, which we’ll parse in the next part!
To learn more about parsing, you should sign up for our course Solve.hs. The 4th module will teach you important Haskell parsing …
Welcome to part 3 of our series on parsing ROS bags in the MCAP format. In Part 1, we went over the basics of ROS and MCAP, and wrote some simple parsers. In Part 2, we filled out our parser enough to see the complete record structure of our bag. In this part, we’re going to filter messages so that we only get those messages that are associated with particular topics. This will leave us with the actual data inside those messages, which we’ll parse in the next part!
To learn more about parsing, you should sign up for our course Solve.hs. The 4th module will teach you important Haskell parsing techniques, ranging from regular expressions to the Megaparsec library we’re using in this series.
Message Structure
Since we’ll start looking in depth at Message records in this part, let’s look at their structure:
Bytes | Name | Type Description
2 | channel_id | uint16
4 | sequence | uint32
8 | log_time | Timestamp
8 | publish_time | Timestamp
N | data | Bytes
The data is, of course, the raw data for this message. The two times (log and publish) tell us when this message was written, as filtering messages by time is a common operation. We will ignore the sequence number for this example.
So far, we don’t actually have that much information about this message. We don’t know what topic it was published on, or how to decode its data. This information is shared across all messages for a particular topic, so it would be redundant to store it on each Message record. But for this article, we would like to filter messages based on topic, so how do we do this?
The channel_id field links us to all the missing information. So let’s now take a look at the structure of Channels and Schemas.
Schemas and Channels
Channels and Schemas provide the additional context that helps us parse the raw data in messages. As you’ll recall, Schema and Channel are both record types next to Message.
We’ll start with channels, since our message type links us directly to a channel with the channel_id field. A channel describes a single stream of messages on a particular topic. So every message on a channel comes from the same topic. As an important note, a Channel record will appear in the MCAP bag before any Message records on that channel. Here are the fields on a channel:
Bytes | Name | Type
2 | id | uint16
2 | schema_id | uint16
4 + N | topic | String
4 + N | message_encoding | String
4 + N | metadata | Map<string, string>
So each channel has a unique ID, a topic, a message encoding, and some metadata. The message encoding field is valuable for decoding our message data. Even if we know the data fields to expect in that data, there are different ways to encode it. In our case, we’ll be using “CDR”, the “Common Data Representation” format. Since the channel has the topic, we can filter messages by topic strictly by using the message records and the channel record each of them links to.
Now depending on how much you’re willing to assume about your program, this might be enough for you to decode the data. You would just need to map the topic name to some kind of data decoding function. But if you want to write a more general program, this isn’t enough. Thus a Channel also has a schema_id field to link it to a Schema.
A Schema can give a “definition” for a message. It has 4 fields, but in our example only 2 of them get populated. The fields are:
Bytes | Name | Type
2 | id | uint16
4 + N | name | String
4 + N | encoding | String
4 + N | data | uint32 length-prefixed Bytes
Just as a channel record will appear before any messages that refer to it, a Schema record will appear in the bag before any Channel record that references it.
The ID of the schema helps channel elements link to this schema element, of course. The “name” in our case will give us the full package name of the type we’re parsing: my_package/msg/Simple. In theory, this would help us locate the original message definition.
The other two schema fields will be blank in our example. ROS2 doesn’t rely on actually storing the schema data in the bag. It relies on you linking the message package name and applying that to decode. So in that sense, our program will be less general. But we’ll still go through the motions of tracking our schema and channel elements.
Updating our Monad
Since the Message record itself doesn’t contain the topic, we need to keep track of schema and channel elements that we’ve already parsed. The simplest way to do this is to use the State monad. We’re already operating within a ParsecT IO monad transformer stack. So what we need to do is define a stateful type for this information, and add StateT to this stack.
Our stateful type should track 5 things:
- The set of topics we’re interested in
- The set of channels we’re interested in
- A map from channel IDs to channel data
- A map from schema IDs to schema data
- A map of the message data we’ve received for our topics
We’ll start by making types that reflect the information we parse from these records, one type for schema, one for channel, and one for message. These are self-explanatory, following the field definitions from the specification.
data MsgSchema = MsgSchema
{ schemaId :: Word16
, schemaName :: ByteString
, schemaEncoding :: ByteString
, schemaData :: ByteString
} deriving (Show, Eq)
data MsgChannel = MsgChannel
{ channelId :: Word16
, channelSchemaId :: Word16
, channelTopic :: ByteString
, channelEncoding :: ByteString
, channelMetadata :: HM.HashMap ByteString ByteString
} deriving (Show, Eq)
data MessageData = MessageData
{ messageLogTime :: Word64
, messagePublishTime :: Word64
, messageData :: ByteString
} deriving (Show, Eq)
Now we can define a type TopicState that wraps the five elements we listed:
data TopicState = TopicState
{ tsChannels :: HM.HashMap Word16 MsgChannel
, tsSchemas :: HM.HashMap Word16 MsgSchema
, tsMessages :: HM.HashMap (ByteString, ByteString) [MessageData]
, tsDesiredTopics :: HS.HashSet ByteString
, tsDesiredChannels :: HS.HashSet Word16
} deriving (Show, Eq)
The first item (desired topics) will be static…we’ll define it at the start. But the other fields will all get modified as we parse the relevant records. In the case of the message data, we’ll use the topic name and schema name as the key in our map.
Now we’ll update our monad to include this as a stateful object:
type Parser a = ParsecT Void ByteString (StateT TopicState IO) a
We also need to update our monadic entry point. Using runParserT now gives a StateT action, so we need to use evalStateT to convert that to an IO action producing our results.
printRecordTypesFromFile :: FilePath -> IO ()
parseBareRecordsFromFile fp = do
input <- BS.readFile fp
let initialTopicState = TopicState HM.empty HM.empty HM.empty (HS.singleton “/simple_topic”) HS.empty
result <- runStateT (runParserT parseMcapFile' fp input) initialTopicState
case result of
Left e -> print e
Right recs -> do
forM_ recs $ \(_, rec) -> printRec rec
print st
Otherwise, this shouldn’t impact most of our Parser functions. If you put in any print statements using lift, these would just need an additional lift now since IO is one level further down our monad stack (or you could use liftIO).
Parsing a Schema
Now that we’ve defined the new types for this part, let’s start using them. We already have a function parseChunk that parses relevant information from a Chunk record. Let’s write similar functions for these 3 record types. We’ll not only parse the data, but also save it in our state under the tsSchemas map.
Let’s start by parsing all the core fields of the Schema element and turning these into a MsgSchema object. This uses our existing primitives and patterns:
parseSchema :: Parser (Word64, Record)
parseSchema = do
(n1, sid) <- parseUint16LE
(n2, sname) <- parseString
(n3, sencoding) <- parseString
(n4, bytesLen) <- parseUint32LE
schema <- BS.pack <$> count (fromIntegral bytesLen) anySingle
let totalLen = n1 + n2 + n3 + n4 + fromIntegral bytesLen
let schema' = MsgSchema sid sname sencoding schema
...
Now we need a StateT action that will “save” this schema element. We pull out the topic state, insert our new schema element into the tsSchemas field by its ID, and the modify the state.
parseSchema :: Parser (Word64, Record)
parseSchema = do
(n1, sid) <- parseUint16LE
(n2, sname) <- parseString
(n3, sencoding) <- parseString
(n4, bytesLen) <- parseUint32LE
schema <- BS.pack <$> count (fromIntegral bytesLen) anySingle
let totalLen = n1 + n2 + n3 + n4 + fromIntegral bytesLen
let schema' = MsgSchema sid sname sencoding schema
...
where
addSchema :: MsgSchema -> StateT TopicState IO ()
addSchema s = do
tops <- get
let newS = HM.insert (schemaId s) s (tsSchemas tops)
put $ tops { tsSchemas = newS }
To complete the function, we just lift our stateful action into the Parser monad, and return the same record signifying the type that we have been so far.
parseSchema :: Parser (Word64, Record)
parseSchema = do
(n1, sid) <- parseUint16LE
(n2, sname) <- parseString
(n3, sencoding) <- parseString
(n4, bytesLen) <- parseUint32LE
schema <- BS.pack <$> count (fromIntegral bytesLen) anySingle
let totalLen = n1 + n2 + n3 + n4 + fromIntegral bytesLen
let schema' = MsgSchema sid sname sencoding schema
lift (addSchema schema’)
return $ (totalLen, Record Schema)
where
addSchema :: MsgSchema -> StateT TopicState IO ()
addSchema s = do
tops <- get
let newS = HM.insert (schemaId s) s (tsSchemas tops)
put $ tops { tsSchemas = newS }
Parsing a Channel
Parsing a channel is mostly similar. We parse the fields using our primitives, and we start to define a function to add this channel into our topic state:
parseChannel :: Parser (Word64, Record)
parseChannel = do
(n1, cid) <- parseUint16LE
(n2, sid) <- parseUint16LE
(n3, topic) <- parseString
(n4, encoding) <- parseString
(n5, mp) <- parseStrMap
let totalLen = n1 + n2 + n3 + n4 + n5
let channel = MsgChannel cid sid topic encoding mp
lift (addChannel channel)
return $ (totalLen, Record Channel)
where
addChannel :: MsgChannel -> StateT TopicState IO ()
addChannel = ...
There are two structural differences between addChannel and addSchema. First, we only care about adding the channel if the topic is one of our desired topics. Second, we also want to update the desired channels set in addition to saving the channel by its ID. Otherwise things are the same:
parseChannel :: Parser (Word64, Record)
parseChannel = do
(n1, cid) <- parseUint16LE
(n2, sid) <- parseUint16LE
(n3, topic) <- parseString
(n4, encoding) <- parseString
(n5, mp) <- parseStrMap
let totalLen = n1 + n2 + n3 + n4 + n5
let channel = MsgChannel cid sid topic encoding mp
lift (addChannel channel)
return $ (totalLen, Record Channel)
where
addChannel c = do
let topic = channelTopic c
tops <- get
when (HS.member topic (tsDesiredTopics tops)) $ do
let newC = HM.insert (channelId c) c (tsChannels tops)
let newDC = HS.insert (channelId c) (tsDesiredChannels tops)
put $ tops { tsChannels = newC, tsDesiredChannels = newDC }
Parsing a Message
Finally, let’s parse a Message record. Unlike the other record types, the data field for a message does prepend its own size. So we need to pass the recordContentLength as an additional argument for the function. This lets us parse all the message data:
parseMessage :: Word64 -> Parser (Word64, Record)
parseMessage len = do
(a, channelId) <- parseUint16LE
(b, sequence) <- parseUint32LE
(c, logTime) <- parseTimestamp
(d, publishTime) <- parseTimestamp
let overheadLen = a + b + c + d
guard' ("Message overhead exceeds content length: " <> show overheadLen <> " " <> show len) (overheadLen <= len)
dataBytes <- BS.pack <$> count (fromIntegral (len - overheadLen)) anySingle
...
Now we have two jobs. First, we want to see if this message comes from a desired topic, and if we can find a schema for that topic. Then we want to save the message if so. For the first step, we’ll write a function that will take the TopicState and return the schema and topic name if they are desired.
parseMessage :: Word64 -> Parser (Word64, Record)
parseMessage len = do
(a, channelId) <- parseUint16LE
(b, sequence) <- parseUint32LE
(c, logTime) <- parseTimestamp
(d, publishTime) <- parseTimestamp
let overheadLen = a + b + c + d
guard' ("Message overhead exceeds content length: " <> show overheadLen <> " " <> show len) (overheadLen <= len)
dataBytes <- BS.pack <$> count (fromIntegral (len - overheadLen)) anySingle
tops <- lift get
let schemaToDecode = findSchema tops channelId
case schemaToDecode of
Nothing -> return (len, Record Message)
...
where
findSchema :: TopicState -> Word16 -> Maybe (MsgSchema, ByteString)
findSchema (TopicState chans schems _ _ desChans) channelId = do
guard (HS.member channelId desChans)
(MsgChannel _ sid topicName _ _) <- HM.lookup channelId chans
schem <- HM.lookup sid schems
return (schem, topicName)
We get the TopicState and then simply pass it to a function in the Maybe monad that evaluates various conditions like if the channel ID is desired, if we can find the channel for this ID, and if we can find the schema for that channel. If one of these checks fails, we simply skip processing the message.
Now we need an addMessage function similar to addSchema and addChannel above. We associate the message data (timestamps and bytes) with the topic name and the schema name in that map.
parseMessage :: Word64 -> Parser (Word64, Record)
parseMessage len = do
(a, channelId) <- parseUint16LE
(b, sequence) <- parseUint32LE
(c, logTime) <- parseTimestamp
(d, publishTime) <- parseTimestamp
let overheadLen = a + b + c + d
guard' ("Message overhead exceeds content length: " <> show overheadLen <> " " <> show len) (overheadLen <= len)
dataBytes <- BS.pack <$> count (fromIntegral (len - overheadLen)) anySingle
tops <- lift get
let schemaToDecode = findSchema tops channelId
case schemaToDecode of
Nothing -> return (len, Record Message)
Just ((MsgSchema _ sname _ _), topicName) -> do
lift $ addMessage topicName sname logTime publishTime dataBytes
return (len, Record Message)
where
findSchema :: TopicState -> Word16 -> Maybe (MsgSchema, ByteString)
findSchema (TopicState chans schems _ _ desChans) channelId = do
guard (HS.member channelId desChans)
(MsgChannel _ sid topicName _ _) <- HM.lookup channelId chans
schem <- HM.lookup sid schems
return (schem, topicName)
addMessage topicName schemaName logTime publishTime dataBytes = do
tops <- get
let prevMsgs = fromMaybe [] $ HM.lookup (topicName, schemaName) (tsMessages tops)
let newTsm = HM.insert (topicName, schemaName) (MessageData logTime publishTime dataBytes : prevMsgs) (tsMessages tops)
put $ tops { tsMessages = newTsm }
Bringing it Together
Now we just need to update parseSingleRecord so that it handles these 3 cases in addition to the Chunk case!
parseSingleRecord :: Parser (Word64, Record)
parseSingleRecord = do
(typLen, typ) <- parseRecordType
(rclLen, recordContentLength) <- parseUint64LE
let totalLen = typLen + rclLen + recordContentLength
record <- case typ of
Chunk -> do
(parsedChunkLength, rec) <- parseChunk
guard' ("Parsed chunk length does not match: " <> show parsedChunkLength <> " " <> show recordContentLength) (parsedChunkLength == recordContentLength)
return rec
Channel -> do
(parsedChannelLength, rec) <- parseChannel
guard' ("Parsed channel length does not match: " <> show parsedChannelLength <> " " <> show recordContentLength) (parsedChannelLength == recordContentLength)
return rec
Schema -> do
(parsedSchemaLength, rec) <- parseSchema
guard' ("Parsed schema length does not match: " <> show parsedSchemaLength <> " " <> show recordContentLength) (parsedSchemaLength == recordContentLength)
return rec
Message -> do
(parsedMessageLength, rec) <- parseMessage recordContentLength
guard' ("Parsed message length does not match: " <> show parsedMessageLength <> " " <> show recordContentLength) (parsedMessageLength == recordContentLength)
return rec
_ -> do
count (fromIntegral recordContentLength) anySingle
return $ Record typ
return (totalLen, record)
Our program will now parse the full bag and print out the header structure as well the final TopicState at the end! Here’s what we see for our “message data”:
Message 1:
MessageData {messageLogTime = 1757733836313810115, messagePublishTime = 1757733836313810115, messageData = "\NUL\SOH\NUL\NUL\ACK\NUL\NUL\NUL\NUL\NUL\NUL\NUL333333\EM@\b\NUL\NUL\NULGoodbye\NUL"
Message 2:
MessageData {messageLogTime = 1757733836289706147, messagePublishTime = 1757733836289706147, messageData = "\NUL\SOH\NUL\NUL\ENQ\NUL\NUL\NUL\NUL\NUL\NUL\NUL\205\204\204\204\204\204\DLE@\ACK\NUL\NUL\NULHello\NUL"}
Conclusion
The data we have still isn’t structured though! We need to figure out how to parse these bytes out of the CDR encoding. We’ll do that next time!
In the meantime, you can learn more about parsing (and other problem solving tactics) by signing up for Solve.hs, our problem solving course! The first 3 modules will teach you about Data Structures and Algorithms in Haskell, and then the final module will teach you advanced parsing techniques like Megaparsec!