risingwave_compute/rpc/service/
monitor_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ffi::CString;
17use std::fs;
18use std::path::Path;
19use std::time::Duration;
20
21use foyer::{HybridCache, TracingOptions};
22use itertools::Itertools;
23use prometheus::core::Collector;
24use prometheus::proto::Metric;
25use risingwave_common::config::{MetricLevel, ServerConfig};
26use risingwave_common_heap_profiling::{AUTO_DUMP_SUFFIX, COLLAPSED_SUFFIX, MANUALLY_DUMP_SUFFIX};
27use risingwave_hummock_sdk::HummockSstableObjectId;
28use risingwave_jni_core::jvm_runtime::dump_jvm_stack_traces;
29use risingwave_pb::monitor_service::monitor_service_server::MonitorService;
30use risingwave_pb::monitor_service::{
31    AnalyzeHeapRequest, AnalyzeHeapResponse, ChannelStats, FragmentStats, GetProfileStatsRequest,
32    GetProfileStatsResponse, GetStreamingStatsRequest, GetStreamingStatsResponse,
33    HeapProfilingRequest, HeapProfilingResponse, ListHeapProfilingRequest,
34    ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse, RelationStats,
35    StackTraceRequest, StackTraceResponse, TieredCacheTracingRequest, TieredCacheTracingResponse,
36};
37use risingwave_rpc_client::error::ToTonicStatus;
38use risingwave_storage::hummock::compactor::await_tree_key::Compaction;
39use risingwave_storage::hummock::{Block, Sstable, SstableBlockIndex};
40use risingwave_stream::executor::monitor::global_streaming_metrics;
41use risingwave_stream::task::LocalStreamManager;
42use risingwave_stream::task::await_tree_key::{Actor, BarrierAwait};
43use thiserror_ext::AsReport;
44use tonic::{Code, Request, Response, Status};
45
46type MetaCache = HybridCache<HummockSstableObjectId, Box<Sstable>>;
47type BlockCache = HybridCache<SstableBlockIndex, Box<Block>>;
48
49#[derive(Clone)]
50pub struct MonitorServiceImpl {
51    stream_mgr: LocalStreamManager,
52    server_config: ServerConfig,
53    meta_cache: Option<MetaCache>,
54    block_cache: Option<BlockCache>,
55}
56
57impl MonitorServiceImpl {
58    pub fn new(
59        stream_mgr: LocalStreamManager,
60        server_config: ServerConfig,
61        meta_cache: Option<MetaCache>,
62        block_cache: Option<BlockCache>,
63    ) -> Self {
64        Self {
65            stream_mgr,
66            server_config,
67            meta_cache,
68            block_cache,
69        }
70    }
71}
72
73#[async_trait::async_trait]
74impl MonitorService for MonitorServiceImpl {
75    #[cfg_attr(coverage, coverage(off))]
76    async fn stack_trace(
77        &self,
78        request: Request<StackTraceRequest>,
79    ) -> Result<Response<StackTraceResponse>, Status> {
80        let _req = request.into_inner();
81
82        let actor_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() {
83            reg.collect::<Actor>()
84                .into_iter()
85                .map(|(k, v)| (k.0, v.to_string()))
86                .collect()
87        } else {
88            Default::default()
89        };
90
91        let barrier_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() {
92            reg.collect::<BarrierAwait>()
93                .into_iter()
94                .map(|(k, v)| (k.prev_epoch, v.to_string()))
95                .collect()
96        } else {
97            Default::default()
98        };
99
100        let rpc_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() {
101            reg.collect::<GrpcCall>()
102                .into_iter()
103                .map(|(k, v)| (k.desc, v.to_string()))
104                .collect()
105        } else {
106            Default::default()
107        };
108
109        let compaction_task_traces = if let Some(hummock) =
110            self.stream_mgr.env.state_store().as_hummock()
111            && let Some(m) = hummock.compaction_await_tree_reg()
112        {
113            m.collect::<Compaction>()
114                .into_iter()
115                .map(|(k, v)| (format!("{k:?}"), v.to_string()))
116                .collect()
117        } else {
118            Default::default()
119        };
120
121        let barrier_worker_state = self.stream_mgr.inspect_barrier_state().await?;
122
123        let jvm_stack_traces = match dump_jvm_stack_traces() {
124            Ok(None) => None,
125            Err(err) => Some(err.as_report().to_string()),
126            Ok(Some(stack_traces)) => Some(stack_traces),
127        };
128
129        Ok(Response::new(StackTraceResponse {
130            actor_traces,
131            rpc_traces,
132            compaction_task_traces,
133            inflight_barrier_traces: barrier_traces,
134            barrier_worker_state: BTreeMap::from_iter([(
135                self.stream_mgr.env.worker_id(),
136                barrier_worker_state,
137            )]),
138            jvm_stack_traces: match jvm_stack_traces {
139                Some(stack_traces) => {
140                    BTreeMap::from_iter([(self.stream_mgr.env.worker_id(), stack_traces)])
141                }
142                None => BTreeMap::new(),
143            },
144        }))
145    }
146
147    #[cfg_attr(coverage, coverage(off))]
148    async fn profiling(
149        &self,
150        request: Request<ProfilingRequest>,
151    ) -> Result<Response<ProfilingResponse>, Status> {
152        if std::env::var("RW_PROFILE_PATH").is_ok() {
153            return Err(Status::internal(
154                "Profiling is already running by setting RW_PROFILE_PATH",
155            ));
156        }
157        let time = request.into_inner().get_sleep_s();
158        let guard = pprof::ProfilerGuardBuilder::default()
159            .blocklist(&["libc", "libgcc", "pthread", "vdso"])
160            .build()
161            .unwrap();
162        tokio::time::sleep(Duration::from_secs(time)).await;
163        let mut buf = vec![];
164        match guard.report().build() {
165            Ok(report) => {
166                report.flamegraph(&mut buf).unwrap();
167                tracing::info!("succeed to generate flamegraph");
168                Ok(Response::new(ProfilingResponse { result: buf }))
169            }
170            Err(err) => {
171                tracing::warn!(error = %err.as_report(), "failed to generate flamegraph");
172                Err(err.to_status(Code::Internal, "monitor"))
173            }
174        }
175    }
176
177    #[cfg_attr(coverage, coverage(off))]
178    async fn heap_profiling(
179        &self,
180        request: Request<HeapProfilingRequest>,
181    ) -> Result<Response<HeapProfilingResponse>, Status> {
182        use std::fs::create_dir_all;
183        use std::path::PathBuf;
184
185        use tikv_jemalloc_ctl;
186
187        if !cfg!(target_os = "linux") {
188            return Err(Status::unimplemented(
189                "heap profiling is only implemented on Linux",
190            ));
191        }
192
193        if !tikv_jemalloc_ctl::opt::prof::read().unwrap() {
194            return Err(Status::failed_precondition(
195                "Jemalloc profiling is not enabled on the node. Try start the node with `MALLOC_CONF=prof:true`",
196            ));
197        }
198
199        let time_prefix = chrono::Local::now().format("%Y-%m-%d-%H-%M-%S");
200        let file_name = format!("{}.{}", time_prefix, MANUALLY_DUMP_SUFFIX);
201        let arg_dir = request.into_inner().dir;
202        let dir = PathBuf::from(if arg_dir.is_empty() {
203            &self.server_config.heap_profiling.dir
204        } else {
205            &arg_dir
206        });
207        create_dir_all(&dir)?;
208
209        let file_path_buf = dir.join(file_name);
210        let file_path = file_path_buf
211            .to_str()
212            .ok_or_else(|| Status::internal("The file dir is not a UTF-8 String"))?;
213        let file_path_c =
214            CString::new(file_path).map_err(|_| Status::internal("0 byte in file path"))?;
215
216        // FIXME(yuhao): `unsafe` here because `jemalloc_dump_mib.write` requires static lifetime
217        if let Err(e) =
218            tikv_jemalloc_ctl::prof::dump::write(unsafe { &*(file_path_c.as_c_str() as *const _) })
219        {
220            tracing::warn!("Manually Jemalloc dump heap file failed! {:?}", e);
221            Err(Status::internal(e.to_string()))
222        } else {
223            tracing::info!("Manually Jemalloc dump heap file created: {}", file_path);
224            Ok(Response::new(HeapProfilingResponse {}))
225        }
226    }
227
228    #[cfg_attr(coverage, coverage(off))]
229    async fn list_heap_profiling(
230        &self,
231        _request: Request<ListHeapProfilingRequest>,
232    ) -> Result<Response<ListHeapProfilingResponse>, Status> {
233        let dump_dir = self.server_config.heap_profiling.dir.clone();
234        let auto_dump_files_name: Vec<_> = fs::read_dir(dump_dir.clone())?
235            .map(|entry| {
236                let entry = entry?;
237                Ok::<_, Status>(entry.file_name().to_string_lossy().to_string())
238            })
239            .filter(|name| {
240                if let Ok(name) = name {
241                    name.contains(AUTO_DUMP_SUFFIX) && !name.ends_with(COLLAPSED_SUFFIX)
242                } else {
243                    true
244                }
245            })
246            .try_collect()?;
247        let manually_dump_files_name: Vec<_> = fs::read_dir(dump_dir.clone())?
248            .map(|entry| {
249                let entry = entry?;
250                Ok::<_, Status>(entry.file_name().to_string_lossy().to_string())
251            })
252            .filter(|name| {
253                if let Ok(name) = name {
254                    name.contains(MANUALLY_DUMP_SUFFIX) && !name.ends_with(COLLAPSED_SUFFIX)
255                } else {
256                    true
257                }
258            })
259            .try_collect()?;
260
261        Ok(Response::new(ListHeapProfilingResponse {
262            dir: dump_dir,
263            name_auto: auto_dump_files_name,
264            name_manually: manually_dump_files_name,
265        }))
266    }
267
268    #[cfg_attr(coverage, coverage(off))]
269    async fn analyze_heap(
270        &self,
271        request: Request<AnalyzeHeapRequest>,
272    ) -> Result<Response<AnalyzeHeapResponse>, Status> {
273        let dumped_path_str = request.into_inner().get_path().clone();
274        let collapsed_path_str = format!("{}.{}", dumped_path_str, COLLAPSED_SUFFIX);
275        let collapsed_path = Path::new(&collapsed_path_str);
276
277        // run jeprof if the target was not analyzed before
278        if !collapsed_path.exists() {
279            risingwave_common_heap_profiling::jeprof::run(
280                dumped_path_str,
281                collapsed_path_str.clone(),
282            )
283            .await
284            .map_err(|e| e.to_status(Code::Internal, "monitor"))?;
285        }
286
287        let file = fs::read(Path::new(&collapsed_path_str))?;
288        Ok(Response::new(AnalyzeHeapResponse { result: file }))
289    }
290
291    async fn get_profile_stats(
292        &self,
293        request: Request<GetProfileStatsRequest>,
294    ) -> Result<Response<GetProfileStatsResponse>, Status> {
295        let metrics = global_streaming_metrics(MetricLevel::Info);
296        let inner = request.into_inner();
297        let executor_ids = &inner.executor_ids;
298        let fragment_ids = HashSet::from_iter(inner.dispatcher_fragment_ids.into_iter());
299        let stream_node_output_row_count = metrics
300            .mem_stream_node_output_row_count
301            .collect(executor_ids);
302        let stream_node_output_blocking_duration_ns = metrics
303            .mem_stream_node_output_blocking_duration_ns
304            .collect(executor_ids);
305
306        // Collect count metrics by fragment_ids
307        fn collect_by_fragment_ids<T: Collector>(
308            m: &T,
309            fragment_ids: &HashSet<u32>,
310        ) -> HashMap<u32, u64> {
311            let mut metrics = HashMap::new();
312            for mut metric_family in m.collect() {
313                for metric in metric_family.take_metric() {
314                    let fragment_id = get_label_infallible(&metric, "fragment_id");
315                    if fragment_ids.contains(&fragment_id) {
316                        let entry = metrics.entry(fragment_id).or_insert(0);
317                        *entry += metric.get_counter().get_value() as u64;
318                    }
319                }
320            }
321            metrics
322        }
323
324        let dispatch_fragment_output_row_count =
325            collect_by_fragment_ids(&metrics.actor_out_record_cnt, &fragment_ids);
326        let dispatch_fragment_output_blocking_duration_ns = collect_by_fragment_ids(
327            &metrics.actor_output_buffer_blocking_duration_ns,
328            &fragment_ids,
329        );
330        Ok(Response::new(GetProfileStatsResponse {
331            stream_node_output_row_count,
332            stream_node_output_blocking_duration_ns,
333            dispatch_fragment_output_row_count,
334            dispatch_fragment_output_blocking_duration_ns,
335        }))
336    }
337
338    #[cfg_attr(coverage, coverage(off))]
339    async fn get_streaming_stats(
340        &self,
341        _request: Request<GetStreamingStatsRequest>,
342    ) -> Result<Response<GetStreamingStatsResponse>, Status> {
343        let metrics = global_streaming_metrics(MetricLevel::Info);
344
345        fn collect<T: Collector>(m: &T) -> Vec<Metric> {
346            m.collect()
347                .into_iter()
348                .next()
349                .unwrap()
350                .take_metric()
351                .into_vec()
352        }
353
354        let actor_output_buffer_blocking_duration_ns =
355            collect(&metrics.actor_output_buffer_blocking_duration_ns);
356        let actor_count = collect(&metrics.actor_count);
357
358        let actor_count: HashMap<_, _> = actor_count
359            .iter()
360            .map(|m| {
361                let fragment_id: u32 = get_label_infallible(m, "fragment_id");
362                let count = m.get_gauge().get_value() as u32;
363                (fragment_id, count)
364            })
365            .collect();
366
367        let mut fragment_stats: HashMap<u32, FragmentStats> = HashMap::new();
368        for (&fragment_id, &actor_count) in &actor_count {
369            fragment_stats.insert(
370                fragment_id,
371                FragmentStats {
372                    actor_count,
373                    current_epoch: 0,
374                },
375            );
376        }
377
378        let actor_current_epoch = collect(&metrics.actor_current_epoch);
379        for m in &actor_current_epoch {
380            let fragment_id: u32 = get_label_infallible(m, "fragment_id");
381            let epoch = m.get_gauge().get_value() as u64;
382            if let Some(s) = fragment_stats.get_mut(&fragment_id) {
383                s.current_epoch = if s.current_epoch == 0 {
384                    epoch
385                } else {
386                    u64::min(s.current_epoch, epoch)
387                }
388            } else {
389                warn!(
390                    fragment_id = fragment_id,
391                    "Miss corresponding actor count metrics"
392                );
393            }
394        }
395
396        let mut relation_stats: HashMap<u32, RelationStats> = HashMap::new();
397        let mview_current_epoch = collect(&metrics.materialize_current_epoch);
398        for m in &mview_current_epoch {
399            let table_id: u32 = get_label_infallible(m, "table_id");
400            let epoch = m.get_gauge().get_value() as u64;
401            if let Some(s) = relation_stats.get_mut(&table_id) {
402                s.current_epoch = if s.current_epoch == 0 {
403                    epoch
404                } else {
405                    u64::min(s.current_epoch, epoch)
406                };
407                s.actor_count += 1;
408            } else {
409                relation_stats.insert(
410                    table_id,
411                    RelationStats {
412                        actor_count: 1,
413                        current_epoch: epoch,
414                    },
415                );
416            }
417        }
418
419        let mut channel_stats: BTreeMap<String, ChannelStats> = BTreeMap::new();
420
421        for metric in actor_output_buffer_blocking_duration_ns {
422            let fragment_id: u32 = get_label_infallible(&metric, "fragment_id");
423            let downstream_fragment_id: u32 =
424                get_label_infallible(&metric, "downstream_fragment_id");
425
426            let key = format!("{}_{}", fragment_id, downstream_fragment_id);
427            let channel_stat = channel_stats.entry(key).or_insert_with(|| ChannelStats {
428                actor_count: 0,
429                output_blocking_duration: 0.,
430                recv_row_count: 0,
431                send_row_count: 0,
432            });
433
434            // When metrics level is Debug, `actor_id` will be removed to reduce metrics.
435            // See `src/common/metrics/src/relabeled_metric.rs`
436            channel_stat.actor_count +=
437                if get_label_infallible::<String>(&metric, "actor_id").is_empty() {
438                    actor_count[&fragment_id]
439                } else {
440                    1
441                };
442            channel_stat.output_blocking_duration += metric.get_counter().get_value();
443        }
444
445        let actor_output_row_count = collect(&metrics.actor_out_record_cnt);
446        for metric in actor_output_row_count {
447            let fragment_id: u32 = get_label_infallible(&metric, "fragment_id");
448
449            // Find out and write to all downstream channels
450            let key_prefix = format!("{}_", fragment_id);
451            let key_range_end = format!("{}`", fragment_id); // '`' is next to `_`
452            for (_, s) in channel_stats.range_mut(key_prefix..key_range_end) {
453                s.send_row_count += metric.get_counter().get_value() as u64;
454            }
455        }
456
457        let actor_input_row_count = collect(&metrics.actor_in_record_cnt);
458        for metric in actor_input_row_count {
459            let upstream_fragment_id: u32 = get_label_infallible(&metric, "upstream_fragment_id");
460            let fragment_id: u32 = get_label_infallible(&metric, "fragment_id");
461
462            let key = format!("{}_{}", upstream_fragment_id, fragment_id);
463            if let Some(s) = channel_stats.get_mut(&key) {
464                s.recv_row_count += metric.get_counter().get_value() as u64;
465            }
466        }
467
468        let channel_stats = channel_stats.into_iter().collect();
469        Ok(Response::new(GetStreamingStatsResponse {
470            channel_stats,
471            fragment_stats,
472            relation_stats,
473        }))
474    }
475
476    #[cfg_attr(coverage, coverage(off))]
477    async fn tiered_cache_tracing(
478        &self,
479        request: Request<TieredCacheTracingRequest>,
480    ) -> Result<Response<TieredCacheTracingResponse>, Status> {
481        let req = request.into_inner();
482
483        tracing::info!("Update tiered cache tracing config: {req:?}");
484
485        if let Some(cache) = &self.meta_cache {
486            if req.enable {
487                cache.enable_tracing();
488            } else {
489                cache.disable_tracing();
490            }
491            let mut options = TracingOptions::new();
492            if let Some(threshold) = req.record_hybrid_insert_threshold_ms {
493                options = options
494                    .with_record_hybrid_insert_threshold(Duration::from_millis(threshold as _));
495            }
496            if let Some(threshold) = req.record_hybrid_get_threshold_ms {
497                options =
498                    options.with_record_hybrid_get_threshold(Duration::from_millis(threshold as _));
499            }
500            if let Some(threshold) = req.record_hybrid_obtain_threshold_ms {
501                options = options
502                    .with_record_hybrid_obtain_threshold(Duration::from_millis(threshold as _));
503            }
504            if let Some(threshold) = req.record_hybrid_remove_threshold_ms {
505                options = options
506                    .with_record_hybrid_remove_threshold(Duration::from_millis(threshold as _));
507            }
508            if let Some(threshold) = req.record_hybrid_fetch_threshold_ms {
509                options = options
510                    .with_record_hybrid_fetch_threshold(Duration::from_millis(threshold as _));
511            }
512            cache.update_tracing_options(options);
513        }
514
515        if let Some(cache) = &self.block_cache {
516            if req.enable {
517                cache.enable_tracing();
518            } else {
519                cache.disable_tracing();
520            }
521            let mut options = TracingOptions::new();
522            if let Some(threshold) = req.record_hybrid_insert_threshold_ms {
523                options = options
524                    .with_record_hybrid_insert_threshold(Duration::from_millis(threshold as _));
525            }
526            if let Some(threshold) = req.record_hybrid_get_threshold_ms {
527                options =
528                    options.with_record_hybrid_get_threshold(Duration::from_millis(threshold as _));
529            }
530            if let Some(threshold) = req.record_hybrid_obtain_threshold_ms {
531                options = options
532                    .with_record_hybrid_obtain_threshold(Duration::from_millis(threshold as _));
533            }
534            if let Some(threshold) = req.record_hybrid_remove_threshold_ms {
535                options = options
536                    .with_record_hybrid_remove_threshold(Duration::from_millis(threshold as _));
537            }
538            if let Some(threshold) = req.record_hybrid_fetch_threshold_ms {
539                options = options
540                    .with_record_hybrid_fetch_threshold(Duration::from_millis(threshold as _));
541            }
542            cache.update_tracing_options(options);
543        }
544
545        Ok(Response::new(TieredCacheTracingResponse::default()))
546    }
547}
548
549pub use grpc_middleware::*;
550use risingwave_common::metrics::get_label_infallible;
551
552pub mod grpc_middleware {
553    use std::sync::Arc;
554    use std::sync::atomic::{AtomicU64, Ordering};
555    use std::task::{Context, Poll};
556
557    use either::Either;
558    use futures::Future;
559    use tonic::body::BoxBody;
560    use tower::{Layer, Service};
561
562    /// Manages the await-trees of `gRPC` requests that are currently served by the compute node.
563    pub type AwaitTreeRegistryRef = await_tree::Registry;
564
565    /// Await-tree key type for gRPC calls.
566    #[derive(Debug, Clone, PartialEq, Eq, Hash)]
567    pub struct GrpcCall {
568        pub desc: String,
569    }
570
571    #[derive(Clone)]
572    pub struct AwaitTreeMiddlewareLayer {
573        registry: Option<AwaitTreeRegistryRef>,
574    }
575
576    impl AwaitTreeMiddlewareLayer {
577        pub fn new(registry: AwaitTreeRegistryRef) -> Self {
578            Self {
579                registry: Some(registry),
580            }
581        }
582
583        pub fn new_optional(registry: Option<AwaitTreeRegistryRef>) -> Self {
584            Self { registry }
585        }
586    }
587
588    impl<S> Layer<S> for AwaitTreeMiddlewareLayer {
589        type Service = AwaitTreeMiddleware<S>;
590
591        fn layer(&self, service: S) -> Self::Service {
592            AwaitTreeMiddleware {
593                inner: service,
594                registry: self.registry.clone(),
595                next_id: Default::default(),
596            }
597        }
598    }
599
600    #[derive(Clone)]
601    pub struct AwaitTreeMiddleware<S> {
602        inner: S,
603        registry: Option<AwaitTreeRegistryRef>,
604        next_id: Arc<AtomicU64>,
605    }
606
607    impl<S> Service<http::Request<BoxBody>> for AwaitTreeMiddleware<S>
608    where
609        S: Service<http::Request<BoxBody>> + Clone,
610    {
611        type Error = S::Error;
612        type Response = S::Response;
613
614        type Future = impl Future<Output = Result<Self::Response, Self::Error>>;
615
616        fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
617            self.inner.poll_ready(cx)
618        }
619
620        fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {
621            let Some(registry) = self.registry.clone() else {
622                return Either::Left(self.inner.call(req));
623            };
624
625            // This is necessary because tonic internally uses `tower::buffer::Buffer`.
626            // See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
627            // for details on why this is necessary
628            let clone = self.inner.clone();
629            let mut inner = std::mem::replace(&mut self.inner, clone);
630
631            let id = self.next_id.fetch_add(1, Ordering::SeqCst);
632            let desc = if let Some(authority) = req.uri().authority() {
633                format!("{authority} - {id}")
634            } else {
635                format!("?? - {id}")
636            };
637            let key = GrpcCall { desc };
638
639            Either::Right(async move {
640                let root = registry.register(key, req.uri().path());
641
642                root.instrument(inner.call(req)).await
643            })
644        }
645    }
646
647    #[cfg(not(madsim))]
648    impl<S: tonic::server::NamedService> tonic::server::NamedService for AwaitTreeMiddleware<S> {
649        const NAME: &'static str = S::NAME;
650    }
651}