1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
use std::convert::TryInto;
use tonic::body::BoxBody;
use tower::Service;
use futures::Future;
use http::request::Request;
use http::response::Response;
use hyper::body::Body;
use crate::StdError;
use crate::endpoint::*;
use crate::transport::Guard;
use crate::tonic;

pub async fn serve_with_drop<A, B>(
    router: tonic::transport::server::Router<A, B>,
    endpoint: Endpoint,
    should_close_on_drop: bool,
) -> Result<Option<Guard>, StdError>
where
    A: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 'static,
    A::Future: Send + 'static,
    A::Error: Into<StdError> + Send,
    B: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 'static,
    B::Future: Send + 'static,
    B::Error: Into<StdError> + Send,
{
    Ok(match endpoint {
        Endpoint::Http(http) => {
            let addr: std::net::SocketAddr = http.try_into()?;
            info!("server listening on TCP {}", &addr);
            router.serve(addr).await.map(|_| None)?
        }

        Endpoint::Ipc(ipc) => {
            use crate::transport::*;

            Ipc::create_dir_all(&ipc.0).await?;

            // TODO: are we should close on drop really?
            let mut uds = Listener::bind(&ipc.0)?.guarded(should_close_on_drop);
            let guard = uds.guard();

            info!("server listening on IPC {}", ipc.0.display());
            router
                .serve_with_incoming(uds.incoming())
                .await
                .map(move |_| guard)?
        }
    })
}

pub async fn serve_with_shutdown<A, B, F>(
    router: tonic::transport::server::Router<A, B>,
    endpoint: Endpoint,
    signal: F,
) -> Result<Option<Guard>, StdError>
where
    A: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 'static,
    A::Future: Send + 'static,
    A::Error: Into<StdError> + Send,
    B: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 'static,
    B::Future: Send + 'static,
    B::Error: Into<StdError> + Send,
    F: Future<Output = ()>,
{
    Ok(match endpoint {
        Endpoint::Http(http) => {
            let addr: std::net::SocketAddr = http.try_into()?;
            info!("server listening on TCP {}", &addr);
            router
                .serve_with_shutdown(addr, signal)
                .await
                .map(|_| None)?
        }

        Endpoint::Ipc(ipc) => {
            use crate::transport::*;

            Ipc::create_dir_all(&ipc.0).await?;

            // TODO: are we should close on drop really?
            let mut uds = Listener::bind(&ipc.0)?.guarded(true);
            let guard = uds.guard();

            info!("server listening on IPC {}", ipc.0.display());
            router
                .serve_with_incoming_shutdown(uds.incoming(), signal)
                .await
                .map(move |_| guard)?
        }
    })
}

pub use router_impl::*;

mod router_impl {
    use super::*;

    #[tonic::async_trait]
    pub trait ServeWith {
        async fn serve_ext(self, endpoint: Endpoint) -> Result<Option<Guard>, StdError>;
        async fn serve_ext_with_shutdown<F: Future<Output = ()> + Send>(
            self,
            endpoint: Endpoint,
            signal: F,
        ) -> Result<Option<Guard>, StdError>;
    }

    #[tonic::async_trait]
    impl<A, B> ServeWith for tonic::transport::server::Router<A, B>
    where
        A: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 'static,
        A::Future: Send + 'static,
        A::Error: Into<StdError> + Send,
        B: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 'static,
        B::Future: Send + 'static,
        B::Error: Into<StdError> + Send,
    {
        #[inline]
        async fn serve_ext(self, endpoint: Endpoint) -> Result<Option<Guard>, StdError> {
            serve_with_drop(self, endpoint, true).await
        }

        #[inline]
        async fn serve_ext_with_shutdown<F>(
            self,
            endpoint: Endpoint,
            signal: F,
        ) -> Result<Option<Guard>, StdError>
        where
            F: Future<Output = ()> + Send,
        {
            serve_with_shutdown(self, endpoint, signal).await
        }
    }
}