Skip to content

async: Close listener fd after server shutdown#200

Merged
lifupan merged 1 commit intocontainerd:masterfrom
abel-von:close-async-listener
Jul 6, 2023
Merged

async: Close listener fd after server shutdown#200
lifupan merged 1 commit intocontainerd:masterfrom
abel-von:close-async-listener

Conversation

@abel-von
Copy link
Copy Markdown
Contributor

@abel-von abel-von commented Jul 3, 2023

the listener fd is not closed after the server shutdown, this will make a residual socket fd in the process if it is running different servers again and again.

here I changed the example codes to reproduce it:

// Copyright (c) 2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//

#[macro_use]
extern crate log;

use std::sync::Arc;

#[cfg(unix)]
use async_trait::async_trait;
use log::LevelFilter;
#[cfg(unix)]
use tokio::signal::unix::{signal, SignalKind};
use tokio::time::sleep;

#[cfg(unix)]
use protocols::r#async::{agent, agent_ttrpc, health, health_ttrpc, types};
#[cfg(unix)]
use ttrpc::asynchronous::Server;
use ttrpc::error::{Error, Result};
use ttrpc::proto::{Code, Status};

mod protocols;
mod utils;

struct HealthService;

#[cfg(unix)]
#[async_trait]
impl health_ttrpc::Health for HealthService {
    async fn check(
        &self,
        _ctx: &::ttrpc::r#async::TtrpcContext,
        _req: health::CheckRequest,
    ) -> Result<health::HealthCheckResponse> {
        let mut status = Status::new();

        status.set_code(Code::NOT_FOUND);
        status.set_message("Just for fun".to_string());

        sleep(std::time::Duration::from_secs(10)).await;

        Err(Error::RpcStatus(status))
    }

    async fn version(
        &self,
        ctx: &::ttrpc::r#async::TtrpcContext,
        req: health::CheckRequest,
    ) -> Result<health::VersionCheckResponse> {
        info!("version {:?}", req);
        info!("ctx {:?}", ctx);
        let mut rep = health::VersionCheckResponse::new();
        rep.agent_version = "mock.0.1".to_string();
        rep.grpc_version = "0.0.1".to_string();
        let mut status = Status::new();
        status.set_code(Code::NOT_FOUND);
        Ok(rep)
    }
}

struct AgentService;

#[cfg(unix)]
#[async_trait]
impl agent_ttrpc::AgentService for AgentService {
    async fn list_interfaces(
        &self,
        _ctx: &::ttrpc::r#async::TtrpcContext,
        _req: agent::ListInterfacesRequest,
    ) -> ::ttrpc::Result<agent::Interfaces> {
        let mut rp = Vec::new();

        let mut i = types::Interface::new();
        i.name = "first".to_string();
        rp.push(i);
        let mut i = types::Interface::new();
        i.name = "second".to_string();
        rp.push(i);

        let mut i = agent::Interfaces::new();
        i.Interfaces = rp;

        Ok(i)
    }
}

#[cfg(windows)]
fn main() {
    println!("This example only works on Unix-like OSes");
}

#[cfg(unix)]
#[tokio::main(flavor = "current_thread")]
async fn main() {
    simple_logging::log_to_stderr(LevelFilter::Trace);

    let h = Box::new(HealthService {}) as Box<dyn health_ttrpc::Health + Send + Sync>;
    let h = Arc::new(h);
    let hservice = health_ttrpc::create_health(h);

    let a = Box::new(AgentService {}) as Box<dyn agent_ttrpc::AgentService + Send + Sync>;
    let a = Arc::new(a);
    let aservice = agent_ttrpc::create_agent_service(a);

    utils::remove_if_sock_exist(utils::SOCK_ADDR).unwrap();

    let mut server = Server::new()
        .bind(utils::SOCK_ADDR)
        .unwrap()
        .register_service(hservice)
        .register_service(aservice);

    let mut hangup = signal(SignalKind::hangup()).unwrap();
    let mut interrupt = signal(SignalKind::interrupt()).unwrap();
    server.start().await.unwrap();
    let mut times = 0;
    loop {
        tokio::select! {
            _ = hangup.recv() => {
                // test stop_listen -> start
                println!("shutdown");
                server.shutdown().await;

                let h = Box::new(HealthService {}) as Box<dyn health_ttrpc::Health + Send + Sync>;
                let h = Arc::new(h);
                let hservice = health_ttrpc::create_health(h);

                let a = Box::new(AgentService {}) as Box<dyn agent_ttrpc::AgentService + Send + Sync>;
                let a = Arc::new(a);
                let aservice = agent_ttrpc::create_agent_service(a);
                let address = format!("{}-{}", utils::SOCK_ADDR, times);
                times += 1;
                utils::remove_if_sock_exist(&address).unwrap();
                server = Server::new()
                    .bind(&address)
                    .unwrap()
                    .register_service(hservice)
                    .register_service(aservice);
                println!("start listen");
                server.start().await.unwrap();

                // // hold some time for the new test connection.
                // sleep(std::time::Duration::from_secs(100)).await;
            }
            _ = interrupt.recv() => {
                // test graceful shutdown
                println!("graceful shutdown");
                server.shutdown().await.unwrap();
            }
    }
        ;
    }
}

