risingwave_meta/dashboard/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prometheus;
16
17use std::net::SocketAddr;
18use std::path::Path as FilePath;
19use std::sync::Arc;
20
21use anyhow::{Context as _, Result, anyhow};
22use axum::Router;
23use axum::extract::{Extension, Path};
24use axum::http::{Method, StatusCode};
25use axum::response::{IntoResponse, Response};
26use axum::routing::get;
27use risingwave_rpc_client::ComputeClientPool;
28use tokio::net::TcpListener;
29use tower::ServiceBuilder;
30use tower_http::add_extension::AddExtensionLayer;
31use tower_http::compression::CompressionLayer;
32use tower_http::cors::{self, CorsLayer};
33
34use crate::hummock::HummockManagerRef;
35use crate::manager::MetadataManager;
36use crate::manager::diagnose::DiagnoseCommandRef;
37
38#[derive(Clone)]
39pub struct DashboardService {
40    pub await_tree_reg: await_tree::Registry,
41    pub dashboard_addr: SocketAddr,
42    pub prometheus_client: Option<prometheus_http_query::Client>,
43    pub prometheus_selector: String,
44    pub metadata_manager: MetadataManager,
45    pub hummock_manager: HummockManagerRef,
46    pub compute_clients: ComputeClientPool,
47    pub diagnose_command: DiagnoseCommandRef,
48    pub trace_state: otlp_embedded::StateRef,
49}
50
51pub type Service = Arc<DashboardService>;
52
53pub(super) mod handlers {
54    use std::cmp::min;
55    use std::collections::HashMap;
56
57    use anyhow::Context;
58    use axum::Json;
59    use axum::extract::Query;
60    use futures::future::join_all;
61    use itertools::Itertools;
62    use risingwave_common::catalog::TableId;
63    use risingwave_common_heap_profiling::COLLAPSED_SUFFIX;
64    use risingwave_meta_model::WorkerId;
65    use risingwave_pb::catalog::table::TableType;
66    use risingwave_pb::catalog::{
67        Index, PbDatabase, PbFunction, PbSchema, Sink, Source, Subscription, Table, View,
68    };
69    use risingwave_pb::common::{WorkerNode, WorkerType};
70    use risingwave_pb::hummock::TableStats;
71    use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
72    use risingwave_pb::meta::{
73        ActorIds, FragmentIdToActorIdMap, FragmentToRelationMap, PbTableFragments, RelationIdInfos,
74    };
75    use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
76    use risingwave_pb::monitor_service::{
77        ChannelDeltaStats, GetStreamingPrometheusStatsResponse, GetStreamingStatsResponse,
78        HeapProfilingResponse, ListHeapProfilingResponse, StackTraceResponse,
79    };
80    use risingwave_pb::user::PbUserInfo;
81    use serde::{Deserialize, Serialize};
82    use serde_json::json;
83    use thiserror_ext::AsReport;
84
85    use super::*;
86    use crate::controller::fragment::StreamingJobInfo;
87    use crate::rpc::await_tree::{dump_cluster_await_tree, dump_worker_node_await_tree};
88
89    #[derive(Serialize)]
90    pub struct TableWithStats {
91        #[serde(flatten)]
92        pub table: Table,
93        pub total_size_bytes: i64,
94        pub total_key_count: i64,
95        pub total_key_size: i64,
96        pub total_value_size: i64,
97        pub compressed_size: u64,
98    }
99
100    impl TableWithStats {
101        pub fn from_table_and_stats(table: Table, stats: Option<&TableStats>) -> Self {
102            match stats {
103                Some(stats) => Self {
104                    total_size_bytes: stats.total_key_size + stats.total_value_size,
105                    total_key_count: stats.total_key_count,
106                    total_key_size: stats.total_key_size,
107                    total_value_size: stats.total_value_size,
108                    compressed_size: stats.total_compressed_size,
109                    table,
110                },
111                None => Self {
112                    total_size_bytes: 0,
113                    total_key_count: 0,
114                    total_key_size: 0,
115                    total_value_size: 0,
116                    compressed_size: 0,
117                    table,
118                },
119            }
120        }
121    }
122
123    pub struct DashboardError(anyhow::Error);
124    pub type Result<T> = std::result::Result<T, DashboardError>;
125
126    pub fn err(err: impl Into<anyhow::Error>) -> DashboardError {
127        DashboardError(err.into())
128    }
129
130    impl From<anyhow::Error> for DashboardError {
131        fn from(value: anyhow::Error) -> Self {
132            DashboardError(value)
133        }
134    }
135
136    impl IntoResponse for DashboardError {
137        fn into_response(self) -> axum::response::Response {
138            let mut resp = Json(json!({
139                "error": self.0.to_report_string(),
140            }))
141            .into_response();
142            *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
143            resp
144        }
145    }
146
147    pub async fn list_clusters(
148        Path(ty): Path<i32>,
149        Extension(srv): Extension<Service>,
150    ) -> Result<Json<Vec<WorkerNode>>> {
151        let worker_type = WorkerType::try_from(ty)
152            .map_err(|_| anyhow!("invalid worker type"))
153            .map_err(err)?;
154        let mut result = srv
155            .metadata_manager
156            .list_worker_node(Some(worker_type), None)
157            .await
158            .map_err(err)?;
159        result.sort_unstable_by_key(|n| n.id);
160        Ok(result.into())
161    }
162
163    async fn list_table_catalogs_inner(
164        metadata_manager: &MetadataManager,
165        hummock_manager: &HummockManagerRef,
166        table_type: TableType,
167    ) -> Result<Json<Vec<TableWithStats>>> {
168        let tables = metadata_manager
169            .catalog_controller
170            .list_tables_by_type(table_type.into())
171            .await
172            .map_err(err)?;
173
174        // Get table statistics from hummock manager
175        let version_stats = hummock_manager.get_version_stats().await;
176
177        let tables_with_stats = tables
178            .into_iter()
179            .map(|table| {
180                let stats = version_stats.table_stats.get(&table.id);
181                TableWithStats::from_table_and_stats(table, stats)
182            })
183            .collect();
184
185        Ok(Json(tables_with_stats))
186    }
187
188    pub async fn list_materialized_views(
189        Extension(srv): Extension<Service>,
190    ) -> Result<Json<Vec<TableWithStats>>> {
191        list_table_catalogs_inner(
192            &srv.metadata_manager,
193            &srv.hummock_manager,
194            TableType::MaterializedView,
195        )
196        .await
197    }
198
199    pub async fn list_tables(
200        Extension(srv): Extension<Service>,
201    ) -> Result<Json<Vec<TableWithStats>>> {
202        list_table_catalogs_inner(
203            &srv.metadata_manager,
204            &srv.hummock_manager,
205            TableType::Table,
206        )
207        .await
208    }
209
210    pub async fn list_index_tables(
211        Extension(srv): Extension<Service>,
212    ) -> Result<Json<Vec<TableWithStats>>> {
213        list_table_catalogs_inner(
214            &srv.metadata_manager,
215            &srv.hummock_manager,
216            TableType::Index,
217        )
218        .await
219    }
220
221    pub async fn list_indexes(Extension(srv): Extension<Service>) -> Result<Json<Vec<Index>>> {
222        let indexes = srv
223            .metadata_manager
224            .catalog_controller
225            .list_indexes()
226            .await
227            .map_err(err)?;
228
229        Ok(Json(indexes))
230    }
231
232    pub async fn list_subscription(
233        Extension(srv): Extension<Service>,
234    ) -> Result<Json<Vec<Subscription>>> {
235        let subscriptions = srv
236            .metadata_manager
237            .catalog_controller
238            .list_subscriptions()
239            .await
240            .map_err(err)?;
241
242        Ok(Json(subscriptions))
243    }
244
245    pub async fn list_internal_tables(
246        Extension(srv): Extension<Service>,
247    ) -> Result<Json<Vec<TableWithStats>>> {
248        list_table_catalogs_inner(
249            &srv.metadata_manager,
250            &srv.hummock_manager,
251            TableType::Internal,
252        )
253        .await
254    }
255
256    pub async fn list_sources(Extension(srv): Extension<Service>) -> Result<Json<Vec<Source>>> {
257        let sources = srv.metadata_manager.list_sources().await.map_err(err)?;
258
259        Ok(Json(sources))
260    }
261
262    pub async fn list_sinks(Extension(srv): Extension<Service>) -> Result<Json<Vec<Sink>>> {
263        let sinks = srv
264            .metadata_manager
265            .catalog_controller
266            .list_sinks()
267            .await
268            .map_err(err)?;
269
270        Ok(Json(sinks))
271    }
272
273    pub async fn list_views(Extension(srv): Extension<Service>) -> Result<Json<Vec<View>>> {
274        let views = srv
275            .metadata_manager
276            .catalog_controller
277            .list_views()
278            .await
279            .map_err(err)?;
280
281        Ok(Json(views))
282    }
283
284    pub async fn list_functions(
285        Extension(srv): Extension<Service>,
286    ) -> Result<Json<Vec<PbFunction>>> {
287        let functions = srv
288            .metadata_manager
289            .catalog_controller
290            .list_functions()
291            .await
292            .map_err(err)?;
293
294        Ok(Json(functions))
295    }
296
297    pub async fn list_streaming_jobs(
298        Extension(srv): Extension<Service>,
299    ) -> Result<Json<Vec<StreamingJobInfo>>> {
300        let streaming_jobs = srv
301            .metadata_manager
302            .catalog_controller
303            .list_streaming_job_infos()
304            .await
305            .map_err(err)?;
306
307        Ok(Json(streaming_jobs))
308    }
309
310    /// In the ddl backpressure graph, we want to compute the backpressure between relations.
311    /// So we need to know which are the fragments which are connected to external relations.
312    /// These fragments form the vertices of the graph.
313    /// We can get collection of backpressure values, keyed by vertex_id-vertex_id.
314    /// This function will return a map of fragment vertex id to relation id.
315    /// We can convert `fragment_id-fragment_id` to `relation_id-relation_id` using that.
316    /// Finally, we have a map of `relation_id-relation_id` to backpressure values.
317    pub async fn get_fragment_to_relation_map(
318        Extension(srv): Extension<Service>,
319    ) -> Result<Json<FragmentToRelationMap>> {
320        let table_fragments = srv
321            .metadata_manager
322            .catalog_controller
323            .table_fragments()
324            .await
325            .map_err(err)?;
326        let mut fragment_to_relation_map = HashMap::new();
327        for (relation_id, tf) in table_fragments {
328            for fragment_id in tf.fragments.keys() {
329                fragment_to_relation_map.insert(*fragment_id, relation_id as u32);
330            }
331        }
332        let map = FragmentToRelationMap {
333            fragment_to_relation_map,
334        };
335        Ok(Json(map))
336    }
337
338    /// Provides a hierarchy of relation ids to fragments to actors.
339    pub async fn get_relation_id_infos(
340        Extension(srv): Extension<Service>,
341    ) -> Result<Json<RelationIdInfos>> {
342        let table_fragments = srv
343            .metadata_manager
344            .catalog_controller
345            .table_fragments()
346            .await
347            .map_err(err)?;
348        let mut map = HashMap::new();
349        for (id, tf) in table_fragments {
350            let mut fragment_id_to_actor_ids = HashMap::new();
351            for (fragment_id, fragment) in &tf.fragments {
352                let actor_ids = fragment.actors.iter().map(|a| a.actor_id).collect_vec();
353                fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
354            }
355            map.insert(
356                id as u32,
357                FragmentIdToActorIdMap {
358                    map: fragment_id_to_actor_ids,
359                },
360            );
361        }
362        let relation_id_infos = RelationIdInfos { map };
363
364        Ok(Json(relation_id_infos))
365    }
366
367    pub async fn list_fragments_by_job_id(
368        Extension(srv): Extension<Service>,
369        Path(job_id): Path<u32>,
370    ) -> Result<Json<PbTableFragments>> {
371        let table_id = TableId::new(job_id);
372        let table_fragments = srv
373            .metadata_manager
374            .get_job_fragments_by_id(&table_id)
375            .await
376            .map_err(err)?;
377        let upstream_fragments = srv
378            .metadata_manager
379            .catalog_controller
380            .upstream_fragments(table_fragments.fragment_ids())
381            .await
382            .map_err(err)?;
383        let dispatchers = srv
384            .metadata_manager
385            .catalog_controller
386            .get_fragment_actor_dispatchers(
387                table_fragments.fragment_ids().map(|id| id as _).collect(),
388            )
389            .await
390            .map_err(err)?;
391        Ok(Json(
392            table_fragments.to_protobuf(&upstream_fragments, &dispatchers),
393        ))
394    }
395
396    pub async fn list_users(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbUserInfo>>> {
397        let users = srv
398            .metadata_manager
399            .catalog_controller
400            .list_users()
401            .await
402            .map_err(err)?;
403
404        Ok(Json(users))
405    }
406
407    pub async fn list_databases(
408        Extension(srv): Extension<Service>,
409    ) -> Result<Json<Vec<PbDatabase>>> {
410        let databases = srv
411            .metadata_manager
412            .catalog_controller
413            .list_databases()
414            .await
415            .map_err(err)?;
416
417        Ok(Json(databases))
418    }
419
420    pub async fn list_schemas(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbSchema>>> {
421        let schemas = srv
422            .metadata_manager
423            .catalog_controller
424            .list_schemas()
425            .await
426            .map_err(err)?;
427
428        Ok(Json(schemas))
429    }
430
431    pub async fn list_object_dependencies(
432        Extension(srv): Extension<Service>,
433    ) -> Result<Json<Vec<PbObjectDependencies>>> {
434        let object_dependencies = srv
435            .metadata_manager
436            .catalog_controller
437            .list_all_object_dependencies()
438            .await
439            .map_err(err)?;
440
441        Ok(Json(object_dependencies))
442    }
443
444    #[derive(Debug, Deserialize)]
445    pub struct AwaitTreeDumpParams {
446        #[serde(default = "await_tree_default_format")]
447        format: String,
448    }
449
450    impl AwaitTreeDumpParams {
451        /// Parse the `format` parameter to [`ActorTracesFormat`].
452        pub fn actor_traces_format(&self) -> Result<ActorTracesFormat> {
453            Ok(match self.format.as_str() {
454                "text" => ActorTracesFormat::Text,
455                "json" => ActorTracesFormat::Json,
456                _ => {
457                    return Err(err(anyhow!(
458                        "Unsupported format `{}`, only `text` and `json` are supported for now",
459                        self.format
460                    )));
461                }
462            })
463        }
464    }
465
466    fn await_tree_default_format() -> String {
467        // In dashboard, await tree is usually for engineer to debug, so we use human-readable text format by default here.
468        "text".to_owned()
469    }
470
471    pub async fn dump_await_tree_all(
472        Query(params): Query<AwaitTreeDumpParams>,
473        Extension(srv): Extension<Service>,
474    ) -> Result<Json<StackTraceResponse>> {
475        let actor_traces_format = params.actor_traces_format()?;
476
477        let res = dump_cluster_await_tree(
478            &srv.metadata_manager,
479            &srv.await_tree_reg,
480            actor_traces_format,
481        )
482        .await
483        .map_err(err)?;
484
485        Ok(res.into())
486    }
487
488    pub async fn dump_await_tree(
489        Path(worker_id): Path<WorkerId>,
490        Query(params): Query<AwaitTreeDumpParams>,
491        Extension(srv): Extension<Service>,
492    ) -> Result<Json<StackTraceResponse>> {
493        let actor_traces_format = params.actor_traces_format()?;
494
495        let worker_node = srv
496            .metadata_manager
497            .get_worker_by_id(worker_id)
498            .await
499            .map_err(err)?
500            .context("worker node not found")
501            .map_err(err)?;
502
503        let res = dump_worker_node_await_tree(std::iter::once(&worker_node), actor_traces_format)
504            .await
505            .map_err(err)?;
506
507        Ok(res.into())
508    }
509
510    pub async fn heap_profile(
511        Path(worker_id): Path<WorkerId>,
512        Extension(srv): Extension<Service>,
513    ) -> Result<Json<HeapProfilingResponse>> {
514        let worker_node = srv
515            .metadata_manager
516            .get_worker_by_id(worker_id)
517            .await
518            .map_err(err)?
519            .context("worker node not found")
520            .map_err(err)?;
521
522        let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
523
524        let result = client.heap_profile("".to_owned()).await.map_err(err)?;
525
526        Ok(result.into())
527    }
528
529    pub async fn list_heap_profile(
530        Path(worker_id): Path<WorkerId>,
531        Extension(srv): Extension<Service>,
532    ) -> Result<Json<ListHeapProfilingResponse>> {
533        let worker_node = srv
534            .metadata_manager
535            .get_worker_by_id(worker_id)
536            .await
537            .map_err(err)?
538            .context("worker node not found")
539            .map_err(err)?;
540
541        let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
542
543        let result = client.list_heap_profile().await.map_err(err)?;
544        Ok(result.into())
545    }
546
547    pub async fn analyze_heap(
548        Path((worker_id, file_path)): Path<(WorkerId, String)>,
549        Extension(srv): Extension<Service>,
550    ) -> Result<Response> {
551        let file_path =
552            String::from_utf8(base64_url::decode(&file_path).map_err(err)?).map_err(err)?;
553
554        let file_name = FilePath::new(&file_path)
555            .file_name()
556            .unwrap()
557            .to_string_lossy()
558            .to_string();
559
560        let collapsed_file_name = format!("{}.{}", file_name, COLLAPSED_SUFFIX);
561
562        let worker_node = srv
563            .metadata_manager
564            .get_worker_by_id(worker_id)
565            .await
566            .map_err(err)?
567            .context("worker node not found")
568            .map_err(err)?;
569
570        let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
571
572        let collapsed_bin = client
573            .analyze_heap(file_path.clone())
574            .await
575            .map_err(err)?
576            .result;
577        let collapsed_str = String::from_utf8_lossy(&collapsed_bin).to_string();
578
579        let response = Response::builder()
580            .header("Content-Type", "application/octet-stream")
581            .header("Content-Disposition", collapsed_file_name)
582            .body(collapsed_str.into());
583
584        response.map_err(err)
585    }
586
587    #[derive(Debug, Deserialize)]
588    pub struct DiagnoseParams {
589        #[serde(default = "await_tree_default_format")]
590        actor_traces_format: String,
591    }
592
593    pub async fn diagnose(
594        Query(params): Query<DiagnoseParams>,
595        Extension(srv): Extension<Service>,
596    ) -> Result<String> {
597        let actor_traces_format = match params.actor_traces_format.as_str() {
598            "text" => ActorTracesFormat::Text,
599            "json" => ActorTracesFormat::Json,
600            _ => {
601                return Err(err(anyhow!(
602                    "Unsupported actor_traces_format `{}`, only `text` and `json` are supported for now",
603                    params.actor_traces_format
604                )));
605            }
606        };
607        Ok(srv.diagnose_command.report(actor_traces_format).await)
608    }
609
610    /// NOTE(kwannoel): Although we fetch the BP for the entire graph via this API,
611    /// the workload should be reasonable.
612    /// In most cases, we can safely assume each node has most 2 outgoing edges (e.g. join).
613    /// In such a scenario, the number of edges is linear to the number of nodes.
614    /// So the workload is proportional to the relation id graph we fetch in `get_relation_id_infos`.
615    pub async fn get_streaming_stats(
616        Extension(srv): Extension<Service>,
617    ) -> Result<Json<GetStreamingStatsResponse>> {
618        let worker_nodes = srv
619            .metadata_manager
620            .list_active_streaming_compute_nodes()
621            .await
622            .map_err(err)?;
623
624        let mut futures = Vec::new();
625
626        for worker_node in worker_nodes {
627            let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
628            let client = Arc::new(client);
629            let fut = async move {
630                let result = client.get_streaming_stats().await.map_err(err)?;
631                Ok::<_, DashboardError>(result)
632            };
633            futures.push(fut);
634        }
635        let results = join_all(futures).await;
636
637        let mut all = GetStreamingStatsResponse::default();
638
639        for result in results {
640            let result = result
641                .map_err(|_| anyhow!("Failed to get back pressure"))
642                .map_err(err)?;
643
644            // Aggregate fragment_stats
645            for (fragment_id, fragment_stats) in result.fragment_stats {
646                if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
647                    s.actor_count += fragment_stats.actor_count;
648                    s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
649                } else {
650                    all.fragment_stats.insert(fragment_id, fragment_stats);
651                }
652            }
653
654            // Aggregate relation_stats
655            for (relation_id, relation_stats) in result.relation_stats {
656                if let Some(s) = all.relation_stats.get_mut(&relation_id) {
657                    s.actor_count += relation_stats.actor_count;
658                    s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
659                } else {
660                    all.relation_stats.insert(relation_id, relation_stats);
661                }
662            }
663
664            // Aggregate channel_stats
665            for (key, channel_stats) in result.channel_stats {
666                if let Some(s) = all.channel_stats.get_mut(&key) {
667                    s.actor_count += channel_stats.actor_count;
668                    s.output_blocking_duration += channel_stats.output_blocking_duration;
669                    s.recv_row_count += channel_stats.recv_row_count;
670                    s.send_row_count += channel_stats.send_row_count;
671                } else {
672                    all.channel_stats.insert(key, channel_stats);
673                }
674            }
675        }
676
677        Ok(all.into())
678    }
679
680    #[derive(Debug, Deserialize)]
681    pub struct StreamingStatsPrometheusParams {
682        /// Unix timestamp in seconds for the evaluation time. If not set, defaults to current Prometheus server time.
683        #[serde(default)]
684        at: Option<i64>,
685        /// Time offset for throughput and backpressure rate calculation in seconds. If not set, defaults to 60s.
686        #[serde(default = "streaming_stats_default_time_offset")]
687        time_offset: i64,
688    }
689
690    fn streaming_stats_default_time_offset() -> i64 {
691        60
692    }
693
694    pub async fn get_streaming_stats_from_prometheus(
695        Query(params): Query<StreamingStatsPrometheusParams>,
696        Extension(srv): Extension<Service>,
697    ) -> Result<Json<GetStreamingPrometheusStatsResponse>> {
698        let mut all = GetStreamingPrometheusStatsResponse::default();
699
700        // Get fragment and relation stats from workers
701        let worker_nodes = srv
702            .metadata_manager
703            .list_active_streaming_compute_nodes()
704            .await
705            .map_err(err)?;
706
707        let mut futures = Vec::new();
708
709        for worker_node in worker_nodes {
710            let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
711            let client = Arc::new(client);
712            let fut = async move {
713                let result = client.get_streaming_stats().await.map_err(err)?;
714                Ok::<_, DashboardError>(result)
715            };
716            futures.push(fut);
717        }
718        let results = join_all(futures).await;
719
720        for result in results {
721            let result = result
722                .map_err(|_| anyhow!("Failed to get streaming stats from worker"))
723                .map_err(err)?;
724
725            // Aggregate fragment_stats
726            for (fragment_id, fragment_stats) in result.fragment_stats {
727                if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
728                    s.actor_count += fragment_stats.actor_count;
729                    s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
730                } else {
731                    all.fragment_stats.insert(fragment_id, fragment_stats);
732                }
733            }
734
735            // Aggregate relation_stats
736            for (relation_id, relation_stats) in result.relation_stats {
737                if let Some(s) = all.relation_stats.get_mut(&relation_id) {
738                    s.actor_count += relation_stats.actor_count;
739                    s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
740                } else {
741                    all.relation_stats.insert(relation_id, relation_stats);
742                }
743            }
744        }
745
746        // Get channel delta stats from Prometheus
747        if let Some(ref client) = srv.prometheus_client {
748            // Query channel delta stats: throughput and backpressure rate
749            let channel_input_throughput_query = format!(
750                "sum(rate(stream_actor_in_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
751                srv.prometheus_selector, params.time_offset
752            );
753            let channel_output_throughput_query = format!(
754                "sum(rate(stream_actor_out_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
755                srv.prometheus_selector, params.time_offset
756            );
757            let channel_backpressure_query = format!(
758                "sum(rate(stream_actor_output_buffer_blocking_duration_ns{{{}}}[{}s])) by (fragment_id, downstream_fragment_id) \
759                 / ignoring (downstream_fragment_id) group_left sum(stream_actor_count) by (fragment_id)",
760                srv.prometheus_selector, params.time_offset
761            );
762
763            // Execute all queries concurrently with optional time parameter
764            let (
765                channel_input_throughput_result,
766                channel_output_throughput_result,
767                channel_backpressure_result,
768            ) = {
769                let mut input_query = client.query(channel_input_throughput_query);
770                let mut output_query = client.query(channel_output_throughput_query);
771                let mut backpressure_query = client.query(channel_backpressure_query);
772
773                // Set the evaluation time if provided
774                if let Some(at_time) = params.at {
775                    input_query = input_query.at(at_time);
776                    output_query = output_query.at(at_time);
777                    backpressure_query = backpressure_query.at(at_time);
778                }
779
780                tokio::try_join!(
781                    input_query.get(),
782                    output_query.get(),
783                    backpressure_query.get(),
784                )
785                .map_err(err)?
786            };
787
788            // Process channel delta stats
789            let mut channel_data = HashMap::new();
790
791            // Collect input throughput
792            if let Some(channel_input_throughput_data) =
793                channel_input_throughput_result.data().as_vector()
794            {
795                for sample in channel_input_throughput_data {
796                    if let Some(fragment_id_str) = sample.metric().get("fragment_id")
797                        && let Some(upstream_fragment_id_str) =
798                            sample.metric().get("upstream_fragment_id")
799                        && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
800                            fragment_id_str.parse::<u32>(),
801                            upstream_fragment_id_str.parse::<u32>(),
802                        )
803                    {
804                        let key = format!("{}_{}", upstream_fragment_id, fragment_id);
805                        channel_data
806                            .entry(key)
807                            .or_insert_with(|| ChannelDeltaStats {
808                                actor_count: 0,
809                                backpressure_rate: 0.0,
810                                recv_throughput: 0.0,
811                                send_throughput: 0.0,
812                            })
813                            .recv_throughput = sample.sample().value();
814                    }
815                }
816            }
817
818            // Collect output throughput
819            if let Some(channel_output_throughput_data) =
820                channel_output_throughput_result.data().as_vector()
821            {
822                for sample in channel_output_throughput_data {
823                    if let Some(fragment_id_str) = sample.metric().get("fragment_id")
824                        && let Some(upstream_fragment_id_str) =
825                            sample.metric().get("upstream_fragment_id")
826                        && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
827                            fragment_id_str.parse::<u32>(),
828                            upstream_fragment_id_str.parse::<u32>(),
829                        )
830                    {
831                        let key = format!("{}_{}", upstream_fragment_id, fragment_id);
832                        channel_data
833                            .entry(key)
834                            .or_insert_with(|| ChannelDeltaStats {
835                                actor_count: 0,
836                                backpressure_rate: 0.0,
837                                recv_throughput: 0.0,
838                                send_throughput: 0.0,
839                            })
840                            .send_throughput = sample.sample().value();
841                    }
842                }
843            }
844
845            // Collect backpressure rate
846            if let Some(channel_backpressure_data) = channel_backpressure_result.data().as_vector()
847            {
848                for sample in channel_backpressure_data {
849                    if let Some(fragment_id_str) = sample.metric().get("fragment_id")
850                        && let Some(downstream_fragment_id_str) =
851                            sample.metric().get("downstream_fragment_id")
852                        && let (Ok(fragment_id), Ok(downstream_fragment_id)) = (
853                            fragment_id_str.parse::<u32>(),
854                            downstream_fragment_id_str.parse::<u32>(),
855                        )
856                    {
857                        let key = format!("{}_{}", fragment_id, downstream_fragment_id);
858                        channel_data
859                            .entry(key)
860                            .or_insert_with(|| ChannelDeltaStats {
861                                actor_count: 0,
862                                backpressure_rate: 0.0,
863                                recv_throughput: 0.0,
864                                send_throughput: 0.0,
865                            })
866                            .backpressure_rate = sample.sample().value() / 1_000_000_000.0; // Convert ns to seconds
867                    }
868                }
869            }
870
871            // Set actor count for channels (using fragment actor count as approximation)
872            for (key, channel_stats) in &mut channel_data {
873                let parts: Vec<&str> = key.split('_').collect();
874                if parts.len() == 2
875                    && let Ok(fragment_id) = parts[1].parse::<u32>()
876                    && let Some(fragment_stats) = all.fragment_stats.get(&fragment_id)
877                {
878                    channel_stats.actor_count = fragment_stats.actor_count;
879                }
880            }
881
882            all.channel_stats = channel_data;
883
884            Ok(Json(all))
885        } else {
886            Err(err(anyhow!("Prometheus endpoint is not set")))
887        }
888    }
889
890    pub async fn get_version(Extension(_srv): Extension<Service>) -> Result<Json<String>> {
891        Ok(Json(risingwave_common::current_cluster_version()))
892    }
893}
894
895impl DashboardService {
896    pub async fn serve(self) -> Result<()> {
897        use handlers::*;
898        let srv = Arc::new(self);
899
900        let cors_layer = CorsLayer::new()
901            .allow_origin(cors::Any)
902            .allow_methods(vec![Method::GET]);
903
904        let api_router = Router::new()
905            .route("/version", get(get_version))
906            .route("/clusters/:ty", get(list_clusters))
907            .route("/streaming_jobs", get(list_streaming_jobs))
908            .route("/fragments/job_id/:job_id", get(list_fragments_by_job_id))
909            .route("/relation_id_infos", get(get_relation_id_infos))
910            .route(
911                "/fragment_to_relation_map",
912                get(get_fragment_to_relation_map),
913            )
914            .route("/views", get(list_views))
915            .route("/functions", get(list_functions))
916            .route("/materialized_views", get(list_materialized_views))
917            .route("/tables", get(list_tables))
918            .route("/indexes", get(list_index_tables))
919            .route("/index_items", get(list_indexes))
920            .route("/subscriptions", get(list_subscription))
921            .route("/internal_tables", get(list_internal_tables))
922            .route("/sources", get(list_sources))
923            .route("/sinks", get(list_sinks))
924            .route("/users", get(list_users))
925            .route("/databases", get(list_databases))
926            .route("/schemas", get(list_schemas))
927            .route("/object_dependencies", get(list_object_dependencies))
928            .route("/metrics/cluster", get(prometheus::list_prometheus_cluster))
929            .route("/metrics/streaming_stats", get(get_streaming_stats))
930            .route(
931                "/metrics/streaming_stats_prometheus",
932                get(get_streaming_stats_from_prometheus),
933            )
934            // /monitor/await_tree/{worker_id}/?format={text or json}
935            .route("/monitor/await_tree/:worker_id", get(dump_await_tree))
936            // /monitor/await_tree/?format={text or json}
937            .route("/monitor/await_tree/", get(dump_await_tree_all))
938            .route("/monitor/dump_heap_profile/:worker_id", get(heap_profile))
939            .route(
940                "/monitor/list_heap_profile/:worker_id",
941                get(list_heap_profile),
942            )
943            .route("/monitor/analyze/:worker_id/*path", get(analyze_heap))
944            // /monitor/diagnose/?format={text or json}
945            .route("/monitor/diagnose/", get(diagnose))
946            .layer(
947                ServiceBuilder::new()
948                    .layer(AddExtensionLayer::new(srv.clone()))
949                    .into_inner(),
950            )
951            .layer(cors_layer);
952
953        let trace_ui_router = otlp_embedded::ui_app(srv.trace_state.clone(), "/trace/");
954        let dashboard_router = risingwave_meta_dashboard::router();
955
956        let app = Router::new()
957            .fallback_service(dashboard_router)
958            .nest("/api", api_router)
959            .nest("/trace", trace_ui_router)
960            .layer(CompressionLayer::new());
961
962        let listener = TcpListener::bind(&srv.dashboard_addr)
963            .await
964            .context("failed to bind dashboard address")?;
965        axum::serve(listener, app)
966            .await
967            .context("failed to serve dashboard service")?;
968
969        Ok(())
970    }
971}