risingwave_common_metrics/monitor/
connection.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::type_name;
16use std::cmp::Ordering;
17use std::future::Future;
18use std::io::{Error, IoSlice};
19use std::net::SocketAddr;
20use std::pin::Pin;
21use std::sync::LazyLock;
22use std::task::{Context, Poll};
23use std::time::Duration;
24
25use cfg_or_panic::cfg_or_panic;
26use futures::FutureExt;
27use http::Uri;
28use hyper_util::client::legacy::connect::dns::{GaiAddrs, GaiFuture, GaiResolver, Name};
29use hyper_util::client::legacy::connect::{Connected, Connection, HttpConnector};
30use hyper_util::rt::TokioIo;
31use itertools::Itertools;
32use pin_project_lite::pin_project;
33use prometheus::{
34    IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
35    register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
36};
37use thiserror_ext::AsReport;
38use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
39use tonic::transport::{Channel, Endpoint};
40use tower_service::Service;
41use tracing::{debug, info, warn};
42
43use crate::monitor::GLOBAL_METRICS_REGISTRY;
44use crate::{LabelGuardedIntCounterVec, register_guarded_int_counter_vec_with_registry};
45
46#[auto_impl::auto_impl(&mut)]
47pub trait MonitorAsyncReadWrite {
48    fn on_read(&mut self, _size: usize) {}
49    fn on_eof(&mut self) {}
50    fn on_read_err(&mut self, _err: &std::io::Error) {}
51
52    fn on_write(&mut self, _size: usize) {}
53    fn on_flush(&mut self) {}
54    fn on_shutdown(&mut self) {}
55    fn on_write_err(&mut self, _err: &std::io::Error) {}
56}
57
58pin_project! {
59    #[derive(Clone)]
60    pub struct MonitoredConnection<C, M> {
61        #[pin]
62        inner: C,
63        monitor: M,
64    }
65}
66
67impl<C, M> MonitoredConnection<C, M> {
68    pub fn new(connector: C, monitor: M) -> Self {
69        Self {
70            inner: connector,
71            monitor,
72        }
73    }
74
75    fn project_into(this: Pin<&mut Self>) -> (Pin<&mut C>, &mut M) {
76        let this = this.project();
77        (this.inner, this.monitor)
78    }
79
80    /// Delegate async read/write traits between tokio and hyper.
81    fn hyper_tokio_delegate(
82        self: Pin<&mut Self>,
83    ) -> TokioIo<MonitoredConnection<TokioIo<Pin<&mut C>>, &mut M>> {
84        let (inner, monitor) = MonitoredConnection::project_into(self);
85        TokioIo::new(MonitoredConnection::new(TokioIo::new(inner), monitor))
86    }
87}
88
89impl<C: AsyncRead, M: MonitorAsyncReadWrite> AsyncRead for MonitoredConnection<C, M> {
90    fn poll_read(
91        self: Pin<&mut Self>,
92        cx: &mut Context<'_>,
93        buf: &mut ReadBuf<'_>,
94    ) -> Poll<std::io::Result<()>> {
95        let before_buf_size = buf.filled().len();
96        let (inner, monitor) = MonitoredConnection::project_into(self);
97        let ret = inner.poll_read(cx, buf);
98        match &ret {
99            Poll::Ready(Ok(())) => {
100                let after_buf_size = buf.filled().len();
101                match after_buf_size.cmp(&before_buf_size) {
102                    Ordering::Less => {
103                        unreachable!(
104                            "buf size decrease after poll read. Bad AsyncRead implementation on {}",
105                            type_name::<C>()
106                        );
107                    }
108                    Ordering::Equal => {
109                        monitor.on_eof();
110                    }
111                    Ordering::Greater => {
112                        monitor.on_read(after_buf_size - before_buf_size);
113                    }
114                }
115            }
116            Poll::Ready(Err(e)) => {
117                monitor.on_read_err(e);
118            }
119            Poll::Pending => {}
120        }
121        ret
122    }
123}
124
125impl<C: hyper::rt::Read, M: MonitorAsyncReadWrite> hyper::rt::Read for MonitoredConnection<C, M> {
126    fn poll_read(
127        self: Pin<&mut Self>,
128        cx: &mut Context<'_>,
129        buf: hyper::rt::ReadBufCursor<'_>,
130    ) -> Poll<Result<(), std::io::Error>> {
131        hyper::rt::Read::poll_read(std::pin::pin!(self.hyper_tokio_delegate()), cx, buf)
132    }
133}
134
135impl<C: AsyncWrite, M: MonitorAsyncReadWrite> AsyncWrite for MonitoredConnection<C, M> {
136    fn poll_write(
137        self: Pin<&mut Self>,
138        cx: &mut Context<'_>,
139        buf: &[u8],
140    ) -> Poll<Result<usize, Error>> {
141        let (inner, monitor) = MonitoredConnection::project_into(self);
142        let ret = inner.poll_write(cx, buf);
143        match &ret {
144            Poll::Ready(Ok(size)) => {
145                monitor.on_write(*size);
146            }
147            Poll::Ready(Err(e)) => {
148                monitor.on_write_err(e);
149            }
150            Poll::Pending => {}
151        }
152        ret
153    }
154
155    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
156        let (inner, monitor) = MonitoredConnection::project_into(self);
157        let ret = inner.poll_flush(cx);
158        match &ret {
159            Poll::Ready(Ok(())) => {
160                monitor.on_flush();
161            }
162            Poll::Ready(Err(e)) => {
163                monitor.on_write_err(e);
164            }
165            Poll::Pending => {}
166        }
167        ret
168    }
169
170    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
171        let (inner, monitor) = MonitoredConnection::project_into(self);
172        let ret = inner.poll_shutdown(cx);
173        match &ret {
174            Poll::Ready(result) => {
175                monitor.on_shutdown();
176                if let Err(e) = result {
177                    monitor.on_write_err(e);
178                }
179            }
180            Poll::Pending => {}
181        }
182        ret
183    }
184
185    fn poll_write_vectored(
186        self: Pin<&mut Self>,
187        cx: &mut Context<'_>,
188        bufs: &[IoSlice<'_>],
189    ) -> Poll<Result<usize, Error>> {
190        let (inner, monitor) = MonitoredConnection::project_into(self);
191        let ret = inner.poll_write_vectored(cx, bufs);
192        match &ret {
193            Poll::Ready(Ok(size)) => {
194                monitor.on_write(*size);
195            }
196            Poll::Ready(Err(e)) => {
197                monitor.on_write_err(e);
198            }
199            Poll::Pending => {}
200        }
201        ret
202    }
203
204    fn is_write_vectored(&self) -> bool {
205        self.inner.is_write_vectored()
206    }
207}
208
209impl<C: hyper::rt::Write, M: MonitorAsyncReadWrite> hyper::rt::Write for MonitoredConnection<C, M> {
210    fn poll_write(
211        self: Pin<&mut Self>,
212        cx: &mut Context<'_>,
213        buf: &[u8],
214    ) -> Poll<Result<usize, std::io::Error>> {
215        hyper::rt::Write::poll_write(std::pin::pin!(self.hyper_tokio_delegate()), cx, buf)
216    }
217
218    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
219        hyper::rt::Write::poll_flush(std::pin::pin!(self.hyper_tokio_delegate()), cx)
220    }
221
222    fn poll_shutdown(
223        self: Pin<&mut Self>,
224        cx: &mut Context<'_>,
225    ) -> Poll<Result<(), std::io::Error>> {
226        hyper::rt::Write::poll_shutdown(std::pin::pin!(self.hyper_tokio_delegate()), cx)
227    }
228
229    fn is_write_vectored(&self) -> bool {
230        self.inner.is_write_vectored()
231    }
232
233    fn poll_write_vectored(
234        self: Pin<&mut Self>,
235        cx: &mut Context<'_>,
236        bufs: &[std::io::IoSlice<'_>],
237    ) -> Poll<Result<usize, std::io::Error>> {
238        hyper::rt::Write::poll_write_vectored(std::pin::pin!(self.hyper_tokio_delegate()), cx, bufs)
239    }
240}
241
242impl<C: Connection, M> Connection for MonitoredConnection<C, M> {
243    fn connected(&self) -> Connected {
244        self.inner.connected()
245    }
246}
247
248#[cfg(not(madsim))]
249impl<C: tonic::transport::server::Connected, M> tonic::transport::server::Connected
250    for MonitoredConnection<C, M>
251{
252    type ConnectInfo = C::ConnectInfo;
253
254    fn connect_info(&self) -> Self::ConnectInfo {
255        self.inner.connect_info()
256    }
257}
258
259pub trait MonitorNewConnection {
260    type ConnectionMonitor: MonitorAsyncReadWrite;
261
262    fn new_connection_monitor(&self, endpoint: String) -> Self::ConnectionMonitor;
263    fn on_err(&self, endpoint: String);
264}
265
266impl<C: Service<Uri>, M: MonitorNewConnection + Clone + 'static> Service<Uri>
267    for MonitoredConnection<C, M>
268where
269    C::Future: 'static,
270{
271    type Error = C::Error;
272    type Response = MonitoredConnection<C::Response, M::ConnectionMonitor>;
273
274    type Future = impl Future<Output = Result<Self::Response, Self::Error>> + 'static;
275
276    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
277        let ret = self.inner.poll_ready(cx);
278        if let Poll::Ready(Err(_)) = &ret {
279            self.monitor.on_err("<poll_ready>".to_owned());
280        }
281        ret
282    }
283
284    fn call(&mut self, uri: Uri) -> Self::Future {
285        let endpoint = format!("{:?}", uri.host());
286        let monitor = self.monitor.clone();
287        self.inner
288            .call(uri)
289            .map(move |result: Result<_, _>| match result {
290                Ok(resp) => Ok(MonitoredConnection::new(
291                    resp,
292                    monitor.new_connection_monitor(endpoint),
293                )),
294                Err(e) => {
295                    monitor.on_err(endpoint);
296                    Err(e)
297                }
298            })
299    }
300}
301
302#[cfg(not(madsim))]
303impl<Con, E, C: futures::Stream<Item = Result<Con, E>>, M: MonitorNewConnection> futures::Stream
304    for MonitoredConnection<C, M>
305where
306    Con:
307        tonic::transport::server::Connected<ConnectInfo = tonic::transport::server::TcpConnectInfo>,
308{
309    type Item = Result<MonitoredConnection<Con, M::ConnectionMonitor>, E>;
310
311    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
312        let (inner, monitor) = MonitoredConnection::project_into(self);
313        inner.poll_next(cx).map(|opt| {
314            opt.map(|result| {
315                result.map(|conn| {
316                    let remote_addr = conn.connect_info().remote_addr();
317                    let endpoint = remote_addr
318                        .map(|remote_addr| format!("{}", remote_addr.ip()))
319                        .unwrap_or("unknown".to_owned());
320                    MonitoredConnection::new(conn, monitor.new_connection_monitor(endpoint))
321                })
322            })
323        })
324    }
325
326    fn size_hint(&self) -> (usize, Option<usize>) {
327        self.inner.size_hint()
328    }
329}
330
331// Compatibility implementation for hyper 0.14 ecosystem.
332// Should be the same as those with imports from `http::Uri` and `hyper_util::client::legacy`.
333// TODO(http-bump): remove this after there is no more dependency on hyper 0.14.
334mod compat {
335    use http_02::Uri;
336    use hyper_014::client::connect::{Connected, Connection};
337
338    use super::*;
339
340    impl<C: Service<Uri>, M: MonitorNewConnection + Clone + 'static> Service<Uri>
341        for MonitoredConnection<C, M>
342    where
343        C::Future: 'static,
344    {
345        type Error = C::Error;
346        type Response = MonitoredConnection<C::Response, M::ConnectionMonitor>;
347
348        type Future = impl Future<Output = Result<Self::Response, Self::Error>> + 'static;
349
350        fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
351            let ret = self.inner.poll_ready(cx);
352            if let Poll::Ready(Err(_)) = &ret {
353                self.monitor.on_err("<poll_ready>".to_owned());
354            }
355            ret
356        }
357
358        fn call(&mut self, uri: Uri) -> Self::Future {
359            let endpoint = format!("{:?}", uri.host());
360            let monitor = self.monitor.clone();
361            self.inner
362                .call(uri)
363                .map(move |result: Result<_, _>| match result {
364                    Ok(resp) => Ok(MonitoredConnection::new(
365                        resp,
366                        monitor.new_connection_monitor(endpoint),
367                    )),
368                    Err(e) => {
369                        monitor.on_err(endpoint);
370                        Err(e)
371                    }
372                })
373        }
374    }
375
376    impl<C: Connection, M> Connection for MonitoredConnection<C, M> {
377        fn connected(&self) -> Connected {
378            self.inner.connected()
379        }
380    }
381}
382
383#[derive(Clone)]
384pub struct ConnectionMetrics {
385    connection_count: IntGaugeVec,
386    connection_create_rate: IntCounterVec,
387    connection_err_rate: IntCounterVec,
388
389    read_rate: IntCounterVec,
390    reader_count: IntGaugeVec,
391
392    write_rate: IntCounterVec,
393    writer_count: IntGaugeVec,
394
395    io_err_rate: LabelGuardedIntCounterVec<4>,
396}
397
398pub static GLOBAL_CONNECTION_METRICS: LazyLock<ConnectionMetrics> =
399    LazyLock::new(|| ConnectionMetrics::new(&GLOBAL_METRICS_REGISTRY));
400
401impl ConnectionMetrics {
402    pub fn new(registry: &Registry) -> Self {
403        let labels = ["connection_type", "uri"];
404        let connection_count = register_int_gauge_vec_with_registry!(
405            "connection_count",
406            "The number of current existing connection",
407            &labels,
408            registry,
409        )
410        .unwrap();
411
412        let connection_create_rate = register_int_counter_vec_with_registry!(
413            "connection_create_rate",
414            "Rate on creating new connection",
415            &labels,
416            registry,
417        )
418        .unwrap();
419
420        let connection_err_rate = register_int_counter_vec_with_registry!(
421            "connection_err_rate",
422            "Error rate on creating new connection",
423            &labels,
424            registry,
425        )
426        .unwrap();
427
428        let read_rate = register_int_counter_vec_with_registry!(
429            "connection_read_rate",
430            "Read rate of a connection",
431            &labels,
432            registry,
433        )
434        .unwrap();
435
436        let reader_count = register_int_gauge_vec_with_registry!(
437            "connection_reader_count",
438            "The number of current existing reader",
439            &labels,
440            registry,
441        )
442        .unwrap();
443
444        let write_rate = register_int_counter_vec_with_registry!(
445            "connection_write_rate",
446            "Write rate of a connection",
447            &labels,
448            registry,
449        )
450        .unwrap();
451
452        let writer_count = register_int_gauge_vec_with_registry!(
453            "connection_writer_count",
454            "The number of current existing writer",
455            &labels,
456            registry,
457        )
458        .unwrap();
459
460        let io_err_rate = register_guarded_int_counter_vec_with_registry!(
461            "connection_io_err_rate",
462            "IO err rate of a connection",
463            &["connection_type", "uri", "op_type", "error_kind"],
464            registry,
465        )
466        .unwrap();
467
468        Self {
469            connection_count,
470            connection_create_rate,
471            connection_err_rate,
472            read_rate,
473            reader_count,
474            write_rate,
475            writer_count,
476            io_err_rate,
477        }
478    }
479}
480
481pub struct TcpConfig {
482    pub tcp_nodelay: bool,
483    pub keepalive_duration: Option<Duration>,
484}
485
486#[allow(clippy::derivable_impls)]
487impl Default for TcpConfig {
488    fn default() -> Self {
489        Self {
490            tcp_nodelay: false,
491            keepalive_duration: None,
492        }
493    }
494}
495
496pub fn monitor_connector<C>(
497    connector: C,
498    connection_type: impl Into<String>,
499) -> MonitoredConnection<C, MonitorNewConnectionImpl> {
500    let connection_type = connection_type.into();
501    info!(
502        "monitoring connector {} with type {}",
503        type_name::<C>(),
504        connection_type
505    );
506    MonitoredConnection::new(connector, MonitorNewConnectionImpl { connection_type })
507}
508
509pub struct MonitoredGaiAddrs {
510    inner: Vec<SocketAddr>,
511    pos: usize,
512}
513
514impl From<GaiAddrs> for MonitoredGaiAddrs {
515    fn from(value: GaiAddrs) -> Self {
516        Self {
517            inner: value.collect_vec(),
518            pos: 0,
519        }
520    }
521}
522
523impl Iterator for MonitoredGaiAddrs {
524    type Item = SocketAddr;
525
526    fn next(&mut self) -> Option<Self::Item> {
527        let res = self.inner.get(self.pos).cloned();
528        self.pos += 1;
529        res
530    }
531}
532
533pub struct MonitoredGaiFuture {
534    name: Name,
535    inner: GaiFuture,
536}
537
538impl std::fmt::Debug for MonitoredGaiFuture {
539    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
540        f.pad("MonitoredGaiFuture")
541    }
542}
543
544impl Future for MonitoredGaiFuture {
545    type Output = Result<MonitoredGaiAddrs, Error>;
546
547    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
548        Pin::new(&mut self.inner).poll(cx).map(|res| match res {
549            Ok(addrs) => {
550                let addrs: MonitoredGaiAddrs = addrs.into();
551                debug!("resolve {} => {:?}", self.name, addrs.inner);
552                Ok(addrs)
553            }
554            Err(err) => Err(err),
555        })
556    }
557}
558
559#[derive(Clone)]
560pub struct MonitoredGaiResolver {
561    inner: GaiResolver,
562}
563
564impl std::fmt::Debug for MonitoredGaiResolver {
565    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
566        f.pad("MonitoredGaiResolver")
567    }
568}
569
570impl Default for MonitoredGaiResolver {
571    fn default() -> Self {
572        Self {
573            inner: GaiResolver::new(),
574        }
575    }
576}
577
578impl Service<Name> for MonitoredGaiResolver {
579    type Error = Error;
580    type Future = MonitoredGaiFuture;
581    type Response = MonitoredGaiAddrs;
582
583    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
584        self.inner.poll_ready(cx)
585    }
586
587    fn call(&mut self, req: Name) -> Self::Future {
588        MonitoredGaiFuture {
589            name: req.clone(),
590            inner: self.inner.call(req),
591        }
592    }
593}
594
595#[cfg_or_panic(not(madsim))]
596fn monitored_http_connector(
597    connection_type: impl Into<String>,
598    config: TcpConfig,
599) -> MonitoredConnection<HttpConnector<MonitoredGaiResolver>, MonitorNewConnectionImpl> {
600    let resolver = MonitoredGaiResolver::default();
601    let mut http = HttpConnector::new_with_resolver(resolver);
602
603    http.enforce_http(false);
604    http.set_nodelay(config.tcp_nodelay);
605    http.set_keepalive(config.keepalive_duration);
606
607    monitor_connector(http, connection_type)
608}
609
610/// Attach general configurations to the endpoint.
611#[cfg_or_panic(not(madsim))]
612fn configure_endpoint(endpoint: Endpoint) -> Endpoint {
613    // This is to mitigate https://github.com/risingwavelabs/risingwave/issues/18039.
614    // TODO: remove this after https://github.com/hyperium/hyper/issues/3724 gets resolved.
615    endpoint.http2_max_header_list_size(16 * 1024 * 1024)
616}
617
618#[easy_ext::ext(EndpointExt)]
619impl Endpoint {
620    pub async fn monitored_connect(
621        mut self,
622        connection_type: impl Into<String>,
623        config: TcpConfig,
624    ) -> Result<Channel, tonic::transport::Error> {
625        #[cfg(not(madsim))]
626        {
627            self = configure_endpoint(self);
628            let connector = monitored_http_connector(connection_type, config);
629            self.connect_with_connector(connector).await
630        }
631        #[cfg(madsim)]
632        {
633            self.connect().await
634        }
635    }
636
637    #[cfg(not(madsim))]
638    pub fn monitored_connect_lazy(
639        mut self,
640        connection_type: impl Into<String>,
641        config: TcpConfig,
642    ) -> Channel {
643        self = configure_endpoint(self);
644        let connector = monitored_http_connector(connection_type, config);
645        self.connect_with_connector_lazy(connector)
646    }
647}
648
649#[cfg(not(madsim))]
650#[easy_ext::ext(RouterExt)]
651impl<L> tonic::transport::server::Router<L> {
652    /// Serve the given service while monitoring the connection.
653    ///
654    /// Calling the function will first bind the given service to the given address. Awaiting the
655    /// returned future will then start the server and keep it running until the given signal
656    /// future resolves.
657    pub fn monitored_serve_with_shutdown<ResBody>(
658        self,
659        listen_addr: std::net::SocketAddr,
660        connection_type: impl Into<String>,
661        config: TcpConfig,
662        signal: impl Future<Output = ()>,
663    ) -> impl Future<Output = ()>
664    where
665        L: tower_layer::Layer<tonic::service::Routes>,
666        L::Service: Service<http::Request<tonic::body::BoxBody>, Response = http::Response<ResBody>>
667            + Clone
668            + Send
669            + 'static,
670        <<L as tower_layer::Layer<tonic::service::Routes>>::Service as Service<
671            http::Request<tonic::body::BoxBody>,
672        >>::Future: Send + 'static,
673        <<L as tower_layer::Layer<tonic::service::Routes>>::Service as Service<
674            http::Request<tonic::body::BoxBody>,
675        >>::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send,
676        ResBody: http_body::Body<Data = bytes::Bytes> + Send + 'static,
677        ResBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
678    {
679        let connection_type = connection_type.into();
680        let incoming = tonic::transport::server::TcpIncoming::new(
681            listen_addr,
682            config.tcp_nodelay,
683            config.keepalive_duration,
684        )
685        .unwrap_or_else(|err| {
686            panic!(
687                "failed to bind `{connection_type}` to `{listen_addr}`: {}",
688                err.as_report()
689            )
690        });
691        let incoming =
692            MonitoredConnection::new(incoming, MonitorNewConnectionImpl { connection_type });
693
694        async move {
695            self.serve_with_incoming_shutdown(incoming, signal)
696                .await
697                .unwrap()
698        }
699    }
700}
701
702#[cfg(madsim)]
703#[easy_ext::ext(RouterExt)]
704impl<L> tonic::transport::server::Router<L> {
705    pub async fn monitored_serve_with_shutdown(
706        self,
707        listen_addr: std::net::SocketAddr,
708        connection_type: impl Into<String>,
709        config: TcpConfig,
710        signal: impl Future<Output = ()>,
711    ) {
712        self.serve_with_shutdown(listen_addr, signal).await.unwrap()
713    }
714}
715
716#[derive(Clone)]
717pub struct MonitorNewConnectionImpl {
718    connection_type: String,
719}
720
721impl MonitorNewConnection for MonitorNewConnectionImpl {
722    type ConnectionMonitor = MonitorAsyncReadWriteImpl;
723
724    fn new_connection_monitor(&self, endpoint: String) -> Self::ConnectionMonitor {
725        let labels = [self.connection_type.as_str(), endpoint.as_str()];
726        let read_rate = GLOBAL_CONNECTION_METRICS
727            .read_rate
728            .with_label_values(&labels);
729        let reader_count = GLOBAL_CONNECTION_METRICS
730            .reader_count
731            .with_label_values(&labels);
732        let write_rate = GLOBAL_CONNECTION_METRICS
733            .write_rate
734            .with_label_values(&labels);
735        let writer_count = GLOBAL_CONNECTION_METRICS
736            .writer_count
737            .with_label_values(&labels);
738        let connection_count = GLOBAL_CONNECTION_METRICS
739            .connection_count
740            .with_label_values(&labels);
741
742        GLOBAL_CONNECTION_METRICS
743            .connection_create_rate
744            .with_label_values(&labels)
745            .inc();
746
747        MonitorAsyncReadWriteImpl::new(
748            endpoint,
749            self.connection_type.clone(),
750            read_rate,
751            reader_count,
752            write_rate,
753            writer_count,
754            connection_count,
755        )
756    }
757
758    fn on_err(&self, endpoint: String) {
759        GLOBAL_CONNECTION_METRICS
760            .connection_err_rate
761            .with_label_values(&[self.connection_type.as_str(), endpoint.as_str()])
762            .inc();
763    }
764}
765
766const READ_WRITE_RATE_REPORT_INTERVAL: u64 = 1024;
767
768pub struct MonitorAsyncReadWriteImpl {
769    endpoint: String,
770    connection_type: String,
771
772    unreported_read_rate: u64,
773    read_rate: IntCounter,
774    reader_count_guard: IntGauge,
775    is_eof: bool,
776
777    unreported_write_rate: u64,
778    write_rate: IntCounter,
779    writer_count_guard: IntGauge,
780    is_shutdown: bool,
781
782    connection_count_guard: IntGauge,
783}
784
785impl MonitorAsyncReadWriteImpl {
786    pub fn new(
787        endpoint: String,
788        connection_type: String,
789        read_rate: IntCounter,
790        reader_count: IntGauge,
791        write_rate: IntCounter,
792        writer_count: IntGauge,
793        connection_count: IntGauge,
794    ) -> Self {
795        reader_count.inc();
796        writer_count.inc();
797        connection_count.inc();
798        Self {
799            endpoint,
800            connection_type,
801            unreported_read_rate: 0,
802            read_rate,
803            reader_count_guard: reader_count,
804            is_eof: false,
805            unreported_write_rate: 0,
806            write_rate,
807            writer_count_guard: writer_count,
808            is_shutdown: false,
809            connection_count_guard: connection_count,
810        }
811    }
812}
813
814impl Drop for MonitorAsyncReadWriteImpl {
815    fn drop(&mut self) {
816        if self.unreported_read_rate > 0 {
817            self.read_rate.inc_by(self.unreported_read_rate);
818        }
819        if self.unreported_write_rate > 0 {
820            self.write_rate.inc_by(self.unreported_write_rate);
821        }
822        if !self.is_eof {
823            self.reader_count_guard.dec();
824        }
825        if !self.is_shutdown {
826            self.writer_count_guard.dec();
827        }
828        self.connection_count_guard.dec();
829    }
830}
831
832impl MonitorAsyncReadWrite for MonitorAsyncReadWriteImpl {
833    fn on_read(&mut self, size: usize) {
834        self.unreported_read_rate += size as u64;
835        if self.unreported_read_rate >= READ_WRITE_RATE_REPORT_INTERVAL {
836            self.read_rate.inc_by(self.unreported_read_rate);
837            self.unreported_read_rate = 0;
838        }
839    }
840
841    fn on_eof(&mut self) {
842        if self.is_eof {
843            warn!("get eof for multiple time");
844            return;
845        }
846        self.is_eof = true;
847        self.reader_count_guard.dec();
848    }
849
850    fn on_read_err(&mut self, err: &Error) {
851        // No need to store the value returned from `with_guarded_label_values`
852        // because it is reporting a single error.
853        GLOBAL_CONNECTION_METRICS
854            .io_err_rate
855            .with_guarded_label_values(&[
856                self.connection_type.as_str(),
857                self.endpoint.as_str(),
858                "read",
859                err.kind().to_string().as_str(),
860            ])
861            .inc();
862    }
863
864    fn on_write(&mut self, size: usize) {
865        self.unreported_write_rate += size as u64;
866        if self.unreported_write_rate >= READ_WRITE_RATE_REPORT_INTERVAL {
867            self.write_rate.inc_by(self.unreported_write_rate);
868            self.unreported_write_rate = 0;
869        }
870    }
871
872    fn on_shutdown(&mut self) {
873        if self.is_shutdown {
874            warn!("get shutdown for multiple time");
875            return;
876        }
877        self.is_shutdown = true;
878        self.writer_count_guard.dec();
879    }
880
881    fn on_write_err(&mut self, err: &Error) {
882        // No need to store the value returned from `with_guarded_label_values`
883        // because it is reporting a single error.
884        GLOBAL_CONNECTION_METRICS
885            .io_err_rate
886            .with_guarded_label_values(&[
887                self.connection_type.as_str(),
888                self.endpoint.as_str(),
889                "write",
890                err.kind().to_string().as_str(),
891            ])
892            .inc();
893    }
894}