1use std::any::type_name;
16use std::cmp::Ordering;
17use std::future::Future;
18use std::io::{Error, IoSlice};
19use std::net::SocketAddr;
20use std::pin::Pin;
21use std::sync::LazyLock;
22use std::task::{Context, Poll};
23use std::time::Duration;
24
25use cfg_or_panic::cfg_or_panic;
26use futures::FutureExt;
27use http::Uri;
28use hyper_util::client::legacy::connect::dns::{GaiAddrs, GaiFuture, GaiResolver, Name};
29use hyper_util::client::legacy::connect::{Connected, Connection, HttpConnector};
30use hyper_util::rt::TokioIo;
31use itertools::Itertools;
32use pin_project_lite::pin_project;
33use prometheus::{
34 IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
35 register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
36};
37use thiserror_ext::AsReport;
38use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
39use tonic::transport::{Channel, Endpoint};
40use tower_service::Service;
41use tracing::{trace, warn};
42
43use crate::monitor::GLOBAL_METRICS_REGISTRY;
44use crate::{LabelGuardedIntCounterVec, register_guarded_int_counter_vec_with_registry};
45
46#[auto_impl::auto_impl(&mut)]
47pub trait MonitorAsyncReadWrite {
48 fn on_read(&mut self, _size: usize) {}
49 fn on_eof(&mut self) {}
50 fn on_read_err(&mut self, _err: &std::io::Error) {}
51
52 fn on_write(&mut self, _size: usize) {}
53 fn on_flush(&mut self) {}
54 fn on_shutdown(&mut self) {}
55 fn on_write_err(&mut self, _err: &std::io::Error) {}
56}
57
58pin_project! {
59 #[derive(Clone)]
60 pub struct MonitoredConnection<C, M> {
61 #[pin]
62 inner: C,
63 monitor: M,
64 }
65}
66
67impl<C, M> MonitoredConnection<C, M> {
68 pub fn new(connector: C, monitor: M) -> Self {
69 Self {
70 inner: connector,
71 monitor,
72 }
73 }
74
75 fn project_into(this: Pin<&mut Self>) -> (Pin<&mut C>, &mut M) {
76 let this = this.project();
77 (this.inner, this.monitor)
78 }
79
80 fn hyper_tokio_delegate(
82 self: Pin<&mut Self>,
83 ) -> TokioIo<MonitoredConnection<TokioIo<Pin<&mut C>>, &mut M>> {
84 let (inner, monitor) = MonitoredConnection::project_into(self);
85 TokioIo::new(MonitoredConnection::new(TokioIo::new(inner), monitor))
86 }
87}
88
89impl<C: AsyncRead, M: MonitorAsyncReadWrite> AsyncRead for MonitoredConnection<C, M> {
90 fn poll_read(
91 self: Pin<&mut Self>,
92 cx: &mut Context<'_>,
93 buf: &mut ReadBuf<'_>,
94 ) -> Poll<std::io::Result<()>> {
95 let before_buf_size = buf.filled().len();
96 let (inner, monitor) = MonitoredConnection::project_into(self);
97 let ret = inner.poll_read(cx, buf);
98 match &ret {
99 Poll::Ready(Ok(())) => {
100 let after_buf_size = buf.filled().len();
101 match after_buf_size.cmp(&before_buf_size) {
102 Ordering::Less => {
103 unreachable!(
104 "buf size decrease after poll read. Bad AsyncRead implementation on {}",
105 type_name::<C>()
106 );
107 }
108 Ordering::Equal => {
109 monitor.on_eof();
110 }
111 Ordering::Greater => {
112 monitor.on_read(after_buf_size - before_buf_size);
113 }
114 }
115 }
116 Poll::Ready(Err(e)) => {
117 monitor.on_read_err(e);
118 }
119 Poll::Pending => {}
120 }
121 ret
122 }
123}
124
125impl<C: hyper::rt::Read, M: MonitorAsyncReadWrite> hyper::rt::Read for MonitoredConnection<C, M> {
126 fn poll_read(
127 self: Pin<&mut Self>,
128 cx: &mut Context<'_>,
129 buf: hyper::rt::ReadBufCursor<'_>,
130 ) -> Poll<Result<(), std::io::Error>> {
131 hyper::rt::Read::poll_read(std::pin::pin!(self.hyper_tokio_delegate()), cx, buf)
132 }
133}
134
135impl<C: AsyncWrite, M: MonitorAsyncReadWrite> AsyncWrite for MonitoredConnection<C, M> {
136 fn poll_write(
137 self: Pin<&mut Self>,
138 cx: &mut Context<'_>,
139 buf: &[u8],
140 ) -> Poll<Result<usize, Error>> {
141 let (inner, monitor) = MonitoredConnection::project_into(self);
142 let ret = inner.poll_write(cx, buf);
143 match &ret {
144 Poll::Ready(Ok(size)) => {
145 monitor.on_write(*size);
146 }
147 Poll::Ready(Err(e)) => {
148 monitor.on_write_err(e);
149 }
150 Poll::Pending => {}
151 }
152 ret
153 }
154
155 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
156 let (inner, monitor) = MonitoredConnection::project_into(self);
157 let ret = inner.poll_flush(cx);
158 match &ret {
159 Poll::Ready(Ok(())) => {
160 monitor.on_flush();
161 }
162 Poll::Ready(Err(e)) => {
163 monitor.on_write_err(e);
164 }
165 Poll::Pending => {}
166 }
167 ret
168 }
169
170 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
171 let (inner, monitor) = MonitoredConnection::project_into(self);
172 let ret = inner.poll_shutdown(cx);
173 match &ret {
174 Poll::Ready(result) => {
175 monitor.on_shutdown();
176 if let Err(e) = result {
177 monitor.on_write_err(e);
178 }
179 }
180 Poll::Pending => {}
181 }
182 ret
183 }
184
185 fn poll_write_vectored(
186 self: Pin<&mut Self>,
187 cx: &mut Context<'_>,
188 bufs: &[IoSlice<'_>],
189 ) -> Poll<Result<usize, Error>> {
190 let (inner, monitor) = MonitoredConnection::project_into(self);
191 let ret = inner.poll_write_vectored(cx, bufs);
192 match &ret {
193 Poll::Ready(Ok(size)) => {
194 monitor.on_write(*size);
195 }
196 Poll::Ready(Err(e)) => {
197 monitor.on_write_err(e);
198 }
199 Poll::Pending => {}
200 }
201 ret
202 }
203
204 fn is_write_vectored(&self) -> bool {
205 self.inner.is_write_vectored()
206 }
207}
208
209impl<C: hyper::rt::Write, M: MonitorAsyncReadWrite> hyper::rt::Write for MonitoredConnection<C, M> {
210 fn poll_write(
211 self: Pin<&mut Self>,
212 cx: &mut Context<'_>,
213 buf: &[u8],
214 ) -> Poll<Result<usize, std::io::Error>> {
215 hyper::rt::Write::poll_write(std::pin::pin!(self.hyper_tokio_delegate()), cx, buf)
216 }
217
218 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
219 hyper::rt::Write::poll_flush(std::pin::pin!(self.hyper_tokio_delegate()), cx)
220 }
221
222 fn poll_shutdown(
223 self: Pin<&mut Self>,
224 cx: &mut Context<'_>,
225 ) -> Poll<Result<(), std::io::Error>> {
226 hyper::rt::Write::poll_shutdown(std::pin::pin!(self.hyper_tokio_delegate()), cx)
227 }
228
229 fn is_write_vectored(&self) -> bool {
230 self.inner.is_write_vectored()
231 }
232
233 fn poll_write_vectored(
234 self: Pin<&mut Self>,
235 cx: &mut Context<'_>,
236 bufs: &[std::io::IoSlice<'_>],
237 ) -> Poll<Result<usize, std::io::Error>> {
238 hyper::rt::Write::poll_write_vectored(std::pin::pin!(self.hyper_tokio_delegate()), cx, bufs)
239 }
240}
241
242impl<C: Connection, M> Connection for MonitoredConnection<C, M> {
243 fn connected(&self) -> Connected {
244 self.inner.connected()
245 }
246}
247
248#[cfg(not(madsim))]
249impl<C: tonic::transport::server::Connected, M> tonic::transport::server::Connected
250 for MonitoredConnection<C, M>
251{
252 type ConnectInfo = C::ConnectInfo;
253
254 fn connect_info(&self) -> Self::ConnectInfo {
255 self.inner.connect_info()
256 }
257}
258
259pub trait MonitorNewConnection {
260 type ConnectionMonitor: MonitorAsyncReadWrite;
261
262 fn new_connection_monitor(&self, endpoint: String) -> Self::ConnectionMonitor;
263 fn on_err(&self, endpoint: String);
264}
265
266impl<C: Service<Uri>, M: MonitorNewConnection + Clone + 'static> Service<Uri>
267 for MonitoredConnection<C, M>
268where
269 C::Future: 'static,
270{
271 type Error = C::Error;
272 type Response = MonitoredConnection<C::Response, M::ConnectionMonitor>;
273
274 type Future = impl Future<Output = Result<Self::Response, Self::Error>> + 'static;
275
276 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
277 let ret = self.inner.poll_ready(cx);
278 if let Poll::Ready(Err(_)) = &ret {
279 self.monitor.on_err("<poll_ready>".to_owned());
280 }
281 ret
282 }
283
284 fn call(&mut self, uri: Uri) -> Self::Future {
285 let endpoint = format!("{:?}", uri.host());
286 let monitor = self.monitor.clone();
287 self.inner
288 .call(uri)
289 .map(move |result: Result<_, _>| match result {
290 Ok(resp) => Ok(MonitoredConnection::new(
291 resp,
292 monitor.new_connection_monitor(endpoint),
293 )),
294 Err(e) => {
295 monitor.on_err(endpoint);
296 Err(e)
297 }
298 })
299 }
300}
301
302#[cfg(not(madsim))]
303impl<Con, E, C: futures::Stream<Item = Result<Con, E>>, M: MonitorNewConnection> futures::Stream
304 for MonitoredConnection<C, M>
305where
306 Con:
307 tonic::transport::server::Connected<ConnectInfo = tonic::transport::server::TcpConnectInfo>,
308{
309 type Item = Result<MonitoredConnection<Con, M::ConnectionMonitor>, E>;
310
311 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
312 let (inner, monitor) = MonitoredConnection::project_into(self);
313 inner.poll_next(cx).map(|opt| {
314 opt.map(|result| {
315 result.map(|conn| {
316 let remote_addr = conn.connect_info().remote_addr();
317 let endpoint = remote_addr
318 .map(|remote_addr| format!("{}", remote_addr.ip()))
319 .unwrap_or("unknown".to_owned());
320 MonitoredConnection::new(conn, monitor.new_connection_monitor(endpoint))
321 })
322 })
323 })
324 }
325
326 fn size_hint(&self) -> (usize, Option<usize>) {
327 self.inner.size_hint()
328 }
329}
330
331#[derive(Clone)]
332pub struct ConnectionMetrics {
333 connection_count: IntGaugeVec,
334 connection_create_rate: IntCounterVec,
335 connection_err_rate: IntCounterVec,
336
337 read_rate: IntCounterVec,
338 reader_count: IntGaugeVec,
339
340 write_rate: IntCounterVec,
341 writer_count: IntGaugeVec,
342
343 io_err_rate: LabelGuardedIntCounterVec,
344}
345
346pub static GLOBAL_CONNECTION_METRICS: LazyLock<ConnectionMetrics> =
347 LazyLock::new(|| ConnectionMetrics::new(&GLOBAL_METRICS_REGISTRY));
348
349impl ConnectionMetrics {
350 pub fn new(registry: &Registry) -> Self {
351 let labels = ["connection_type", "uri"];
352 let connection_count = register_int_gauge_vec_with_registry!(
353 "connection_count",
354 "The number of current existing connection",
355 &labels,
356 registry,
357 )
358 .unwrap();
359
360 let connection_create_rate = register_int_counter_vec_with_registry!(
361 "connection_create_rate",
362 "Rate on creating new connection",
363 &labels,
364 registry,
365 )
366 .unwrap();
367
368 let connection_err_rate = register_int_counter_vec_with_registry!(
369 "connection_err_rate",
370 "Error rate on creating new connection",
371 &labels,
372 registry,
373 )
374 .unwrap();
375
376 let read_rate = register_int_counter_vec_with_registry!(
377 "connection_read_rate",
378 "Read rate of a connection",
379 &labels,
380 registry,
381 )
382 .unwrap();
383
384 let reader_count = register_int_gauge_vec_with_registry!(
385 "connection_reader_count",
386 "The number of current existing reader",
387 &labels,
388 registry,
389 )
390 .unwrap();
391
392 let write_rate = register_int_counter_vec_with_registry!(
393 "connection_write_rate",
394 "Write rate of a connection",
395 &labels,
396 registry,
397 )
398 .unwrap();
399
400 let writer_count = register_int_gauge_vec_with_registry!(
401 "connection_writer_count",
402 "The number of current existing writer",
403 &labels,
404 registry,
405 )
406 .unwrap();
407
408 let io_err_rate = register_guarded_int_counter_vec_with_registry!(
409 "connection_io_err_rate",
410 "IO err rate of a connection",
411 &["connection_type", "uri", "op_type", "error_kind"],
412 registry,
413 )
414 .unwrap();
415
416 Self {
417 connection_count,
418 connection_create_rate,
419 connection_err_rate,
420 read_rate,
421 reader_count,
422 write_rate,
423 writer_count,
424 io_err_rate,
425 }
426 }
427}
428
429pub struct TcpConfig {
430 pub tcp_nodelay: bool,
431 pub keepalive_duration: Option<Duration>,
432}
433
434#[allow(clippy::derivable_impls)]
435impl Default for TcpConfig {
436 fn default() -> Self {
437 Self {
438 tcp_nodelay: false,
439 keepalive_duration: None,
440 }
441 }
442}
443
444pub fn monitor_connector<C>(
445 connector: C,
446 connection_type: impl Into<String>,
447) -> MonitoredConnection<C, MonitorNewConnectionImpl> {
448 let connection_type = connection_type.into();
449 trace!(
450 "monitoring connector {} with type {}",
451 type_name::<C>(),
452 connection_type
453 );
454 MonitoredConnection::new(connector, MonitorNewConnectionImpl { connection_type })
455}
456
457pub struct MonitoredGaiAddrs {
458 inner: Vec<SocketAddr>,
459 pos: usize,
460}
461
462impl From<GaiAddrs> for MonitoredGaiAddrs {
463 fn from(value: GaiAddrs) -> Self {
464 Self {
465 inner: value.collect_vec(),
466 pos: 0,
467 }
468 }
469}
470
471impl Iterator for MonitoredGaiAddrs {
472 type Item = SocketAddr;
473
474 fn next(&mut self) -> Option<Self::Item> {
475 let res = self.inner.get(self.pos).cloned();
476 self.pos += 1;
477 res
478 }
479}
480
481pub struct MonitoredGaiFuture {
482 name: Name,
483 inner: GaiFuture,
484}
485
486impl std::fmt::Debug for MonitoredGaiFuture {
487 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
488 f.pad("MonitoredGaiFuture")
489 }
490}
491
492impl Future for MonitoredGaiFuture {
493 type Output = Result<MonitoredGaiAddrs, Error>;
494
495 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
496 Pin::new(&mut self.inner).poll(cx).map(|res| match res {
497 Ok(addrs) => {
498 let addrs: MonitoredGaiAddrs = addrs.into();
499 trace!("resolve {} => {:?}", self.name, addrs.inner);
500 Ok(addrs)
501 }
502 Err(err) => Err(err),
503 })
504 }
505}
506
507#[derive(Clone)]
508pub struct MonitoredGaiResolver {
509 inner: GaiResolver,
510}
511
512impl std::fmt::Debug for MonitoredGaiResolver {
513 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
514 f.pad("MonitoredGaiResolver")
515 }
516}
517
518impl Default for MonitoredGaiResolver {
519 fn default() -> Self {
520 Self {
521 inner: GaiResolver::new(),
522 }
523 }
524}
525
526impl Service<Name> for MonitoredGaiResolver {
527 type Error = Error;
528 type Future = MonitoredGaiFuture;
529 type Response = MonitoredGaiAddrs;
530
531 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
532 self.inner.poll_ready(cx)
533 }
534
535 fn call(&mut self, req: Name) -> Self::Future {
536 MonitoredGaiFuture {
537 name: req.clone(),
538 inner: self.inner.call(req),
539 }
540 }
541}
542
543#[cfg_or_panic(not(madsim))]
544fn monitored_http_connector(
545 connection_type: impl Into<String>,
546 config: TcpConfig,
547) -> MonitoredConnection<HttpConnector<MonitoredGaiResolver>, MonitorNewConnectionImpl> {
548 let resolver = MonitoredGaiResolver::default();
549 let mut http = HttpConnector::new_with_resolver(resolver);
550
551 http.enforce_http(false);
552 http.set_nodelay(config.tcp_nodelay);
553 http.set_keepalive(config.keepalive_duration);
554
555 monitor_connector(http, connection_type)
556}
557
558#[cfg_or_panic(not(madsim))]
560fn configure_endpoint(endpoint: Endpoint) -> Endpoint {
561 endpoint.http2_max_header_list_size(16 * 1024 * 1024)
564}
565
566#[easy_ext::ext(EndpointExt)]
567impl Endpoint {
568 pub async fn monitored_connect(
569 mut self,
570 connection_type: impl Into<String>,
571 config: TcpConfig,
572 ) -> Result<Channel, tonic::transport::Error> {
573 #[cfg(not(madsim))]
574 {
575 self = configure_endpoint(self);
576 let connector = monitored_http_connector(connection_type, config);
577 self.connect_with_connector(connector).await
578 }
579 #[cfg(madsim)]
580 {
581 self.connect().await
582 }
583 }
584
585 #[cfg(not(madsim))]
586 pub fn monitored_connect_lazy(
587 mut self,
588 connection_type: impl Into<String>,
589 config: TcpConfig,
590 ) -> Channel {
591 self = configure_endpoint(self);
592 let connector = monitored_http_connector(connection_type, config);
593 self.connect_with_connector_lazy(connector)
594 }
595}
596
597#[cfg(not(madsim))]
598#[easy_ext::ext(RouterExt)]
599impl<L> tonic::transport::server::Router<L> {
600 pub fn monitored_serve_with_shutdown<ResBody>(
606 self,
607 listen_addr: std::net::SocketAddr,
608 connection_type: impl Into<String>,
609 config: TcpConfig,
610 signal: impl Future<Output = ()>,
611 ) -> impl Future<Output = ()>
612 where
613 L: tower_layer::Layer<tonic::service::Routes>,
614 L::Service: Service<http::Request<tonic::body::Body>, Response = http::Response<ResBody>>
615 + Clone
616 + Send
617 + 'static,
618 <<L as tower_layer::Layer<tonic::service::Routes>>::Service as Service<
619 http::Request<tonic::body::Body>,
620 >>::Future: Send + 'static,
621 <<L as tower_layer::Layer<tonic::service::Routes>>::Service as Service<
622 http::Request<tonic::body::Body>,
623 >>::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send,
624 ResBody: http_body::Body<Data = bytes::Bytes> + Send + 'static,
625 ResBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
626 {
627 let connection_type = connection_type.into();
628 let incoming = tonic::transport::server::TcpIncoming::bind(listen_addr)
629 .map(|incoming| {
630 incoming
631 .with_nodelay(Some(config.tcp_nodelay))
632 .with_keepalive(config.keepalive_duration)
633 })
634 .unwrap_or_else(|err| {
635 panic!(
636 "failed to bind `{connection_type}` to `{listen_addr}`: {}",
637 err.as_report()
638 )
639 });
640 let incoming =
641 MonitoredConnection::new(incoming, MonitorNewConnectionImpl { connection_type });
642
643 async move {
644 self.serve_with_incoming_shutdown(incoming, signal)
645 .await
646 .unwrap()
647 }
648 }
649}
650
651#[cfg(madsim)]
652#[easy_ext::ext(RouterExt)]
653impl<L> tonic::transport::server::Router<L> {
654 pub async fn monitored_serve_with_shutdown(
655 self,
656 listen_addr: std::net::SocketAddr,
657 connection_type: impl Into<String>,
658 config: TcpConfig,
659 signal: impl Future<Output = ()>,
660 ) {
661 self.serve_with_shutdown(listen_addr, signal).await.unwrap()
662 }
663}
664
665#[derive(Clone)]
666pub struct MonitorNewConnectionImpl {
667 connection_type: String,
668}
669
670impl MonitorNewConnection for MonitorNewConnectionImpl {
671 type ConnectionMonitor = MonitorAsyncReadWriteImpl;
672
673 fn new_connection_monitor(&self, endpoint: String) -> Self::ConnectionMonitor {
674 let labels = [self.connection_type.as_str(), endpoint.as_str()];
675 let read_rate = GLOBAL_CONNECTION_METRICS
676 .read_rate
677 .with_label_values(&labels);
678 let reader_count = GLOBAL_CONNECTION_METRICS
679 .reader_count
680 .with_label_values(&labels);
681 let write_rate = GLOBAL_CONNECTION_METRICS
682 .write_rate
683 .with_label_values(&labels);
684 let writer_count = GLOBAL_CONNECTION_METRICS
685 .writer_count
686 .with_label_values(&labels);
687 let connection_count = GLOBAL_CONNECTION_METRICS
688 .connection_count
689 .with_label_values(&labels);
690
691 GLOBAL_CONNECTION_METRICS
692 .connection_create_rate
693 .with_label_values(&labels)
694 .inc();
695
696 MonitorAsyncReadWriteImpl::new(
697 endpoint,
698 self.connection_type.clone(),
699 read_rate,
700 reader_count,
701 write_rate,
702 writer_count,
703 connection_count,
704 )
705 }
706
707 fn on_err(&self, endpoint: String) {
708 GLOBAL_CONNECTION_METRICS
709 .connection_err_rate
710 .with_label_values(&[self.connection_type.as_str(), endpoint.as_str()])
711 .inc();
712 }
713}
714
715const READ_WRITE_RATE_REPORT_INTERVAL: u64 = 1024;
716
717pub struct MonitorAsyncReadWriteImpl {
718 endpoint: String,
719 connection_type: String,
720
721 unreported_read_rate: u64,
722 read_rate: IntCounter,
723 reader_count_guard: IntGauge,
724 is_eof: bool,
725
726 unreported_write_rate: u64,
727 write_rate: IntCounter,
728 writer_count_guard: IntGauge,
729 is_shutdown: bool,
730
731 connection_count_guard: IntGauge,
732}
733
734impl MonitorAsyncReadWriteImpl {
735 pub fn new(
736 endpoint: String,
737 connection_type: String,
738 read_rate: IntCounter,
739 reader_count: IntGauge,
740 write_rate: IntCounter,
741 writer_count: IntGauge,
742 connection_count: IntGauge,
743 ) -> Self {
744 reader_count.inc();
745 writer_count.inc();
746 connection_count.inc();
747 Self {
748 endpoint,
749 connection_type,
750 unreported_read_rate: 0,
751 read_rate,
752 reader_count_guard: reader_count,
753 is_eof: false,
754 unreported_write_rate: 0,
755 write_rate,
756 writer_count_guard: writer_count,
757 is_shutdown: false,
758 connection_count_guard: connection_count,
759 }
760 }
761}
762
763impl Drop for MonitorAsyncReadWriteImpl {
764 fn drop(&mut self) {
765 if self.unreported_read_rate > 0 {
766 self.read_rate.inc_by(self.unreported_read_rate);
767 }
768 if self.unreported_write_rate > 0 {
769 self.write_rate.inc_by(self.unreported_write_rate);
770 }
771 if !self.is_eof {
772 self.reader_count_guard.dec();
773 }
774 if !self.is_shutdown {
775 self.writer_count_guard.dec();
776 }
777 self.connection_count_guard.dec();
778 }
779}
780
781impl MonitorAsyncReadWrite for MonitorAsyncReadWriteImpl {
782 fn on_read(&mut self, size: usize) {
783 self.unreported_read_rate += size as u64;
784 if self.unreported_read_rate >= READ_WRITE_RATE_REPORT_INTERVAL {
785 self.read_rate.inc_by(self.unreported_read_rate);
786 self.unreported_read_rate = 0;
787 }
788 }
789
790 fn on_eof(&mut self) {
791 if self.is_eof {
792 warn!("get eof for multiple time");
793 return;
794 }
795 self.is_eof = true;
796 self.reader_count_guard.dec();
797 }
798
799 fn on_read_err(&mut self, err: &Error) {
800 GLOBAL_CONNECTION_METRICS
803 .io_err_rate
804 .with_guarded_label_values(&[
805 self.connection_type.as_str(),
806 self.endpoint.as_str(),
807 "read",
808 err.kind().to_string().as_str(),
809 ])
810 .inc();
811 }
812
813 fn on_write(&mut self, size: usize) {
814 self.unreported_write_rate += size as u64;
815 if self.unreported_write_rate >= READ_WRITE_RATE_REPORT_INTERVAL {
816 self.write_rate.inc_by(self.unreported_write_rate);
817 self.unreported_write_rate = 0;
818 }
819 }
820
821 fn on_shutdown(&mut self) {
822 if self.is_shutdown {
823 warn!("get shutdown for multiple time");
824 return;
825 }
826 self.is_shutdown = true;
827 self.writer_count_guard.dec();
828 }
829
830 fn on_write_err(&mut self, err: &Error) {
831 GLOBAL_CONNECTION_METRICS
834 .io_err_rate
835 .with_guarded_label_values(&[
836 self.connection_type.as_str(),
837 self.endpoint.as_str(),
838 "write",
839 err.kind().to_string().as_str(),
840 ])
841 .inc();
842 }
843}