1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ffi::CString;
17use std::fs;
18use std::path::Path;
19use std::time::Duration;
20
21use foyer::{HybridCache, TracingOptions};
22use itertools::Itertools;
23use prometheus::core::Collector;
24use prometheus::proto::Metric;
25use risingwave_common::config::{MetricLevel, ServerConfig};
26use risingwave_common_heap_profiling::{AUTO_DUMP_SUFFIX, COLLAPSED_SUFFIX, MANUALLY_DUMP_SUFFIX};
27use risingwave_hummock_sdk::HummockSstableObjectId;
28use risingwave_jni_core::jvm_runtime::dump_jvm_stack_traces;
29use risingwave_pb::monitor_service::monitor_service_server::MonitorService;
30use risingwave_pb::monitor_service::{
31 AnalyzeHeapRequest, AnalyzeHeapResponse, ChannelStats, FragmentStats, GetProfileStatsRequest,
32 GetProfileStatsResponse, GetStreamingStatsRequest, GetStreamingStatsResponse,
33 HeapProfilingRequest, HeapProfilingResponse, ListHeapProfilingRequest,
34 ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse, RelationStats,
35 StackTraceRequest, StackTraceResponse, TieredCacheTracingRequest, TieredCacheTracingResponse,
36};
37use risingwave_rpc_client::error::ToTonicStatus;
38use risingwave_storage::hummock::compactor::await_tree_key::Compaction;
39use risingwave_storage::hummock::{Block, Sstable, SstableBlockIndex};
40use risingwave_stream::executor::monitor::global_streaming_metrics;
41use risingwave_stream::task::LocalStreamManager;
42use risingwave_stream::task::await_tree_key::{Actor, BarrierAwait};
43use thiserror_ext::AsReport;
44use tonic::{Code, Request, Response, Status};
45
46type MetaCache = HybridCache<HummockSstableObjectId, Box<Sstable>>;
47type BlockCache = HybridCache<SstableBlockIndex, Box<Block>>;
48
49#[derive(Clone)]
50pub struct MonitorServiceImpl {
51 stream_mgr: LocalStreamManager,
52 server_config: ServerConfig,
53 meta_cache: Option<MetaCache>,
54 block_cache: Option<BlockCache>,
55}
56
57impl MonitorServiceImpl {
58 pub fn new(
59 stream_mgr: LocalStreamManager,
60 server_config: ServerConfig,
61 meta_cache: Option<MetaCache>,
62 block_cache: Option<BlockCache>,
63 ) -> Self {
64 Self {
65 stream_mgr,
66 server_config,
67 meta_cache,
68 block_cache,
69 }
70 }
71}
72
73#[async_trait::async_trait]
74impl MonitorService for MonitorServiceImpl {
75 #[cfg_attr(coverage, coverage(off))]
76 async fn stack_trace(
77 &self,
78 request: Request<StackTraceRequest>,
79 ) -> Result<Response<StackTraceResponse>, Status> {
80 let _req = request.into_inner();
81
82 let actor_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() {
83 reg.collect::<Actor>()
84 .into_iter()
85 .map(|(k, v)| (k.0, v.to_string()))
86 .collect()
87 } else {
88 Default::default()
89 };
90
91 let barrier_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() {
92 reg.collect::<BarrierAwait>()
93 .into_iter()
94 .map(|(k, v)| (k.prev_epoch, v.to_string()))
95 .collect()
96 } else {
97 Default::default()
98 };
99
100 let rpc_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() {
101 reg.collect::<GrpcCall>()
102 .into_iter()
103 .map(|(k, v)| (k.desc, v.to_string()))
104 .collect()
105 } else {
106 Default::default()
107 };
108
109 let compaction_task_traces = if let Some(hummock) =
110 self.stream_mgr.env.state_store().as_hummock()
111 && let Some(m) = hummock.compaction_await_tree_reg()
112 {
113 m.collect::<Compaction>()
114 .into_iter()
115 .map(|(k, v)| (format!("{k:?}"), v.to_string()))
116 .collect()
117 } else {
118 Default::default()
119 };
120
121 let barrier_worker_state = self.stream_mgr.inspect_barrier_state().await?;
122
123 let jvm_stack_traces = match dump_jvm_stack_traces() {
124 Ok(None) => None,
125 Err(err) => Some(err.as_report().to_string()),
126 Ok(Some(stack_traces)) => Some(stack_traces),
127 };
128
129 Ok(Response::new(StackTraceResponse {
130 actor_traces,
131 rpc_traces,
132 compaction_task_traces,
133 inflight_barrier_traces: barrier_traces,
134 barrier_worker_state: BTreeMap::from_iter([(
135 self.stream_mgr.env.worker_id(),
136 barrier_worker_state,
137 )]),
138 jvm_stack_traces: match jvm_stack_traces {
139 Some(stack_traces) => {
140 BTreeMap::from_iter([(self.stream_mgr.env.worker_id(), stack_traces)])
141 }
142 None => BTreeMap::new(),
143 },
144 }))
145 }
146
147 #[cfg_attr(coverage, coverage(off))]
148 async fn profiling(
149 &self,
150 request: Request<ProfilingRequest>,
151 ) -> Result<Response<ProfilingResponse>, Status> {
152 if std::env::var("RW_PROFILE_PATH").is_ok() {
153 return Err(Status::internal(
154 "Profiling is already running by setting RW_PROFILE_PATH",
155 ));
156 }
157 let time = request.into_inner().get_sleep_s();
158 let guard = pprof::ProfilerGuardBuilder::default()
159 .blocklist(&["libc", "libgcc", "pthread", "vdso"])
160 .build()
161 .unwrap();
162 tokio::time::sleep(Duration::from_secs(time)).await;
163 let mut buf = vec![];
164 match guard.report().build() {
165 Ok(report) => {
166 report.flamegraph(&mut buf).unwrap();
167 tracing::info!("succeed to generate flamegraph");
168 Ok(Response::new(ProfilingResponse { result: buf }))
169 }
170 Err(err) => {
171 tracing::warn!(error = %err.as_report(), "failed to generate flamegraph");
172 Err(err.to_status(Code::Internal, "monitor"))
173 }
174 }
175 }
176
177 #[cfg_attr(coverage, coverage(off))]
178 async fn heap_profiling(
179 &self,
180 request: Request<HeapProfilingRequest>,
181 ) -> Result<Response<HeapProfilingResponse>, Status> {
182 use std::fs::create_dir_all;
183 use std::path::PathBuf;
184
185 use tikv_jemalloc_ctl;
186
187 if !cfg!(target_os = "linux") {
188 return Err(Status::unimplemented(
189 "heap profiling is only implemented on Linux",
190 ));
191 }
192
193 if !tikv_jemalloc_ctl::opt::prof::read().unwrap() {
194 return Err(Status::failed_precondition(
195 "Jemalloc profiling is not enabled on the node. Try start the node with `MALLOC_CONF=prof:true`",
196 ));
197 }
198
199 let time_prefix = chrono::Local::now().format("%Y-%m-%d-%H-%M-%S");
200 let file_name = format!("{}.{}", time_prefix, MANUALLY_DUMP_SUFFIX);
201 let arg_dir = request.into_inner().dir;
202 let dir = PathBuf::from(if arg_dir.is_empty() {
203 &self.server_config.heap_profiling.dir
204 } else {
205 &arg_dir
206 });
207 create_dir_all(&dir)?;
208
209 let file_path_buf = dir.join(file_name);
210 let file_path = file_path_buf
211 .to_str()
212 .ok_or_else(|| Status::internal("The file dir is not a UTF-8 String"))?;
213 let file_path_c =
214 CString::new(file_path).map_err(|_| Status::internal("0 byte in file path"))?;
215
216 if let Err(e) =
218 tikv_jemalloc_ctl::prof::dump::write(unsafe { &*(file_path_c.as_c_str() as *const _) })
219 {
220 tracing::warn!("Manually Jemalloc dump heap file failed! {:?}", e);
221 Err(Status::internal(e.to_string()))
222 } else {
223 tracing::info!("Manually Jemalloc dump heap file created: {}", file_path);
224 Ok(Response::new(HeapProfilingResponse {}))
225 }
226 }
227
228 #[cfg_attr(coverage, coverage(off))]
229 async fn list_heap_profiling(
230 &self,
231 _request: Request<ListHeapProfilingRequest>,
232 ) -> Result<Response<ListHeapProfilingResponse>, Status> {
233 let dump_dir = self.server_config.heap_profiling.dir.clone();
234 let auto_dump_files_name: Vec<_> = fs::read_dir(dump_dir.clone())?
235 .map(|entry| {
236 let entry = entry?;
237 Ok::<_, Status>(entry.file_name().to_string_lossy().to_string())
238 })
239 .filter(|name| {
240 if let Ok(name) = name {
241 name.contains(AUTO_DUMP_SUFFIX) && !name.ends_with(COLLAPSED_SUFFIX)
242 } else {
243 true
244 }
245 })
246 .try_collect()?;
247 let manually_dump_files_name: Vec<_> = fs::read_dir(dump_dir.clone())?
248 .map(|entry| {
249 let entry = entry?;
250 Ok::<_, Status>(entry.file_name().to_string_lossy().to_string())
251 })
252 .filter(|name| {
253 if let Ok(name) = name {
254 name.contains(MANUALLY_DUMP_SUFFIX) && !name.ends_with(COLLAPSED_SUFFIX)
255 } else {
256 true
257 }
258 })
259 .try_collect()?;
260
261 Ok(Response::new(ListHeapProfilingResponse {
262 dir: dump_dir,
263 name_auto: auto_dump_files_name,
264 name_manually: manually_dump_files_name,
265 }))
266 }
267
268 #[cfg_attr(coverage, coverage(off))]
269 async fn analyze_heap(
270 &self,
271 request: Request<AnalyzeHeapRequest>,
272 ) -> Result<Response<AnalyzeHeapResponse>, Status> {
273 let dumped_path_str = request.into_inner().get_path().clone();
274 let collapsed_path_str = format!("{}.{}", dumped_path_str, COLLAPSED_SUFFIX);
275 let collapsed_path = Path::new(&collapsed_path_str);
276
277 if !collapsed_path.exists() {
279 risingwave_common_heap_profiling::jeprof::run(
280 dumped_path_str,
281 collapsed_path_str.clone(),
282 )
283 .await
284 .map_err(|e| e.to_status(Code::Internal, "monitor"))?;
285 }
286
287 let file = fs::read(Path::new(&collapsed_path_str))?;
288 Ok(Response::new(AnalyzeHeapResponse { result: file }))
289 }
290
291 async fn get_profile_stats(
292 &self,
293 request: Request<GetProfileStatsRequest>,
294 ) -> Result<Response<GetProfileStatsResponse>, Status> {
295 let metrics = global_streaming_metrics(MetricLevel::Info);
296 let inner = request.into_inner();
297 let executor_ids = &inner.executor_ids;
298 let fragment_ids = HashSet::from_iter(inner.dispatcher_fragment_ids.into_iter());
299 let stream_node_output_row_count = metrics
300 .mem_stream_node_output_row_count
301 .collect(executor_ids);
302 let stream_node_output_blocking_duration_ns = metrics
303 .mem_stream_node_output_blocking_duration_ns
304 .collect(executor_ids);
305
306 fn collect_by_fragment_ids<T: Collector>(
308 m: &T,
309 fragment_ids: &HashSet<u32>,
310 ) -> HashMap<u32, u64> {
311 let mut metrics = HashMap::new();
312 for mut metric_family in m.collect() {
313 for metric in metric_family.take_metric() {
314 let fragment_id = get_label_infallible(&metric, "fragment_id");
315 if fragment_ids.contains(&fragment_id) {
316 let entry = metrics.entry(fragment_id).or_insert(0);
317 *entry += metric.get_counter().get_value() as u64;
318 }
319 }
320 }
321 metrics
322 }
323
324 let dispatch_fragment_output_row_count =
325 collect_by_fragment_ids(&metrics.actor_out_record_cnt, &fragment_ids);
326 let dispatch_fragment_output_blocking_duration_ns = collect_by_fragment_ids(
327 &metrics.actor_output_buffer_blocking_duration_ns,
328 &fragment_ids,
329 );
330 Ok(Response::new(GetProfileStatsResponse {
331 stream_node_output_row_count,
332 stream_node_output_blocking_duration_ns,
333 dispatch_fragment_output_row_count,
334 dispatch_fragment_output_blocking_duration_ns,
335 }))
336 }
337
338 #[cfg_attr(coverage, coverage(off))]
339 async fn get_streaming_stats(
340 &self,
341 _request: Request<GetStreamingStatsRequest>,
342 ) -> Result<Response<GetStreamingStatsResponse>, Status> {
343 let metrics = global_streaming_metrics(MetricLevel::Info);
344
345 fn collect<T: Collector>(m: &T) -> Vec<Metric> {
346 m.collect()
347 .into_iter()
348 .next()
349 .unwrap()
350 .take_metric()
351 .into_vec()
352 }
353
354 let actor_output_buffer_blocking_duration_ns =
355 collect(&metrics.actor_output_buffer_blocking_duration_ns);
356 let actor_count = collect(&metrics.actor_count);
357
358 let actor_count: HashMap<_, _> = actor_count
359 .iter()
360 .map(|m| {
361 let fragment_id: u32 = get_label_infallible(m, "fragment_id");
362 let count = m.get_gauge().get_value() as u32;
363 (fragment_id, count)
364 })
365 .collect();
366
367 let mut fragment_stats: HashMap<u32, FragmentStats> = HashMap::new();
368 for (&fragment_id, &actor_count) in &actor_count {
369 fragment_stats.insert(
370 fragment_id,
371 FragmentStats {
372 actor_count,
373 current_epoch: 0,
374 },
375 );
376 }
377
378 let actor_current_epoch = collect(&metrics.actor_current_epoch);
379 for m in &actor_current_epoch {
380 let fragment_id: u32 = get_label_infallible(m, "fragment_id");
381 let epoch = m.get_gauge().get_value() as u64;
382 if let Some(s) = fragment_stats.get_mut(&fragment_id) {
383 s.current_epoch = if s.current_epoch == 0 {
384 epoch
385 } else {
386 u64::min(s.current_epoch, epoch)
387 }
388 } else {
389 warn!(
390 fragment_id = fragment_id,
391 "Miss corresponding actor count metrics"
392 );
393 }
394 }
395
396 let mut relation_stats: HashMap<u32, RelationStats> = HashMap::new();
397 let mview_current_epoch = collect(&metrics.materialize_current_epoch);
398 for m in &mview_current_epoch {
399 let table_id: u32 = get_label_infallible(m, "table_id");
400 let epoch = m.get_gauge().get_value() as u64;
401 if let Some(s) = relation_stats.get_mut(&table_id) {
402 s.current_epoch = if s.current_epoch == 0 {
403 epoch
404 } else {
405 u64::min(s.current_epoch, epoch)
406 };
407 s.actor_count += 1;
408 } else {
409 relation_stats.insert(
410 table_id,
411 RelationStats {
412 actor_count: 1,
413 current_epoch: epoch,
414 },
415 );
416 }
417 }
418
419 let mut channel_stats: BTreeMap<String, ChannelStats> = BTreeMap::new();
420
421 for metric in actor_output_buffer_blocking_duration_ns {
422 let fragment_id: u32 = get_label_infallible(&metric, "fragment_id");
423 let downstream_fragment_id: u32 =
424 get_label_infallible(&metric, "downstream_fragment_id");
425
426 let key = format!("{}_{}", fragment_id, downstream_fragment_id);
427 let channel_stat = channel_stats.entry(key).or_insert_with(|| ChannelStats {
428 actor_count: 0,
429 output_blocking_duration: 0.,
430 recv_row_count: 0,
431 send_row_count: 0,
432 });
433
434 channel_stat.actor_count +=
437 if get_label_infallible::<String>(&metric, "actor_id").is_empty() {
438 actor_count[&fragment_id]
439 } else {
440 1
441 };
442 channel_stat.output_blocking_duration += metric.get_counter().get_value();
443 }
444
445 let actor_output_row_count = collect(&metrics.actor_out_record_cnt);
446 for metric in actor_output_row_count {
447 let fragment_id: u32 = get_label_infallible(&metric, "fragment_id");
448
449 let key_prefix = format!("{}_", fragment_id);
451 let key_range_end = format!("{}`", fragment_id); for (_, s) in channel_stats.range_mut(key_prefix..key_range_end) {
453 s.send_row_count += metric.get_counter().get_value() as u64;
454 }
455 }
456
457 let actor_input_row_count = collect(&metrics.actor_in_record_cnt);
458 for metric in actor_input_row_count {
459 let upstream_fragment_id: u32 = get_label_infallible(&metric, "upstream_fragment_id");
460 let fragment_id: u32 = get_label_infallible(&metric, "fragment_id");
461
462 let key = format!("{}_{}", upstream_fragment_id, fragment_id);
463 if let Some(s) = channel_stats.get_mut(&key) {
464 s.recv_row_count += metric.get_counter().get_value() as u64;
465 }
466 }
467
468 let channel_stats = channel_stats.into_iter().collect();
469 Ok(Response::new(GetStreamingStatsResponse {
470 channel_stats,
471 fragment_stats,
472 relation_stats,
473 }))
474 }
475
476 #[cfg_attr(coverage, coverage(off))]
477 async fn tiered_cache_tracing(
478 &self,
479 request: Request<TieredCacheTracingRequest>,
480 ) -> Result<Response<TieredCacheTracingResponse>, Status> {
481 let req = request.into_inner();
482
483 tracing::info!("Update tiered cache tracing config: {req:?}");
484
485 if let Some(cache) = &self.meta_cache {
486 if req.enable {
487 cache.enable_tracing();
488 } else {
489 cache.disable_tracing();
490 }
491 let mut options = TracingOptions::new();
492 if let Some(threshold) = req.record_hybrid_insert_threshold_ms {
493 options = options
494 .with_record_hybrid_insert_threshold(Duration::from_millis(threshold as _));
495 }
496 if let Some(threshold) = req.record_hybrid_get_threshold_ms {
497 options =
498 options.with_record_hybrid_get_threshold(Duration::from_millis(threshold as _));
499 }
500 if let Some(threshold) = req.record_hybrid_obtain_threshold_ms {
501 options = options
502 .with_record_hybrid_obtain_threshold(Duration::from_millis(threshold as _));
503 }
504 if let Some(threshold) = req.record_hybrid_remove_threshold_ms {
505 options = options
506 .with_record_hybrid_remove_threshold(Duration::from_millis(threshold as _));
507 }
508 if let Some(threshold) = req.record_hybrid_fetch_threshold_ms {
509 options = options
510 .with_record_hybrid_fetch_threshold(Duration::from_millis(threshold as _));
511 }
512 cache.update_tracing_options(options);
513 }
514
515 if let Some(cache) = &self.block_cache {
516 if req.enable {
517 cache.enable_tracing();
518 } else {
519 cache.disable_tracing();
520 }
521 let mut options = TracingOptions::new();
522 if let Some(threshold) = req.record_hybrid_insert_threshold_ms {
523 options = options
524 .with_record_hybrid_insert_threshold(Duration::from_millis(threshold as _));
525 }
526 if let Some(threshold) = req.record_hybrid_get_threshold_ms {
527 options =
528 options.with_record_hybrid_get_threshold(Duration::from_millis(threshold as _));
529 }
530 if let Some(threshold) = req.record_hybrid_obtain_threshold_ms {
531 options = options
532 .with_record_hybrid_obtain_threshold(Duration::from_millis(threshold as _));
533 }
534 if let Some(threshold) = req.record_hybrid_remove_threshold_ms {
535 options = options
536 .with_record_hybrid_remove_threshold(Duration::from_millis(threshold as _));
537 }
538 if let Some(threshold) = req.record_hybrid_fetch_threshold_ms {
539 options = options
540 .with_record_hybrid_fetch_threshold(Duration::from_millis(threshold as _));
541 }
542 cache.update_tracing_options(options);
543 }
544
545 Ok(Response::new(TieredCacheTracingResponse::default()))
546 }
547}
548
549pub use grpc_middleware::*;
550use risingwave_common::metrics::get_label_infallible;
551
552pub mod grpc_middleware {
553 use std::sync::Arc;
554 use std::sync::atomic::{AtomicU64, Ordering};
555 use std::task::{Context, Poll};
556
557 use either::Either;
558 use futures::Future;
559 use tonic::body::BoxBody;
560 use tower::{Layer, Service};
561
562 pub type AwaitTreeRegistryRef = await_tree::Registry;
564
565 #[derive(Debug, Clone, PartialEq, Eq, Hash)]
567 pub struct GrpcCall {
568 pub desc: String,
569 }
570
571 #[derive(Clone)]
572 pub struct AwaitTreeMiddlewareLayer {
573 registry: Option<AwaitTreeRegistryRef>,
574 }
575
576 impl AwaitTreeMiddlewareLayer {
577 pub fn new(registry: AwaitTreeRegistryRef) -> Self {
578 Self {
579 registry: Some(registry),
580 }
581 }
582
583 pub fn new_optional(registry: Option<AwaitTreeRegistryRef>) -> Self {
584 Self { registry }
585 }
586 }
587
588 impl<S> Layer<S> for AwaitTreeMiddlewareLayer {
589 type Service = AwaitTreeMiddleware<S>;
590
591 fn layer(&self, service: S) -> Self::Service {
592 AwaitTreeMiddleware {
593 inner: service,
594 registry: self.registry.clone(),
595 next_id: Default::default(),
596 }
597 }
598 }
599
600 #[derive(Clone)]
601 pub struct AwaitTreeMiddleware<S> {
602 inner: S,
603 registry: Option<AwaitTreeRegistryRef>,
604 next_id: Arc<AtomicU64>,
605 }
606
607 impl<S> Service<http::Request<BoxBody>> for AwaitTreeMiddleware<S>
608 where
609 S: Service<http::Request<BoxBody>> + Clone,
610 {
611 type Error = S::Error;
612 type Response = S::Response;
613
614 type Future = impl Future<Output = Result<Self::Response, Self::Error>>;
615
616 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
617 self.inner.poll_ready(cx)
618 }
619
620 fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {
621 let Some(registry) = self.registry.clone() else {
622 return Either::Left(self.inner.call(req));
623 };
624
625 let clone = self.inner.clone();
629 let mut inner = std::mem::replace(&mut self.inner, clone);
630
631 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
632 let desc = if let Some(authority) = req.uri().authority() {
633 format!("{authority} - {id}")
634 } else {
635 format!("?? - {id}")
636 };
637 let key = GrpcCall { desc };
638
639 Either::Right(async move {
640 let root = registry.register(key, req.uri().path());
641
642 root.instrument(inner.call(req)).await
643 })
644 }
645 }
646
647 #[cfg(not(madsim))]
648 impl<S: tonic::server::NamedService> tonic::server::NamedService for AwaitTreeMiddleware<S> {
649 const NAME: &'static str = S::NAME;
650 }
651}