risingwave_compute/rpc/service/
monitor_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ffi::CString;
17use std::fs;
18use std::path::Path;
19use std::time::Duration;
20
21use foyer::{HybridCache, TracingOptions};
22use itertools::Itertools;
23use prometheus::core::Collector;
24use prometheus::proto::Metric;
25use risingwave_common::config::{MetricLevel, ServerConfig};
26use risingwave_common_heap_profiling::{AUTO_DUMP_SUFFIX, COLLAPSED_SUFFIX, MANUALLY_DUMP_SUFFIX};
27use risingwave_hummock_sdk::HummockSstableObjectId;
28use risingwave_jni_core::jvm_runtime::dump_jvm_stack_traces;
29use risingwave_pb::monitor_service::monitor_service_server::MonitorService;
30use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
31use risingwave_pb::monitor_service::{
32    AnalyzeHeapRequest, AnalyzeHeapResponse, ChannelStats, FragmentStats, GetProfileStatsRequest,
33    GetProfileStatsResponse, GetStreamingStatsRequest, GetStreamingStatsResponse,
34    HeapProfilingRequest, HeapProfilingResponse, ListHeapProfilingRequest,
35    ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse, RelationStats,
36    StackTraceRequest, StackTraceResponse, TieredCacheTracingRequest, TieredCacheTracingResponse,
37};
38use risingwave_rpc_client::error::ToTonicStatus;
39use risingwave_storage::hummock::compactor::await_tree_key::Compaction;
40use risingwave_storage::hummock::{Block, Sstable, SstableBlockIndex};
41use risingwave_stream::executor::monitor::global_streaming_metrics;
42use risingwave_stream::task::LocalStreamManager;
43use risingwave_stream::task::await_tree_key::{Actor, BarrierAwait};
44use thiserror_ext::AsReport;
45use tonic::{Code, Request, Response, Status};
46
47type MetaCache = HybridCache<HummockSstableObjectId, Box<Sstable>>;
48type BlockCache = HybridCache<SstableBlockIndex, Box<Block>>;
49
50#[derive(Clone)]
51pub struct MonitorServiceImpl {
52    stream_mgr: LocalStreamManager,
53    server_config: ServerConfig,
54    meta_cache: Option<MetaCache>,
55    block_cache: Option<BlockCache>,
56}
57
58impl MonitorServiceImpl {
59    pub fn new(
60        stream_mgr: LocalStreamManager,
61        server_config: ServerConfig,
62        meta_cache: Option<MetaCache>,
63        block_cache: Option<BlockCache>,
64    ) -> Self {
65        Self {
66            stream_mgr,
67            server_config,
68            meta_cache,
69            block_cache,
70        }
71    }
72}
73
74#[async_trait::async_trait]
75impl MonitorService for MonitorServiceImpl {
76    async fn stack_trace(
77        &self,
78        request: Request<StackTraceRequest>,
79    ) -> Result<Response<StackTraceResponse>, Status> {
80        let req = request.into_inner();
81
82        let actor_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() {
83            reg.collect::<Actor>()
84                .into_iter()
85                .map(|(k, v)| {
86                    (
87                        k.0,
88                        if req.actor_traces_format == ActorTracesFormat::Text as i32 {
89                            v.to_string()
90                        } else {
91                            serde_json::to_string(&v).unwrap()
92                        },
93                    )
94                })
95                .collect()
96        } else {
97            Default::default()
98        };
99
100        let barrier_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() {
101            reg.collect::<BarrierAwait>()
102                .into_iter()
103                .map(|(k, v)| (k.prev_epoch, v.to_string()))
104                .collect()
105        } else {
106            Default::default()
107        };
108
109        let rpc_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() {
110            reg.collect::<GrpcCall>()
111                .into_iter()
112                .map(|(k, v)| (k.desc, v.to_string()))
113                .collect()
114        } else {
115            Default::default()
116        };
117
118        let compaction_task_traces = if let Some(hummock) =
119            self.stream_mgr.env.state_store().as_hummock()
120            && let Some(m) = hummock.compaction_await_tree_reg()
121        {
122            m.collect::<Compaction>()
123                .into_iter()
124                .map(|(k, v)| (format!("{k:?}"), v.to_string()))
125                .collect()
126        } else {
127            Default::default()
128        };
129
130        let barrier_worker_state = self.stream_mgr.inspect_barrier_state().await?;
131
132        let jvm_stack_traces = match dump_jvm_stack_traces() {
133            Ok(None) => None,
134            Err(err) => Some(err.as_report().to_string()),
135            Ok(Some(stack_traces)) => Some(stack_traces),
136        };
137
138        Ok(Response::new(StackTraceResponse {
139            actor_traces,
140            rpc_traces,
141            compaction_task_traces,
142            inflight_barrier_traces: barrier_traces,
143            barrier_worker_state: BTreeMap::from_iter([(
144                self.stream_mgr.env.worker_id(),
145                barrier_worker_state,
146            )]),
147            jvm_stack_traces: match jvm_stack_traces {
148                Some(stack_traces) => {
149                    BTreeMap::from_iter([(self.stream_mgr.env.worker_id(), stack_traces)])
150                }
151                None => BTreeMap::new(),
152            },
153            meta_traces: Default::default(),
154        }))
155    }
156
157    async fn profiling(
158        &self,
159        request: Request<ProfilingRequest>,
160    ) -> Result<Response<ProfilingResponse>, Status> {
161        if std::env::var("RW_PROFILE_PATH").is_ok() {
162            return Err(Status::internal(
163                "Profiling is already running by setting RW_PROFILE_PATH",
164            ));
165        }
166        let time = request.into_inner().get_sleep_s();
167        let guard = pprof::ProfilerGuardBuilder::default()
168            .blocklist(&["libc", "libgcc", "pthread", "vdso"])
169            .build()
170            .unwrap();
171        tokio::time::sleep(Duration::from_secs(time)).await;
172        let mut buf = vec![];
173        match guard.report().build() {
174            Ok(report) => {
175                report.flamegraph(&mut buf).unwrap();
176                tracing::info!("succeed to generate flamegraph");
177                Ok(Response::new(ProfilingResponse { result: buf }))
178            }
179            Err(err) => {
180                tracing::warn!(error = %err.as_report(), "failed to generate flamegraph");
181                Err(err.to_status(Code::Internal, "monitor"))
182            }
183        }
184    }
185
186    async fn heap_profiling(
187        &self,
188        request: Request<HeapProfilingRequest>,
189    ) -> Result<Response<HeapProfilingResponse>, Status> {
190        use std::fs::create_dir_all;
191        use std::path::PathBuf;
192
193        use tikv_jemalloc_ctl;
194
195        if !cfg!(target_os = "linux") {
196            return Err(Status::unimplemented(
197                "heap profiling is only implemented on Linux",
198            ));
199        }
200
201        if !tikv_jemalloc_ctl::opt::prof::read().unwrap() {
202            return Err(Status::failed_precondition(
203                "Jemalloc profiling is not enabled on the node. Try start the node with `MALLOC_CONF=prof:true`",
204            ));
205        }
206
207        let time_prefix = chrono::Local::now().format("%Y-%m-%d-%H-%M-%S");
208        let file_name = format!("{}.{}", time_prefix, MANUALLY_DUMP_SUFFIX);
209        let arg_dir = request.into_inner().dir;
210        let dir = PathBuf::from(if arg_dir.is_empty() {
211            &self.server_config.heap_profiling.dir
212        } else {
213            &arg_dir
214        });
215        create_dir_all(&dir)?;
216
217        let file_path_buf = dir.join(file_name);
218        let file_path = file_path_buf
219            .to_str()
220            .ok_or_else(|| Status::internal("The file dir is not a UTF-8 String"))?;
221        let file_path_c =
222            CString::new(file_path).map_err(|_| Status::internal("0 byte in file path"))?;
223
224        // FIXME(yuhao): `unsafe` here because `jemalloc_dump_mib.write` requires static lifetime
225        if let Err(e) =
226            tikv_jemalloc_ctl::prof::dump::write(unsafe { &*(file_path_c.as_c_str() as *const _) })
227        {
228            tracing::warn!("Manually Jemalloc dump heap file failed! {:?}", e);
229            Err(Status::internal(e.to_string()))
230        } else {
231            tracing::info!("Manually Jemalloc dump heap file created: {}", file_path);
232            Ok(Response::new(HeapProfilingResponse {}))
233        }
234    }
235
236    async fn list_heap_profiling(
237        &self,
238        _request: Request<ListHeapProfilingRequest>,
239    ) -> Result<Response<ListHeapProfilingResponse>, Status> {
240        let dump_dir = self.server_config.heap_profiling.dir.clone();
241        let auto_dump_files_name: Vec<_> = fs::read_dir(dump_dir.clone())?
242            .map(|entry| {
243                let entry = entry?;
244                Ok::<_, Status>(entry.file_name().to_string_lossy().to_string())
245            })
246            .filter(|name| {
247                if let Ok(name) = name {
248                    name.contains(AUTO_DUMP_SUFFIX) && !name.ends_with(COLLAPSED_SUFFIX)
249                } else {
250                    true
251                }
252            })
253            .try_collect()?;
254        let manually_dump_files_name: Vec<_> = fs::read_dir(dump_dir.clone())?
255            .map(|entry| {
256                let entry = entry?;
257                Ok::<_, Status>(entry.file_name().to_string_lossy().to_string())
258            })
259            .filter(|name| {
260                if let Ok(name) = name {
261                    name.contains(MANUALLY_DUMP_SUFFIX) && !name.ends_with(COLLAPSED_SUFFIX)
262                } else {
263                    true
264                }
265            })
266            .try_collect()?;
267
268        Ok(Response::new(ListHeapProfilingResponse {
269            dir: dump_dir,
270            name_auto: auto_dump_files_name,
271            name_manually: manually_dump_files_name,
272        }))
273    }
274
275    async fn analyze_heap(
276        &self,
277        request: Request<AnalyzeHeapRequest>,
278    ) -> Result<Response<AnalyzeHeapResponse>, Status> {
279        let dumped_path_str = request.into_inner().get_path().clone();
280        let collapsed_path_str = format!("{}.{}", dumped_path_str, COLLAPSED_SUFFIX);
281        let collapsed_path = Path::new(&collapsed_path_str);
282
283        // run jeprof if the target was not analyzed before
284        if !collapsed_path.exists() {
285            risingwave_common_heap_profiling::jeprof::run(
286                dumped_path_str,
287                collapsed_path_str.clone(),
288            )
289            .await
290            .map_err(|e| e.to_status(Code::Internal, "monitor"))?;
291        }
292
293        let file = fs::read(Path::new(&collapsed_path_str))?;
294        Ok(Response::new(AnalyzeHeapResponse { result: file }))
295    }
296
297    async fn get_profile_stats(
298        &self,
299        request: Request<GetProfileStatsRequest>,
300    ) -> Result<Response<GetProfileStatsResponse>, Status> {
301        let metrics = global_streaming_metrics(MetricLevel::Info);
302        let inner = request.into_inner();
303        let executor_ids = &inner.executor_ids;
304        let fragment_ids = HashSet::from_iter(inner.dispatcher_fragment_ids.into_iter());
305        let stream_node_output_row_count = metrics
306            .mem_stream_node_output_row_count
307            .collect(executor_ids);
308        let stream_node_output_blocking_duration_ns = metrics
309            .mem_stream_node_output_blocking_duration_ns
310            .collect(executor_ids);
311
312        // Collect count metrics by fragment_ids
313        fn collect_by_fragment_ids<T: Collector>(
314            m: &T,
315            fragment_ids: &HashSet<u32>,
316        ) -> HashMap<u32, u64> {
317            let mut metrics = HashMap::new();
318            for mut metric_family in m.collect() {
319                for metric in metric_family.take_metric() {
320                    let fragment_id = get_label_infallible(&metric, "fragment_id");
321                    if fragment_ids.contains(&fragment_id) {
322                        let entry = metrics.entry(fragment_id).or_insert(0);
323                        *entry += metric.get_counter().value() as u64;
324                    }
325                }
326            }
327            metrics
328        }
329
330        let dispatch_fragment_output_row_count =
331            collect_by_fragment_ids(&metrics.actor_out_record_cnt, &fragment_ids);
332        let dispatch_fragment_output_blocking_duration_ns = collect_by_fragment_ids(
333            &metrics.actor_output_buffer_blocking_duration_ns,
334            &fragment_ids,
335        );
336        Ok(Response::new(GetProfileStatsResponse {
337            stream_node_output_row_count,
338            stream_node_output_blocking_duration_ns,
339            dispatch_fragment_output_row_count,
340            dispatch_fragment_output_blocking_duration_ns,
341        }))
342    }
343
344    async fn get_streaming_stats(
345        &self,
346        _request: Request<GetStreamingStatsRequest>,
347    ) -> Result<Response<GetStreamingStatsResponse>, Status> {
348        let metrics = global_streaming_metrics(MetricLevel::Info);
349
350        fn collect<T: Collector>(m: &T) -> Vec<Metric> {
351            m.collect().into_iter().next().unwrap().take_metric()
352        }
353
354        let actor_output_buffer_blocking_duration_ns =
355            collect(&metrics.actor_output_buffer_blocking_duration_ns);
356        let actor_count = collect(&metrics.actor_count);
357
358        let actor_count: HashMap<_, _> = actor_count
359            .iter()
360            .map(|m| {
361                let fragment_id: u32 = get_label_infallible(m, "fragment_id");
362                let count = m.get_gauge().value() as u32;
363                (fragment_id, count)
364            })
365            .collect();
366
367        let mut fragment_stats: HashMap<u32, FragmentStats> = HashMap::new();
368        for (&fragment_id, &actor_count) in &actor_count {
369            fragment_stats.insert(
370                fragment_id,
371                FragmentStats {
372                    actor_count,
373                    current_epoch: 0,
374                },
375            );
376        }
377
378        let actor_current_epoch = collect(&metrics.actor_current_epoch);
379        for m in &actor_current_epoch {
380            let fragment_id: u32 = get_label_infallible(m, "fragment_id");
381            let epoch = m.get_gauge().value() as u64;
382            if let Some(s) = fragment_stats.get_mut(&fragment_id) {
383                s.current_epoch = if s.current_epoch == 0 {
384                    epoch
385                } else {
386                    u64::min(s.current_epoch, epoch)
387                }
388            } else {
389                warn!(
390                    fragment_id = fragment_id,
391                    "Miss corresponding actor count metrics"
392                );
393            }
394        }
395
396        let mut relation_stats: HashMap<u32, RelationStats> = HashMap::new();
397        let mview_current_epoch = collect(&metrics.materialize_current_epoch);
398        for m in &mview_current_epoch {
399            let table_id: u32 = get_label_infallible(m, "table_id");
400            let epoch = m.get_gauge().value() as u64;
401            if let Some(s) = relation_stats.get_mut(&table_id) {
402                s.current_epoch = if s.current_epoch == 0 {
403                    epoch
404                } else {
405                    u64::min(s.current_epoch, epoch)
406                };
407                s.actor_count += 1;
408            } else {
409                relation_stats.insert(
410                    table_id,
411                    RelationStats {
412                        actor_count: 1,
413                        current_epoch: epoch,
414                    },
415                );
416            }
417        }
418
419        let mut channel_stats: BTreeMap<String, ChannelStats> = BTreeMap::new();
420
421        for metric in actor_output_buffer_blocking_duration_ns {
422            let fragment_id: u32 = get_label_infallible(&metric, "fragment_id");
423            let downstream_fragment_id: u32 =
424                get_label_infallible(&metric, "downstream_fragment_id");
425
426            let key = format!("{}_{}", fragment_id, downstream_fragment_id);
427            let channel_stat = channel_stats.entry(key).or_insert_with(|| ChannelStats {
428                actor_count: 0,
429                output_blocking_duration: 0.,
430                recv_row_count: 0,
431                send_row_count: 0,
432            });
433
434            // When metrics level is Debug, `actor_id` will be removed to reduce metrics.
435            // See `src/common/metrics/src/relabeled_metric.rs`
436            channel_stat.actor_count +=
437                if get_label_infallible::<String>(&metric, "actor_id").is_empty() {
438                    actor_count[&fragment_id]
439                } else {
440                    1
441                };
442            channel_stat.output_blocking_duration += metric.get_counter().value();
443        }
444
445        let actor_output_row_count = collect(&metrics.actor_out_record_cnt);
446        for metric in actor_output_row_count {
447            let fragment_id: u32 = get_label_infallible(&metric, "fragment_id");
448
449            // Find out and write to all downstream channels
450            let key_prefix = format!("{}_", fragment_id);
451            let key_range_end = format!("{}`", fragment_id); // '`' is next to `_`
452            for (_, s) in channel_stats.range_mut(key_prefix..key_range_end) {
453                s.send_row_count += metric.get_counter().value() as u64;
454            }
455        }
456
457        let actor_input_row_count = collect(&metrics.actor_in_record_cnt);
458        for metric in actor_input_row_count {
459            let upstream_fragment_id: u32 = get_label_infallible(&metric, "upstream_fragment_id");
460            let fragment_id: u32 = get_label_infallible(&metric, "fragment_id");
461
462            let key = format!("{}_{}", upstream_fragment_id, fragment_id);
463            if let Some(s) = channel_stats.get_mut(&key) {
464                s.recv_row_count += metric.get_counter().value() as u64;
465            }
466        }
467
468        let channel_stats = channel_stats.into_iter().collect();
469        Ok(Response::new(GetStreamingStatsResponse {
470            channel_stats,
471            fragment_stats,
472            relation_stats,
473        }))
474    }
475
476    async fn tiered_cache_tracing(
477        &self,
478        request: Request<TieredCacheTracingRequest>,
479    ) -> Result<Response<TieredCacheTracingResponse>, Status> {
480        let req = request.into_inner();
481
482        tracing::info!("Update tiered cache tracing config: {req:?}");
483
484        if let Some(cache) = &self.meta_cache {
485            if req.enable {
486                cache.enable_tracing();
487            } else {
488                cache.disable_tracing();
489            }
490            let mut options = TracingOptions::new();
491            if let Some(threshold) = req.record_hybrid_insert_threshold_ms {
492                options = options
493                    .with_record_hybrid_insert_threshold(Duration::from_millis(threshold as _));
494            }
495            if let Some(threshold) = req.record_hybrid_get_threshold_ms {
496                options =
497                    options.with_record_hybrid_get_threshold(Duration::from_millis(threshold as _));
498            }
499            if let Some(threshold) = req.record_hybrid_obtain_threshold_ms {
500                options = options
501                    .with_record_hybrid_obtain_threshold(Duration::from_millis(threshold as _));
502            }
503            if let Some(threshold) = req.record_hybrid_remove_threshold_ms {
504                options = options
505                    .with_record_hybrid_remove_threshold(Duration::from_millis(threshold as _));
506            }
507            if let Some(threshold) = req.record_hybrid_fetch_threshold_ms {
508                options = options
509                    .with_record_hybrid_fetch_threshold(Duration::from_millis(threshold as _));
510            }
511            cache.update_tracing_options(options);
512        }
513
514        if let Some(cache) = &self.block_cache {
515            if req.enable {
516                cache.enable_tracing();
517            } else {
518                cache.disable_tracing();
519            }
520            let mut options = TracingOptions::new();
521            if let Some(threshold) = req.record_hybrid_insert_threshold_ms {
522                options = options
523                    .with_record_hybrid_insert_threshold(Duration::from_millis(threshold as _));
524            }
525            if let Some(threshold) = req.record_hybrid_get_threshold_ms {
526                options =
527                    options.with_record_hybrid_get_threshold(Duration::from_millis(threshold as _));
528            }
529            if let Some(threshold) = req.record_hybrid_obtain_threshold_ms {
530                options = options
531                    .with_record_hybrid_obtain_threshold(Duration::from_millis(threshold as _));
532            }
533            if let Some(threshold) = req.record_hybrid_remove_threshold_ms {
534                options = options
535                    .with_record_hybrid_remove_threshold(Duration::from_millis(threshold as _));
536            }
537            if let Some(threshold) = req.record_hybrid_fetch_threshold_ms {
538                options = options
539                    .with_record_hybrid_fetch_threshold(Duration::from_millis(threshold as _));
540            }
541            cache.update_tracing_options(options);
542        }
543
544        Ok(Response::new(TieredCacheTracingResponse::default()))
545    }
546}
547
548pub use grpc_middleware::*;
549use risingwave_common::metrics::get_label_infallible;
550
551pub mod grpc_middleware {
552    use std::sync::Arc;
553    use std::sync::atomic::{AtomicU64, Ordering};
554    use std::task::{Context, Poll};
555
556    use either::Either;
557    use futures::Future;
558    use tonic::body::BoxBody;
559    use tower::{Layer, Service};
560
561    /// Manages the await-trees of `gRPC` requests that are currently served by the compute node.
562    pub type AwaitTreeRegistryRef = await_tree::Registry;
563
564    /// Await-tree key type for gRPC calls.
565    #[derive(Debug, Clone, PartialEq, Eq, Hash)]
566    pub struct GrpcCall {
567        pub desc: String,
568    }
569
570    #[derive(Clone)]
571    pub struct AwaitTreeMiddlewareLayer {
572        registry: Option<AwaitTreeRegistryRef>,
573    }
574
575    impl AwaitTreeMiddlewareLayer {
576        pub fn new(registry: AwaitTreeRegistryRef) -> Self {
577            Self {
578                registry: Some(registry),
579            }
580        }
581
582        pub fn new_optional(registry: Option<AwaitTreeRegistryRef>) -> Self {
583            Self { registry }
584        }
585    }
586
587    impl<S> Layer<S> for AwaitTreeMiddlewareLayer {
588        type Service = AwaitTreeMiddleware<S>;
589
590        fn layer(&self, service: S) -> Self::Service {
591            AwaitTreeMiddleware {
592                inner: service,
593                registry: self.registry.clone(),
594                next_id: Default::default(),
595            }
596        }
597    }
598
599    #[derive(Clone)]
600    pub struct AwaitTreeMiddleware<S> {
601        inner: S,
602        registry: Option<AwaitTreeRegistryRef>,
603        next_id: Arc<AtomicU64>,
604    }
605
606    impl<S> Service<http::Request<BoxBody>> for AwaitTreeMiddleware<S>
607    where
608        S: Service<http::Request<BoxBody>> + Clone,
609    {
610        type Error = S::Error;
611        type Response = S::Response;
612
613        type Future = impl Future<Output = Result<Self::Response, Self::Error>>;
614
615        fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
616            self.inner.poll_ready(cx)
617        }
618
619        fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {
620            let Some(registry) = self.registry.clone() else {
621                return Either::Left(self.inner.call(req));
622            };
623
624            // This is necessary because tonic internally uses `tower::buffer::Buffer`.
625            // See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
626            // for details on why this is necessary
627            let clone = self.inner.clone();
628            let mut inner = std::mem::replace(&mut self.inner, clone);
629
630            let id = self.next_id.fetch_add(1, Ordering::SeqCst);
631            let desc = if let Some(authority) = req.uri().authority() {
632                format!("{authority} - {id}")
633            } else {
634                format!("?? - {id}")
635            };
636            let key = GrpcCall { desc };
637
638            Either::Right(async move {
639                let root = registry.register(key, req.uri().path());
640
641                root.instrument(inner.call(req)).await
642            })
643        }
644    }
645
646    #[cfg(not(madsim))]
647    impl<S: tonic::server::NamedService> tonic::server::NamedService for AwaitTreeMiddleware<S> {
648        const NAME: &'static str = S::NAME;
649    }
650}