1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ffi::CString;
17use std::fs;
18use std::path::Path;
19use std::time::Duration;
20
21use foyer::{HybridCache, TracingOptions};
22use itertools::Itertools;
23use prometheus::core::Collector;
24use prometheus::proto::Metric;
25use risingwave_common::config::{MetricLevel, ServerConfig};
26use risingwave_common_heap_profiling::{AUTO_DUMP_SUFFIX, COLLAPSED_SUFFIX, MANUALLY_DUMP_SUFFIX};
27use risingwave_hummock_sdk::HummockSstableObjectId;
28use risingwave_jni_core::jvm_runtime::dump_jvm_stack_traces;
29use risingwave_pb::monitor_service::monitor_service_server::MonitorService;
30use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
31use risingwave_pb::monitor_service::{
32 AnalyzeHeapRequest, AnalyzeHeapResponse, ChannelStats, FragmentStats, GetProfileStatsRequest,
33 GetProfileStatsResponse, GetStreamingStatsRequest, GetStreamingStatsResponse,
34 HeapProfilingRequest, HeapProfilingResponse, ListHeapProfilingRequest,
35 ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse, RelationStats,
36 StackTraceRequest, StackTraceResponse, TieredCacheTracingRequest, TieredCacheTracingResponse,
37};
38use risingwave_rpc_client::error::ToTonicStatus;
39use risingwave_storage::hummock::compactor::await_tree_key::Compaction;
40use risingwave_storage::hummock::{Block, Sstable, SstableBlockIndex};
41use risingwave_stream::executor::monitor::global_streaming_metrics;
42use risingwave_stream::task::LocalStreamManager;
43use risingwave_stream::task::await_tree_key::{Actor, BarrierAwait};
44use thiserror_ext::AsReport;
45use tonic::{Code, Request, Response, Status};
46
47type MetaCache = HybridCache<HummockSstableObjectId, Box<Sstable>>;
48type BlockCache = HybridCache<SstableBlockIndex, Box<Block>>;
49
50#[derive(Clone)]
51pub struct MonitorServiceImpl {
52 stream_mgr: LocalStreamManager,
53 server_config: ServerConfig,
54 meta_cache: Option<MetaCache>,
55 block_cache: Option<BlockCache>,
56}
57
58impl MonitorServiceImpl {
59 pub fn new(
60 stream_mgr: LocalStreamManager,
61 server_config: ServerConfig,
62 meta_cache: Option<MetaCache>,
63 block_cache: Option<BlockCache>,
64 ) -> Self {
65 Self {
66 stream_mgr,
67 server_config,
68 meta_cache,
69 block_cache,
70 }
71 }
72}
73
74#[async_trait::async_trait]
75impl MonitorService for MonitorServiceImpl {
76 async fn stack_trace(
77 &self,
78 request: Request<StackTraceRequest>,
79 ) -> Result<Response<StackTraceResponse>, Status> {
80 let req = request.into_inner();
81
82 let actor_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() {
83 reg.collect::<Actor>()
84 .into_iter()
85 .map(|(k, v)| {
86 (
87 k.0,
88 if req.actor_traces_format == ActorTracesFormat::Text as i32 {
89 v.to_string()
90 } else {
91 serde_json::to_string(&v).unwrap()
92 },
93 )
94 })
95 .collect()
96 } else {
97 Default::default()
98 };
99
100 let barrier_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() {
101 reg.collect::<BarrierAwait>()
102 .into_iter()
103 .map(|(k, v)| (k.prev_epoch, v.to_string()))
104 .collect()
105 } else {
106 Default::default()
107 };
108
109 let rpc_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() {
110 reg.collect::<GrpcCall>()
111 .into_iter()
112 .map(|(k, v)| (k.desc, v.to_string()))
113 .collect()
114 } else {
115 Default::default()
116 };
117
118 let compaction_task_traces = if let Some(hummock) =
119 self.stream_mgr.env.state_store().as_hummock()
120 && let Some(m) = hummock.compaction_await_tree_reg()
121 {
122 m.collect::<Compaction>()
123 .into_iter()
124 .map(|(k, v)| (format!("{k:?}"), v.to_string()))
125 .collect()
126 } else {
127 Default::default()
128 };
129
130 let barrier_worker_state = self.stream_mgr.inspect_barrier_state().await?;
131
132 let jvm_stack_traces = match dump_jvm_stack_traces() {
133 Ok(None) => None,
134 Err(err) => Some(err.as_report().to_string()),
135 Ok(Some(stack_traces)) => Some(stack_traces),
136 };
137
138 Ok(Response::new(StackTraceResponse {
139 actor_traces,
140 rpc_traces,
141 compaction_task_traces,
142 inflight_barrier_traces: barrier_traces,
143 barrier_worker_state: BTreeMap::from_iter([(
144 self.stream_mgr.env.worker_id(),
145 barrier_worker_state,
146 )]),
147 jvm_stack_traces: match jvm_stack_traces {
148 Some(stack_traces) => {
149 BTreeMap::from_iter([(self.stream_mgr.env.worker_id(), stack_traces)])
150 }
151 None => BTreeMap::new(),
152 },
153 meta_traces: Default::default(),
154 }))
155 }
156
157 async fn profiling(
158 &self,
159 request: Request<ProfilingRequest>,
160 ) -> Result<Response<ProfilingResponse>, Status> {
161 if std::env::var("RW_PROFILE_PATH").is_ok() {
162 return Err(Status::internal(
163 "Profiling is already running by setting RW_PROFILE_PATH",
164 ));
165 }
166 let time = request.into_inner().get_sleep_s();
167 let guard = pprof::ProfilerGuardBuilder::default()
168 .blocklist(&["libc", "libgcc", "pthread", "vdso"])
169 .build()
170 .unwrap();
171 tokio::time::sleep(Duration::from_secs(time)).await;
172 let mut buf = vec![];
173 match guard.report().build() {
174 Ok(report) => {
175 report.flamegraph(&mut buf).unwrap();
176 tracing::info!("succeed to generate flamegraph");
177 Ok(Response::new(ProfilingResponse { result: buf }))
178 }
179 Err(err) => {
180 tracing::warn!(error = %err.as_report(), "failed to generate flamegraph");
181 Err(err.to_status(Code::Internal, "monitor"))
182 }
183 }
184 }
185
186 async fn heap_profiling(
187 &self,
188 request: Request<HeapProfilingRequest>,
189 ) -> Result<Response<HeapProfilingResponse>, Status> {
190 use std::fs::create_dir_all;
191 use std::path::PathBuf;
192
193 use tikv_jemalloc_ctl;
194
195 if !cfg!(target_os = "linux") {
196 return Err(Status::unimplemented(
197 "heap profiling is only implemented on Linux",
198 ));
199 }
200
201 if !tikv_jemalloc_ctl::opt::prof::read().unwrap() {
202 return Err(Status::failed_precondition(
203 "Jemalloc profiling is not enabled on the node. Try start the node with `MALLOC_CONF=prof:true`",
204 ));
205 }
206
207 let time_prefix = chrono::Local::now().format("%Y-%m-%d-%H-%M-%S");
208 let file_name = format!("{}.{}", time_prefix, MANUALLY_DUMP_SUFFIX);
209 let arg_dir = request.into_inner().dir;
210 let dir = PathBuf::from(if arg_dir.is_empty() {
211 &self.server_config.heap_profiling.dir
212 } else {
213 &arg_dir
214 });
215 create_dir_all(&dir)?;
216
217 let file_path_buf = dir.join(file_name);
218 let file_path = file_path_buf
219 .to_str()
220 .ok_or_else(|| Status::internal("The file dir is not a UTF-8 String"))?;
221 let file_path_c =
222 CString::new(file_path).map_err(|_| Status::internal("0 byte in file path"))?;
223
224 if let Err(e) =
226 tikv_jemalloc_ctl::prof::dump::write(unsafe { &*(file_path_c.as_c_str() as *const _) })
227 {
228 tracing::warn!("Manually Jemalloc dump heap file failed! {:?}", e);
229 Err(Status::internal(e.to_string()))
230 } else {
231 tracing::info!("Manually Jemalloc dump heap file created: {}", file_path);
232 Ok(Response::new(HeapProfilingResponse {}))
233 }
234 }
235
236 async fn list_heap_profiling(
237 &self,
238 _request: Request<ListHeapProfilingRequest>,
239 ) -> Result<Response<ListHeapProfilingResponse>, Status> {
240 let dump_dir = self.server_config.heap_profiling.dir.clone();
241 let auto_dump_files_name: Vec<_> = fs::read_dir(dump_dir.clone())?
242 .map(|entry| {
243 let entry = entry?;
244 Ok::<_, Status>(entry.file_name().to_string_lossy().to_string())
245 })
246 .filter(|name| {
247 if let Ok(name) = name {
248 name.contains(AUTO_DUMP_SUFFIX) && !name.ends_with(COLLAPSED_SUFFIX)
249 } else {
250 true
251 }
252 })
253 .try_collect()?;
254 let manually_dump_files_name: Vec<_> = fs::read_dir(dump_dir.clone())?
255 .map(|entry| {
256 let entry = entry?;
257 Ok::<_, Status>(entry.file_name().to_string_lossy().to_string())
258 })
259 .filter(|name| {
260 if let Ok(name) = name {
261 name.contains(MANUALLY_DUMP_SUFFIX) && !name.ends_with(COLLAPSED_SUFFIX)
262 } else {
263 true
264 }
265 })
266 .try_collect()?;
267
268 Ok(Response::new(ListHeapProfilingResponse {
269 dir: dump_dir,
270 name_auto: auto_dump_files_name,
271 name_manually: manually_dump_files_name,
272 }))
273 }
274
275 async fn analyze_heap(
276 &self,
277 request: Request<AnalyzeHeapRequest>,
278 ) -> Result<Response<AnalyzeHeapResponse>, Status> {
279 let dumped_path_str = request.into_inner().get_path().clone();
280 let collapsed_path_str = format!("{}.{}", dumped_path_str, COLLAPSED_SUFFIX);
281 let collapsed_path = Path::new(&collapsed_path_str);
282
283 if !collapsed_path.exists() {
285 risingwave_common_heap_profiling::jeprof::run(
286 dumped_path_str,
287 collapsed_path_str.clone(),
288 )
289 .await
290 .map_err(|e| e.to_status(Code::Internal, "monitor"))?;
291 }
292
293 let file = fs::read(Path::new(&collapsed_path_str))?;
294 Ok(Response::new(AnalyzeHeapResponse { result: file }))
295 }
296
297 async fn get_profile_stats(
298 &self,
299 request: Request<GetProfileStatsRequest>,
300 ) -> Result<Response<GetProfileStatsResponse>, Status> {
301 let metrics = global_streaming_metrics(MetricLevel::Info);
302 let inner = request.into_inner();
303 let executor_ids = &inner.executor_ids;
304 let fragment_ids = HashSet::from_iter(inner.dispatcher_fragment_ids.into_iter());
305 let stream_node_output_row_count = metrics
306 .mem_stream_node_output_row_count
307 .collect(executor_ids);
308 let stream_node_output_blocking_duration_ns = metrics
309 .mem_stream_node_output_blocking_duration_ns
310 .collect(executor_ids);
311
312 fn collect_by_fragment_ids<T: Collector>(
314 m: &T,
315 fragment_ids: &HashSet<u32>,
316 ) -> HashMap<u32, u64> {
317 let mut metrics = HashMap::new();
318 for mut metric_family in m.collect() {
319 for metric in metric_family.take_metric() {
320 let fragment_id = get_label_infallible(&metric, "fragment_id");
321 if fragment_ids.contains(&fragment_id) {
322 let entry = metrics.entry(fragment_id).or_insert(0);
323 *entry += metric.get_counter().value() as u64;
324 }
325 }
326 }
327 metrics
328 }
329
330 let dispatch_fragment_output_row_count =
331 collect_by_fragment_ids(&metrics.actor_out_record_cnt, &fragment_ids);
332 let dispatch_fragment_output_blocking_duration_ns = collect_by_fragment_ids(
333 &metrics.actor_output_buffer_blocking_duration_ns,
334 &fragment_ids,
335 );
336 Ok(Response::new(GetProfileStatsResponse {
337 stream_node_output_row_count,
338 stream_node_output_blocking_duration_ns,
339 dispatch_fragment_output_row_count,
340 dispatch_fragment_output_blocking_duration_ns,
341 }))
342 }
343
344 async fn get_streaming_stats(
345 &self,
346 _request: Request<GetStreamingStatsRequest>,
347 ) -> Result<Response<GetStreamingStatsResponse>, Status> {
348 let metrics = global_streaming_metrics(MetricLevel::Info);
349
350 fn collect<T: Collector>(m: &T) -> Vec<Metric> {
351 m.collect().into_iter().next().unwrap().take_metric()
352 }
353
354 let actor_output_buffer_blocking_duration_ns =
355 collect(&metrics.actor_output_buffer_blocking_duration_ns);
356 let actor_count = collect(&metrics.actor_count);
357
358 let actor_count: HashMap<_, _> = actor_count
359 .iter()
360 .map(|m| {
361 let fragment_id: u32 = get_label_infallible(m, "fragment_id");
362 let count = m.get_gauge().value() as u32;
363 (fragment_id, count)
364 })
365 .collect();
366
367 let mut fragment_stats: HashMap<u32, FragmentStats> = HashMap::new();
368 for (&fragment_id, &actor_count) in &actor_count {
369 fragment_stats.insert(
370 fragment_id,
371 FragmentStats {
372 actor_count,
373 current_epoch: 0,
374 },
375 );
376 }
377
378 let actor_current_epoch = collect(&metrics.actor_current_epoch);
379 for m in &actor_current_epoch {
380 let fragment_id: u32 = get_label_infallible(m, "fragment_id");
381 let epoch = m.get_gauge().value() as u64;
382 if let Some(s) = fragment_stats.get_mut(&fragment_id) {
383 s.current_epoch = if s.current_epoch == 0 {
384 epoch
385 } else {
386 u64::min(s.current_epoch, epoch)
387 }
388 } else {
389 warn!(
390 fragment_id = fragment_id,
391 "Miss corresponding actor count metrics"
392 );
393 }
394 }
395
396 let mut relation_stats: HashMap<u32, RelationStats> = HashMap::new();
397 let mview_current_epoch = collect(&metrics.materialize_current_epoch);
398 for m in &mview_current_epoch {
399 let table_id: u32 = get_label_infallible(m, "table_id");
400 let epoch = m.get_gauge().value() as u64;
401 if let Some(s) = relation_stats.get_mut(&table_id) {
402 s.current_epoch = if s.current_epoch == 0 {
403 epoch
404 } else {
405 u64::min(s.current_epoch, epoch)
406 };
407 s.actor_count += 1;
408 } else {
409 relation_stats.insert(
410 table_id,
411 RelationStats {
412 actor_count: 1,
413 current_epoch: epoch,
414 },
415 );
416 }
417 }
418
419 let mut channel_stats: BTreeMap<String, ChannelStats> = BTreeMap::new();
420
421 for metric in actor_output_buffer_blocking_duration_ns {
422 let fragment_id: u32 = get_label_infallible(&metric, "fragment_id");
423 let downstream_fragment_id: u32 =
424 get_label_infallible(&metric, "downstream_fragment_id");
425
426 let key = format!("{}_{}", fragment_id, downstream_fragment_id);
427 let channel_stat = channel_stats.entry(key).or_insert_with(|| ChannelStats {
428 actor_count: 0,
429 output_blocking_duration: 0.,
430 recv_row_count: 0,
431 send_row_count: 0,
432 });
433
434 channel_stat.actor_count +=
437 if get_label_infallible::<String>(&metric, "actor_id").is_empty() {
438 actor_count[&fragment_id]
439 } else {
440 1
441 };
442 channel_stat.output_blocking_duration += metric.get_counter().value();
443 }
444
445 let actor_output_row_count = collect(&metrics.actor_out_record_cnt);
446 for metric in actor_output_row_count {
447 let fragment_id: u32 = get_label_infallible(&metric, "fragment_id");
448
449 let key_prefix = format!("{}_", fragment_id);
451 let key_range_end = format!("{}`", fragment_id); for (_, s) in channel_stats.range_mut(key_prefix..key_range_end) {
453 s.send_row_count += metric.get_counter().value() as u64;
454 }
455 }
456
457 let actor_input_row_count = collect(&metrics.actor_in_record_cnt);
458 for metric in actor_input_row_count {
459 let upstream_fragment_id: u32 = get_label_infallible(&metric, "upstream_fragment_id");
460 let fragment_id: u32 = get_label_infallible(&metric, "fragment_id");
461
462 let key = format!("{}_{}", upstream_fragment_id, fragment_id);
463 if let Some(s) = channel_stats.get_mut(&key) {
464 s.recv_row_count += metric.get_counter().value() as u64;
465 }
466 }
467
468 let channel_stats = channel_stats.into_iter().collect();
469 Ok(Response::new(GetStreamingStatsResponse {
470 channel_stats,
471 fragment_stats,
472 relation_stats,
473 }))
474 }
475
476 async fn tiered_cache_tracing(
477 &self,
478 request: Request<TieredCacheTracingRequest>,
479 ) -> Result<Response<TieredCacheTracingResponse>, Status> {
480 let req = request.into_inner();
481
482 tracing::info!("Update tiered cache tracing config: {req:?}");
483
484 if let Some(cache) = &self.meta_cache {
485 if req.enable {
486 cache.enable_tracing();
487 } else {
488 cache.disable_tracing();
489 }
490 let mut options = TracingOptions::new();
491 if let Some(threshold) = req.record_hybrid_insert_threshold_ms {
492 options = options
493 .with_record_hybrid_insert_threshold(Duration::from_millis(threshold as _));
494 }
495 if let Some(threshold) = req.record_hybrid_get_threshold_ms {
496 options =
497 options.with_record_hybrid_get_threshold(Duration::from_millis(threshold as _));
498 }
499 if let Some(threshold) = req.record_hybrid_obtain_threshold_ms {
500 options = options
501 .with_record_hybrid_obtain_threshold(Duration::from_millis(threshold as _));
502 }
503 if let Some(threshold) = req.record_hybrid_remove_threshold_ms {
504 options = options
505 .with_record_hybrid_remove_threshold(Duration::from_millis(threshold as _));
506 }
507 if let Some(threshold) = req.record_hybrid_fetch_threshold_ms {
508 options = options
509 .with_record_hybrid_fetch_threshold(Duration::from_millis(threshold as _));
510 }
511 cache.update_tracing_options(options);
512 }
513
514 if let Some(cache) = &self.block_cache {
515 if req.enable {
516 cache.enable_tracing();
517 } else {
518 cache.disable_tracing();
519 }
520 let mut options = TracingOptions::new();
521 if let Some(threshold) = req.record_hybrid_insert_threshold_ms {
522 options = options
523 .with_record_hybrid_insert_threshold(Duration::from_millis(threshold as _));
524 }
525 if let Some(threshold) = req.record_hybrid_get_threshold_ms {
526 options =
527 options.with_record_hybrid_get_threshold(Duration::from_millis(threshold as _));
528 }
529 if let Some(threshold) = req.record_hybrid_obtain_threshold_ms {
530 options = options
531 .with_record_hybrid_obtain_threshold(Duration::from_millis(threshold as _));
532 }
533 if let Some(threshold) = req.record_hybrid_remove_threshold_ms {
534 options = options
535 .with_record_hybrid_remove_threshold(Duration::from_millis(threshold as _));
536 }
537 if let Some(threshold) = req.record_hybrid_fetch_threshold_ms {
538 options = options
539 .with_record_hybrid_fetch_threshold(Duration::from_millis(threshold as _));
540 }
541 cache.update_tracing_options(options);
542 }
543
544 Ok(Response::new(TieredCacheTracingResponse::default()))
545 }
546}
547
548pub use grpc_middleware::*;
549use risingwave_common::metrics::get_label_infallible;
550
551pub mod grpc_middleware {
552 use std::sync::Arc;
553 use std::sync::atomic::{AtomicU64, Ordering};
554 use std::task::{Context, Poll};
555
556 use either::Either;
557 use futures::Future;
558 use tonic::body::BoxBody;
559 use tower::{Layer, Service};
560
561 pub type AwaitTreeRegistryRef = await_tree::Registry;
563
564 #[derive(Debug, Clone, PartialEq, Eq, Hash)]
566 pub struct GrpcCall {
567 pub desc: String,
568 }
569
570 #[derive(Clone)]
571 pub struct AwaitTreeMiddlewareLayer {
572 registry: Option<AwaitTreeRegistryRef>,
573 }
574
575 impl AwaitTreeMiddlewareLayer {
576 pub fn new(registry: AwaitTreeRegistryRef) -> Self {
577 Self {
578 registry: Some(registry),
579 }
580 }
581
582 pub fn new_optional(registry: Option<AwaitTreeRegistryRef>) -> Self {
583 Self { registry }
584 }
585 }
586
587 impl<S> Layer<S> for AwaitTreeMiddlewareLayer {
588 type Service = AwaitTreeMiddleware<S>;
589
590 fn layer(&self, service: S) -> Self::Service {
591 AwaitTreeMiddleware {
592 inner: service,
593 registry: self.registry.clone(),
594 next_id: Default::default(),
595 }
596 }
597 }
598
599 #[derive(Clone)]
600 pub struct AwaitTreeMiddleware<S> {
601 inner: S,
602 registry: Option<AwaitTreeRegistryRef>,
603 next_id: Arc<AtomicU64>,
604 }
605
606 impl<S> Service<http::Request<BoxBody>> for AwaitTreeMiddleware<S>
607 where
608 S: Service<http::Request<BoxBody>> + Clone,
609 {
610 type Error = S::Error;
611 type Response = S::Response;
612
613 type Future = impl Future<Output = Result<Self::Response, Self::Error>>;
614
615 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
616 self.inner.poll_ready(cx)
617 }
618
619 fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {
620 let Some(registry) = self.registry.clone() else {
621 return Either::Left(self.inner.call(req));
622 };
623
624 let clone = self.inner.clone();
628 let mut inner = std::mem::replace(&mut self.inner, clone);
629
630 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
631 let desc = if let Some(authority) = req.uri().authority() {
632 format!("{authority} - {id}")
633 } else {
634 format!("?? - {id}")
635 };
636 let key = GrpcCall { desc };
637
638 Either::Right(async move {
639 let root = registry.register(key, req.uri().path());
640
641 root.instrument(inner.call(req)).await
642 })
643 }
644 }
645
646 #[cfg(not(madsim))]
647 impl<S: tonic::server::NamedService> tonic::server::NamedService for AwaitTreeMiddleware<S> {
648 const NAME: &'static str = S::NAME;
649 }
650}