1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use std::task::{Context, Poll};
use futures_util::future;
use hyper::service::Service;
use crate::metrics::collector::MetricsCollector;
use std::time::Duration;
use std::net::SocketAddr;
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use crate::metrics::prometheus::encode_metrics;
use crate::heartbeat::HeartRateMonitor;
use crate::metrics::execution::get_system_metrics;
#[derive(Debug)]
pub struct InfoService {
metric_collector: MetricsCollector,
hrm: HeartRateMonitor,
}
impl InfoService {
fn load_metric(&mut self) -> Response<Body> {
let metrics = self.metric_collector.get_metrics();
let prometheus = encode_metrics(
Some(get_system_metrics()),
metrics,
&[
"ds_access",
"compile",
"multiple_compile",
"script_metadata",
"publish_module",
"execute_script",
],
);
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "text/plain")
.body(Body::from(prometheus))
.unwrap()
}
fn check_health(&mut self) -> Response<Body> {
let status = if self.hrm.is_alive() { 200 } else { 500 };
Response::builder()
.status(status)
.body(Body::empty())
.unwrap()
}
}
impl Service<Request<Body>> for InfoService {
type Response = Response<Body>;
type Error = hyper::Error;
type Future = future::Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Ok(()).into()
}
fn call(&mut self, req: Request<Body>) -> Self::Future {
match (req.method(), req.uri().path()) {
(&Method::GET, "/metrics") => future::ok(self.load_metric()),
(&Method::GET, "/health") => future::ok(self.check_health()),
_ => future::ok(
Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from(Vec::from(&b"Not found."[..])))
.unwrap(),
),
}
}
}
pub struct ServiceMaker {
metric_collector: MetricsCollector,
hrm: HeartRateMonitor,
}
impl<T> Service<T> for ServiceMaker {
type Response = InfoService;
type Error = std::io::Error;
type Future = future::Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Ok(()).into()
}
fn call(&mut self, _: T) -> Self::Future {
future::ok(InfoService {
metric_collector: self.metric_collector.clone(),
hrm: self.hrm.clone(),
})
}
}
pub async fn start_info_service(
addr: SocketAddr,
hrm: HeartRateMonitor,
metrics_update_rate: Duration,
) -> Result<(), Box<dyn std::error::Error>> {
let srv_maker = ServiceMaker {
metric_collector: MetricsCollector::new(metrics_update_rate),
hrm,
};
let server = Server::bind(&addr).serve(srv_maker);
info!("Listening on http://{}", addr);
server.await?;
Ok(())
}