risingwave_compute/rpc/service/
monitor_service.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::time::Duration;
17
18use foyer::{HybridCache, TracingOptions};
19use prometheus::core::Collector;
20use prometheus::proto::Metric;
21use risingwave_common::config::{MetricLevel, ServerConfig};
22use risingwave_common_heap_profiling::ProfileServiceImpl;
23use risingwave_hummock_sdk::HummockSstableObjectId;
24use risingwave_jni_core::jvm_runtime::dump_jvm_stack_traces;
25use risingwave_pb::monitor_service::monitor_service_server::MonitorService;
26use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
27use risingwave_pb::monitor_service::{
28    AnalyzeHeapRequest, AnalyzeHeapResponse, ChannelStats, FragmentStats, GetProfileStatsRequest,
29    GetProfileStatsResponse, GetStreamingStatsRequest, GetStreamingStatsResponse,
30    HeapProfilingRequest, HeapProfilingResponse, ListHeapProfilingRequest,
31    ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse, RelationStats,
32    StackTraceRequest, StackTraceResponse, TieredCacheTracingRequest, TieredCacheTracingResponse,
33};
34use risingwave_storage::hummock::compactor::await_tree_key::Compaction;
35use risingwave_storage::hummock::{Block, Sstable, SstableBlockIndex};
36use risingwave_stream::executor::monitor::global_streaming_metrics;
37use risingwave_stream::task::LocalStreamManager;
38use risingwave_stream::task::await_tree_key::{Actor, BarrierAwait};
39use thiserror_ext::AsReport;
40use tonic::{Request, Response, Status};
41
42type MetaCache = HybridCache<HummockSstableObjectId, Box<Sstable>>;
43type BlockCache = HybridCache<SstableBlockIndex, Box<Block>>;
44
45#[derive(Clone)]
46pub struct MonitorServiceImpl {
47    stream_mgr: LocalStreamManager,
48    profile_service: ProfileServiceImpl,
49    meta_cache: Option<MetaCache>,
50    block_cache: Option<BlockCache>,
51}
52
53impl MonitorServiceImpl {
54    pub fn new(
55        stream_mgr: LocalStreamManager,
56        server_config: ServerConfig,
57        meta_cache: Option<MetaCache>,
58        block_cache: Option<BlockCache>,
59    ) -> Self {
60        Self {
61            stream_mgr,
62            profile_service: ProfileServiceImpl::new(server_config),
63            meta_cache,
64            block_cache,
65        }
66    }
67}
68
69#[async_trait::async_trait]
70impl MonitorService for MonitorServiceImpl {
71    async fn stack_trace(
72        &self,
73        request: Request<StackTraceRequest>,
74    ) -> Result<Response<StackTraceResponse>, Status> {
75        let req = request.into_inner();
76
77        let actor_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() {
78            reg.collect::<Actor>()
79                .into_iter()
80                .map(|(k, v)| {
81                    (
82                        k.0.as_raw_id(),
83                        if req.actor_traces_format == ActorTracesFormat::Text as i32 {
84                            v.to_string()
85                        } else {
86                            serde_json::to_string(&v).unwrap()
87                        },
88                    )
89                })
90                .collect()
91        } else {
92            Default::default()
93        };
94
95        let barrier_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() {
96            reg.collect::<BarrierAwait>()
97                .into_iter()
98                .map(|(k, v)| (k.prev_epoch, v.to_string()))
99                .collect()
100        } else {
101            Default::default()
102        };
103
104        let rpc_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() {
105            reg.collect::<GrpcCall>()
106                .into_iter()
107                .map(|(k, v)| (k.desc, v.to_string()))
108                .collect()
109        } else {
110            Default::default()
111        };
112
113        let compaction_task_traces = if let Some(hummock) =
114            self.stream_mgr.env.state_store().as_hummock()
115            && let Some(m) = hummock.compaction_await_tree_reg()
116        {
117            m.collect::<Compaction>()
118                .into_iter()
119                .map(|(k, v)| (format!("{k:?}"), v.to_string()))
120                .collect()
121        } else {
122            Default::default()
123        };
124
125        let barrier_worker_state = self.stream_mgr.inspect_barrier_state().await?;
126
127        let jvm_stack_traces = match dump_jvm_stack_traces() {
128            Ok(None) => None,
129            Err(err) => Some(err.as_report().to_string()),
130            Ok(Some(stack_traces)) => Some(stack_traces),
131        };
132
133        Ok(Response::new(StackTraceResponse {
134            actor_traces,
135            rpc_traces,
136            compaction_task_traces,
137            inflight_barrier_traces: barrier_traces,
138            barrier_worker_state: BTreeMap::from_iter([(
139                self.stream_mgr.env.worker_id(),
140                barrier_worker_state,
141            )]),
142            jvm_stack_traces: match jvm_stack_traces {
143                Some(stack_traces) => {
144                    BTreeMap::from_iter([(self.stream_mgr.env.worker_id(), stack_traces)])
145                }
146                None => BTreeMap::new(),
147            },
148            meta_traces: Default::default(),
149            node_errors: Default::default(),
150        }))
151    }
152
153    async fn profiling(
154        &self,
155        request: Request<ProfilingRequest>,
156    ) -> Result<Response<ProfilingResponse>, Status> {
157        self.profile_service.profiling(request).await
158    }
159
160    async fn heap_profiling(
161        &self,
162        request: Request<HeapProfilingRequest>,
163    ) -> Result<Response<HeapProfilingResponse>, Status> {
164        self.profile_service.heap_profiling(request).await
165    }
166
167    async fn list_heap_profiling(
168        &self,
169        _request: Request<ListHeapProfilingRequest>,
170    ) -> Result<Response<ListHeapProfilingResponse>, Status> {
171        self.profile_service.list_heap_profiling(_request).await
172    }
173
174    async fn analyze_heap(
175        &self,
176        request: Request<AnalyzeHeapRequest>,
177    ) -> Result<Response<AnalyzeHeapResponse>, Status> {
178        self.profile_service.analyze_heap(request).await
179    }
180
181    async fn get_profile_stats(
182        &self,
183        request: Request<GetProfileStatsRequest>,
184    ) -> Result<Response<GetProfileStatsResponse>, Status> {
185        let metrics = global_streaming_metrics(MetricLevel::Info);
186        let inner = request.into_inner();
187        let executor_ids = &inner.executor_ids;
188        let fragment_ids = HashSet::from_iter(inner.dispatcher_fragment_ids);
189        let stream_node_output_row_count = metrics
190            .mem_stream_node_output_row_count
191            .collect(executor_ids);
192        let stream_node_output_blocking_duration_ns = metrics
193            .mem_stream_node_output_blocking_duration_ns
194            .collect(executor_ids);
195
196        // Collect count metrics by fragment_ids
197        fn collect_by_fragment_ids<T: Collector>(
198            m: &T,
199            fragment_ids: &HashSet<FragmentId>,
200        ) -> HashMap<FragmentId, u64> {
201            let mut metrics = HashMap::new();
202            for mut metric_family in m.collect() {
203                for metric in metric_family.take_metric() {
204                    let fragment_id = get_label_infallible(&metric, "fragment_id");
205                    if fragment_ids.contains(&fragment_id) {
206                        let entry = metrics.entry(fragment_id).or_insert(0);
207                        *entry += metric.get_counter().value() as u64;
208                    }
209                }
210            }
211            metrics
212        }
213
214        let dispatch_fragment_output_row_count =
215            collect_by_fragment_ids(&metrics.actor_out_record_cnt, &fragment_ids);
216        let dispatch_fragment_output_blocking_duration_ns = collect_by_fragment_ids(
217            &metrics.actor_output_buffer_blocking_duration_ns,
218            &fragment_ids,
219        );
220        Ok(Response::new(GetProfileStatsResponse {
221            stream_node_output_row_count,
222            stream_node_output_blocking_duration_ns,
223            dispatch_fragment_output_row_count,
224            dispatch_fragment_output_blocking_duration_ns,
225        }))
226    }
227
228    async fn get_streaming_stats(
229        &self,
230        _request: Request<GetStreamingStatsRequest>,
231    ) -> Result<Response<GetStreamingStatsResponse>, Status> {
232        let metrics = global_streaming_metrics(MetricLevel::Info);
233
234        fn collect<T: Collector>(m: &T) -> Vec<Metric> {
235            m.collect().into_iter().next().unwrap().take_metric()
236        }
237
238        let actor_output_buffer_blocking_duration_ns =
239            collect(&metrics.actor_output_buffer_blocking_duration_ns);
240        let actor_count = collect(&metrics.actor_count);
241
242        let actor_count: HashMap<_, _> = actor_count
243            .iter()
244            .map(|m| {
245                let fragment_id: u32 = get_label_infallible(m, "fragment_id");
246                let count = m.get_gauge().value() as u32;
247                (fragment_id, count)
248            })
249            .collect();
250
251        let mut fragment_stats: HashMap<u32, FragmentStats> = HashMap::new();
252        for (&fragment_id, &actor_count) in &actor_count {
253            fragment_stats.insert(
254                fragment_id,
255                FragmentStats {
256                    actor_count,
257                    current_epoch: 0,
258                },
259            );
260        }
261
262        let actor_current_epoch = collect(&metrics.actor_current_epoch);
263        for m in &actor_current_epoch {
264            let fragment_id: u32 = get_label_infallible(m, "fragment_id");
265            let epoch = m.get_gauge().value() as u64;
266            if let Some(s) = fragment_stats.get_mut(&fragment_id) {
267                s.current_epoch = if s.current_epoch == 0 {
268                    epoch
269                } else {
270                    u64::min(s.current_epoch, epoch)
271                }
272            } else {
273                warn!(
274                    fragment_id = fragment_id,
275                    "Miss corresponding actor count metrics"
276                );
277            }
278        }
279
280        let mut relation_stats: HashMap<u32, RelationStats> = HashMap::new();
281        let mview_current_epoch = collect(&metrics.materialize_current_epoch);
282        for m in &mview_current_epoch {
283            let table_id: u32 = get_label_infallible(m, "table_id");
284            let epoch = m.get_gauge().value() as u64;
285            if let Some(s) = relation_stats.get_mut(&table_id) {
286                s.current_epoch = if s.current_epoch == 0 {
287                    epoch
288                } else {
289                    u64::min(s.current_epoch, epoch)
290                };
291                s.actor_count += 1;
292            } else {
293                relation_stats.insert(
294                    table_id,
295                    RelationStats {
296                        actor_count: 1,
297                        current_epoch: epoch,
298                    },
299                );
300            }
301        }
302
303        let mut channel_stats: BTreeMap<String, ChannelStats> = BTreeMap::new();
304
305        for metric in actor_output_buffer_blocking_duration_ns {
306            let fragment_id: u32 = get_label_infallible(&metric, "fragment_id");
307            let downstream_fragment_id: u32 =
308                get_label_infallible(&metric, "downstream_fragment_id");
309
310            let actor_count_to_add =
311                if get_label_infallible::<String>(&metric, "actor_id").is_empty() {
312                    match actor_count.get(&fragment_id) {
313                        Some(&count) => count,
314                        None => {
315                            // Metrics can be momentarily inconsistent (or stale) across families.
316                            // Skip this channel instead of crashing the compute node.
317                            warn!(
318                                fragment_id = fragment_id,
319                                downstream_fragment_id = downstream_fragment_id,
320                                "Miss corresponding actor count metrics"
321                            );
322                            continue;
323                        }
324                    }
325                } else {
326                    1
327                };
328
329            let key = format!("{}_{}", fragment_id, downstream_fragment_id);
330            let channel_stat = channel_stats.entry(key).or_insert_with(|| ChannelStats {
331                actor_count: 0,
332                output_blocking_duration: 0.,
333                recv_row_count: 0,
334                send_row_count: 0,
335            });
336
337            // When metrics level is Debug, `actor_id` will be removed to reduce metrics.
338            // See `src/common/metrics/src/relabeled_metric.rs`
339            channel_stat.actor_count += actor_count_to_add;
340            channel_stat.output_blocking_duration += metric.get_counter().value();
341        }
342
343        let actor_output_row_count = collect(&metrics.actor_out_record_cnt);
344        for metric in actor_output_row_count {
345            let fragment_id: u32 = get_label_infallible(&metric, "fragment_id");
346
347            // Find out and write to all downstream channels
348            let key_prefix = format!("{}_", fragment_id);
349            let key_range_end = format!("{}`", fragment_id); // '`' is next to `_`
350            for (_, s) in channel_stats.range_mut(key_prefix..key_range_end) {
351                s.send_row_count += metric.get_counter().value() as u64;
352            }
353        }
354
355        let actor_input_row_count = collect(&metrics.actor_in_record_cnt);
356        for metric in actor_input_row_count {
357            let upstream_fragment_id: u32 = get_label_infallible(&metric, "upstream_fragment_id");
358            let fragment_id: u32 = get_label_infallible(&metric, "fragment_id");
359
360            let key = format!("{}_{}", upstream_fragment_id, fragment_id);
361            if let Some(s) = channel_stats.get_mut(&key) {
362                s.recv_row_count += metric.get_counter().value() as u64;
363            }
364        }
365
366        let channel_stats = channel_stats.into_iter().collect();
367        Ok(Response::new(GetStreamingStatsResponse {
368            channel_stats,
369            fragment_stats,
370            relation_stats,
371        }))
372    }
373
374    async fn tiered_cache_tracing(
375        &self,
376        request: Request<TieredCacheTracingRequest>,
377    ) -> Result<Response<TieredCacheTracingResponse>, Status> {
378        let req = request.into_inner();
379
380        tracing::info!("Update tiered cache tracing config: {req:?}");
381
382        if let Some(cache) = &self.meta_cache {
383            if req.enable {
384                cache.enable_tracing();
385            } else {
386                cache.disable_tracing();
387            }
388            let mut options = TracingOptions::new();
389            if let Some(threshold) = req.record_hybrid_insert_threshold_ms {
390                options = options
391                    .with_record_hybrid_insert_threshold(Duration::from_millis(threshold as _));
392            }
393            if let Some(threshold) = req.record_hybrid_get_threshold_ms {
394                options =
395                    options.with_record_hybrid_get_threshold(Duration::from_millis(threshold as _));
396            }
397            if let Some(threshold) = req.record_hybrid_remove_threshold_ms {
398                options = options
399                    .with_record_hybrid_remove_threshold(Duration::from_millis(threshold as _));
400            }
401            if let Some(threshold) = req.record_hybrid_fetch_threshold_ms {
402                options = options.with_record_hybrid_get_or_fetch_threshold(Duration::from_millis(
403                    threshold as _,
404                ));
405            }
406            cache.update_tracing_options(options);
407        }
408
409        if let Some(cache) = &self.block_cache {
410            if req.enable {
411                cache.enable_tracing();
412            } else {
413                cache.disable_tracing();
414            }
415            let mut options = TracingOptions::new();
416            if let Some(threshold) = req.record_hybrid_insert_threshold_ms {
417                options = options
418                    .with_record_hybrid_insert_threshold(Duration::from_millis(threshold as _));
419            }
420            if let Some(threshold) = req.record_hybrid_get_threshold_ms {
421                options =
422                    options.with_record_hybrid_get_threshold(Duration::from_millis(threshold as _));
423            }
424            if let Some(threshold) = req.record_hybrid_remove_threshold_ms {
425                options = options
426                    .with_record_hybrid_remove_threshold(Duration::from_millis(threshold as _));
427            }
428            if let Some(threshold) = req.record_hybrid_fetch_threshold_ms {
429                options = options.with_record_hybrid_get_or_fetch_threshold(Duration::from_millis(
430                    threshold as _,
431                ));
432            }
433            cache.update_tracing_options(options);
434        }
435
436        Ok(Response::new(TieredCacheTracingResponse::default()))
437    }
438}
439
440pub use grpc_middleware::*;
441use risingwave_common::metrics::get_label_infallible;
442use risingwave_pb::id::FragmentId;
443
444pub mod grpc_middleware {
445    use std::sync::Arc;
446    use std::sync::atomic::{AtomicU64, Ordering};
447    use std::task::{Context, Poll};
448
449    use either::Either;
450    use futures::Future;
451    use tonic::body::Body;
452    use tower::{Layer, Service};
453
454    /// Manages the await-trees of `gRPC` requests that are currently served by the compute node.
455    pub type AwaitTreeRegistryRef = await_tree::Registry;
456
457    /// Await-tree key type for gRPC calls.
458    #[derive(Debug, Clone, PartialEq, Eq, Hash)]
459    pub struct GrpcCall {
460        pub desc: String,
461    }
462
463    #[derive(Clone)]
464    pub struct AwaitTreeMiddlewareLayer {
465        registry: Option<AwaitTreeRegistryRef>,
466    }
467
468    impl AwaitTreeMiddlewareLayer {
469        pub fn new(registry: AwaitTreeRegistryRef) -> Self {
470            Self {
471                registry: Some(registry),
472            }
473        }
474
475        pub fn new_optional(registry: Option<AwaitTreeRegistryRef>) -> Self {
476            Self { registry }
477        }
478    }
479
480    impl<S> Layer<S> for AwaitTreeMiddlewareLayer {
481        type Service = AwaitTreeMiddleware<S>;
482
483        fn layer(&self, service: S) -> Self::Service {
484            AwaitTreeMiddleware {
485                inner: service,
486                registry: self.registry.clone(),
487                next_id: Default::default(),
488            }
489        }
490    }
491
492    #[derive(Clone)]
493    pub struct AwaitTreeMiddleware<S> {
494        inner: S,
495        registry: Option<AwaitTreeRegistryRef>,
496        next_id: Arc<AtomicU64>,
497    }
498
499    impl<S> Service<http::Request<Body>> for AwaitTreeMiddleware<S>
500    where
501        S: Service<http::Request<Body>> + Clone,
502    {
503        type Error = S::Error;
504        type Response = S::Response;
505
506        type Future = impl Future<Output = Result<Self::Response, Self::Error>>;
507
508        fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
509            self.inner.poll_ready(cx)
510        }
511
512        fn call(&mut self, req: http::Request<Body>) -> Self::Future {
513            let Some(registry) = self.registry.clone() else {
514                return Either::Left(self.inner.call(req));
515            };
516
517            // This is necessary because tonic internally uses `tower::buffer::Buffer`.
518            // See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
519            // for details on why this is necessary
520            let clone = self.inner.clone();
521            let mut inner = std::mem::replace(&mut self.inner, clone);
522
523            let id = self.next_id.fetch_add(1, Ordering::SeqCst);
524            let desc = if let Some(authority) = req.uri().authority() {
525                format!("{authority} - {id}")
526            } else {
527                format!("?? - {id}")
528            };
529            let key = GrpcCall { desc };
530
531            Either::Right(async move {
532                let root = registry.register(key, req.uri().path());
533
534                root.instrument(inner.call(req)).await
535            })
536        }
537    }
538
539    #[cfg(not(madsim))]
540    impl<S: tonic::server::NamedService> tonic::server::NamedService for AwaitTreeMiddleware<S> {
541        const NAME: &'static str = S::NAME;
542    }
543}