risingwave_common_metrics/monitor/
connection.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::type_name;
16use std::cmp::Ordering;
17use std::future::Future;
18use std::io::{Error, IoSlice};
19use std::net::SocketAddr;
20use std::pin::Pin;
21use std::sync::LazyLock;
22use std::task::{Context, Poll};
23use std::time::Duration;
24
25use cfg_or_panic::cfg_or_panic;
26use futures::FutureExt;
27use http::Uri;
28use hyper_util::client::legacy::connect::dns::{GaiAddrs, GaiFuture, GaiResolver, Name};
29use hyper_util::client::legacy::connect::{Connected, Connection, HttpConnector};
30use hyper_util::rt::TokioIo;
31use itertools::Itertools;
32use pin_project_lite::pin_project;
33use prometheus::{
34    IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
35    register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
36};
37use thiserror_ext::AsReport;
38use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
39use tonic::transport::{Channel, Endpoint};
40use tower_service::Service;
41use tracing::{trace, warn};
42
43use crate::monitor::GLOBAL_METRICS_REGISTRY;
44use crate::{LabelGuardedIntCounterVec, register_guarded_int_counter_vec_with_registry};
45
46#[auto_impl::auto_impl(&mut)]
47pub trait MonitorAsyncReadWrite {
48    fn on_read(&mut self, _size: usize) {}
49    fn on_eof(&mut self) {}
50    fn on_read_err(&mut self, _err: &std::io::Error) {}
51
52    fn on_write(&mut self, _size: usize) {}
53    fn on_flush(&mut self) {}
54    fn on_shutdown(&mut self) {}
55    fn on_write_err(&mut self, _err: &std::io::Error) {}
56}
57
58pin_project! {
59    #[derive(Clone)]
60    pub struct MonitoredConnection<C, M> {
61        #[pin]
62        inner: C,
63        monitor: M,
64    }
65}
66
67impl<C, M> MonitoredConnection<C, M> {
68    pub fn new(connector: C, monitor: M) -> Self {
69        Self {
70            inner: connector,
71            monitor,
72        }
73    }
74
75    fn project_into(this: Pin<&mut Self>) -> (Pin<&mut C>, &mut M) {
76        let this = this.project();
77        (this.inner, this.monitor)
78    }
79
80    /// Delegate async read/write traits between tokio and hyper.
81    fn hyper_tokio_delegate(
82        self: Pin<&mut Self>,
83    ) -> TokioIo<MonitoredConnection<TokioIo<Pin<&mut C>>, &mut M>> {
84        let (inner, monitor) = MonitoredConnection::project_into(self);
85        TokioIo::new(MonitoredConnection::new(TokioIo::new(inner), monitor))
86    }
87}
88
89impl<C: AsyncRead, M: MonitorAsyncReadWrite> AsyncRead for MonitoredConnection<C, M> {
90    fn poll_read(
91        self: Pin<&mut Self>,
92        cx: &mut Context<'_>,
93        buf: &mut ReadBuf<'_>,
94    ) -> Poll<std::io::Result<()>> {
95        let before_buf_size = buf.filled().len();
96        let (inner, monitor) = MonitoredConnection::project_into(self);
97        let ret = inner.poll_read(cx, buf);
98        match &ret {
99            Poll::Ready(Ok(())) => {
100                let after_buf_size = buf.filled().len();
101                match after_buf_size.cmp(&before_buf_size) {
102                    Ordering::Less => {
103                        unreachable!(
104                            "buf size decrease after poll read. Bad AsyncRead implementation on {}",
105                            type_name::<C>()
106                        );
107                    }
108                    Ordering::Equal => {
109                        monitor.on_eof();
110                    }
111                    Ordering::Greater => {
112                        monitor.on_read(after_buf_size - before_buf_size);
113                    }
114                }
115            }
116            Poll::Ready(Err(e)) => {
117                monitor.on_read_err(e);
118            }
119            Poll::Pending => {}
120        }
121        ret
122    }
123}
124
125impl<C: hyper::rt::Read, M: MonitorAsyncReadWrite> hyper::rt::Read for MonitoredConnection<C, M> {
126    fn poll_read(
127        self: Pin<&mut Self>,
128        cx: &mut Context<'_>,
129        buf: hyper::rt::ReadBufCursor<'_>,
130    ) -> Poll<Result<(), std::io::Error>> {
131        hyper::rt::Read::poll_read(std::pin::pin!(self.hyper_tokio_delegate()), cx, buf)
132    }
133}
134
135impl<C: AsyncWrite, M: MonitorAsyncReadWrite> AsyncWrite for MonitoredConnection<C, M> {
136    fn poll_write(
137        self: Pin<&mut Self>,
138        cx: &mut Context<'_>,
139        buf: &[u8],
140    ) -> Poll<Result<usize, Error>> {
141        let (inner, monitor) = MonitoredConnection::project_into(self);
142        let ret = inner.poll_write(cx, buf);
143        match &ret {
144            Poll::Ready(Ok(size)) => {
145                monitor.on_write(*size);
146            }
147            Poll::Ready(Err(e)) => {
148                monitor.on_write_err(e);
149            }
150            Poll::Pending => {}
151        }
152        ret
153    }
154
155    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
156        let (inner, monitor) = MonitoredConnection::project_into(self);
157        let ret = inner.poll_flush(cx);
158        match &ret {
159            Poll::Ready(Ok(())) => {
160                monitor.on_flush();
161            }
162            Poll::Ready(Err(e)) => {
163                monitor.on_write_err(e);
164            }
165            Poll::Pending => {}
166        }
167        ret
168    }
169
170    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
171        let (inner, monitor) = MonitoredConnection::project_into(self);
172        let ret = inner.poll_shutdown(cx);
173        match &ret {
174            Poll::Ready(result) => {
175                monitor.on_shutdown();
176                if let Err(e) = result {
177                    monitor.on_write_err(e);
178                }
179            }
180            Poll::Pending => {}
181        }
182        ret
183    }
184
185    fn poll_write_vectored(
186        self: Pin<&mut Self>,
187        cx: &mut Context<'_>,
188        bufs: &[IoSlice<'_>],
189    ) -> Poll<Result<usize, Error>> {
190        let (inner, monitor) = MonitoredConnection::project_into(self);
191        let ret = inner.poll_write_vectored(cx, bufs);
192        match &ret {
193            Poll::Ready(Ok(size)) => {
194                monitor.on_write(*size);
195            }
196            Poll::Ready(Err(e)) => {
197                monitor.on_write_err(e);
198            }
199            Poll::Pending => {}
200        }
201        ret
202    }
203
204    fn is_write_vectored(&self) -> bool {
205        self.inner.is_write_vectored()
206    }
207}
208
209impl<C: hyper::rt::Write, M: MonitorAsyncReadWrite> hyper::rt::Write for MonitoredConnection<C, M> {
210    fn poll_write(
211        self: Pin<&mut Self>,
212        cx: &mut Context<'_>,
213        buf: &[u8],
214    ) -> Poll<Result<usize, std::io::Error>> {
215        hyper::rt::Write::poll_write(std::pin::pin!(self.hyper_tokio_delegate()), cx, buf)
216    }
217
218    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
219        hyper::rt::Write::poll_flush(std::pin::pin!(self.hyper_tokio_delegate()), cx)
220    }
221
222    fn poll_shutdown(
223        self: Pin<&mut Self>,
224        cx: &mut Context<'_>,
225    ) -> Poll<Result<(), std::io::Error>> {
226        hyper::rt::Write::poll_shutdown(std::pin::pin!(self.hyper_tokio_delegate()), cx)
227    }
228
229    fn is_write_vectored(&self) -> bool {
230        self.inner.is_write_vectored()
231    }
232
233    fn poll_write_vectored(
234        self: Pin<&mut Self>,
235        cx: &mut Context<'_>,
236        bufs: &[std::io::IoSlice<'_>],
237    ) -> Poll<Result<usize, std::io::Error>> {
238        hyper::rt::Write::poll_write_vectored(std::pin::pin!(self.hyper_tokio_delegate()), cx, bufs)
239    }
240}
241
242impl<C: Connection, M> Connection for MonitoredConnection<C, M> {
243    fn connected(&self) -> Connected {
244        self.inner.connected()
245    }
246}
247
248#[cfg(not(madsim))]
249impl<C: tonic::transport::server::Connected, M> tonic::transport::server::Connected
250    for MonitoredConnection<C, M>
251{
252    type ConnectInfo = C::ConnectInfo;
253
254    fn connect_info(&self) -> Self::ConnectInfo {
255        self.inner.connect_info()
256    }
257}
258
259pub trait MonitorNewConnection {
260    type ConnectionMonitor: MonitorAsyncReadWrite;
261
262    fn new_connection_monitor(&self, endpoint: String) -> Self::ConnectionMonitor;
263    fn on_err(&self, endpoint: String);
264}
265
266impl<C: Service<Uri>, M: MonitorNewConnection + Clone + 'static> Service<Uri>
267    for MonitoredConnection<C, M>
268where
269    C::Future: 'static,
270{
271    type Error = C::Error;
272    type Response = MonitoredConnection<C::Response, M::ConnectionMonitor>;
273
274    type Future = impl Future<Output = Result<Self::Response, Self::Error>> + 'static;
275
276    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
277        let ret = self.inner.poll_ready(cx);
278        if let Poll::Ready(Err(_)) = &ret {
279            self.monitor.on_err("<poll_ready>".to_owned());
280        }
281        ret
282    }
283
284    fn call(&mut self, uri: Uri) -> Self::Future {
285        let endpoint = format!("{:?}", uri.host());
286        let monitor = self.monitor.clone();
287        self.inner
288            .call(uri)
289            .map(move |result: Result<_, _>| match result {
290                Ok(resp) => Ok(MonitoredConnection::new(
291                    resp,
292                    monitor.new_connection_monitor(endpoint),
293                )),
294                Err(e) => {
295                    monitor.on_err(endpoint);
296                    Err(e)
297                }
298            })
299    }
300}
301
302#[cfg(not(madsim))]
303impl<Con, E, C: futures::Stream<Item = Result<Con, E>>, M: MonitorNewConnection> futures::Stream
304    for MonitoredConnection<C, M>
305where
306    Con:
307        tonic::transport::server::Connected<ConnectInfo = tonic::transport::server::TcpConnectInfo>,
308{
309    type Item = Result<MonitoredConnection<Con, M::ConnectionMonitor>, E>;
310
311    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
312        let (inner, monitor) = MonitoredConnection::project_into(self);
313        inner.poll_next(cx).map(|opt| {
314            opt.map(|result| {
315                result.map(|conn| {
316                    let remote_addr = conn.connect_info().remote_addr();
317                    let endpoint = remote_addr
318                        .map(|remote_addr| format!("{}", remote_addr.ip()))
319                        .unwrap_or("unknown".to_owned());
320                    MonitoredConnection::new(conn, monitor.new_connection_monitor(endpoint))
321                })
322            })
323        })
324    }
325
326    fn size_hint(&self) -> (usize, Option<usize>) {
327        self.inner.size_hint()
328    }
329}
330
331#[derive(Clone)]
332pub struct ConnectionMetrics {
333    connection_count: IntGaugeVec,
334    connection_create_rate: IntCounterVec,
335    connection_err_rate: IntCounterVec,
336
337    read_rate: IntCounterVec,
338    reader_count: IntGaugeVec,
339
340    write_rate: IntCounterVec,
341    writer_count: IntGaugeVec,
342
343    io_err_rate: LabelGuardedIntCounterVec,
344}
345
346pub static GLOBAL_CONNECTION_METRICS: LazyLock<ConnectionMetrics> =
347    LazyLock::new(|| ConnectionMetrics::new(&GLOBAL_METRICS_REGISTRY));
348
349impl ConnectionMetrics {
350    pub fn new(registry: &Registry) -> Self {
351        let labels = ["connection_type", "uri"];
352        let connection_count = register_int_gauge_vec_with_registry!(
353            "connection_count",
354            "The number of current existing connection",
355            &labels,
356            registry,
357        )
358        .unwrap();
359
360        let connection_create_rate = register_int_counter_vec_with_registry!(
361            "connection_create_rate",
362            "Rate on creating new connection",
363            &labels,
364            registry,
365        )
366        .unwrap();
367
368        let connection_err_rate = register_int_counter_vec_with_registry!(
369            "connection_err_rate",
370            "Error rate on creating new connection",
371            &labels,
372            registry,
373        )
374        .unwrap();
375
376        let read_rate = register_int_counter_vec_with_registry!(
377            "connection_read_rate",
378            "Read rate of a connection",
379            &labels,
380            registry,
381        )
382        .unwrap();
383
384        let reader_count = register_int_gauge_vec_with_registry!(
385            "connection_reader_count",
386            "The number of current existing reader",
387            &labels,
388            registry,
389        )
390        .unwrap();
391
392        let write_rate = register_int_counter_vec_with_registry!(
393            "connection_write_rate",
394            "Write rate of a connection",
395            &labels,
396            registry,
397        )
398        .unwrap();
399
400        let writer_count = register_int_gauge_vec_with_registry!(
401            "connection_writer_count",
402            "The number of current existing writer",
403            &labels,
404            registry,
405        )
406        .unwrap();
407
408        let io_err_rate = register_guarded_int_counter_vec_with_registry!(
409            "connection_io_err_rate",
410            "IO err rate of a connection",
411            &["connection_type", "uri", "op_type", "error_kind"],
412            registry,
413        )
414        .unwrap();
415
416        Self {
417            connection_count,
418            connection_create_rate,
419            connection_err_rate,
420            read_rate,
421            reader_count,
422            write_rate,
423            writer_count,
424            io_err_rate,
425        }
426    }
427}
428
429pub struct TcpConfig {
430    pub tcp_nodelay: bool,
431    pub keepalive_duration: Option<Duration>,
432}
433
434#[allow(clippy::derivable_impls)]
435impl Default for TcpConfig {
436    fn default() -> Self {
437        Self {
438            tcp_nodelay: false,
439            keepalive_duration: None,
440        }
441    }
442}
443
444pub fn monitor_connector<C>(
445    connector: C,
446    connection_type: impl Into<String>,
447) -> MonitoredConnection<C, MonitorNewConnectionImpl> {
448    let connection_type = connection_type.into();
449    trace!(
450        "monitoring connector {} with type {}",
451        type_name::<C>(),
452        connection_type
453    );
454    MonitoredConnection::new(connector, MonitorNewConnectionImpl { connection_type })
455}
456
457pub struct MonitoredGaiAddrs {
458    inner: Vec<SocketAddr>,
459    pos: usize,
460}
461
462impl From<GaiAddrs> for MonitoredGaiAddrs {
463    fn from(value: GaiAddrs) -> Self {
464        Self {
465            inner: value.collect_vec(),
466            pos: 0,
467        }
468    }
469}
470
471impl Iterator for MonitoredGaiAddrs {
472    type Item = SocketAddr;
473
474    fn next(&mut self) -> Option<Self::Item> {
475        let res = self.inner.get(self.pos).cloned();
476        self.pos += 1;
477        res
478    }
479}
480
481pub struct MonitoredGaiFuture {
482    name: Name,
483    inner: GaiFuture,
484}
485
486impl std::fmt::Debug for MonitoredGaiFuture {
487    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
488        f.pad("MonitoredGaiFuture")
489    }
490}
491
492impl Future for MonitoredGaiFuture {
493    type Output = Result<MonitoredGaiAddrs, Error>;
494
495    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
496        Pin::new(&mut self.inner).poll(cx).map(|res| match res {
497            Ok(addrs) => {
498                let addrs: MonitoredGaiAddrs = addrs.into();
499                trace!("resolve {} => {:?}", self.name, addrs.inner);
500                Ok(addrs)
501            }
502            Err(err) => Err(err),
503        })
504    }
505}
506
507#[derive(Clone)]
508pub struct MonitoredGaiResolver {
509    inner: GaiResolver,
510}
511
512impl std::fmt::Debug for MonitoredGaiResolver {
513    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
514        f.pad("MonitoredGaiResolver")
515    }
516}
517
518impl Default for MonitoredGaiResolver {
519    fn default() -> Self {
520        Self {
521            inner: GaiResolver::new(),
522        }
523    }
524}
525
526impl Service<Name> for MonitoredGaiResolver {
527    type Error = Error;
528    type Future = MonitoredGaiFuture;
529    type Response = MonitoredGaiAddrs;
530
531    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
532        self.inner.poll_ready(cx)
533    }
534
535    fn call(&mut self, req: Name) -> Self::Future {
536        MonitoredGaiFuture {
537            name: req.clone(),
538            inner: self.inner.call(req),
539        }
540    }
541}
542
543#[cfg_or_panic(not(madsim))]
544fn monitored_http_connector(
545    connection_type: impl Into<String>,
546    config: TcpConfig,
547) -> MonitoredConnection<HttpConnector<MonitoredGaiResolver>, MonitorNewConnectionImpl> {
548    let resolver = MonitoredGaiResolver::default();
549    let mut http = HttpConnector::new_with_resolver(resolver);
550
551    http.enforce_http(false);
552    http.set_nodelay(config.tcp_nodelay);
553    http.set_keepalive(config.keepalive_duration);
554
555    monitor_connector(http, connection_type)
556}
557
558/// Attach general configurations to the endpoint.
559#[cfg_or_panic(not(madsim))]
560fn configure_endpoint(endpoint: Endpoint) -> Endpoint {
561    // This is to mitigate https://github.com/risingwavelabs/risingwave/issues/18039.
562    // TODO: remove this after https://github.com/hyperium/hyper/issues/3724 gets resolved.
563    endpoint.http2_max_header_list_size(16 * 1024 * 1024)
564}
565
566#[easy_ext::ext(EndpointExt)]
567impl Endpoint {
568    pub async fn monitored_connect(
569        mut self,
570        connection_type: impl Into<String>,
571        config: TcpConfig,
572    ) -> Result<Channel, tonic::transport::Error> {
573        #[cfg(not(madsim))]
574        {
575            self = configure_endpoint(self);
576            let connector = monitored_http_connector(connection_type, config);
577            self.connect_with_connector(connector).await
578        }
579        #[cfg(madsim)]
580        {
581            self.connect().await
582        }
583    }
584
585    #[cfg(not(madsim))]
586    pub fn monitored_connect_lazy(
587        mut self,
588        connection_type: impl Into<String>,
589        config: TcpConfig,
590    ) -> Channel {
591        self = configure_endpoint(self);
592        let connector = monitored_http_connector(connection_type, config);
593        self.connect_with_connector_lazy(connector)
594    }
595}
596
597#[cfg(not(madsim))]
598#[easy_ext::ext(RouterExt)]
599impl<L> tonic::transport::server::Router<L> {
600    /// Serve the given service while monitoring the connection.
601    ///
602    /// Calling the function will first bind the given service to the given address. Awaiting the
603    /// returned future will then start the server and keep it running until the given signal
604    /// future resolves.
605    pub fn monitored_serve_with_shutdown<ResBody>(
606        self,
607        listen_addr: std::net::SocketAddr,
608        connection_type: impl Into<String>,
609        config: TcpConfig,
610        signal: impl Future<Output = ()>,
611    ) -> impl Future<Output = ()>
612    where
613        L: tower_layer::Layer<tonic::service::Routes>,
614        L::Service: Service<http::Request<tonic::body::Body>, Response = http::Response<ResBody>>
615            + Clone
616            + Send
617            + 'static,
618        <<L as tower_layer::Layer<tonic::service::Routes>>::Service as Service<
619            http::Request<tonic::body::Body>,
620        >>::Future: Send + 'static,
621        <<L as tower_layer::Layer<tonic::service::Routes>>::Service as Service<
622            http::Request<tonic::body::Body>,
623        >>::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send,
624        ResBody: http_body::Body<Data = bytes::Bytes> + Send + 'static,
625        ResBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
626    {
627        let connection_type = connection_type.into();
628        let incoming = tonic::transport::server::TcpIncoming::bind(listen_addr)
629            .map(|incoming| {
630                incoming
631                    .with_nodelay(Some(config.tcp_nodelay))
632                    .with_keepalive(config.keepalive_duration)
633            })
634            .unwrap_or_else(|err| {
635                panic!(
636                    "failed to bind `{connection_type}` to `{listen_addr}`: {}",
637                    err.as_report()
638                )
639            });
640        let incoming =
641            MonitoredConnection::new(incoming, MonitorNewConnectionImpl { connection_type });
642
643        async move {
644            self.serve_with_incoming_shutdown(incoming, signal)
645                .await
646                .unwrap()
647        }
648    }
649}
650
651#[cfg(madsim)]
652#[easy_ext::ext(RouterExt)]
653impl<L> tonic::transport::server::Router<L> {
654    pub async fn monitored_serve_with_shutdown(
655        self,
656        listen_addr: std::net::SocketAddr,
657        connection_type: impl Into<String>,
658        config: TcpConfig,
659        signal: impl Future<Output = ()>,
660    ) {
661        self.serve_with_shutdown(listen_addr, signal).await.unwrap()
662    }
663}
664
665#[derive(Clone)]
666pub struct MonitorNewConnectionImpl {
667    connection_type: String,
668}
669
670impl MonitorNewConnection for MonitorNewConnectionImpl {
671    type ConnectionMonitor = MonitorAsyncReadWriteImpl;
672
673    fn new_connection_monitor(&self, endpoint: String) -> Self::ConnectionMonitor {
674        let labels = [self.connection_type.as_str(), endpoint.as_str()];
675        let read_rate = GLOBAL_CONNECTION_METRICS
676            .read_rate
677            .with_label_values(&labels);
678        let reader_count = GLOBAL_CONNECTION_METRICS
679            .reader_count
680            .with_label_values(&labels);
681        let write_rate = GLOBAL_CONNECTION_METRICS
682            .write_rate
683            .with_label_values(&labels);
684        let writer_count = GLOBAL_CONNECTION_METRICS
685            .writer_count
686            .with_label_values(&labels);
687        let connection_count = GLOBAL_CONNECTION_METRICS
688            .connection_count
689            .with_label_values(&labels);
690
691        GLOBAL_CONNECTION_METRICS
692            .connection_create_rate
693            .with_label_values(&labels)
694            .inc();
695
696        MonitorAsyncReadWriteImpl::new(
697            endpoint,
698            self.connection_type.clone(),
699            read_rate,
700            reader_count,
701            write_rate,
702            writer_count,
703            connection_count,
704        )
705    }
706
707    fn on_err(&self, endpoint: String) {
708        GLOBAL_CONNECTION_METRICS
709            .connection_err_rate
710            .with_label_values(&[self.connection_type.as_str(), endpoint.as_str()])
711            .inc();
712    }
713}
714
715const READ_WRITE_RATE_REPORT_INTERVAL: u64 = 1024;
716
717pub struct MonitorAsyncReadWriteImpl {
718    endpoint: String,
719    connection_type: String,
720
721    unreported_read_rate: u64,
722    read_rate: IntCounter,
723    reader_count_guard: IntGauge,
724    is_eof: bool,
725
726    unreported_write_rate: u64,
727    write_rate: IntCounter,
728    writer_count_guard: IntGauge,
729    is_shutdown: bool,
730
731    connection_count_guard: IntGauge,
732}
733
734impl MonitorAsyncReadWriteImpl {
735    pub fn new(
736        endpoint: String,
737        connection_type: String,
738        read_rate: IntCounter,
739        reader_count: IntGauge,
740        write_rate: IntCounter,
741        writer_count: IntGauge,
742        connection_count: IntGauge,
743    ) -> Self {
744        reader_count.inc();
745        writer_count.inc();
746        connection_count.inc();
747        Self {
748            endpoint,
749            connection_type,
750            unreported_read_rate: 0,
751            read_rate,
752            reader_count_guard: reader_count,
753            is_eof: false,
754            unreported_write_rate: 0,
755            write_rate,
756            writer_count_guard: writer_count,
757            is_shutdown: false,
758            connection_count_guard: connection_count,
759        }
760    }
761}
762
763impl Drop for MonitorAsyncReadWriteImpl {
764    fn drop(&mut self) {
765        if self.unreported_read_rate > 0 {
766            self.read_rate.inc_by(self.unreported_read_rate);
767        }
768        if self.unreported_write_rate > 0 {
769            self.write_rate.inc_by(self.unreported_write_rate);
770        }
771        if !self.is_eof {
772            self.reader_count_guard.dec();
773        }
774        if !self.is_shutdown {
775            self.writer_count_guard.dec();
776        }
777        self.connection_count_guard.dec();
778    }
779}
780
781impl MonitorAsyncReadWrite for MonitorAsyncReadWriteImpl {
782    fn on_read(&mut self, size: usize) {
783        self.unreported_read_rate += size as u64;
784        if self.unreported_read_rate >= READ_WRITE_RATE_REPORT_INTERVAL {
785            self.read_rate.inc_by(self.unreported_read_rate);
786            self.unreported_read_rate = 0;
787        }
788    }
789
790    fn on_eof(&mut self) {
791        if self.is_eof {
792            warn!("get eof for multiple time");
793            return;
794        }
795        self.is_eof = true;
796        self.reader_count_guard.dec();
797    }
798
799    fn on_read_err(&mut self, err: &Error) {
800        // No need to store the value returned from `with_guarded_label_values`
801        // because it is reporting a single error.
802        GLOBAL_CONNECTION_METRICS
803            .io_err_rate
804            .with_guarded_label_values(&[
805                self.connection_type.as_str(),
806                self.endpoint.as_str(),
807                "read",
808                err.kind().to_string().as_str(),
809            ])
810            .inc();
811    }
812
813    fn on_write(&mut self, size: usize) {
814        self.unreported_write_rate += size as u64;
815        if self.unreported_write_rate >= READ_WRITE_RATE_REPORT_INTERVAL {
816            self.write_rate.inc_by(self.unreported_write_rate);
817            self.unreported_write_rate = 0;
818        }
819    }
820
821    fn on_shutdown(&mut self) {
822        if self.is_shutdown {
823            warn!("get shutdown for multiple time");
824            return;
825        }
826        self.is_shutdown = true;
827        self.writer_count_guard.dec();
828    }
829
830    fn on_write_err(&mut self, err: &Error) {
831        // No need to store the value returned from `with_guarded_label_values`
832        // because it is reporting a single error.
833        GLOBAL_CONNECTION_METRICS
834            .io_err_rate
835            .with_guarded_label_values(&[
836                self.connection_type.as_str(),
837                self.endpoint.as_str(),
838                "write",
839                err.kind().to_string().as_str(),
840            ])
841            .inc();
842    }
843}