1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::time::Duration;
17
18use foyer::{HybridCache, TracingOptions};
19use prometheus::core::Collector;
20use prometheus::proto::Metric;
21use risingwave_common::config::{MetricLevel, ServerConfig};
22use risingwave_common_heap_profiling::ProfileServiceImpl;
23use risingwave_hummock_sdk::HummockSstableObjectId;
24use risingwave_jni_core::jvm_runtime::dump_jvm_stack_traces;
25use risingwave_pb::monitor_service::monitor_service_server::MonitorService;
26use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
27use risingwave_pb::monitor_service::{
28 AnalyzeHeapRequest, AnalyzeHeapResponse, ChannelStats, FragmentStats, GetProfileStatsRequest,
29 GetProfileStatsResponse, GetStreamingStatsRequest, GetStreamingStatsResponse,
30 HeapProfilingRequest, HeapProfilingResponse, ListHeapProfilingRequest,
31 ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse, RelationStats,
32 StackTraceRequest, StackTraceResponse, TieredCacheTracingRequest, TieredCacheTracingResponse,
33};
34use risingwave_storage::hummock::compactor::await_tree_key::Compaction;
35use risingwave_storage::hummock::{Block, Sstable, SstableBlockIndex};
36use risingwave_stream::executor::monitor::global_streaming_metrics;
37use risingwave_stream::task::LocalStreamManager;
38use risingwave_stream::task::await_tree_key::{Actor, BarrierAwait};
39use thiserror_ext::AsReport;
40use tonic::{Request, Response, Status};
41
42type MetaCache = HybridCache<HummockSstableObjectId, Box<Sstable>>;
43type BlockCache = HybridCache<SstableBlockIndex, Box<Block>>;
44
45#[derive(Clone)]
46pub struct MonitorServiceImpl {
47 stream_mgr: LocalStreamManager,
48 profile_service: ProfileServiceImpl,
49 meta_cache: Option<MetaCache>,
50 block_cache: Option<BlockCache>,
51}
52
53impl MonitorServiceImpl {
54 pub fn new(
55 stream_mgr: LocalStreamManager,
56 server_config: ServerConfig,
57 meta_cache: Option<MetaCache>,
58 block_cache: Option<BlockCache>,
59 ) -> Self {
60 Self {
61 stream_mgr,
62 profile_service: ProfileServiceImpl::new(server_config),
63 meta_cache,
64 block_cache,
65 }
66 }
67}
68
69#[async_trait::async_trait]
70impl MonitorService for MonitorServiceImpl {
71 async fn stack_trace(
72 &self,
73 request: Request<StackTraceRequest>,
74 ) -> Result<Response<StackTraceResponse>, Status> {
75 let req = request.into_inner();
76
77 let actor_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() {
78 reg.collect::<Actor>()
79 .into_iter()
80 .map(|(k, v)| {
81 (
82 k.0.as_raw_id(),
83 if req.actor_traces_format == ActorTracesFormat::Text as i32 {
84 v.to_string()
85 } else {
86 serde_json::to_string(&v).unwrap()
87 },
88 )
89 })
90 .collect()
91 } else {
92 Default::default()
93 };
94
95 let barrier_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() {
96 reg.collect::<BarrierAwait>()
97 .into_iter()
98 .map(|(k, v)| (k.prev_epoch, v.to_string()))
99 .collect()
100 } else {
101 Default::default()
102 };
103
104 let rpc_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() {
105 reg.collect::<GrpcCall>()
106 .into_iter()
107 .map(|(k, v)| (k.desc, v.to_string()))
108 .collect()
109 } else {
110 Default::default()
111 };
112
113 let compaction_task_traces = if let Some(hummock) =
114 self.stream_mgr.env.state_store().as_hummock()
115 && let Some(m) = hummock.compaction_await_tree_reg()
116 {
117 m.collect::<Compaction>()
118 .into_iter()
119 .map(|(k, v)| (format!("{k:?}"), v.to_string()))
120 .collect()
121 } else {
122 Default::default()
123 };
124
125 let barrier_worker_state = self.stream_mgr.inspect_barrier_state().await?;
126
127 let jvm_stack_traces = match dump_jvm_stack_traces() {
128 Ok(None) => None,
129 Err(err) => Some(err.as_report().to_string()),
130 Ok(Some(stack_traces)) => Some(stack_traces),
131 };
132
133 Ok(Response::new(StackTraceResponse {
134 actor_traces,
135 rpc_traces,
136 compaction_task_traces,
137 inflight_barrier_traces: barrier_traces,
138 barrier_worker_state: BTreeMap::from_iter([(
139 self.stream_mgr.env.worker_id(),
140 barrier_worker_state,
141 )]),
142 jvm_stack_traces: match jvm_stack_traces {
143 Some(stack_traces) => {
144 BTreeMap::from_iter([(self.stream_mgr.env.worker_id(), stack_traces)])
145 }
146 None => BTreeMap::new(),
147 },
148 meta_traces: Default::default(),
149 node_errors: Default::default(),
150 }))
151 }
152
153 async fn profiling(
154 &self,
155 request: Request<ProfilingRequest>,
156 ) -> Result<Response<ProfilingResponse>, Status> {
157 self.profile_service.profiling(request).await
158 }
159
160 async fn heap_profiling(
161 &self,
162 request: Request<HeapProfilingRequest>,
163 ) -> Result<Response<HeapProfilingResponse>, Status> {
164 self.profile_service.heap_profiling(request).await
165 }
166
167 async fn list_heap_profiling(
168 &self,
169 _request: Request<ListHeapProfilingRequest>,
170 ) -> Result<Response<ListHeapProfilingResponse>, Status> {
171 self.profile_service.list_heap_profiling(_request).await
172 }
173
174 async fn analyze_heap(
175 &self,
176 request: Request<AnalyzeHeapRequest>,
177 ) -> Result<Response<AnalyzeHeapResponse>, Status> {
178 self.profile_service.analyze_heap(request).await
179 }
180
181 async fn get_profile_stats(
182 &self,
183 request: Request<GetProfileStatsRequest>,
184 ) -> Result<Response<GetProfileStatsResponse>, Status> {
185 let metrics = global_streaming_metrics(MetricLevel::Info);
186 let inner = request.into_inner();
187 let executor_ids = &inner.executor_ids;
188 let fragment_ids = HashSet::from_iter(inner.dispatcher_fragment_ids);
189 let stream_node_output_row_count = metrics
190 .mem_stream_node_output_row_count
191 .collect(executor_ids);
192 let stream_node_output_blocking_duration_ns = metrics
193 .mem_stream_node_output_blocking_duration_ns
194 .collect(executor_ids);
195
196 fn collect_by_fragment_ids<T: Collector>(
198 m: &T,
199 fragment_ids: &HashSet<FragmentId>,
200 ) -> HashMap<FragmentId, u64> {
201 let mut metrics = HashMap::new();
202 for mut metric_family in m.collect() {
203 for metric in metric_family.take_metric() {
204 let fragment_id = get_label_infallible(&metric, "fragment_id");
205 if fragment_ids.contains(&fragment_id) {
206 let entry = metrics.entry(fragment_id).or_insert(0);
207 *entry += metric.get_counter().value() as u64;
208 }
209 }
210 }
211 metrics
212 }
213
214 let dispatch_fragment_output_row_count =
215 collect_by_fragment_ids(&metrics.actor_out_record_cnt, &fragment_ids);
216 let dispatch_fragment_output_blocking_duration_ns = collect_by_fragment_ids(
217 &metrics.actor_output_buffer_blocking_duration_ns,
218 &fragment_ids,
219 );
220 Ok(Response::new(GetProfileStatsResponse {
221 stream_node_output_row_count,
222 stream_node_output_blocking_duration_ns,
223 dispatch_fragment_output_row_count,
224 dispatch_fragment_output_blocking_duration_ns,
225 }))
226 }
227
228 async fn get_streaming_stats(
229 &self,
230 _request: Request<GetStreamingStatsRequest>,
231 ) -> Result<Response<GetStreamingStatsResponse>, Status> {
232 let metrics = global_streaming_metrics(MetricLevel::Info);
233
234 fn collect<T: Collector>(m: &T) -> Vec<Metric> {
235 m.collect().into_iter().next().unwrap().take_metric()
236 }
237
238 let actor_output_buffer_blocking_duration_ns =
239 collect(&metrics.actor_output_buffer_blocking_duration_ns);
240 let actor_count = collect(&metrics.actor_count);
241
242 let actor_count: HashMap<_, _> = actor_count
243 .iter()
244 .map(|m| {
245 let fragment_id: u32 = get_label_infallible(m, "fragment_id");
246 let count = m.get_gauge().value() as u32;
247 (fragment_id, count)
248 })
249 .collect();
250
251 let mut fragment_stats: HashMap<u32, FragmentStats> = HashMap::new();
252 for (&fragment_id, &actor_count) in &actor_count {
253 fragment_stats.insert(
254 fragment_id,
255 FragmentStats {
256 actor_count,
257 current_epoch: 0,
258 },
259 );
260 }
261
262 let actor_current_epoch = collect(&metrics.actor_current_epoch);
263 for m in &actor_current_epoch {
264 let fragment_id: u32 = get_label_infallible(m, "fragment_id");
265 let epoch = m.get_gauge().value() as u64;
266 if let Some(s) = fragment_stats.get_mut(&fragment_id) {
267 s.current_epoch = if s.current_epoch == 0 {
268 epoch
269 } else {
270 u64::min(s.current_epoch, epoch)
271 }
272 } else {
273 warn!(
274 fragment_id = fragment_id,
275 "Miss corresponding actor count metrics"
276 );
277 }
278 }
279
280 let mut relation_stats: HashMap<u32, RelationStats> = HashMap::new();
281 let mview_current_epoch = collect(&metrics.materialize_current_epoch);
282 for m in &mview_current_epoch {
283 let table_id: u32 = get_label_infallible(m, "table_id");
284 let epoch = m.get_gauge().value() as u64;
285 if let Some(s) = relation_stats.get_mut(&table_id) {
286 s.current_epoch = if s.current_epoch == 0 {
287 epoch
288 } else {
289 u64::min(s.current_epoch, epoch)
290 };
291 s.actor_count += 1;
292 } else {
293 relation_stats.insert(
294 table_id,
295 RelationStats {
296 actor_count: 1,
297 current_epoch: epoch,
298 },
299 );
300 }
301 }
302
303 let mut channel_stats: BTreeMap<String, ChannelStats> = BTreeMap::new();
304
305 for metric in actor_output_buffer_blocking_duration_ns {
306 let fragment_id: u32 = get_label_infallible(&metric, "fragment_id");
307 let downstream_fragment_id: u32 =
308 get_label_infallible(&metric, "downstream_fragment_id");
309
310 let actor_count_to_add =
311 if get_label_infallible::<String>(&metric, "actor_id").is_empty() {
312 match actor_count.get(&fragment_id) {
313 Some(&count) => count,
314 None => {
315 warn!(
318 fragment_id = fragment_id,
319 downstream_fragment_id = downstream_fragment_id,
320 "Miss corresponding actor count metrics"
321 );
322 continue;
323 }
324 }
325 } else {
326 1
327 };
328
329 let key = format!("{}_{}", fragment_id, downstream_fragment_id);
330 let channel_stat = channel_stats.entry(key).or_insert_with(|| ChannelStats {
331 actor_count: 0,
332 output_blocking_duration: 0.,
333 recv_row_count: 0,
334 send_row_count: 0,
335 });
336
337 channel_stat.actor_count += actor_count_to_add;
340 channel_stat.output_blocking_duration += metric.get_counter().value();
341 }
342
343 let actor_output_row_count = collect(&metrics.actor_out_record_cnt);
344 for metric in actor_output_row_count {
345 let fragment_id: u32 = get_label_infallible(&metric, "fragment_id");
346
347 let key_prefix = format!("{}_", fragment_id);
349 let key_range_end = format!("{}`", fragment_id); for (_, s) in channel_stats.range_mut(key_prefix..key_range_end) {
351 s.send_row_count += metric.get_counter().value() as u64;
352 }
353 }
354
355 let actor_input_row_count = collect(&metrics.actor_in_record_cnt);
356 for metric in actor_input_row_count {
357 let upstream_fragment_id: u32 = get_label_infallible(&metric, "upstream_fragment_id");
358 let fragment_id: u32 = get_label_infallible(&metric, "fragment_id");
359
360 let key = format!("{}_{}", upstream_fragment_id, fragment_id);
361 if let Some(s) = channel_stats.get_mut(&key) {
362 s.recv_row_count += metric.get_counter().value() as u64;
363 }
364 }
365
366 let channel_stats = channel_stats.into_iter().collect();
367 Ok(Response::new(GetStreamingStatsResponse {
368 channel_stats,
369 fragment_stats,
370 relation_stats,
371 }))
372 }
373
374 async fn tiered_cache_tracing(
375 &self,
376 request: Request<TieredCacheTracingRequest>,
377 ) -> Result<Response<TieredCacheTracingResponse>, Status> {
378 let req = request.into_inner();
379
380 tracing::info!("Update tiered cache tracing config: {req:?}");
381
382 if let Some(cache) = &self.meta_cache {
383 if req.enable {
384 cache.enable_tracing();
385 } else {
386 cache.disable_tracing();
387 }
388 let mut options = TracingOptions::new();
389 if let Some(threshold) = req.record_hybrid_insert_threshold_ms {
390 options = options
391 .with_record_hybrid_insert_threshold(Duration::from_millis(threshold as _));
392 }
393 if let Some(threshold) = req.record_hybrid_get_threshold_ms {
394 options =
395 options.with_record_hybrid_get_threshold(Duration::from_millis(threshold as _));
396 }
397 if let Some(threshold) = req.record_hybrid_remove_threshold_ms {
398 options = options
399 .with_record_hybrid_remove_threshold(Duration::from_millis(threshold as _));
400 }
401 if let Some(threshold) = req.record_hybrid_fetch_threshold_ms {
402 options = options.with_record_hybrid_get_or_fetch_threshold(Duration::from_millis(
403 threshold as _,
404 ));
405 }
406 cache.update_tracing_options(options);
407 }
408
409 if let Some(cache) = &self.block_cache {
410 if req.enable {
411 cache.enable_tracing();
412 } else {
413 cache.disable_tracing();
414 }
415 let mut options = TracingOptions::new();
416 if let Some(threshold) = req.record_hybrid_insert_threshold_ms {
417 options = options
418 .with_record_hybrid_insert_threshold(Duration::from_millis(threshold as _));
419 }
420 if let Some(threshold) = req.record_hybrid_get_threshold_ms {
421 options =
422 options.with_record_hybrid_get_threshold(Duration::from_millis(threshold as _));
423 }
424 if let Some(threshold) = req.record_hybrid_remove_threshold_ms {
425 options = options
426 .with_record_hybrid_remove_threshold(Duration::from_millis(threshold as _));
427 }
428 if let Some(threshold) = req.record_hybrid_fetch_threshold_ms {
429 options = options.with_record_hybrid_get_or_fetch_threshold(Duration::from_millis(
430 threshold as _,
431 ));
432 }
433 cache.update_tracing_options(options);
434 }
435
436 Ok(Response::new(TieredCacheTracingResponse::default()))
437 }
438}
439
440pub use grpc_middleware::*;
441use risingwave_common::metrics::get_label_infallible;
442use risingwave_pb::id::FragmentId;
443
444pub mod grpc_middleware {
445 use std::sync::Arc;
446 use std::sync::atomic::{AtomicU64, Ordering};
447 use std::task::{Context, Poll};
448
449 use either::Either;
450 use futures::Future;
451 use tonic::body::Body;
452 use tower::{Layer, Service};
453
454 pub type AwaitTreeRegistryRef = await_tree::Registry;
456
457 #[derive(Debug, Clone, PartialEq, Eq, Hash)]
459 pub struct GrpcCall {
460 pub desc: String,
461 }
462
463 #[derive(Clone)]
464 pub struct AwaitTreeMiddlewareLayer {
465 registry: Option<AwaitTreeRegistryRef>,
466 }
467
468 impl AwaitTreeMiddlewareLayer {
469 pub fn new(registry: AwaitTreeRegistryRef) -> Self {
470 Self {
471 registry: Some(registry),
472 }
473 }
474
475 pub fn new_optional(registry: Option<AwaitTreeRegistryRef>) -> Self {
476 Self { registry }
477 }
478 }
479
480 impl<S> Layer<S> for AwaitTreeMiddlewareLayer {
481 type Service = AwaitTreeMiddleware<S>;
482
483 fn layer(&self, service: S) -> Self::Service {
484 AwaitTreeMiddleware {
485 inner: service,
486 registry: self.registry.clone(),
487 next_id: Default::default(),
488 }
489 }
490 }
491
492 #[derive(Clone)]
493 pub struct AwaitTreeMiddleware<S> {
494 inner: S,
495 registry: Option<AwaitTreeRegistryRef>,
496 next_id: Arc<AtomicU64>,
497 }
498
499 impl<S> Service<http::Request<Body>> for AwaitTreeMiddleware<S>
500 where
501 S: Service<http::Request<Body>> + Clone,
502 {
503 type Error = S::Error;
504 type Response = S::Response;
505
506 type Future = impl Future<Output = Result<Self::Response, Self::Error>>;
507
508 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
509 self.inner.poll_ready(cx)
510 }
511
512 fn call(&mut self, req: http::Request<Body>) -> Self::Future {
513 let Some(registry) = self.registry.clone() else {
514 return Either::Left(self.inner.call(req));
515 };
516
517 let clone = self.inner.clone();
521 let mut inner = std::mem::replace(&mut self.inner, clone);
522
523 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
524 let desc = if let Some(authority) = req.uri().authority() {
525 format!("{authority} - {id}")
526 } else {
527 format!("?? - {id}")
528 };
529 let key = GrpcCall { desc };
530
531 Either::Right(async move {
532 let root = registry.register(key, req.uri().path());
533
534 root.instrument(inner.call(req)).await
535 })
536 }
537 }
538
539 #[cfg(not(madsim))]
540 impl<S: tonic::server::NamedService> tonic::server::NamedService for AwaitTreeMiddleware<S> {
541 const NAME: &'static str = S::NAME;
542 }
543}