if I kill -SIGHUP of this process, we can see that the fd count is increasing:
image

this PR fix the issue by close the listener fd. this is ok for those calling server.bind() with the socket address, but if the listener is opened from outside, and the lifecycle is managed outside, then it may cause the twice close error. but we can see that the listener is closed in the sync codes. so I think we can assume that the server has taken the ownership of listener fd after it is added into the server.

The listener fd is not closed when server.shutdown(), this will make a
residual fd if a process start and close different servers for many
times.

Signed-off-by: Abel Feng <fshb1988@gmail.com>
@abel-von abel-von force-pushed the close-async-listener branch from a9a1cca to cc6b187 Compare July 3, 2023 12:19
@Tim-Zhang
Copy link
Copy Markdown
Member

@abel-von Good catch, this bug is caused by

let dup_fd = unistd::dup(incoming.as_raw_fd()).unwrap();
and I forgot to close the fd.
Would you mind back porting this to 0.5.0, 0.6.0 and 0.7.0? if you are too busy to do it, never mind, I will do it. Thanks a million.

@abel-von
Copy link
Copy Markdown
Contributor Author

abel-von commented Jul 4, 2023

@Tim-Zhang The CI seems to have some wired error, could you help solving it please? I can cherry-pick this to other branches, shall I submit the PR too?

@codecov
Copy link
Copy Markdown

codecov bot commented Jul 5, 2023

Codecov Report

Patch coverage has no change and project coverage change: -0.05 ⚠️

Comparison is base (e94bb9f) 24.47% compared to head (cc6b187) 24.42%.

Additional details and impacted files
@@            Coverage Diff             @@
##           master     #200      +/-   ##
==========================================
- Coverage   24.47%   24.42%   -0.05%     
==========================================
  Files          17       17              
  Lines        2521     2526       +5     
==========================================
  Hits          617      617              
- Misses       1904     1909       +5     
Impacted Files Coverage Δ
src/asynchronous/server.rs 0.00% <0.00%> (ø)

☔ View full report in Codecov by Sentry.
📢 Do you have feedback about the report comment? Let us know in this issue.

@Tim-Zhang
Copy link
Copy Markdown
Member

@Tim-Zhang The CI seems to have some wired error, could you help solving it please? I can cherry-pick this to other branches, shall I submit the PR too?

It works now.

@lifupan lifupan merged commit 2691b72 into containerd:master Jul 6, 2023
Tim-Zhang added a commit to Tim-Zhang/ttrpc-rust that referenced this pull request Aug 22, 2023
Cut the release for containerd#196, containerd#197, containerd#200, containerd#203, containerd#208

Signed-off-by: Tim Zhang <tim@hyper.sh>
@Tim-Zhang Tim-Zhang mentioned this pull request Aug 22, 2023
KarstenB pushed a commit to KarstenB/ttrpc-rust that referenced this pull request May 1, 2025
async: Close listener fd after server shutdown
KarstenB pushed a commit to KarstenB/ttrpc-rust that referenced this pull request May 1, 2025
Cut the release for containerd#196, containerd#197, containerd#200, containerd#203, containerd#208

Signed-off-by: Tim Zhang <tim@hyper.sh>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants