1mod prometheus;
16
17use std::net::SocketAddr;
18use std::sync::Arc;
19
20use anyhow::{Context as _, Result, anyhow};
21use axum::Router;
22use axum::extract::{Extension, Path};
23use axum::http::{Method, StatusCode};
24use axum::response::{IntoResponse, Response};
25use axum::routing::get;
26use risingwave_common_heap_profiling::ProfileServiceImpl;
27use risingwave_rpc_client::MonitorClientPool;
28use tokio::net::TcpListener;
29use tower::ServiceBuilder;
30use tower_http::add_extension::AddExtensionLayer;
31use tower_http::compression::CompressionLayer;
32use tower_http::cors::{self, CorsLayer};
33
34use crate::hummock::HummockManagerRef;
35use crate::manager::MetadataManager;
36use crate::manager::diagnose::DiagnoseCommandRef;
37
38#[derive(Clone)]
39pub struct DashboardService {
40 pub await_tree_reg: await_tree::Registry,
41 pub dashboard_addr: SocketAddr,
42 pub prometheus_client: Option<prometheus_http_query::Client>,
43 pub prometheus_selector: String,
44 pub metadata_manager: MetadataManager,
45 pub hummock_manager: HummockManagerRef,
46 pub monitor_clients: MonitorClientPool,
47 pub diagnose_command: DiagnoseCommandRef,
48 pub profile_service: ProfileServiceImpl,
49 pub trace_state: otlp_embedded::StateRef,
50}
51
52pub type Service = Arc<DashboardService>;
53
54pub(super) mod handlers {
55 use std::cmp::min;
56 use std::collections::HashMap;
57
58 use anyhow::Context;
59 use axum::Json;
60 use axum::extract::Query;
61 use futures::future::join_all;
62 use itertools::Itertools;
63 use risingwave_common::id::JobId;
64 use risingwave_meta_model::WorkerId;
65 use risingwave_pb::catalog::table::TableType;
66 use risingwave_pb::catalog::{
67 Index, PbDatabase, PbFunction, PbSchema, Sink, Source, Subscription, Table, View,
68 };
69 use risingwave_pb::common::{WorkerNode, WorkerType};
70 use risingwave_pb::hummock::TableStats;
71 use risingwave_pb::meta::{
72 ActorIds, FragmentIdToActorIdMap, FragmentToRelationMap, PbTableFragments, RelationIdInfos,
73 };
74 use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
75 use risingwave_pb::monitor_service::{
76 AnalyzeHeapRequest, ChannelDeltaStats, GetStreamingPrometheusStatsResponse,
77 GetStreamingStatsResponse, HeapProfilingRequest, HeapProfilingResponse,
78 ListHeapProfilingRequest, ListHeapProfilingResponse, StackTraceResponse,
79 };
80 use risingwave_pb::user::PbUserInfo;
81 use serde::{Deserialize, Serialize};
82 use serde_json::json;
83 use thiserror_ext::AsReport;
84 use tonic::Request;
85
86 use super::*;
87 use crate::controller::fragment::StreamingJobInfo;
88 use crate::rpc::await_tree::{dump_cluster_await_tree, dump_worker_node_await_tree};
89
90 #[derive(Serialize)]
91 pub struct TableWithStats {
92 #[serde(flatten)]
93 pub table: Table,
94 pub total_size_bytes: i64,
95 pub total_key_count: i64,
96 pub total_key_size: i64,
97 pub total_value_size: i64,
98 pub compressed_size: u64,
99 }
100
101 impl TableWithStats {
102 pub fn from_table_and_stats(table: Table, stats: Option<&TableStats>) -> Self {
103 match stats {
104 Some(stats) => Self {
105 total_size_bytes: stats.total_key_size + stats.total_value_size,
106 total_key_count: stats.total_key_count,
107 total_key_size: stats.total_key_size,
108 total_value_size: stats.total_value_size,
109 compressed_size: stats.total_compressed_size,
110 table,
111 },
112 None => Self {
113 total_size_bytes: 0,
114 total_key_count: 0,
115 total_key_size: 0,
116 total_value_size: 0,
117 compressed_size: 0,
118 table,
119 },
120 }
121 }
122 }
123
124 pub struct DashboardError(anyhow::Error);
125 pub type Result<T> = std::result::Result<T, DashboardError>;
126
127 pub fn err(err: impl Into<anyhow::Error>) -> DashboardError {
128 DashboardError(err.into())
129 }
130
131 impl From<anyhow::Error> for DashboardError {
132 fn from(value: anyhow::Error) -> Self {
133 DashboardError(value)
134 }
135 }
136
137 impl IntoResponse for DashboardError {
138 fn into_response(self) -> axum::response::Response {
139 let mut resp = Json(json!({
140 "error": self.0.to_report_string(),
141 }))
142 .into_response();
143 *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
144 resp
145 }
146 }
147
148 pub async fn list_clusters(
149 Path(ty): Path<i32>,
150 Extension(srv): Extension<Service>,
151 ) -> Result<Json<Vec<WorkerNode>>> {
152 let worker_type = WorkerType::try_from(ty)
153 .map_err(|_| anyhow!("invalid worker type"))
154 .map_err(err)?;
155 let mut result = srv
156 .metadata_manager
157 .list_worker_node(Some(worker_type), None)
158 .await
159 .map_err(err)?;
160 result.sort_unstable_by_key(|n| n.id);
161 Ok(result.into())
162 }
163
164 async fn list_table_catalogs_inner(
165 metadata_manager: &MetadataManager,
166 hummock_manager: &HummockManagerRef,
167 table_type: TableType,
168 ) -> Result<Json<Vec<TableWithStats>>> {
169 let tables = metadata_manager
170 .catalog_controller
171 .list_tables_by_type(table_type.into())
172 .await
173 .map_err(err)?;
174
175 let version_stats = hummock_manager.get_version_stats().await;
177
178 let tables_with_stats = tables
179 .into_iter()
180 .map(|table| {
181 let stats = version_stats.table_stats.get(&table.id);
182 TableWithStats::from_table_and_stats(table, stats)
183 })
184 .collect();
185
186 Ok(Json(tables_with_stats))
187 }
188
189 pub async fn list_materialized_views(
190 Extension(srv): Extension<Service>,
191 ) -> Result<Json<Vec<TableWithStats>>> {
192 list_table_catalogs_inner(
193 &srv.metadata_manager,
194 &srv.hummock_manager,
195 TableType::MaterializedView,
196 )
197 .await
198 }
199
200 pub async fn list_tables(
201 Extension(srv): Extension<Service>,
202 ) -> Result<Json<Vec<TableWithStats>>> {
203 list_table_catalogs_inner(
204 &srv.metadata_manager,
205 &srv.hummock_manager,
206 TableType::Table,
207 )
208 .await
209 }
210
211 pub async fn list_index_tables(
212 Extension(srv): Extension<Service>,
213 ) -> Result<Json<Vec<TableWithStats>>> {
214 list_table_catalogs_inner(
215 &srv.metadata_manager,
216 &srv.hummock_manager,
217 TableType::Index,
218 )
219 .await
220 }
221
222 pub async fn list_indexes(Extension(srv): Extension<Service>) -> Result<Json<Vec<Index>>> {
223 let indexes = srv
224 .metadata_manager
225 .catalog_controller
226 .list_indexes()
227 .await
228 .map_err(err)?;
229
230 Ok(Json(indexes))
231 }
232
233 pub async fn list_subscription(
234 Extension(srv): Extension<Service>,
235 ) -> Result<Json<Vec<Subscription>>> {
236 let subscriptions = srv
237 .metadata_manager
238 .catalog_controller
239 .list_subscriptions()
240 .await
241 .map_err(err)?;
242
243 Ok(Json(subscriptions))
244 }
245
246 pub async fn list_internal_tables(
247 Extension(srv): Extension<Service>,
248 ) -> Result<Json<Vec<TableWithStats>>> {
249 list_table_catalogs_inner(
250 &srv.metadata_manager,
251 &srv.hummock_manager,
252 TableType::Internal,
253 )
254 .await
255 }
256
257 pub async fn list_sources(Extension(srv): Extension<Service>) -> Result<Json<Vec<Source>>> {
258 let sources = srv.metadata_manager.list_sources().await.map_err(err)?;
259
260 Ok(Json(sources))
261 }
262
263 pub async fn list_sinks(Extension(srv): Extension<Service>) -> Result<Json<Vec<Sink>>> {
264 let sinks = srv
265 .metadata_manager
266 .catalog_controller
267 .list_sinks()
268 .await
269 .map_err(err)?;
270
271 Ok(Json(sinks))
272 }
273
274 pub async fn list_views(Extension(srv): Extension<Service>) -> Result<Json<Vec<View>>> {
275 let views = srv
276 .metadata_manager
277 .catalog_controller
278 .list_views()
279 .await
280 .map_err(err)?;
281
282 Ok(Json(views))
283 }
284
285 pub async fn list_functions(
286 Extension(srv): Extension<Service>,
287 ) -> Result<Json<Vec<PbFunction>>> {
288 let functions = srv
289 .metadata_manager
290 .catalog_controller
291 .list_functions()
292 .await
293 .map_err(err)?;
294
295 Ok(Json(functions))
296 }
297
298 pub async fn list_streaming_jobs(
299 Extension(srv): Extension<Service>,
300 ) -> Result<Json<Vec<StreamingJobInfo>>> {
301 let streaming_jobs = srv
302 .metadata_manager
303 .catalog_controller
304 .list_streaming_job_infos()
305 .await
306 .map_err(err)?;
307
308 Ok(Json(streaming_jobs))
309 }
310
311 pub async fn get_fragment_to_relation_map(
319 Extension(srv): Extension<Service>,
320 ) -> Result<Json<FragmentToRelationMap>> {
321 let table_fragments = srv
322 .metadata_manager
323 .catalog_controller
324 .table_fragments()
325 .await
326 .map_err(err)?;
327 let mut fragment_to_relation_map = HashMap::new();
328 for (relation_id, (tf, _, _)) in table_fragments {
329 for fragment_id in tf.fragments.keys() {
330 fragment_to_relation_map.insert(*fragment_id, relation_id);
331 }
332 }
333 let map = FragmentToRelationMap {
334 fragment_to_relation_map,
335 };
336 Ok(Json(map))
337 }
338
339 pub async fn get_relation_id_infos(
341 Extension(srv): Extension<Service>,
342 ) -> Result<Json<RelationIdInfos>> {
343 let table_fragments = srv
344 .metadata_manager
345 .catalog_controller
346 .table_fragments()
347 .await
348 .map_err(err)?;
349 let mut map = HashMap::new();
350 for (id, (tf, fragment_actors, _actor_status)) in table_fragments {
351 let mut fragment_id_to_actor_ids = HashMap::new();
352 for fragment_id in tf.fragments.keys() {
353 let actor_ids = fragment_actors
354 .get(fragment_id)
355 .into_iter()
356 .flat_map(|actors| actors.iter().map(|a| a.actor_id))
357 .collect_vec();
358 fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
359 }
360 map.insert(
361 id.as_raw_id(),
362 FragmentIdToActorIdMap {
363 map: fragment_id_to_actor_ids,
364 },
365 );
366 }
367 let relation_id_infos = RelationIdInfos { map };
368
369 Ok(Json(relation_id_infos))
370 }
371
372 pub async fn list_fragments_by_job_id(
373 Extension(srv): Extension<Service>,
374 Path(job_id): Path<u32>,
375 ) -> Result<Json<PbTableFragments>> {
376 let job_id = JobId::new(job_id);
377 let (table_fragments, fragment_actors, actor_status) = srv
378 .metadata_manager
379 .catalog_controller
380 .get_job_fragments_by_id(job_id)
381 .await
382 .map_err(err)?;
383 let upstream_fragments = srv
384 .metadata_manager
385 .catalog_controller
386 .upstream_fragments(table_fragments.fragment_ids())
387 .await
388 .map_err(err)?;
389 let dispatchers = srv
390 .metadata_manager
391 .catalog_controller
392 .get_fragment_actor_dispatchers(
393 table_fragments.fragment_ids().map(|id| id as _).collect(),
394 )
395 .await
396 .map_err(err)?;
397 Ok(Json(table_fragments.to_protobuf(
398 &fragment_actors,
399 &upstream_fragments,
400 &dispatchers,
401 actor_status,
402 )))
403 }
404
405 pub async fn list_users(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbUserInfo>>> {
406 let users = srv
407 .metadata_manager
408 .catalog_controller
409 .list_users()
410 .await
411 .map_err(err)?;
412
413 Ok(Json(users))
414 }
415
416 pub async fn list_databases(
417 Extension(srv): Extension<Service>,
418 ) -> Result<Json<Vec<PbDatabase>>> {
419 let databases = srv
420 .metadata_manager
421 .catalog_controller
422 .list_databases()
423 .await
424 .map_err(err)?;
425
426 Ok(Json(databases))
427 }
428
429 pub async fn list_schemas(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbSchema>>> {
430 let schemas = srv
431 .metadata_manager
432 .catalog_controller
433 .list_schemas()
434 .await
435 .map_err(err)?;
436
437 Ok(Json(schemas))
438 }
439
440 #[derive(Serialize)]
441 #[serde(rename_all = "camelCase")]
442 pub struct DashboardObjectDependency {
443 pub object_id: u32,
444 pub referenced_object_id: u32,
445 }
446
447 pub async fn list_object_dependencies(
448 Extension(srv): Extension<Service>,
449 ) -> Result<Json<Vec<DashboardObjectDependency>>> {
450 let object_dependencies = srv
451 .metadata_manager
452 .catalog_controller
453 .list_all_object_dependencies()
454 .await
455 .map_err(err)?;
456
457 let result = object_dependencies
458 .into_iter()
459 .map(|dependency| DashboardObjectDependency {
460 object_id: dependency.object_id.as_raw_id(),
461 referenced_object_id: dependency.referenced_object_id.as_raw_id(),
462 })
463 .collect();
464
465 Ok(Json(result))
466 }
467
468 #[derive(Debug, Deserialize)]
469 pub struct AwaitTreeDumpParams {
470 #[serde(default = "await_tree_default_format")]
471 format: String,
472 }
473
474 impl AwaitTreeDumpParams {
475 pub fn actor_traces_format(&self) -> Result<ActorTracesFormat> {
477 Ok(match self.format.as_str() {
478 "text" => ActorTracesFormat::Text,
479 "json" => ActorTracesFormat::Json,
480 _ => {
481 return Err(err(anyhow!(
482 "Unsupported format `{}`, only `text` and `json` are supported for now",
483 self.format
484 )));
485 }
486 })
487 }
488 }
489
490 fn await_tree_default_format() -> String {
491 "text".to_owned()
493 }
494
495 pub async fn dump_await_tree_all(
496 Query(params): Query<AwaitTreeDumpParams>,
497 Extension(srv): Extension<Service>,
498 ) -> Result<Json<StackTraceResponse>> {
499 let actor_traces_format = params.actor_traces_format()?;
500
501 let res = dump_cluster_await_tree(
502 &srv.metadata_manager,
503 &srv.await_tree_reg,
504 actor_traces_format,
505 )
506 .await
507 .map_err(err)?;
508
509 Ok(res.into())
510 }
511
512 pub async fn dump_await_tree(
513 Path(worker_id): Path<WorkerId>,
514 Query(params): Query<AwaitTreeDumpParams>,
515 Extension(srv): Extension<Service>,
516 ) -> Result<Json<StackTraceResponse>> {
517 let actor_traces_format = params.actor_traces_format()?;
518
519 let worker_node = srv
520 .metadata_manager
521 .get_worker_by_id(worker_id)
522 .await
523 .map_err(err)?
524 .context("worker node not found")
525 .map_err(err)?;
526
527 let res = dump_worker_node_await_tree(std::iter::once(&worker_node), actor_traces_format)
528 .await
529 .map_err(err)?;
530
531 Ok(res.into())
532 }
533
534 pub async fn heap_profile(
535 Path(worker_id): Path<WorkerId>,
536 Extension(srv): Extension<Service>,
537 ) -> Result<Json<HeapProfilingResponse>> {
538 if worker_id == crate::manager::META_NODE_ID {
539 let result = srv
540 .profile_service
541 .heap_profiling(Request::new(HeapProfilingRequest { dir: "".to_owned() }))
542 .map_err(err)?
543 .into_inner();
544 return Ok(result.into());
545 }
546
547 let worker_node = srv
548 .metadata_manager
549 .get_worker_by_id(worker_id)
550 .await
551 .map_err(err)?
552 .context("worker node not found")
553 .map_err(err)?;
554
555 let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
556
557 let result = client.heap_profile("".to_owned()).await.map_err(err)?;
558
559 Ok(result.into())
560 }
561
562 pub async fn list_heap_profile(
563 Path(worker_id): Path<WorkerId>,
564 Extension(srv): Extension<Service>,
565 ) -> Result<Json<ListHeapProfilingResponse>> {
566 if worker_id == crate::manager::META_NODE_ID {
567 let result = srv
568 .profile_service
569 .list_heap_profiling(Request::new(ListHeapProfilingRequest {}))
570 .map_err(err)?
571 .into_inner();
572 return Ok(result.into());
573 }
574
575 let worker_node = srv
576 .metadata_manager
577 .get_worker_by_id(worker_id)
578 .await
579 .map_err(err)?
580 .context("worker node not found")
581 .map_err(err)?;
582
583 let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
584
585 let result = client.list_heap_profile().await.map_err(err)?;
586 Ok(result.into())
587 }
588
589 pub async fn analyze_heap(
590 Path((worker_id, file_path)): Path<(WorkerId, String)>,
591 Extension(srv): Extension<Service>,
592 ) -> Result<Response> {
593 let file_path =
594 String::from_utf8(base64_url::decode(&file_path).map_err(err)?).map_err(err)?;
595
596 let collapsed_bin = if worker_id == crate::manager::META_NODE_ID {
597 srv.profile_service
598 .analyze_heap(Request::new(AnalyzeHeapRequest {
599 path: file_path.clone(),
600 }))
601 .await
602 .map_err(err)?
603 .into_inner()
604 .result
605 } else {
606 let worker_node = srv
607 .metadata_manager
608 .get_worker_by_id(worker_id)
609 .await
610 .map_err(err)?
611 .context("worker node not found")
612 .map_err(err)?;
613
614 let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
615 client
616 .analyze_heap(file_path.clone())
617 .await
618 .map_err(err)?
619 .result
620 };
621 let collapsed_str = String::from_utf8_lossy(&collapsed_bin).to_string();
622
623 let response = Response::builder()
624 .header("Content-Type", "application/octet-stream")
625 .body(collapsed_str.into());
626
627 response.map_err(err)
628 }
629
630 #[derive(Debug, Deserialize)]
631 pub struct DiagnoseParams {
632 #[serde(default = "await_tree_default_format")]
633 actor_traces_format: String,
634 }
635
636 pub async fn diagnose(
637 Query(params): Query<DiagnoseParams>,
638 Extension(srv): Extension<Service>,
639 ) -> Result<String> {
640 let actor_traces_format = match params.actor_traces_format.as_str() {
641 "text" => ActorTracesFormat::Text,
642 "json" => ActorTracesFormat::Json,
643 _ => {
644 return Err(err(anyhow!(
645 "Unsupported actor_traces_format `{}`, only `text` and `json` are supported for now",
646 params.actor_traces_format
647 )));
648 }
649 };
650 Ok(srv.diagnose_command.report(actor_traces_format).await)
651 }
652
653 pub async fn get_streaming_stats(
659 Extension(srv): Extension<Service>,
660 ) -> Result<Json<GetStreamingStatsResponse>> {
661 let worker_nodes = srv
662 .metadata_manager
663 .list_active_streaming_compute_nodes()
664 .await
665 .map_err(err)?;
666
667 let mut futures = Vec::new();
668
669 for worker_node in worker_nodes {
670 let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
671 let client = Arc::new(client);
672 let fut = async move {
673 let result = client.get_streaming_stats().await.map_err(err)?;
674 Ok::<_, DashboardError>(result)
675 };
676 futures.push(fut);
677 }
678 let results = join_all(futures).await;
679
680 let mut all = GetStreamingStatsResponse::default();
681
682 for result in results {
683 let result = result
684 .map_err(|_| anyhow!("Failed to get back pressure"))
685 .map_err(err)?;
686
687 for (fragment_id, fragment_stats) in result.fragment_stats {
689 if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
690 s.actor_count += fragment_stats.actor_count;
691 s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
692 } else {
693 all.fragment_stats.insert(fragment_id, fragment_stats);
694 }
695 }
696
697 for (relation_id, relation_stats) in result.relation_stats {
699 if let Some(s) = all.relation_stats.get_mut(&relation_id) {
700 s.actor_count += relation_stats.actor_count;
701 s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
702 } else {
703 all.relation_stats.insert(relation_id, relation_stats);
704 }
705 }
706
707 for (key, channel_stats) in result.channel_stats {
709 if let Some(s) = all.channel_stats.get_mut(&key) {
710 s.actor_count += channel_stats.actor_count;
711 s.output_blocking_duration += channel_stats.output_blocking_duration;
712 s.recv_row_count += channel_stats.recv_row_count;
713 s.send_row_count += channel_stats.send_row_count;
714 } else {
715 all.channel_stats.insert(key, channel_stats);
716 }
717 }
718 }
719
720 Ok(all.into())
721 }
722
723 #[derive(Debug, Deserialize)]
724 pub struct StreamingStatsPrometheusParams {
725 #[serde(default)]
727 at: Option<i64>,
728 #[serde(default = "streaming_stats_default_time_offset")]
730 time_offset: i64,
731 }
732
733 fn streaming_stats_default_time_offset() -> i64 {
734 60
735 }
736
737 pub async fn get_streaming_stats_from_prometheus(
738 Query(params): Query<StreamingStatsPrometheusParams>,
739 Extension(srv): Extension<Service>,
740 ) -> Result<Json<GetStreamingPrometheusStatsResponse>> {
741 let mut all = GetStreamingPrometheusStatsResponse::default();
742
743 let worker_nodes = srv
745 .metadata_manager
746 .list_active_streaming_compute_nodes()
747 .await
748 .map_err(err)?;
749
750 let mut futures = Vec::new();
751
752 for worker_node in worker_nodes {
753 let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
754 let client = Arc::new(client);
755 let fut = async move {
756 let result = client.get_streaming_stats().await.map_err(err)?;
757 Ok::<_, DashboardError>(result)
758 };
759 futures.push(fut);
760 }
761 let results = join_all(futures).await;
762
763 for result in results {
764 let result = result
765 .map_err(|_| anyhow!("Failed to get streaming stats from worker"))
766 .map_err(err)?;
767
768 for (fragment_id, fragment_stats) in result.fragment_stats {
770 if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
771 s.actor_count += fragment_stats.actor_count;
772 s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
773 } else {
774 all.fragment_stats.insert(fragment_id, fragment_stats);
775 }
776 }
777
778 for (relation_id, relation_stats) in result.relation_stats {
780 if let Some(s) = all.relation_stats.get_mut(&relation_id) {
781 s.actor_count += relation_stats.actor_count;
782 s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
783 } else {
784 all.relation_stats.insert(relation_id, relation_stats);
785 }
786 }
787 }
788
789 if let Some(ref client) = srv.prometheus_client {
791 let channel_input_throughput_query = format!(
793 "sum(rate(stream_actor_in_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
794 srv.prometheus_selector, params.time_offset
795 );
796 let channel_output_throughput_query = format!(
797 "sum(rate(stream_actor_out_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
798 srv.prometheus_selector, params.time_offset
799 );
800 let channel_backpressure_query = format!(
801 "sum(rate(stream_actor_output_buffer_blocking_duration_ns{{{}}}[{}s])) by (fragment_id, downstream_fragment_id) \
802 / ignoring (downstream_fragment_id) group_left sum(stream_actor_count) by (fragment_id)",
803 srv.prometheus_selector, params.time_offset
804 );
805
806 let (
808 channel_input_throughput_result,
809 channel_output_throughput_result,
810 channel_backpressure_result,
811 ) = {
812 let mut input_query = client.query(channel_input_throughput_query);
813 let mut output_query = client.query(channel_output_throughput_query);
814 let mut backpressure_query = client.query(channel_backpressure_query);
815
816 if let Some(at_time) = params.at {
818 input_query = input_query.at(at_time);
819 output_query = output_query.at(at_time);
820 backpressure_query = backpressure_query.at(at_time);
821 }
822
823 tokio::try_join!(
824 input_query.get(),
825 output_query.get(),
826 backpressure_query.get(),
827 )
828 .map_err(err)?
829 };
830
831 let mut channel_data = HashMap::new();
833
834 if let Some(channel_input_throughput_data) =
836 channel_input_throughput_result.data().as_vector()
837 {
838 for sample in channel_input_throughput_data {
839 if let Some(fragment_id_str) = sample.metric().get("fragment_id")
840 && let Some(upstream_fragment_id_str) =
841 sample.metric().get("upstream_fragment_id")
842 && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
843 fragment_id_str.parse::<u32>(),
844 upstream_fragment_id_str.parse::<u32>(),
845 )
846 {
847 let key = format!("{}_{}", upstream_fragment_id, fragment_id);
848 channel_data
849 .entry(key)
850 .or_insert_with(|| ChannelDeltaStats {
851 actor_count: 0,
852 backpressure_rate: 0.0,
853 recv_throughput: 0.0,
854 send_throughput: 0.0,
855 })
856 .recv_throughput = sample.sample().value();
857 }
858 }
859 }
860
861 if let Some(channel_output_throughput_data) =
863 channel_output_throughput_result.data().as_vector()
864 {
865 for sample in channel_output_throughput_data {
866 if let Some(fragment_id_str) = sample.metric().get("fragment_id")
867 && let Some(upstream_fragment_id_str) =
868 sample.metric().get("upstream_fragment_id")
869 && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
870 fragment_id_str.parse::<u32>(),
871 upstream_fragment_id_str.parse::<u32>(),
872 )
873 {
874 let key = format!("{}_{}", upstream_fragment_id, fragment_id);
875 channel_data
876 .entry(key)
877 .or_insert_with(|| ChannelDeltaStats {
878 actor_count: 0,
879 backpressure_rate: 0.0,
880 recv_throughput: 0.0,
881 send_throughput: 0.0,
882 })
883 .send_throughput = sample.sample().value();
884 }
885 }
886 }
887
888 if let Some(channel_backpressure_data) = channel_backpressure_result.data().as_vector()
890 {
891 for sample in channel_backpressure_data {
892 if let Some(fragment_id_str) = sample.metric().get("fragment_id")
893 && let Some(downstream_fragment_id_str) =
894 sample.metric().get("downstream_fragment_id")
895 && let (Ok(fragment_id), Ok(downstream_fragment_id)) = (
896 fragment_id_str.parse::<u32>(),
897 downstream_fragment_id_str.parse::<u32>(),
898 )
899 {
900 let key = format!("{}_{}", fragment_id, downstream_fragment_id);
901 channel_data
902 .entry(key)
903 .or_insert_with(|| ChannelDeltaStats {
904 actor_count: 0,
905 backpressure_rate: 0.0,
906 recv_throughput: 0.0,
907 send_throughput: 0.0,
908 })
909 .backpressure_rate = sample.sample().value() / 1_000_000_000.0; }
911 }
912 }
913
914 for (key, channel_stats) in &mut channel_data {
916 let parts: Vec<&str> = key.split('_').collect();
917 if parts.len() == 2
918 && let Ok(fragment_id) = parts[1].parse::<u32>()
919 && let Some(fragment_stats) = all.fragment_stats.get(&fragment_id)
920 {
921 channel_stats.actor_count = fragment_stats.actor_count;
922 }
923 }
924
925 all.channel_stats = channel_data;
926
927 Ok(Json(all))
928 } else {
929 Err(err(anyhow!("Prometheus endpoint is not set")))
930 }
931 }
932
933 pub async fn get_version(Extension(_srv): Extension<Service>) -> Result<Json<String>> {
934 Ok(Json(risingwave_common::current_cluster_version()))
935 }
936}
937
938impl DashboardService {
939 pub async fn serve(self) -> Result<()> {
940 use handlers::*;
941 let srv = Arc::new(self);
942
943 let cors_layer = CorsLayer::new()
944 .allow_origin(cors::Any)
945 .allow_methods(vec![Method::GET]);
946
947 let api_router = Router::new()
948 .route("/version", get(get_version))
949 .route("/clusters/{ty}", get(list_clusters))
950 .route("/streaming_jobs", get(list_streaming_jobs))
951 .route("/fragments/job_id/{job_id}", get(list_fragments_by_job_id))
952 .route("/relation_id_infos", get(get_relation_id_infos))
953 .route(
954 "/fragment_to_relation_map",
955 get(get_fragment_to_relation_map),
956 )
957 .route("/views", get(list_views))
958 .route("/functions", get(list_functions))
959 .route("/materialized_views", get(list_materialized_views))
960 .route("/tables", get(list_tables))
961 .route("/indexes", get(list_index_tables))
962 .route("/index_items", get(list_indexes))
963 .route("/subscriptions", get(list_subscription))
964 .route("/internal_tables", get(list_internal_tables))
965 .route("/sources", get(list_sources))
966 .route("/sinks", get(list_sinks))
967 .route("/users", get(list_users))
968 .route("/databases", get(list_databases))
969 .route("/schemas", get(list_schemas))
970 .route("/object_dependencies", get(list_object_dependencies))
971 .route("/metrics/cluster", get(prometheus::list_prometheus_cluster))
972 .route("/metrics/streaming_stats", get(get_streaming_stats))
973 .route(
974 "/metrics/streaming_stats_prometheus",
975 get(get_streaming_stats_from_prometheus),
976 )
977 .route("/monitor/await_tree/{worker_id}", get(dump_await_tree))
979 .route("/monitor/await_tree/", get(dump_await_tree_all))
981 .route("/monitor/dump_heap_profile/{worker_id}", get(heap_profile))
982 .route(
983 "/monitor/list_heap_profile/{worker_id}",
984 get(list_heap_profile),
985 )
986 .route("/monitor/analyze/{worker_id}/{*path}", get(analyze_heap))
987 .route("/monitor/diagnose/", get(diagnose))
989 .layer(
990 ServiceBuilder::new()
991 .layer(AddExtensionLayer::new(srv.clone()))
992 .into_inner(),
993 )
994 .layer(cors_layer);
995
996 let trace_ui_router = otlp_embedded::ui_app(srv.trace_state.clone(), "/trace/");
997 let dashboard_router = risingwave_meta_dashboard::router();
998
999 let app = Router::new()
1000 .fallback_service(dashboard_router)
1001 .nest("/api", api_router)
1002 .nest("/trace", trace_ui_router)
1003 .layer(CompressionLayer::new());
1004
1005 let listener = TcpListener::bind(&srv.dashboard_addr)
1006 .await
1007 .context("failed to bind dashboard address")?;
1008 axum::serve(listener, app)
1009 .await
1010 .context("failed to serve dashboard service")?;
1011
1012 Ok(())
1013 }
1014}