Reading and writing on inbound/outbound streams in libp2p-kad is implemented via hand written procedural / sequential state machines. This logic can be simplified by using Rust's async-await.
Manual state machines in libp2p-kad:
|
/// State of an active outbound substream. |
|
enum OutboundSubstreamState<TUserData> { |
|
/// We haven't started opening the outgoing substream yet. |
|
/// Contains the request we want to send, and the user data if we expect an answer. |
|
PendingOpen(SubstreamProtocol<KademliaProtocolConfig, (KadRequestMsg, Option<TUserData>)>), |
|
/// Waiting to send a message to the remote. |
|
PendingSend( |
|
KadOutStreamSink<NegotiatedSubstream>, |
|
KadRequestMsg, |
|
Option<TUserData>, |
|
), |
|
/// Waiting to flush the substream so that the data arrives to the remote. |
|
PendingFlush(KadOutStreamSink<NegotiatedSubstream>, Option<TUserData>), |
|
/// Waiting for an answer back from the remote. |
|
// TODO: add timeout |
|
WaitingAnswer(KadOutStreamSink<NegotiatedSubstream>, TUserData), |
|
/// An error happened on the substream and we should report the error to the user. |
|
ReportError(KademliaHandlerQueryErr, TUserData), |
|
/// The substream is being closed. |
|
Closing(KadOutStreamSink<NegotiatedSubstream>), |
|
/// The substream is complete and will not perform any more work. |
|
Done, |
|
Poisoned, |
|
} |
Example of using async-await in libp2p-dcutr:
|
pub async fn accept(mut self) -> Result<(NegotiatedSubstream, Bytes), UpgradeError> { |
|
let msg = StopMessage { |
|
r#type: stop_message::Type::Status.into(), |
|
peer: None, |
|
limit: None, |
|
status: Some(Status::Ok.into()), |
|
}; |
|
|
|
self.send(msg).await?; |
|
|
|
let FramedParts { |
|
io, |
|
read_buffer, |
|
write_buffer, |
|
.. |
|
} = self.substream.into_parts(); |
|
assert!( |
|
write_buffer.is_empty(), |
|
"Expect a flushed Framed to have an empty write buffer." |
|
); |
|
|
|
Ok((io, read_buffer.freeze())) |
|
} |
Reading and writing on inbound/outbound streams in
libp2p-kadis implemented via hand written procedural / sequential state machines. This logic can be simplified by using Rust's async-await.Manual state machines in
libp2p-kad:rust-libp2p/protocols/kad/src/handler.rs
Lines 140 to 163 in 43fdfe2
Example of using async-await in
libp2p-dcutr:rust-libp2p/protocols/relay/src/v2/protocol/inbound_stop.rs
Lines 124 to 146 in 43fdfe2