1use std::any::type_name;
16use std::cmp::Ordering;
17use std::future::Future;
18use std::io::{Error, IoSlice};
19use std::net::SocketAddr;
20use std::pin::Pin;
21use std::sync::LazyLock;
22use std::task::{Context, Poll};
23use std::time::Duration;
24
25use cfg_or_panic::cfg_or_panic;
26use futures::FutureExt;
27use http::Uri;
28use hyper_util::client::legacy::connect::dns::{GaiAddrs, GaiFuture, GaiResolver, Name};
29use hyper_util::client::legacy::connect::{Connected, Connection, HttpConnector};
30use hyper_util::rt::TokioIo;
31use itertools::Itertools;
32use pin_project_lite::pin_project;
33use prometheus::{
34 IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
35 register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
36};
37use thiserror_ext::AsReport;
38use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
39use tonic::transport::{Channel, Endpoint};
40use tower_service::Service;
41use tracing::{debug, info, warn};
42
43use crate::monitor::GLOBAL_METRICS_REGISTRY;
44use crate::{LabelGuardedIntCounterVec, register_guarded_int_counter_vec_with_registry};
45
46#[auto_impl::auto_impl(&mut)]
47pub trait MonitorAsyncReadWrite {
48 fn on_read(&mut self, _size: usize) {}
49 fn on_eof(&mut self) {}
50 fn on_read_err(&mut self, _err: &std::io::Error) {}
51
52 fn on_write(&mut self, _size: usize) {}
53 fn on_flush(&mut self) {}
54 fn on_shutdown(&mut self) {}
55 fn on_write_err(&mut self, _err: &std::io::Error) {}
56}
57
58pin_project! {
59 #[derive(Clone)]
60 pub struct MonitoredConnection<C, M> {
61 #[pin]
62 inner: C,
63 monitor: M,
64 }
65}
66
67impl<C, M> MonitoredConnection<C, M> {
68 pub fn new(connector: C, monitor: M) -> Self {
69 Self {
70 inner: connector,
71 monitor,
72 }
73 }
74
75 fn project_into(this: Pin<&mut Self>) -> (Pin<&mut C>, &mut M) {
76 let this = this.project();
77 (this.inner, this.monitor)
78 }
79
80 fn hyper_tokio_delegate(
82 self: Pin<&mut Self>,
83 ) -> TokioIo<MonitoredConnection<TokioIo<Pin<&mut C>>, &mut M>> {
84 let (inner, monitor) = MonitoredConnection::project_into(self);
85 TokioIo::new(MonitoredConnection::new(TokioIo::new(inner), monitor))
86 }
87}
88
89impl<C: AsyncRead, M: MonitorAsyncReadWrite> AsyncRead for MonitoredConnection<C, M> {
90 fn poll_read(
91 self: Pin<&mut Self>,
92 cx: &mut Context<'_>,
93 buf: &mut ReadBuf<'_>,
94 ) -> Poll<std::io::Result<()>> {
95 let before_buf_size = buf.filled().len();
96 let (inner, monitor) = MonitoredConnection::project_into(self);
97 let ret = inner.poll_read(cx, buf);
98 match &ret {
99 Poll::Ready(Ok(())) => {
100 let after_buf_size = buf.filled().len();
101 match after_buf_size.cmp(&before_buf_size) {
102 Ordering::Less => {
103 unreachable!(
104 "buf size decrease after poll read. Bad AsyncRead implementation on {}",
105 type_name::<C>()
106 );
107 }
108 Ordering::Equal => {
109 monitor.on_eof();
110 }
111 Ordering::Greater => {
112 monitor.on_read(after_buf_size - before_buf_size);
113 }
114 }
115 }
116 Poll::Ready(Err(e)) => {
117 monitor.on_read_err(e);
118 }
119 Poll::Pending => {}
120 }
121 ret
122 }
123}
124
125impl<C: hyper::rt::Read, M: MonitorAsyncReadWrite> hyper::rt::Read for MonitoredConnection<C, M> {
126 fn poll_read(
127 self: Pin<&mut Self>,
128 cx: &mut Context<'_>,
129 buf: hyper::rt::ReadBufCursor<'_>,
130 ) -> Poll<Result<(), std::io::Error>> {
131 hyper::rt::Read::poll_read(std::pin::pin!(self.hyper_tokio_delegate()), cx, buf)
132 }
133}
134
135impl<C: AsyncWrite, M: MonitorAsyncReadWrite> AsyncWrite for MonitoredConnection<C, M> {
136 fn poll_write(
137 self: Pin<&mut Self>,
138 cx: &mut Context<'_>,
139 buf: &[u8],
140 ) -> Poll<Result<usize, Error>> {
141 let (inner, monitor) = MonitoredConnection::project_into(self);
142 let ret = inner.poll_write(cx, buf);
143 match &ret {
144 Poll::Ready(Ok(size)) => {
145 monitor.on_write(*size);
146 }
147 Poll::Ready(Err(e)) => {
148 monitor.on_write_err(e);
149 }
150 Poll::Pending => {}
151 }
152 ret
153 }
154
155 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
156 let (inner, monitor) = MonitoredConnection::project_into(self);
157 let ret = inner.poll_flush(cx);
158 match &ret {
159 Poll::Ready(Ok(())) => {
160 monitor.on_flush();
161 }
162 Poll::Ready(Err(e)) => {
163 monitor.on_write_err(e);
164 }
165 Poll::Pending => {}
166 }
167 ret
168 }
169
170 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
171 let (inner, monitor) = MonitoredConnection::project_into(self);
172 let ret = inner.poll_shutdown(cx);
173 match &ret {
174 Poll::Ready(result) => {
175 monitor.on_shutdown();
176 if let Err(e) = result {
177 monitor.on_write_err(e);
178 }
179 }
180 Poll::Pending => {}
181 }
182 ret
183 }
184
185 fn poll_write_vectored(
186 self: Pin<&mut Self>,
187 cx: &mut Context<'_>,
188 bufs: &[IoSlice<'_>],
189 ) -> Poll<Result<usize, Error>> {
190 let (inner, monitor) = MonitoredConnection::project_into(self);
191 let ret = inner.poll_write_vectored(cx, bufs);
192 match &ret {
193 Poll::Ready(Ok(size)) => {
194 monitor.on_write(*size);
195 }
196 Poll::Ready(Err(e)) => {
197 monitor.on_write_err(e);
198 }
199 Poll::Pending => {}
200 }
201 ret
202 }
203
204 fn is_write_vectored(&self) -> bool {
205 self.inner.is_write_vectored()
206 }
207}
208
209impl<C: hyper::rt::Write, M: MonitorAsyncReadWrite> hyper::rt::Write for MonitoredConnection<C, M> {
210 fn poll_write(
211 self: Pin<&mut Self>,
212 cx: &mut Context<'_>,
213 buf: &[u8],
214 ) -> Poll<Result<usize, std::io::Error>> {
215 hyper::rt::Write::poll_write(std::pin::pin!(self.hyper_tokio_delegate()), cx, buf)
216 }
217
218 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
219 hyper::rt::Write::poll_flush(std::pin::pin!(self.hyper_tokio_delegate()), cx)
220 }
221
222 fn poll_shutdown(
223 self: Pin<&mut Self>,
224 cx: &mut Context<'_>,
225 ) -> Poll<Result<(), std::io::Error>> {
226 hyper::rt::Write::poll_shutdown(std::pin::pin!(self.hyper_tokio_delegate()), cx)
227 }
228
229 fn is_write_vectored(&self) -> bool {
230 self.inner.is_write_vectored()
231 }
232
233 fn poll_write_vectored(
234 self: Pin<&mut Self>,
235 cx: &mut Context<'_>,
236 bufs: &[std::io::IoSlice<'_>],
237 ) -> Poll<Result<usize, std::io::Error>> {
238 hyper::rt::Write::poll_write_vectored(std::pin::pin!(self.hyper_tokio_delegate()), cx, bufs)
239 }
240}
241
242impl<C: Connection, M> Connection for MonitoredConnection<C, M> {
243 fn connected(&self) -> Connected {
244 self.inner.connected()
245 }
246}
247
248#[cfg(not(madsim))]
249impl<C: tonic::transport::server::Connected, M> tonic::transport::server::Connected
250 for MonitoredConnection<C, M>
251{
252 type ConnectInfo = C::ConnectInfo;
253
254 fn connect_info(&self) -> Self::ConnectInfo {
255 self.inner.connect_info()
256 }
257}
258
259pub trait MonitorNewConnection {
260 type ConnectionMonitor: MonitorAsyncReadWrite;
261
262 fn new_connection_monitor(&self, endpoint: String) -> Self::ConnectionMonitor;
263 fn on_err(&self, endpoint: String);
264}
265
266impl<C: Service<Uri>, M: MonitorNewConnection + Clone + 'static> Service<Uri>
267 for MonitoredConnection<C, M>
268where
269 C::Future: 'static,
270{
271 type Error = C::Error;
272 type Response = MonitoredConnection<C::Response, M::ConnectionMonitor>;
273
274 type Future = impl Future<Output = Result<Self::Response, Self::Error>> + 'static;
275
276 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
277 let ret = self.inner.poll_ready(cx);
278 if let Poll::Ready(Err(_)) = &ret {
279 self.monitor.on_err("<poll_ready>".to_owned());
280 }
281 ret
282 }
283
284 fn call(&mut self, uri: Uri) -> Self::Future {
285 let endpoint = format!("{:?}", uri.host());
286 let monitor = self.monitor.clone();
287 self.inner
288 .call(uri)
289 .map(move |result: Result<_, _>| match result {
290 Ok(resp) => Ok(MonitoredConnection::new(
291 resp,
292 monitor.new_connection_monitor(endpoint),
293 )),
294 Err(e) => {
295 monitor.on_err(endpoint);
296 Err(e)
297 }
298 })
299 }
300}
301
302#[cfg(not(madsim))]
303impl<Con, E, C: futures::Stream<Item = Result<Con, E>>, M: MonitorNewConnection> futures::Stream
304 for MonitoredConnection<C, M>
305where
306 Con:
307 tonic::transport::server::Connected<ConnectInfo = tonic::transport::server::TcpConnectInfo>,
308{
309 type Item = Result<MonitoredConnection<Con, M::ConnectionMonitor>, E>;
310
311 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
312 let (inner, monitor) = MonitoredConnection::project_into(self);
313 inner.poll_next(cx).map(|opt| {
314 opt.map(|result| {
315 result.map(|conn| {
316 let remote_addr = conn.connect_info().remote_addr();
317 let endpoint = remote_addr
318 .map(|remote_addr| format!("{}", remote_addr.ip()))
319 .unwrap_or("unknown".to_owned());
320 MonitoredConnection::new(conn, monitor.new_connection_monitor(endpoint))
321 })
322 })
323 })
324 }
325
326 fn size_hint(&self) -> (usize, Option<usize>) {
327 self.inner.size_hint()
328 }
329}
330
331mod compat {
335 use http_02::Uri;
336 use hyper_014::client::connect::{Connected, Connection};
337
338 use super::*;
339
340 impl<C: Service<Uri>, M: MonitorNewConnection + Clone + 'static> Service<Uri>
341 for MonitoredConnection<C, M>
342 where
343 C::Future: 'static,
344 {
345 type Error = C::Error;
346 type Response = MonitoredConnection<C::Response, M::ConnectionMonitor>;
347
348 type Future = impl Future<Output = Result<Self::Response, Self::Error>> + 'static;
349
350 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
351 let ret = self.inner.poll_ready(cx);
352 if let Poll::Ready(Err(_)) = &ret {
353 self.monitor.on_err("<poll_ready>".to_owned());
354 }
355 ret
356 }
357
358 fn call(&mut self, uri: Uri) -> Self::Future {
359 let endpoint = format!("{:?}", uri.host());
360 let monitor = self.monitor.clone();
361 self.inner
362 .call(uri)
363 .map(move |result: Result<_, _>| match result {
364 Ok(resp) => Ok(MonitoredConnection::new(
365 resp,
366 monitor.new_connection_monitor(endpoint),
367 )),
368 Err(e) => {
369 monitor.on_err(endpoint);
370 Err(e)
371 }
372 })
373 }
374 }
375
376 impl<C: Connection, M> Connection for MonitoredConnection<C, M> {
377 fn connected(&self) -> Connected {
378 self.inner.connected()
379 }
380 }
381}
382
383#[derive(Clone)]
384pub struct ConnectionMetrics {
385 connection_count: IntGaugeVec,
386 connection_create_rate: IntCounterVec,
387 connection_err_rate: IntCounterVec,
388
389 read_rate: IntCounterVec,
390 reader_count: IntGaugeVec,
391
392 write_rate: IntCounterVec,
393 writer_count: IntGaugeVec,
394
395 io_err_rate: LabelGuardedIntCounterVec<4>,
396}
397
398pub static GLOBAL_CONNECTION_METRICS: LazyLock<ConnectionMetrics> =
399 LazyLock::new(|| ConnectionMetrics::new(&GLOBAL_METRICS_REGISTRY));
400
401impl ConnectionMetrics {
402 pub fn new(registry: &Registry) -> Self {
403 let labels = ["connection_type", "uri"];
404 let connection_count = register_int_gauge_vec_with_registry!(
405 "connection_count",
406 "The number of current existing connection",
407 &labels,
408 registry,
409 )
410 .unwrap();
411
412 let connection_create_rate = register_int_counter_vec_with_registry!(
413 "connection_create_rate",
414 "Rate on creating new connection",
415 &labels,
416 registry,
417 )
418 .unwrap();
419
420 let connection_err_rate = register_int_counter_vec_with_registry!(
421 "connection_err_rate",
422 "Error rate on creating new connection",
423 &labels,
424 registry,
425 )
426 .unwrap();
427
428 let read_rate = register_int_counter_vec_with_registry!(
429 "connection_read_rate",
430 "Read rate of a connection",
431 &labels,
432 registry,
433 )
434 .unwrap();
435
436 let reader_count = register_int_gauge_vec_with_registry!(
437 "connection_reader_count",
438 "The number of current existing reader",
439 &labels,
440 registry,
441 )
442 .unwrap();
443
444 let write_rate = register_int_counter_vec_with_registry!(
445 "connection_write_rate",
446 "Write rate of a connection",
447 &labels,
448 registry,
449 )
450 .unwrap();
451
452 let writer_count = register_int_gauge_vec_with_registry!(
453 "connection_writer_count",
454 "The number of current existing writer",
455 &labels,
456 registry,
457 )
458 .unwrap();
459
460 let io_err_rate = register_guarded_int_counter_vec_with_registry!(
461 "connection_io_err_rate",
462 "IO err rate of a connection",
463 &["connection_type", "uri", "op_type", "error_kind"],
464 registry,
465 )
466 .unwrap();
467
468 Self {
469 connection_count,
470 connection_create_rate,
471 connection_err_rate,
472 read_rate,
473 reader_count,
474 write_rate,
475 writer_count,
476 io_err_rate,
477 }
478 }
479}
480
481pub struct TcpConfig {
482 pub tcp_nodelay: bool,
483 pub keepalive_duration: Option<Duration>,
484}
485
486#[allow(clippy::derivable_impls)]
487impl Default for TcpConfig {
488 fn default() -> Self {
489 Self {
490 tcp_nodelay: false,
491 keepalive_duration: None,
492 }
493 }
494}
495
496pub fn monitor_connector<C>(
497 connector: C,
498 connection_type: impl Into<String>,
499) -> MonitoredConnection<C, MonitorNewConnectionImpl> {
500 let connection_type = connection_type.into();
501 info!(
502 "monitoring connector {} with type {}",
503 type_name::<C>(),
504 connection_type
505 );
506 MonitoredConnection::new(connector, MonitorNewConnectionImpl { connection_type })
507}
508
509pub struct MonitoredGaiAddrs {
510 inner: Vec<SocketAddr>,
511 pos: usize,
512}
513
514impl From<GaiAddrs> for MonitoredGaiAddrs {
515 fn from(value: GaiAddrs) -> Self {
516 Self {
517 inner: value.collect_vec(),
518 pos: 0,
519 }
520 }
521}
522
523impl Iterator for MonitoredGaiAddrs {
524 type Item = SocketAddr;
525
526 fn next(&mut self) -> Option<Self::Item> {
527 let res = self.inner.get(self.pos).cloned();
528 self.pos += 1;
529 res
530 }
531}
532
533pub struct MonitoredGaiFuture {
534 name: Name,
535 inner: GaiFuture,
536}
537
538impl std::fmt::Debug for MonitoredGaiFuture {
539 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
540 f.pad("MonitoredGaiFuture")
541 }
542}
543
544impl Future for MonitoredGaiFuture {
545 type Output = Result<MonitoredGaiAddrs, Error>;
546
547 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
548 Pin::new(&mut self.inner).poll(cx).map(|res| match res {
549 Ok(addrs) => {
550 let addrs: MonitoredGaiAddrs = addrs.into();
551 debug!("resolve {} => {:?}", self.name, addrs.inner);
552 Ok(addrs)
553 }
554 Err(err) => Err(err),
555 })
556 }
557}
558
559#[derive(Clone)]
560pub struct MonitoredGaiResolver {
561 inner: GaiResolver,
562}
563
564impl std::fmt::Debug for MonitoredGaiResolver {
565 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
566 f.pad("MonitoredGaiResolver")
567 }
568}
569
570impl Default for MonitoredGaiResolver {
571 fn default() -> Self {
572 Self {
573 inner: GaiResolver::new(),
574 }
575 }
576}
577
578impl Service<Name> for MonitoredGaiResolver {
579 type Error = Error;
580 type Future = MonitoredGaiFuture;
581 type Response = MonitoredGaiAddrs;
582
583 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
584 self.inner.poll_ready(cx)
585 }
586
587 fn call(&mut self, req: Name) -> Self::Future {
588 MonitoredGaiFuture {
589 name: req.clone(),
590 inner: self.inner.call(req),
591 }
592 }
593}
594
595#[cfg_or_panic(not(madsim))]
596fn monitored_http_connector(
597 connection_type: impl Into<String>,
598 config: TcpConfig,
599) -> MonitoredConnection<HttpConnector<MonitoredGaiResolver>, MonitorNewConnectionImpl> {
600 let resolver = MonitoredGaiResolver::default();
601 let mut http = HttpConnector::new_with_resolver(resolver);
602
603 http.enforce_http(false);
604 http.set_nodelay(config.tcp_nodelay);
605 http.set_keepalive(config.keepalive_duration);
606
607 monitor_connector(http, connection_type)
608}
609
610#[cfg_or_panic(not(madsim))]
612fn configure_endpoint(endpoint: Endpoint) -> Endpoint {
613 endpoint.http2_max_header_list_size(16 * 1024 * 1024)
616}
617
618#[easy_ext::ext(EndpointExt)]
619impl Endpoint {
620 pub async fn monitored_connect(
621 mut self,
622 connection_type: impl Into<String>,
623 config: TcpConfig,
624 ) -> Result<Channel, tonic::transport::Error> {
625 #[cfg(not(madsim))]
626 {
627 self = configure_endpoint(self);
628 let connector = monitored_http_connector(connection_type, config);
629 self.connect_with_connector(connector).await
630 }
631 #[cfg(madsim)]
632 {
633 self.connect().await
634 }
635 }
636
637 #[cfg(not(madsim))]
638 pub fn monitored_connect_lazy(
639 mut self,
640 connection_type: impl Into<String>,
641 config: TcpConfig,
642 ) -> Channel {
643 self = configure_endpoint(self);
644 let connector = monitored_http_connector(connection_type, config);
645 self.connect_with_connector_lazy(connector)
646 }
647}
648
649#[cfg(not(madsim))]
650#[easy_ext::ext(RouterExt)]
651impl<L> tonic::transport::server::Router<L> {
652 pub fn monitored_serve_with_shutdown<ResBody>(
658 self,
659 listen_addr: std::net::SocketAddr,
660 connection_type: impl Into<String>,
661 config: TcpConfig,
662 signal: impl Future<Output = ()>,
663 ) -> impl Future<Output = ()>
664 where
665 L: tower_layer::Layer<tonic::service::Routes>,
666 L::Service: Service<http::Request<tonic::body::BoxBody>, Response = http::Response<ResBody>>
667 + Clone
668 + Send
669 + 'static,
670 <<L as tower_layer::Layer<tonic::service::Routes>>::Service as Service<
671 http::Request<tonic::body::BoxBody>,
672 >>::Future: Send + 'static,
673 <<L as tower_layer::Layer<tonic::service::Routes>>::Service as Service<
674 http::Request<tonic::body::BoxBody>,
675 >>::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send,
676 ResBody: http_body::Body<Data = bytes::Bytes> + Send + 'static,
677 ResBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
678 {
679 let connection_type = connection_type.into();
680 let incoming = tonic::transport::server::TcpIncoming::new(
681 listen_addr,
682 config.tcp_nodelay,
683 config.keepalive_duration,
684 )
685 .unwrap_or_else(|err| {
686 panic!(
687 "failed to bind `{connection_type}` to `{listen_addr}`: {}",
688 err.as_report()
689 )
690 });
691 let incoming =
692 MonitoredConnection::new(incoming, MonitorNewConnectionImpl { connection_type });
693
694 async move {
695 self.serve_with_incoming_shutdown(incoming, signal)
696 .await
697 .unwrap()
698 }
699 }
700}
701
702#[cfg(madsim)]
703#[easy_ext::ext(RouterExt)]
704impl<L> tonic::transport::server::Router<L> {
705 pub async fn monitored_serve_with_shutdown(
706 self,
707 listen_addr: std::net::SocketAddr,
708 connection_type: impl Into<String>,
709 config: TcpConfig,
710 signal: impl Future<Output = ()>,
711 ) {
712 self.serve_with_shutdown(listen_addr, signal).await.unwrap()
713 }
714}
715
716#[derive(Clone)]
717pub struct MonitorNewConnectionImpl {
718 connection_type: String,
719}
720
721impl MonitorNewConnection for MonitorNewConnectionImpl {
722 type ConnectionMonitor = MonitorAsyncReadWriteImpl;
723
724 fn new_connection_monitor(&self, endpoint: String) -> Self::ConnectionMonitor {
725 let labels = [self.connection_type.as_str(), endpoint.as_str()];
726 let read_rate = GLOBAL_CONNECTION_METRICS
727 .read_rate
728 .with_label_values(&labels);
729 let reader_count = GLOBAL_CONNECTION_METRICS
730 .reader_count
731 .with_label_values(&labels);
732 let write_rate = GLOBAL_CONNECTION_METRICS
733 .write_rate
734 .with_label_values(&labels);
735 let writer_count = GLOBAL_CONNECTION_METRICS
736 .writer_count
737 .with_label_values(&labels);
738 let connection_count = GLOBAL_CONNECTION_METRICS
739 .connection_count
740 .with_label_values(&labels);
741
742 GLOBAL_CONNECTION_METRICS
743 .connection_create_rate
744 .with_label_values(&labels)
745 .inc();
746
747 MonitorAsyncReadWriteImpl::new(
748 endpoint,
749 self.connection_type.clone(),
750 read_rate,
751 reader_count,
752 write_rate,
753 writer_count,
754 connection_count,
755 )
756 }
757
758 fn on_err(&self, endpoint: String) {
759 GLOBAL_CONNECTION_METRICS
760 .connection_err_rate
761 .with_label_values(&[self.connection_type.as_str(), endpoint.as_str()])
762 .inc();
763 }
764}
765
766const READ_WRITE_RATE_REPORT_INTERVAL: u64 = 1024;
767
768pub struct MonitorAsyncReadWriteImpl {
769 endpoint: String,
770 connection_type: String,
771
772 unreported_read_rate: u64,
773 read_rate: IntCounter,
774 reader_count_guard: IntGauge,
775 is_eof: bool,
776
777 unreported_write_rate: u64,
778 write_rate: IntCounter,
779 writer_count_guard: IntGauge,
780 is_shutdown: bool,
781
782 connection_count_guard: IntGauge,
783}
784
785impl MonitorAsyncReadWriteImpl {
786 pub fn new(
787 endpoint: String,
788 connection_type: String,
789 read_rate: IntCounter,
790 reader_count: IntGauge,
791 write_rate: IntCounter,
792 writer_count: IntGauge,
793 connection_count: IntGauge,
794 ) -> Self {
795 reader_count.inc();
796 writer_count.inc();
797 connection_count.inc();
798 Self {
799 endpoint,
800 connection_type,
801 unreported_read_rate: 0,
802 read_rate,
803 reader_count_guard: reader_count,
804 is_eof: false,
805 unreported_write_rate: 0,
806 write_rate,
807 writer_count_guard: writer_count,
808 is_shutdown: false,
809 connection_count_guard: connection_count,
810 }
811 }
812}
813
814impl Drop for MonitorAsyncReadWriteImpl {
815 fn drop(&mut self) {
816 if self.unreported_read_rate > 0 {
817 self.read_rate.inc_by(self.unreported_read_rate);
818 }
819 if self.unreported_write_rate > 0 {
820 self.write_rate.inc_by(self.unreported_write_rate);
821 }
822 if !self.is_eof {
823 self.reader_count_guard.dec();
824 }
825 if !self.is_shutdown {
826 self.writer_count_guard.dec();
827 }
828 self.connection_count_guard.dec();
829 }
830}
831
832impl MonitorAsyncReadWrite for MonitorAsyncReadWriteImpl {
833 fn on_read(&mut self, size: usize) {
834 self.unreported_read_rate += size as u64;
835 if self.unreported_read_rate >= READ_WRITE_RATE_REPORT_INTERVAL {
836 self.read_rate.inc_by(self.unreported_read_rate);
837 self.unreported_read_rate = 0;
838 }
839 }
840
841 fn on_eof(&mut self) {
842 if self.is_eof {
843 warn!("get eof for multiple time");
844 return;
845 }
846 self.is_eof = true;
847 self.reader_count_guard.dec();
848 }
849
850 fn on_read_err(&mut self, err: &Error) {
851 GLOBAL_CONNECTION_METRICS
854 .io_err_rate
855 .with_guarded_label_values(&[
856 self.connection_type.as_str(),
857 self.endpoint.as_str(),
858 "read",
859 err.kind().to_string().as_str(),
860 ])
861 .inc();
862 }
863
864 fn on_write(&mut self, size: usize) {
865 self.unreported_write_rate += size as u64;
866 if self.unreported_write_rate >= READ_WRITE_RATE_REPORT_INTERVAL {
867 self.write_rate.inc_by(self.unreported_write_rate);
868 self.unreported_write_rate = 0;
869 }
870 }
871
872 fn on_shutdown(&mut self) {
873 if self.is_shutdown {
874 warn!("get shutdown for multiple time");
875 return;
876 }
877 self.is_shutdown = true;
878 self.writer_count_guard.dec();
879 }
880
881 fn on_write_err(&mut self, err: &Error) {
882 GLOBAL_CONNECTION_METRICS
885 .io_err_rate
886 .with_guarded_label_values(&[
887 self.connection_type.as_str(),
888 self.endpoint.as_str(),
889 "write",
890 err.kind().to_string().as_str(),
891 ])
892 .inc();
893 }
894}