Skip to content

Commit b13b597

Browse files
authored
Merge 55769a7 into d815cae
2 parents d815cae + 55769a7 commit b13b597

1 file changed

Lines changed: 150 additions & 0 deletions

File tree

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
//! Very basic example to showcase how to write a protocol that rejects new
2+
//! connections based on internal state. Useful when you want an endpoint to
3+
//! stop accepting new connections for some reason only known to the node. Maybe
4+
//! it's doing a migration, starting up, in a "maintenance mode", or serving
5+
//! too many connections.
6+
//!
7+
//! ## Usage
8+
//!
9+
//! cargo run --example screening-connection --features=examples
10+
use std::sync::{
11+
Arc,
12+
atomic::{AtomicU64, Ordering},
13+
};
14+
15+
use iroh::{
16+
Endpoint, NodeAddr,
17+
endpoint::{Connecting, Connection},
18+
protocol::{AcceptError, ProtocolHandler, Router},
19+
};
20+
use n0_snafu::{Result, ResultExt};
21+
22+
/// Each protocol is identified by its ALPN string.
23+
///
24+
/// The ALPN, or application-layer protocol negotiation, is exchanged in the connection handshake,
25+
/// and the connection is aborted unless both nodes pass the same bytestring.
26+
const ALPN: &[u8] = b"iroh-example/screening-connection/0";
27+
28+
#[tokio::main]
29+
async fn main() -> Result<()> {
30+
let router = start_accept_side().await?;
31+
// Wait for the endpoint to be reachable
32+
router.endpoint().online().await;
33+
let node_addr = router.endpoint().node_addr();
34+
35+
// call connect three times. connection index 1 will be an odd number, and rejected.
36+
connect_side(&node_addr).await?;
37+
if let Err(err) = connect_side(&node_addr).await {
38+
println!("Error connecting: {}", err);
39+
}
40+
connect_side(&node_addr).await?;
41+
42+
// This makes sure the endpoint in the router is closed properly and connections close gracefully
43+
router.shutdown().await.e()?;
44+
45+
Ok(())
46+
}
47+
48+
async fn connect_side(addr: &NodeAddr) -> Result<()> {
49+
let endpoint = Endpoint::builder().discovery_n0().bind().await?;
50+
51+
// Open a connection to the accepting node
52+
let conn = endpoint.connect(addr.clone(), ALPN).await?;
53+
54+
// Open a bidirectional QUIC stream
55+
let (mut send, mut recv) = conn.open_bi().await.e()?;
56+
57+
// Send some data to be echoed
58+
send.write_all(b"Hello, world!").await.e()?;
59+
60+
// Signal the end of data for this particular stream
61+
send.finish().e()?;
62+
63+
// Receive the echo, but limit reading up to maximum 1000 bytes
64+
let response = recv.read_to_end(1000).await.e()?;
65+
assert_eq!(&response, b"Hello, world!");
66+
67+
// Explicitly close the whole connection.
68+
conn.close(0u32.into(), b"bye!");
69+
70+
// The above call only queues a close message to be sent (see how it's not async!).
71+
// We need to actually call this to make sure this message is sent out.
72+
endpoint.close().await;
73+
// If we don't call this, but continue using the endpoint, we then the queued
74+
// close call will eventually be picked up and sent.
75+
// But always try to wait for endpoint.close().await to go through before dropping
76+
// the endpoint to ensure any queued messages are sent through and connections are
77+
// closed gracefully.
78+
Ok(())
79+
}
80+
81+
async fn start_accept_side() -> Result<Router> {
82+
let endpoint = Endpoint::builder().discovery_n0().bind().await?;
83+
84+
let echo = ScreenedEcho {
85+
conn_attempt_count: Arc::new(AtomicU64::new(0)),
86+
};
87+
88+
// Build our protocol handler and add our protocol, identified by its ALPN, and spawn the node.
89+
let router = Router::builder(endpoint).accept(ALPN, echo).spawn();
90+
91+
Ok(router)
92+
}
93+
94+
/// This is the same as the echo example, but keeps an internal count of the
95+
/// number of connections that have been attempted. This is to demonstrate how
96+
/// to plumb state into the protocol handler
97+
#[derive(Debug, Clone)]
98+
struct ScreenedEcho {
99+
conn_attempt_count: Arc<AtomicU64>,
100+
}
101+
102+
impl ProtocolHandler for ScreenedEcho {
103+
/// `on_connecting` allows us to intercept a connection as it's being formed,
104+
/// which is the right place to cut off a connection as early as possible.
105+
/// This is an optional method on the ProtocolHandler trait.
106+
async fn on_connecting(&self, connecting: Connecting) -> Result<Connection, AcceptError> {
107+
self.conn_attempt_count.fetch_add(1, Ordering::Relaxed);
108+
let count = self.conn_attempt_count.load(Ordering::Relaxed);
109+
110+
// reject every other connection
111+
if count % 2 == 0 {
112+
println!("rejecting connection");
113+
return Err(AcceptError::NotAllowed {});
114+
}
115+
116+
// To allow normal connection construction, await the connecting future & return
117+
let conn = connecting.await?;
118+
Ok(conn)
119+
}
120+
121+
/// The `accept` method is called for each incoming connection for our ALPN.
122+
/// This is the primary place to kick off work in response to a new connection.
123+
///
124+
/// The returned future runs on a newly spawned tokio task, so it can run as long as
125+
/// the connection lasts.
126+
async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
127+
// We can get the remote's node id from the connection.
128+
let node_id = connection.remote_node_id()?;
129+
println!("accepted connection from {node_id}");
130+
131+
// Our protocol is a simple request-response protocol, so we expect the
132+
// connecting peer to open a single bi-directional stream.
133+
let (mut send, mut recv) = connection.accept_bi().await?;
134+
135+
// Echo any bytes received back directly.
136+
// This will keep copying until the sender signals the end of data on the stream.
137+
let bytes_sent = tokio::io::copy(&mut recv, &mut send).await?;
138+
println!("Copied over {bytes_sent} byte(s)");
139+
140+
// By calling `finish` on the send stream we signal that we will not send anything
141+
// further, which makes the receive stream on the other end terminate.
142+
send.finish()?;
143+
144+
// Wait until the remote closes the connection, which it does once it
145+
// received the response.
146+
connection.closed().await;
147+
148+
Ok(())
149+
}
150+
}

0 commit comments

Comments
 (0)