1mod prometheus;
16
17use std::net::SocketAddr;
18use std::sync::Arc;
19
20use anyhow::{Context as _, Result, anyhow};
21use axum::Router;
22use axum::extract::{Extension, Path};
23use axum::http::{Method, StatusCode};
24use axum::response::{IntoResponse, Response};
25use axum::routing::get;
26use risingwave_common_heap_profiling::ProfileServiceImpl;
27use risingwave_rpc_client::MonitorClientPool;
28use tokio::net::TcpListener;
29use tower::ServiceBuilder;
30use tower_http::add_extension::AddExtensionLayer;
31use tower_http::compression::CompressionLayer;
32use tower_http::cors::{self, CorsLayer};
33
34use crate::hummock::HummockManagerRef;
35use crate::manager::MetadataManager;
36use crate::manager::diagnose::DiagnoseCommandRef;
37
38#[derive(Clone)]
39pub struct DashboardService {
40 pub await_tree_reg: await_tree::Registry,
41 pub dashboard_addr: SocketAddr,
42 pub prometheus_client: Option<prometheus_http_query::Client>,
43 pub prometheus_selector: String,
44 pub metadata_manager: MetadataManager,
45 pub hummock_manager: HummockManagerRef,
46 pub monitor_clients: MonitorClientPool,
47 pub diagnose_command: DiagnoseCommandRef,
48 pub profile_service: ProfileServiceImpl,
49 pub trace_state: otlp_embedded::StateRef,
50}
51
52pub type Service = Arc<DashboardService>;
53
54pub(super) mod handlers {
55 use std::cmp::min;
56 use std::collections::HashMap;
57
58 use anyhow::Context;
59 use axum::Json;
60 use axum::extract::Query;
61 use futures::future::join_all;
62 use itertools::Itertools;
63 use risingwave_common::id::JobId;
64 use risingwave_meta_model::WorkerId;
65 use risingwave_pb::catalog::table::TableType;
66 use risingwave_pb::catalog::{
67 Index, PbDatabase, PbFunction, PbSchema, Sink, Source, Subscription, Table, View,
68 };
69 use risingwave_pb::common::{WorkerNode, WorkerType};
70 use risingwave_pb::hummock::TableStats;
71 use risingwave_pb::meta::{
72 ActorIds, FragmentIdToActorIdMap, FragmentToRelationMap, PbTableFragments, RelationIdInfos,
73 };
74 use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
75 use risingwave_pb::monitor_service::{
76 AnalyzeHeapRequest, ChannelDeltaStats, GetStreamingPrometheusStatsResponse,
77 GetStreamingStatsResponse, HeapProfilingRequest, HeapProfilingResponse,
78 ListHeapProfilingRequest, ListHeapProfilingResponse, StackTraceResponse,
79 };
80 use risingwave_pb::user::PbUserInfo;
81 use serde::{Deserialize, Serialize};
82 use serde_json::json;
83 use thiserror_ext::AsReport;
84 use tonic::Request;
85
86 use super::*;
87 use crate::controller::fragment::StreamingJobInfo;
88 use crate::rpc::await_tree::{dump_cluster_await_tree, dump_worker_node_await_tree};
89
90 #[derive(Serialize)]
91 pub struct TableWithStats {
92 #[serde(flatten)]
93 pub table: Table,
94 pub total_size_bytes: i64,
95 pub total_key_count: i64,
96 pub total_key_size: i64,
97 pub total_value_size: i64,
98 pub compressed_size: u64,
99 }
100
101 impl TableWithStats {
102 pub fn from_table_and_stats(table: Table, stats: Option<&TableStats>) -> Self {
103 match stats {
104 Some(stats) => Self {
105 total_size_bytes: stats.total_key_size + stats.total_value_size,
106 total_key_count: stats.total_key_count,
107 total_key_size: stats.total_key_size,
108 total_value_size: stats.total_value_size,
109 compressed_size: stats.total_compressed_size,
110 table,
111 },
112 None => Self {
113 total_size_bytes: 0,
114 total_key_count: 0,
115 total_key_size: 0,
116 total_value_size: 0,
117 compressed_size: 0,
118 table,
119 },
120 }
121 }
122 }
123
124 pub struct DashboardError(anyhow::Error);
125 pub type Result<T> = std::result::Result<T, DashboardError>;
126
127 pub fn err(err: impl Into<anyhow::Error>) -> DashboardError {
128 DashboardError(err.into())
129 }
130
131 impl From<anyhow::Error> for DashboardError {
132 fn from(value: anyhow::Error) -> Self {
133 DashboardError(value)
134 }
135 }
136
137 impl IntoResponse for DashboardError {
138 fn into_response(self) -> axum::response::Response {
139 let mut resp = Json(json!({
140 "error": self.0.to_report_string(),
141 }))
142 .into_response();
143 *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
144 resp
145 }
146 }
147
148 pub async fn list_clusters(
149 Path(ty): Path<i32>,
150 Extension(srv): Extension<Service>,
151 ) -> Result<Json<Vec<WorkerNode>>> {
152 let worker_type = WorkerType::try_from(ty)
153 .map_err(|_| anyhow!("invalid worker type"))
154 .map_err(err)?;
155 let mut result = srv
156 .metadata_manager
157 .list_worker_node(Some(worker_type), None)
158 .await
159 .map_err(err)?;
160 result.sort_unstable_by_key(|n| n.id);
161 Ok(result.into())
162 }
163
164 async fn list_table_catalogs_inner(
165 metadata_manager: &MetadataManager,
166 hummock_manager: &HummockManagerRef,
167 table_type: TableType,
168 ) -> Result<Json<Vec<TableWithStats>>> {
169 let tables = metadata_manager
170 .catalog_controller
171 .list_tables_by_type(table_type.into())
172 .await
173 .map_err(err)?;
174
175 let version_stats = hummock_manager.get_version_stats().await;
177
178 let tables_with_stats = tables
179 .into_iter()
180 .map(|table| {
181 let stats = version_stats.table_stats.get(&table.id);
182 TableWithStats::from_table_and_stats(table, stats)
183 })
184 .collect();
185
186 Ok(Json(tables_with_stats))
187 }
188
189 pub async fn list_materialized_views(
190 Extension(srv): Extension<Service>,
191 ) -> Result<Json<Vec<TableWithStats>>> {
192 list_table_catalogs_inner(
193 &srv.metadata_manager,
194 &srv.hummock_manager,
195 TableType::MaterializedView,
196 )
197 .await
198 }
199
200 pub async fn list_tables(
201 Extension(srv): Extension<Service>,
202 ) -> Result<Json<Vec<TableWithStats>>> {
203 list_table_catalogs_inner(
204 &srv.metadata_manager,
205 &srv.hummock_manager,
206 TableType::Table,
207 )
208 .await
209 }
210
211 pub async fn list_index_tables(
212 Extension(srv): Extension<Service>,
213 ) -> Result<Json<Vec<TableWithStats>>> {
214 list_table_catalogs_inner(
215 &srv.metadata_manager,
216 &srv.hummock_manager,
217 TableType::Index,
218 )
219 .await
220 }
221
222 pub async fn list_indexes(Extension(srv): Extension<Service>) -> Result<Json<Vec<Index>>> {
223 let indexes = srv
224 .metadata_manager
225 .catalog_controller
226 .list_indexes()
227 .await
228 .map_err(err)?;
229
230 Ok(Json(indexes))
231 }
232
233 pub async fn list_subscription(
234 Extension(srv): Extension<Service>,
235 ) -> Result<Json<Vec<Subscription>>> {
236 let subscriptions = srv
237 .metadata_manager
238 .catalog_controller
239 .list_subscriptions()
240 .await
241 .map_err(err)?;
242
243 Ok(Json(subscriptions))
244 }
245
246 pub async fn list_internal_tables(
247 Extension(srv): Extension<Service>,
248 ) -> Result<Json<Vec<TableWithStats>>> {
249 list_table_catalogs_inner(
250 &srv.metadata_manager,
251 &srv.hummock_manager,
252 TableType::Internal,
253 )
254 .await
255 }
256
257 pub async fn list_sources(Extension(srv): Extension<Service>) -> Result<Json<Vec<Source>>> {
258 let sources = srv.metadata_manager.list_sources().await.map_err(err)?;
259
260 Ok(Json(sources))
261 }
262
263 pub async fn list_sinks(Extension(srv): Extension<Service>) -> Result<Json<Vec<Sink>>> {
264 let sinks = srv
265 .metadata_manager
266 .catalog_controller
267 .list_sinks()
268 .await
269 .map_err(err)?;
270
271 Ok(Json(sinks))
272 }
273
274 pub async fn list_views(Extension(srv): Extension<Service>) -> Result<Json<Vec<View>>> {
275 let views = srv
276 .metadata_manager
277 .catalog_controller
278 .list_views()
279 .await
280 .map_err(err)?;
281
282 Ok(Json(views))
283 }
284
285 pub async fn list_functions(
286 Extension(srv): Extension<Service>,
287 ) -> Result<Json<Vec<PbFunction>>> {
288 let functions = srv
289 .metadata_manager
290 .catalog_controller
291 .list_functions()
292 .await
293 .map_err(err)?;
294
295 Ok(Json(functions))
296 }
297
298 pub async fn list_streaming_jobs(
299 Extension(srv): Extension<Service>,
300 ) -> Result<Json<Vec<StreamingJobInfo>>> {
301 let streaming_jobs = srv
302 .metadata_manager
303 .catalog_controller
304 .list_streaming_job_infos()
305 .await
306 .map_err(err)?;
307
308 Ok(Json(streaming_jobs))
309 }
310
311 pub async fn get_fragment_to_relation_map(
319 Extension(srv): Extension<Service>,
320 ) -> Result<Json<FragmentToRelationMap>> {
321 let table_fragments = srv
322 .metadata_manager
323 .catalog_controller
324 .table_fragments()
325 .await
326 .map_err(err)?;
327 let mut fragment_to_relation_map = HashMap::new();
328 for (relation_id, (tf, _, _)) in table_fragments {
329 for fragment_id in tf.fragments.keys() {
330 fragment_to_relation_map.insert(*fragment_id, relation_id);
331 }
332 }
333 let map = FragmentToRelationMap {
334 fragment_to_relation_map,
335 };
336 Ok(Json(map))
337 }
338
339 pub async fn get_relation_id_infos(
341 Extension(srv): Extension<Service>,
342 ) -> Result<Json<RelationIdInfos>> {
343 let table_fragments = srv
344 .metadata_manager
345 .catalog_controller
346 .table_fragments()
347 .await
348 .map_err(err)?;
349 let mut map = HashMap::new();
350 for (id, (tf, fragment_actors, _actor_status)) in table_fragments {
351 let mut fragment_id_to_actor_ids = HashMap::new();
352 for fragment_id in tf.fragments.keys() {
353 let actor_ids = fragment_actors
354 .get(fragment_id)
355 .into_iter()
356 .flat_map(|actors| actors.iter().map(|a| a.actor_id))
357 .collect_vec();
358 fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
359 }
360 map.insert(
361 id.as_raw_id(),
362 FragmentIdToActorIdMap {
363 map: fragment_id_to_actor_ids,
364 },
365 );
366 }
367 let relation_id_infos = RelationIdInfos { map };
368
369 Ok(Json(relation_id_infos))
370 }
371
372 pub async fn list_fragments_by_job_id(
373 Extension(srv): Extension<Service>,
374 Path(job_id): Path<u32>,
375 ) -> Result<Json<PbTableFragments>> {
376 let job_id = JobId::new(job_id);
377 let (table_fragments, fragment_actors, actor_status) = srv
378 .metadata_manager
379 .catalog_controller
380 .get_job_fragments_by_id(job_id)
381 .await
382 .map_err(err)?;
383 let upstream_fragments = srv
384 .metadata_manager
385 .catalog_controller
386 .upstream_fragments(table_fragments.fragment_ids())
387 .await
388 .map_err(err)?;
389 let dispatchers = srv
390 .metadata_manager
391 .catalog_controller
392 .get_fragment_actor_dispatchers(
393 table_fragments.fragment_ids().map(|id| id as _).collect(),
394 )
395 .await
396 .map_err(err)?;
397 Ok(Json(table_fragments.to_protobuf(
398 &fragment_actors,
399 &upstream_fragments,
400 &dispatchers,
401 actor_status,
402 )))
403 }
404
405 pub async fn list_users(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbUserInfo>>> {
406 let users = srv
407 .metadata_manager
408 .catalog_controller
409 .list_users()
410 .await
411 .map_err(err)?;
412
413 Ok(Json(users))
414 }
415
416 pub async fn list_databases(
417 Extension(srv): Extension<Service>,
418 ) -> Result<Json<Vec<PbDatabase>>> {
419 let databases = srv
420 .metadata_manager
421 .catalog_controller
422 .list_databases()
423 .await
424 .map_err(err)?;
425
426 Ok(Json(databases))
427 }
428
429 pub async fn list_schemas(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbSchema>>> {
430 let schemas = srv
431 .metadata_manager
432 .catalog_controller
433 .list_schemas()
434 .await
435 .map_err(err)?;
436
437 Ok(Json(schemas))
438 }
439
440 #[derive(Serialize)]
441 #[serde(rename_all = "camelCase")]
442 pub struct DashboardObjectDependency {
443 pub object_id: u32,
444 pub referenced_object_id: u32,
445 }
446
447 pub async fn list_object_dependencies(
448 Extension(srv): Extension<Service>,
449 ) -> Result<Json<Vec<DashboardObjectDependency>>> {
450 let object_dependencies = srv
451 .metadata_manager
452 .catalog_controller
453 .list_all_object_dependencies()
454 .await
455 .map_err(err)?;
456
457 let result = object_dependencies
458 .into_iter()
459 .map(|dependency| DashboardObjectDependency {
460 object_id: dependency.object_id.as_raw_id(),
461 referenced_object_id: dependency.referenced_object_id.as_raw_id(),
462 })
463 .collect();
464
465 Ok(Json(result))
466 }
467
468 #[derive(Debug, Deserialize)]
469 pub struct AwaitTreeDumpParams {
470 #[serde(default = "await_tree_default_format")]
471 format: String,
472 }
473
474 impl AwaitTreeDumpParams {
475 pub fn actor_traces_format(&self) -> Result<ActorTracesFormat> {
477 Ok(match self.format.as_str() {
478 "text" => ActorTracesFormat::Text,
479 "json" => ActorTracesFormat::Json,
480 _ => {
481 return Err(err(anyhow!(
482 "Unsupported format `{}`, only `text` and `json` are supported for now",
483 self.format
484 )));
485 }
486 })
487 }
488 }
489
490 fn await_tree_default_format() -> String {
491 "text".to_owned()
493 }
494
495 pub async fn dump_await_tree_all(
496 Query(params): Query<AwaitTreeDumpParams>,
497 Extension(srv): Extension<Service>,
498 ) -> Result<Json<StackTraceResponse>> {
499 let actor_traces_format = params.actor_traces_format()?;
500
501 let res = dump_cluster_await_tree(
502 &srv.metadata_manager,
503 &srv.await_tree_reg,
504 actor_traces_format,
505 )
506 .await
507 .map_err(err)?;
508
509 Ok(res.into())
510 }
511
512 pub async fn dump_await_tree(
513 Path(worker_id): Path<WorkerId>,
514 Query(params): Query<AwaitTreeDumpParams>,
515 Extension(srv): Extension<Service>,
516 ) -> Result<Json<StackTraceResponse>> {
517 let actor_traces_format = params.actor_traces_format()?;
518
519 let worker_node = srv
520 .metadata_manager
521 .get_worker_by_id(worker_id)
522 .await
523 .map_err(err)?
524 .context("worker node not found")
525 .map_err(err)?;
526
527 let res = dump_worker_node_await_tree(std::iter::once(&worker_node), actor_traces_format)
528 .await
529 .map_err(err)?;
530
531 Ok(res.into())
532 }
533
534 pub async fn heap_profile(
535 Path(worker_id): Path<WorkerId>,
536 Extension(srv): Extension<Service>,
537 ) -> Result<Json<HeapProfilingResponse>> {
538 if worker_id == crate::manager::META_NODE_ID {
539 let result = srv
540 .profile_service
541 .heap_profiling(Request::new(HeapProfilingRequest { dir: "".to_owned() }))
542 .await
543 .map_err(err)?
544 .into_inner();
545 return Ok(result.into());
546 }
547
548 let worker_node = srv
549 .metadata_manager
550 .get_worker_by_id(worker_id)
551 .await
552 .map_err(err)?
553 .context("worker node not found")
554 .map_err(err)?;
555
556 let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
557
558 let result = client.heap_profile("".to_owned()).await.map_err(err)?;
559
560 Ok(result.into())
561 }
562
563 pub async fn list_heap_profile(
564 Path(worker_id): Path<WorkerId>,
565 Extension(srv): Extension<Service>,
566 ) -> Result<Json<ListHeapProfilingResponse>> {
567 if worker_id == crate::manager::META_NODE_ID {
568 let result = srv
569 .profile_service
570 .list_heap_profiling(Request::new(ListHeapProfilingRequest {}))
571 .await
572 .map_err(err)?
573 .into_inner();
574 return Ok(result.into());
575 }
576
577 let worker_node = srv
578 .metadata_manager
579 .get_worker_by_id(worker_id)
580 .await
581 .map_err(err)?
582 .context("worker node not found")
583 .map_err(err)?;
584
585 let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
586
587 let result = client.list_heap_profile().await.map_err(err)?;
588 Ok(result.into())
589 }
590
591 pub async fn analyze_heap(
592 Path((worker_id, file_path)): Path<(WorkerId, String)>,
593 Extension(srv): Extension<Service>,
594 ) -> Result<Response> {
595 let file_path =
596 String::from_utf8(base64_url::decode(&file_path).map_err(err)?).map_err(err)?;
597
598 let collapsed_bin = if worker_id == crate::manager::META_NODE_ID {
599 srv.profile_service
600 .analyze_heap(Request::new(AnalyzeHeapRequest {
601 path: file_path.clone(),
602 }))
603 .await
604 .map_err(err)?
605 .into_inner()
606 .result
607 } else {
608 let worker_node = srv
609 .metadata_manager
610 .get_worker_by_id(worker_id)
611 .await
612 .map_err(err)?
613 .context("worker node not found")
614 .map_err(err)?;
615
616 let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
617 client
618 .analyze_heap(file_path.clone())
619 .await
620 .map_err(err)?
621 .result
622 };
623 let collapsed_str = String::from_utf8_lossy(&collapsed_bin).to_string();
624
625 let response = Response::builder()
626 .header("Content-Type", "application/octet-stream")
627 .body(collapsed_str.into());
628
629 response.map_err(err)
630 }
631
632 #[derive(Debug, Deserialize)]
633 pub struct DiagnoseParams {
634 #[serde(default = "await_tree_default_format")]
635 actor_traces_format: String,
636 }
637
638 pub async fn diagnose(
639 Query(params): Query<DiagnoseParams>,
640 Extension(srv): Extension<Service>,
641 ) -> Result<String> {
642 let actor_traces_format = match params.actor_traces_format.as_str() {
643 "text" => ActorTracesFormat::Text,
644 "json" => ActorTracesFormat::Json,
645 _ => {
646 return Err(err(anyhow!(
647 "Unsupported actor_traces_format `{}`, only `text` and `json` are supported for now",
648 params.actor_traces_format
649 )));
650 }
651 };
652 Ok(srv.diagnose_command.report(actor_traces_format).await)
653 }
654
655 pub async fn get_streaming_stats(
661 Extension(srv): Extension<Service>,
662 ) -> Result<Json<GetStreamingStatsResponse>> {
663 let worker_nodes = srv
664 .metadata_manager
665 .list_active_streaming_compute_nodes()
666 .await
667 .map_err(err)?;
668
669 let mut futures = Vec::new();
670
671 for worker_node in worker_nodes {
672 let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
673 let client = Arc::new(client);
674 let fut = async move {
675 let result = client.get_streaming_stats().await.map_err(err)?;
676 Ok::<_, DashboardError>(result)
677 };
678 futures.push(fut);
679 }
680 let results = join_all(futures).await;
681
682 let mut all = GetStreamingStatsResponse::default();
683
684 for result in results {
685 let result = result
686 .map_err(|_| anyhow!("Failed to get back pressure"))
687 .map_err(err)?;
688
689 for (fragment_id, fragment_stats) in result.fragment_stats {
691 if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
692 s.actor_count += fragment_stats.actor_count;
693 s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
694 } else {
695 all.fragment_stats.insert(fragment_id, fragment_stats);
696 }
697 }
698
699 for (relation_id, relation_stats) in result.relation_stats {
701 if let Some(s) = all.relation_stats.get_mut(&relation_id) {
702 s.actor_count += relation_stats.actor_count;
703 s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
704 } else {
705 all.relation_stats.insert(relation_id, relation_stats);
706 }
707 }
708
709 for (key, channel_stats) in result.channel_stats {
711 if let Some(s) = all.channel_stats.get_mut(&key) {
712 s.actor_count += channel_stats.actor_count;
713 s.output_blocking_duration += channel_stats.output_blocking_duration;
714 s.recv_row_count += channel_stats.recv_row_count;
715 s.send_row_count += channel_stats.send_row_count;
716 } else {
717 all.channel_stats.insert(key, channel_stats);
718 }
719 }
720 }
721
722 Ok(all.into())
723 }
724
725 #[derive(Debug, Deserialize)]
726 pub struct StreamingStatsPrometheusParams {
727 #[serde(default)]
729 at: Option<i64>,
730 #[serde(default = "streaming_stats_default_time_offset")]
732 time_offset: i64,
733 }
734
735 fn streaming_stats_default_time_offset() -> i64 {
736 60
737 }
738
739 pub async fn get_streaming_stats_from_prometheus(
740 Query(params): Query<StreamingStatsPrometheusParams>,
741 Extension(srv): Extension<Service>,
742 ) -> Result<Json<GetStreamingPrometheusStatsResponse>> {
743 let mut all = GetStreamingPrometheusStatsResponse::default();
744
745 let worker_nodes = srv
747 .metadata_manager
748 .list_active_streaming_compute_nodes()
749 .await
750 .map_err(err)?;
751
752 let mut futures = Vec::new();
753
754 for worker_node in worker_nodes {
755 let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
756 let client = Arc::new(client);
757 let fut = async move {
758 let result = client.get_streaming_stats().await.map_err(err)?;
759 Ok::<_, DashboardError>(result)
760 };
761 futures.push(fut);
762 }
763 let results = join_all(futures).await;
764
765 for result in results {
766 let result = result
767 .map_err(|_| anyhow!("Failed to get streaming stats from worker"))
768 .map_err(err)?;
769
770 for (fragment_id, fragment_stats) in result.fragment_stats {
772 if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
773 s.actor_count += fragment_stats.actor_count;
774 s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
775 } else {
776 all.fragment_stats.insert(fragment_id, fragment_stats);
777 }
778 }
779
780 for (relation_id, relation_stats) in result.relation_stats {
782 if let Some(s) = all.relation_stats.get_mut(&relation_id) {
783 s.actor_count += relation_stats.actor_count;
784 s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
785 } else {
786 all.relation_stats.insert(relation_id, relation_stats);
787 }
788 }
789 }
790
791 if let Some(ref client) = srv.prometheus_client {
793 let channel_input_throughput_query = format!(
795 "sum(rate(stream_actor_in_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
796 srv.prometheus_selector, params.time_offset
797 );
798 let channel_output_throughput_query = format!(
799 "sum(rate(stream_actor_out_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
800 srv.prometheus_selector, params.time_offset
801 );
802 let channel_backpressure_query = format!(
803 "sum(rate(stream_actor_output_buffer_blocking_duration_ns{{{}}}[{}s])) by (fragment_id, downstream_fragment_id) \
804 / ignoring (downstream_fragment_id) group_left sum(stream_actor_count) by (fragment_id)",
805 srv.prometheus_selector, params.time_offset
806 );
807
808 let (
810 channel_input_throughput_result,
811 channel_output_throughput_result,
812 channel_backpressure_result,
813 ) = {
814 let mut input_query = client.query(channel_input_throughput_query);
815 let mut output_query = client.query(channel_output_throughput_query);
816 let mut backpressure_query = client.query(channel_backpressure_query);
817
818 if let Some(at_time) = params.at {
820 input_query = input_query.at(at_time);
821 output_query = output_query.at(at_time);
822 backpressure_query = backpressure_query.at(at_time);
823 }
824
825 tokio::try_join!(
826 input_query.get(),
827 output_query.get(),
828 backpressure_query.get(),
829 )
830 .map_err(err)?
831 };
832
833 let mut channel_data = HashMap::new();
835
836 if let Some(channel_input_throughput_data) =
838 channel_input_throughput_result.data().as_vector()
839 {
840 for sample in channel_input_throughput_data {
841 if let Some(fragment_id_str) = sample.metric().get("fragment_id")
842 && let Some(upstream_fragment_id_str) =
843 sample.metric().get("upstream_fragment_id")
844 && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
845 fragment_id_str.parse::<u32>(),
846 upstream_fragment_id_str.parse::<u32>(),
847 )
848 {
849 let key = format!("{}_{}", upstream_fragment_id, fragment_id);
850 channel_data
851 .entry(key)
852 .or_insert_with(|| ChannelDeltaStats {
853 actor_count: 0,
854 backpressure_rate: 0.0,
855 recv_throughput: 0.0,
856 send_throughput: 0.0,
857 })
858 .recv_throughput = sample.sample().value();
859 }
860 }
861 }
862
863 if let Some(channel_output_throughput_data) =
865 channel_output_throughput_result.data().as_vector()
866 {
867 for sample in channel_output_throughput_data {
868 if let Some(fragment_id_str) = sample.metric().get("fragment_id")
869 && let Some(upstream_fragment_id_str) =
870 sample.metric().get("upstream_fragment_id")
871 && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
872 fragment_id_str.parse::<u32>(),
873 upstream_fragment_id_str.parse::<u32>(),
874 )
875 {
876 let key = format!("{}_{}", upstream_fragment_id, fragment_id);
877 channel_data
878 .entry(key)
879 .or_insert_with(|| ChannelDeltaStats {
880 actor_count: 0,
881 backpressure_rate: 0.0,
882 recv_throughput: 0.0,
883 send_throughput: 0.0,
884 })
885 .send_throughput = sample.sample().value();
886 }
887 }
888 }
889
890 if let Some(channel_backpressure_data) = channel_backpressure_result.data().as_vector()
892 {
893 for sample in channel_backpressure_data {
894 if let Some(fragment_id_str) = sample.metric().get("fragment_id")
895 && let Some(downstream_fragment_id_str) =
896 sample.metric().get("downstream_fragment_id")
897 && let (Ok(fragment_id), Ok(downstream_fragment_id)) = (
898 fragment_id_str.parse::<u32>(),
899 downstream_fragment_id_str.parse::<u32>(),
900 )
901 {
902 let key = format!("{}_{}", fragment_id, downstream_fragment_id);
903 channel_data
904 .entry(key)
905 .or_insert_with(|| ChannelDeltaStats {
906 actor_count: 0,
907 backpressure_rate: 0.0,
908 recv_throughput: 0.0,
909 send_throughput: 0.0,
910 })
911 .backpressure_rate = sample.sample().value() / 1_000_000_000.0; }
913 }
914 }
915
916 for (key, channel_stats) in &mut channel_data {
918 let parts: Vec<&str> = key.split('_').collect();
919 if parts.len() == 2
920 && let Ok(fragment_id) = parts[1].parse::<u32>()
921 && let Some(fragment_stats) = all.fragment_stats.get(&fragment_id)
922 {
923 channel_stats.actor_count = fragment_stats.actor_count;
924 }
925 }
926
927 all.channel_stats = channel_data;
928
929 Ok(Json(all))
930 } else {
931 Err(err(anyhow!("Prometheus endpoint is not set")))
932 }
933 }
934
935 pub async fn get_version(Extension(_srv): Extension<Service>) -> Result<Json<String>> {
936 Ok(Json(risingwave_common::current_cluster_version()))
937 }
938}
939
940impl DashboardService {
941 pub async fn serve(self) -> Result<()> {
942 use handlers::*;
943 let srv = Arc::new(self);
944
945 let cors_layer = CorsLayer::new()
946 .allow_origin(cors::Any)
947 .allow_methods(vec![Method::GET]);
948
949 let api_router = Router::new()
950 .route("/version", get(get_version))
951 .route("/clusters/{ty}", get(list_clusters))
952 .route("/streaming_jobs", get(list_streaming_jobs))
953 .route("/fragments/job_id/{job_id}", get(list_fragments_by_job_id))
954 .route("/relation_id_infos", get(get_relation_id_infos))
955 .route(
956 "/fragment_to_relation_map",
957 get(get_fragment_to_relation_map),
958 )
959 .route("/views", get(list_views))
960 .route("/functions", get(list_functions))
961 .route("/materialized_views", get(list_materialized_views))
962 .route("/tables", get(list_tables))
963 .route("/indexes", get(list_index_tables))
964 .route("/index_items", get(list_indexes))
965 .route("/subscriptions", get(list_subscription))
966 .route("/internal_tables", get(list_internal_tables))
967 .route("/sources", get(list_sources))
968 .route("/sinks", get(list_sinks))
969 .route("/users", get(list_users))
970 .route("/databases", get(list_databases))
971 .route("/schemas", get(list_schemas))
972 .route("/object_dependencies", get(list_object_dependencies))
973 .route("/metrics/cluster", get(prometheus::list_prometheus_cluster))
974 .route("/metrics/streaming_stats", get(get_streaming_stats))
975 .route(
976 "/metrics/streaming_stats_prometheus",
977 get(get_streaming_stats_from_prometheus),
978 )
979 .route("/monitor/await_tree/{worker_id}", get(dump_await_tree))
981 .route("/monitor/await_tree/", get(dump_await_tree_all))
983 .route("/monitor/dump_heap_profile/{worker_id}", get(heap_profile))
984 .route(
985 "/monitor/list_heap_profile/{worker_id}",
986 get(list_heap_profile),
987 )
988 .route("/monitor/analyze/{worker_id}/{*path}", get(analyze_heap))
989 .route("/monitor/diagnose/", get(diagnose))
991 .layer(
992 ServiceBuilder::new()
993 .layer(AddExtensionLayer::new(srv.clone()))
994 .into_inner(),
995 )
996 .layer(cors_layer);
997
998 let trace_ui_router = otlp_embedded::ui_app(srv.trace_state.clone(), "/trace/");
999 let dashboard_router = risingwave_meta_dashboard::router();
1000
1001 let app = Router::new()
1002 .fallback_service(dashboard_router)
1003 .nest("/api", api_router)
1004 .nest("/trace", trace_ui_router)
1005 .layer(CompressionLayer::new());
1006
1007 let listener = TcpListener::bind(&srv.dashboard_addr)
1008 .await
1009 .context("failed to bind dashboard address")?;
1010 axum::serve(listener, app)
1011 .await
1012 .context("failed to serve dashboard service")?;
1013
1014 Ok(())
1015 }
1016}