risingwave_meta/dashboard/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prometheus;
16
17use std::net::SocketAddr;
18use std::sync::Arc;
19
20use anyhow::{Context as _, Result, anyhow};
21use axum::Router;
22use axum::extract::{Extension, Path};
23use axum::http::{Method, StatusCode};
24use axum::response::{IntoResponse, Response};
25use axum::routing::get;
26use risingwave_rpc_client::ComputeClientPool;
27use tokio::net::TcpListener;
28use tower::ServiceBuilder;
29use tower_http::add_extension::AddExtensionLayer;
30use tower_http::compression::CompressionLayer;
31use tower_http::cors::{self, CorsLayer};
32
33use crate::hummock::HummockManagerRef;
34use crate::manager::MetadataManager;
35use crate::manager::diagnose::DiagnoseCommandRef;
36
37#[derive(Clone)]
38pub struct DashboardService {
39    pub await_tree_reg: await_tree::Registry,
40    pub dashboard_addr: SocketAddr,
41    pub prometheus_client: Option<prometheus_http_query::Client>,
42    pub prometheus_selector: String,
43    pub metadata_manager: MetadataManager,
44    pub hummock_manager: HummockManagerRef,
45    pub compute_clients: ComputeClientPool,
46    pub diagnose_command: DiagnoseCommandRef,
47    pub trace_state: otlp_embedded::StateRef,
48}
49
50pub type Service = Arc<DashboardService>;
51
52pub(super) mod handlers {
53    use std::cmp::min;
54    use std::collections::HashMap;
55
56    use anyhow::Context;
57    use axum::Json;
58    use axum::extract::Query;
59    use futures::future::join_all;
60    use itertools::Itertools;
61    use risingwave_common::id::JobId;
62    use risingwave_meta_model::WorkerId;
63    use risingwave_pb::catalog::table::TableType;
64    use risingwave_pb::catalog::{
65        Index, PbDatabase, PbFunction, PbSchema, Sink, Source, Subscription, Table, View,
66    };
67    use risingwave_pb::common::{WorkerNode, WorkerType};
68    use risingwave_pb::hummock::TableStats;
69    use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
70    use risingwave_pb::meta::{
71        ActorIds, FragmentIdToActorIdMap, FragmentToRelationMap, PbTableFragments, RelationIdInfos,
72    };
73    use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
74    use risingwave_pb::monitor_service::{
75        ChannelDeltaStats, GetStreamingPrometheusStatsResponse, GetStreamingStatsResponse,
76        HeapProfilingResponse, ListHeapProfilingResponse, StackTraceResponse,
77    };
78    use risingwave_pb::user::PbUserInfo;
79    use serde::{Deserialize, Serialize};
80    use serde_json::json;
81    use thiserror_ext::AsReport;
82
83    use super::*;
84    use crate::controller::fragment::StreamingJobInfo;
85    use crate::rpc::await_tree::{dump_cluster_await_tree, dump_worker_node_await_tree};
86
87    #[derive(Serialize)]
88    pub struct TableWithStats {
89        #[serde(flatten)]
90        pub table: Table,
91        pub total_size_bytes: i64,
92        pub total_key_count: i64,
93        pub total_key_size: i64,
94        pub total_value_size: i64,
95        pub compressed_size: u64,
96    }
97
98    impl TableWithStats {
99        pub fn from_table_and_stats(table: Table, stats: Option<&TableStats>) -> Self {
100            match stats {
101                Some(stats) => Self {
102                    total_size_bytes: stats.total_key_size + stats.total_value_size,
103                    total_key_count: stats.total_key_count,
104                    total_key_size: stats.total_key_size,
105                    total_value_size: stats.total_value_size,
106                    compressed_size: stats.total_compressed_size,
107                    table,
108                },
109                None => Self {
110                    total_size_bytes: 0,
111                    total_key_count: 0,
112                    total_key_size: 0,
113                    total_value_size: 0,
114                    compressed_size: 0,
115                    table,
116                },
117            }
118        }
119    }
120
121    pub struct DashboardError(anyhow::Error);
122    pub type Result<T> = std::result::Result<T, DashboardError>;
123
124    pub fn err(err: impl Into<anyhow::Error>) -> DashboardError {
125        DashboardError(err.into())
126    }
127
128    impl From<anyhow::Error> for DashboardError {
129        fn from(value: anyhow::Error) -> Self {
130            DashboardError(value)
131        }
132    }
133
134    impl IntoResponse for DashboardError {
135        fn into_response(self) -> axum::response::Response {
136            let mut resp = Json(json!({
137                "error": self.0.to_report_string(),
138            }))
139            .into_response();
140            *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
141            resp
142        }
143    }
144
145    pub async fn list_clusters(
146        Path(ty): Path<i32>,
147        Extension(srv): Extension<Service>,
148    ) -> Result<Json<Vec<WorkerNode>>> {
149        let worker_type = WorkerType::try_from(ty)
150            .map_err(|_| anyhow!("invalid worker type"))
151            .map_err(err)?;
152        let mut result = srv
153            .metadata_manager
154            .list_worker_node(Some(worker_type), None)
155            .await
156            .map_err(err)?;
157        result.sort_unstable_by_key(|n| n.id);
158        Ok(result.into())
159    }
160
161    async fn list_table_catalogs_inner(
162        metadata_manager: &MetadataManager,
163        hummock_manager: &HummockManagerRef,
164        table_type: TableType,
165    ) -> Result<Json<Vec<TableWithStats>>> {
166        let tables = metadata_manager
167            .catalog_controller
168            .list_tables_by_type(table_type.into())
169            .await
170            .map_err(err)?;
171
172        // Get table statistics from hummock manager
173        let version_stats = hummock_manager.get_version_stats().await;
174
175        let tables_with_stats = tables
176            .into_iter()
177            .map(|table| {
178                let stats = version_stats.table_stats.get(&table.id);
179                TableWithStats::from_table_and_stats(table, stats)
180            })
181            .collect();
182
183        Ok(Json(tables_with_stats))
184    }
185
186    pub async fn list_materialized_views(
187        Extension(srv): Extension<Service>,
188    ) -> Result<Json<Vec<TableWithStats>>> {
189        list_table_catalogs_inner(
190            &srv.metadata_manager,
191            &srv.hummock_manager,
192            TableType::MaterializedView,
193        )
194        .await
195    }
196
197    pub async fn list_tables(
198        Extension(srv): Extension<Service>,
199    ) -> Result<Json<Vec<TableWithStats>>> {
200        list_table_catalogs_inner(
201            &srv.metadata_manager,
202            &srv.hummock_manager,
203            TableType::Table,
204        )
205        .await
206    }
207
208    pub async fn list_index_tables(
209        Extension(srv): Extension<Service>,
210    ) -> Result<Json<Vec<TableWithStats>>> {
211        list_table_catalogs_inner(
212            &srv.metadata_manager,
213            &srv.hummock_manager,
214            TableType::Index,
215        )
216        .await
217    }
218
219    pub async fn list_indexes(Extension(srv): Extension<Service>) -> Result<Json<Vec<Index>>> {
220        let indexes = srv
221            .metadata_manager
222            .catalog_controller
223            .list_indexes()
224            .await
225            .map_err(err)?;
226
227        Ok(Json(indexes))
228    }
229
230    pub async fn list_subscription(
231        Extension(srv): Extension<Service>,
232    ) -> Result<Json<Vec<Subscription>>> {
233        let subscriptions = srv
234            .metadata_manager
235            .catalog_controller
236            .list_subscriptions()
237            .await
238            .map_err(err)?;
239
240        Ok(Json(subscriptions))
241    }
242
243    pub async fn list_internal_tables(
244        Extension(srv): Extension<Service>,
245    ) -> Result<Json<Vec<TableWithStats>>> {
246        list_table_catalogs_inner(
247            &srv.metadata_manager,
248            &srv.hummock_manager,
249            TableType::Internal,
250        )
251        .await
252    }
253
254    pub async fn list_sources(Extension(srv): Extension<Service>) -> Result<Json<Vec<Source>>> {
255        let sources = srv.metadata_manager.list_sources().await.map_err(err)?;
256
257        Ok(Json(sources))
258    }
259
260    pub async fn list_sinks(Extension(srv): Extension<Service>) -> Result<Json<Vec<Sink>>> {
261        let sinks = srv
262            .metadata_manager
263            .catalog_controller
264            .list_sinks()
265            .await
266            .map_err(err)?;
267
268        Ok(Json(sinks))
269    }
270
271    pub async fn list_views(Extension(srv): Extension<Service>) -> Result<Json<Vec<View>>> {
272        let views = srv
273            .metadata_manager
274            .catalog_controller
275            .list_views()
276            .await
277            .map_err(err)?;
278
279        Ok(Json(views))
280    }
281
282    pub async fn list_functions(
283        Extension(srv): Extension<Service>,
284    ) -> Result<Json<Vec<PbFunction>>> {
285        let functions = srv
286            .metadata_manager
287            .catalog_controller
288            .list_functions()
289            .await
290            .map_err(err)?;
291
292        Ok(Json(functions))
293    }
294
295    pub async fn list_streaming_jobs(
296        Extension(srv): Extension<Service>,
297    ) -> Result<Json<Vec<StreamingJobInfo>>> {
298        let streaming_jobs = srv
299            .metadata_manager
300            .catalog_controller
301            .list_streaming_job_infos()
302            .await
303            .map_err(err)?;
304
305        Ok(Json(streaming_jobs))
306    }
307
308    /// In the ddl backpressure graph, we want to compute the backpressure between relations.
309    /// So we need to know which are the fragments which are connected to external relations.
310    /// These fragments form the vertices of the graph.
311    /// We can get collection of backpressure values, keyed by vertex_id-vertex_id.
312    /// This function will return a map of fragment vertex id to relation id.
313    /// We can convert `fragment_id-fragment_id` to `relation_id-relation_id` using that.
314    /// Finally, we have a map of `relation_id-relation_id` to backpressure values.
315    pub async fn get_fragment_to_relation_map(
316        Extension(srv): Extension<Service>,
317    ) -> Result<Json<FragmentToRelationMap>> {
318        let table_fragments = srv
319            .metadata_manager
320            .catalog_controller
321            .table_fragments()
322            .await
323            .map_err(err)?;
324        let mut fragment_to_relation_map = HashMap::new();
325        for (relation_id, tf) in table_fragments {
326            for fragment_id in tf.fragments.keys() {
327                fragment_to_relation_map.insert(*fragment_id, relation_id.as_raw_id());
328            }
329        }
330        let map = FragmentToRelationMap {
331            fragment_to_relation_map,
332        };
333        Ok(Json(map))
334    }
335
336    /// Provides a hierarchy of relation ids to fragments to actors.
337    pub async fn get_relation_id_infos(
338        Extension(srv): Extension<Service>,
339    ) -> Result<Json<RelationIdInfos>> {
340        let table_fragments = srv
341            .metadata_manager
342            .catalog_controller
343            .table_fragments()
344            .await
345            .map_err(err)?;
346        let mut map = HashMap::new();
347        for (id, tf) in table_fragments {
348            let mut fragment_id_to_actor_ids = HashMap::new();
349            for (fragment_id, fragment) in &tf.fragments {
350                let actor_ids = fragment.actors.iter().map(|a| a.actor_id).collect_vec();
351                fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
352            }
353            map.insert(
354                id.as_raw_id(),
355                FragmentIdToActorIdMap {
356                    map: fragment_id_to_actor_ids,
357                },
358            );
359        }
360        let relation_id_infos = RelationIdInfos { map };
361
362        Ok(Json(relation_id_infos))
363    }
364
365    pub async fn list_fragments_by_job_id(
366        Extension(srv): Extension<Service>,
367        Path(job_id): Path<u32>,
368    ) -> Result<Json<PbTableFragments>> {
369        let job_id = JobId::new(job_id);
370        let table_fragments = srv
371            .metadata_manager
372            .get_job_fragments_by_id(job_id)
373            .await
374            .map_err(err)?;
375        let upstream_fragments = srv
376            .metadata_manager
377            .catalog_controller
378            .upstream_fragments(table_fragments.fragment_ids())
379            .await
380            .map_err(err)?;
381        let dispatchers = srv
382            .metadata_manager
383            .catalog_controller
384            .get_fragment_actor_dispatchers(
385                table_fragments.fragment_ids().map(|id| id as _).collect(),
386            )
387            .await
388            .map_err(err)?;
389        Ok(Json(
390            table_fragments.to_protobuf(&upstream_fragments, &dispatchers),
391        ))
392    }
393
394    pub async fn list_users(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbUserInfo>>> {
395        let users = srv
396            .metadata_manager
397            .catalog_controller
398            .list_users()
399            .await
400            .map_err(err)?;
401
402        Ok(Json(users))
403    }
404
405    pub async fn list_databases(
406        Extension(srv): Extension<Service>,
407    ) -> Result<Json<Vec<PbDatabase>>> {
408        let databases = srv
409            .metadata_manager
410            .catalog_controller
411            .list_databases()
412            .await
413            .map_err(err)?;
414
415        Ok(Json(databases))
416    }
417
418    pub async fn list_schemas(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbSchema>>> {
419        let schemas = srv
420            .metadata_manager
421            .catalog_controller
422            .list_schemas()
423            .await
424            .map_err(err)?;
425
426        Ok(Json(schemas))
427    }
428
429    pub async fn list_object_dependencies(
430        Extension(srv): Extension<Service>,
431    ) -> Result<Json<Vec<PbObjectDependencies>>> {
432        let object_dependencies = srv
433            .metadata_manager
434            .catalog_controller
435            .list_all_object_dependencies()
436            .await
437            .map_err(err)?;
438
439        Ok(Json(object_dependencies))
440    }
441
442    #[derive(Debug, Deserialize)]
443    pub struct AwaitTreeDumpParams {
444        #[serde(default = "await_tree_default_format")]
445        format: String,
446    }
447
448    impl AwaitTreeDumpParams {
449        /// Parse the `format` parameter to [`ActorTracesFormat`].
450        pub fn actor_traces_format(&self) -> Result<ActorTracesFormat> {
451            Ok(match self.format.as_str() {
452                "text" => ActorTracesFormat::Text,
453                "json" => ActorTracesFormat::Json,
454                _ => {
455                    return Err(err(anyhow!(
456                        "Unsupported format `{}`, only `text` and `json` are supported for now",
457                        self.format
458                    )));
459                }
460            })
461        }
462    }
463
464    fn await_tree_default_format() -> String {
465        // In dashboard, await tree is usually for engineer to debug, so we use human-readable text format by default here.
466        "text".to_owned()
467    }
468
469    pub async fn dump_await_tree_all(
470        Query(params): Query<AwaitTreeDumpParams>,
471        Extension(srv): Extension<Service>,
472    ) -> Result<Json<StackTraceResponse>> {
473        let actor_traces_format = params.actor_traces_format()?;
474
475        let res = dump_cluster_await_tree(
476            &srv.metadata_manager,
477            &srv.await_tree_reg,
478            actor_traces_format,
479        )
480        .await
481        .map_err(err)?;
482
483        Ok(res.into())
484    }
485
486    pub async fn dump_await_tree(
487        Path(worker_id): Path<WorkerId>,
488        Query(params): Query<AwaitTreeDumpParams>,
489        Extension(srv): Extension<Service>,
490    ) -> Result<Json<StackTraceResponse>> {
491        let actor_traces_format = params.actor_traces_format()?;
492
493        let worker_node = srv
494            .metadata_manager
495            .get_worker_by_id(worker_id)
496            .await
497            .map_err(err)?
498            .context("worker node not found")
499            .map_err(err)?;
500
501        let res = dump_worker_node_await_tree(std::iter::once(&worker_node), actor_traces_format)
502            .await
503            .map_err(err)?;
504
505        Ok(res.into())
506    }
507
508    pub async fn heap_profile(
509        Path(worker_id): Path<WorkerId>,
510        Extension(srv): Extension<Service>,
511    ) -> Result<Json<HeapProfilingResponse>> {
512        let worker_node = srv
513            .metadata_manager
514            .get_worker_by_id(worker_id)
515            .await
516            .map_err(err)?
517            .context("worker node not found")
518            .map_err(err)?;
519
520        let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
521
522        let result = client.heap_profile("".to_owned()).await.map_err(err)?;
523
524        Ok(result.into())
525    }
526
527    pub async fn list_heap_profile(
528        Path(worker_id): Path<WorkerId>,
529        Extension(srv): Extension<Service>,
530    ) -> Result<Json<ListHeapProfilingResponse>> {
531        let worker_node = srv
532            .metadata_manager
533            .get_worker_by_id(worker_id)
534            .await
535            .map_err(err)?
536            .context("worker node not found")
537            .map_err(err)?;
538
539        let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
540
541        let result = client.list_heap_profile().await.map_err(err)?;
542        Ok(result.into())
543    }
544
545    pub async fn analyze_heap(
546        Path((worker_id, file_path)): Path<(WorkerId, String)>,
547        Extension(srv): Extension<Service>,
548    ) -> Result<Response> {
549        let file_path =
550            String::from_utf8(base64_url::decode(&file_path).map_err(err)?).map_err(err)?;
551
552        let worker_node = srv
553            .metadata_manager
554            .get_worker_by_id(worker_id)
555            .await
556            .map_err(err)?
557            .context("worker node not found")
558            .map_err(err)?;
559
560        let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
561
562        let collapsed_bin = client
563            .analyze_heap(file_path.clone())
564            .await
565            .map_err(err)?
566            .result;
567        let collapsed_str = String::from_utf8_lossy(&collapsed_bin).to_string();
568
569        let response = Response::builder()
570            .header("Content-Type", "application/octet-stream")
571            .body(collapsed_str.into());
572
573        response.map_err(err)
574    }
575
576    #[derive(Debug, Deserialize)]
577    pub struct DiagnoseParams {
578        #[serde(default = "await_tree_default_format")]
579        actor_traces_format: String,
580    }
581
582    pub async fn diagnose(
583        Query(params): Query<DiagnoseParams>,
584        Extension(srv): Extension<Service>,
585    ) -> Result<String> {
586        let actor_traces_format = match params.actor_traces_format.as_str() {
587            "text" => ActorTracesFormat::Text,
588            "json" => ActorTracesFormat::Json,
589            _ => {
590                return Err(err(anyhow!(
591                    "Unsupported actor_traces_format `{}`, only `text` and `json` are supported for now",
592                    params.actor_traces_format
593                )));
594            }
595        };
596        Ok(srv.diagnose_command.report(actor_traces_format).await)
597    }
598
599    /// NOTE(kwannoel): Although we fetch the BP for the entire graph via this API,
600    /// the workload should be reasonable.
601    /// In most cases, we can safely assume each node has most 2 outgoing edges (e.g. join).
602    /// In such a scenario, the number of edges is linear to the number of nodes.
603    /// So the workload is proportional to the relation id graph we fetch in `get_relation_id_infos`.
604    pub async fn get_streaming_stats(
605        Extension(srv): Extension<Service>,
606    ) -> Result<Json<GetStreamingStatsResponse>> {
607        let worker_nodes = srv
608            .metadata_manager
609            .list_active_streaming_compute_nodes()
610            .await
611            .map_err(err)?;
612
613        let mut futures = Vec::new();
614
615        for worker_node in worker_nodes {
616            let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
617            let client = Arc::new(client);
618            let fut = async move {
619                let result = client.get_streaming_stats().await.map_err(err)?;
620                Ok::<_, DashboardError>(result)
621            };
622            futures.push(fut);
623        }
624        let results = join_all(futures).await;
625
626        let mut all = GetStreamingStatsResponse::default();
627
628        for result in results {
629            let result = result
630                .map_err(|_| anyhow!("Failed to get back pressure"))
631                .map_err(err)?;
632
633            // Aggregate fragment_stats
634            for (fragment_id, fragment_stats) in result.fragment_stats {
635                if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
636                    s.actor_count += fragment_stats.actor_count;
637                    s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
638                } else {
639                    all.fragment_stats.insert(fragment_id, fragment_stats);
640                }
641            }
642
643            // Aggregate relation_stats
644            for (relation_id, relation_stats) in result.relation_stats {
645                if let Some(s) = all.relation_stats.get_mut(&relation_id) {
646                    s.actor_count += relation_stats.actor_count;
647                    s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
648                } else {
649                    all.relation_stats.insert(relation_id, relation_stats);
650                }
651            }
652
653            // Aggregate channel_stats
654            for (key, channel_stats) in result.channel_stats {
655                if let Some(s) = all.channel_stats.get_mut(&key) {
656                    s.actor_count += channel_stats.actor_count;
657                    s.output_blocking_duration += channel_stats.output_blocking_duration;
658                    s.recv_row_count += channel_stats.recv_row_count;
659                    s.send_row_count += channel_stats.send_row_count;
660                } else {
661                    all.channel_stats.insert(key, channel_stats);
662                }
663            }
664        }
665
666        Ok(all.into())
667    }
668
669    #[derive(Debug, Deserialize)]
670    pub struct StreamingStatsPrometheusParams {
671        /// Unix timestamp in seconds for the evaluation time. If not set, defaults to current Prometheus server time.
672        #[serde(default)]
673        at: Option<i64>,
674        /// Time offset for throughput and backpressure rate calculation in seconds. If not set, defaults to 60s.
675        #[serde(default = "streaming_stats_default_time_offset")]
676        time_offset: i64,
677    }
678
679    fn streaming_stats_default_time_offset() -> i64 {
680        60
681    }
682
683    pub async fn get_streaming_stats_from_prometheus(
684        Query(params): Query<StreamingStatsPrometheusParams>,
685        Extension(srv): Extension<Service>,
686    ) -> Result<Json<GetStreamingPrometheusStatsResponse>> {
687        let mut all = GetStreamingPrometheusStatsResponse::default();
688
689        // Get fragment and relation stats from workers
690        let worker_nodes = srv
691            .metadata_manager
692            .list_active_streaming_compute_nodes()
693            .await
694            .map_err(err)?;
695
696        let mut futures = Vec::new();
697
698        for worker_node in worker_nodes {
699            let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
700            let client = Arc::new(client);
701            let fut = async move {
702                let result = client.get_streaming_stats().await.map_err(err)?;
703                Ok::<_, DashboardError>(result)
704            };
705            futures.push(fut);
706        }
707        let results = join_all(futures).await;
708
709        for result in results {
710            let result = result
711                .map_err(|_| anyhow!("Failed to get streaming stats from worker"))
712                .map_err(err)?;
713
714            // Aggregate fragment_stats
715            for (fragment_id, fragment_stats) in result.fragment_stats {
716                if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
717                    s.actor_count += fragment_stats.actor_count;
718                    s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
719                } else {
720                    all.fragment_stats.insert(fragment_id, fragment_stats);
721                }
722            }
723
724            // Aggregate relation_stats
725            for (relation_id, relation_stats) in result.relation_stats {
726                if let Some(s) = all.relation_stats.get_mut(&relation_id) {
727                    s.actor_count += relation_stats.actor_count;
728                    s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
729                } else {
730                    all.relation_stats.insert(relation_id, relation_stats);
731                }
732            }
733        }
734
735        // Get channel delta stats from Prometheus
736        if let Some(ref client) = srv.prometheus_client {
737            // Query channel delta stats: throughput and backpressure rate
738            let channel_input_throughput_query = format!(
739                "sum(rate(stream_actor_in_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
740                srv.prometheus_selector, params.time_offset
741            );
742            let channel_output_throughput_query = format!(
743                "sum(rate(stream_actor_out_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
744                srv.prometheus_selector, params.time_offset
745            );
746            let channel_backpressure_query = format!(
747                "sum(rate(stream_actor_output_buffer_blocking_duration_ns{{{}}}[{}s])) by (fragment_id, downstream_fragment_id) \
748                 / ignoring (downstream_fragment_id) group_left sum(stream_actor_count) by (fragment_id)",
749                srv.prometheus_selector, params.time_offset
750            );
751
752            // Execute all queries concurrently with optional time parameter
753            let (
754                channel_input_throughput_result,
755                channel_output_throughput_result,
756                channel_backpressure_result,
757            ) = {
758                let mut input_query = client.query(channel_input_throughput_query);
759                let mut output_query = client.query(channel_output_throughput_query);
760                let mut backpressure_query = client.query(channel_backpressure_query);
761
762                // Set the evaluation time if provided
763                if let Some(at_time) = params.at {
764                    input_query = input_query.at(at_time);
765                    output_query = output_query.at(at_time);
766                    backpressure_query = backpressure_query.at(at_time);
767                }
768
769                tokio::try_join!(
770                    input_query.get(),
771                    output_query.get(),
772                    backpressure_query.get(),
773                )
774                .map_err(err)?
775            };
776
777            // Process channel delta stats
778            let mut channel_data = HashMap::new();
779
780            // Collect input throughput
781            if let Some(channel_input_throughput_data) =
782                channel_input_throughput_result.data().as_vector()
783            {
784                for sample in channel_input_throughput_data {
785                    if let Some(fragment_id_str) = sample.metric().get("fragment_id")
786                        && let Some(upstream_fragment_id_str) =
787                            sample.metric().get("upstream_fragment_id")
788                        && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
789                            fragment_id_str.parse::<u32>(),
790                            upstream_fragment_id_str.parse::<u32>(),
791                        )
792                    {
793                        let key = format!("{}_{}", upstream_fragment_id, fragment_id);
794                        channel_data
795                            .entry(key)
796                            .or_insert_with(|| ChannelDeltaStats {
797                                actor_count: 0,
798                                backpressure_rate: 0.0,
799                                recv_throughput: 0.0,
800                                send_throughput: 0.0,
801                            })
802                            .recv_throughput = sample.sample().value();
803                    }
804                }
805            }
806
807            // Collect output throughput
808            if let Some(channel_output_throughput_data) =
809                channel_output_throughput_result.data().as_vector()
810            {
811                for sample in channel_output_throughput_data {
812                    if let Some(fragment_id_str) = sample.metric().get("fragment_id")
813                        && let Some(upstream_fragment_id_str) =
814                            sample.metric().get("upstream_fragment_id")
815                        && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
816                            fragment_id_str.parse::<u32>(),
817                            upstream_fragment_id_str.parse::<u32>(),
818                        )
819                    {
820                        let key = format!("{}_{}", upstream_fragment_id, fragment_id);
821                        channel_data
822                            .entry(key)
823                            .or_insert_with(|| ChannelDeltaStats {
824                                actor_count: 0,
825                                backpressure_rate: 0.0,
826                                recv_throughput: 0.0,
827                                send_throughput: 0.0,
828                            })
829                            .send_throughput = sample.sample().value();
830                    }
831                }
832            }
833
834            // Collect backpressure rate
835            if let Some(channel_backpressure_data) = channel_backpressure_result.data().as_vector()
836            {
837                for sample in channel_backpressure_data {
838                    if let Some(fragment_id_str) = sample.metric().get("fragment_id")
839                        && let Some(downstream_fragment_id_str) =
840                            sample.metric().get("downstream_fragment_id")
841                        && let (Ok(fragment_id), Ok(downstream_fragment_id)) = (
842                            fragment_id_str.parse::<u32>(),
843                            downstream_fragment_id_str.parse::<u32>(),
844                        )
845                    {
846                        let key = format!("{}_{}", fragment_id, downstream_fragment_id);
847                        channel_data
848                            .entry(key)
849                            .or_insert_with(|| ChannelDeltaStats {
850                                actor_count: 0,
851                                backpressure_rate: 0.0,
852                                recv_throughput: 0.0,
853                                send_throughput: 0.0,
854                            })
855                            .backpressure_rate = sample.sample().value() / 1_000_000_000.0; // Convert ns to seconds
856                    }
857                }
858            }
859
860            // Set actor count for channels (using fragment actor count as approximation)
861            for (key, channel_stats) in &mut channel_data {
862                let parts: Vec<&str> = key.split('_').collect();
863                if parts.len() == 2
864                    && let Ok(fragment_id) = parts[1].parse::<u32>()
865                    && let Some(fragment_stats) = all.fragment_stats.get(&fragment_id)
866                {
867                    channel_stats.actor_count = fragment_stats.actor_count;
868                }
869            }
870
871            all.channel_stats = channel_data;
872
873            Ok(Json(all))
874        } else {
875            Err(err(anyhow!("Prometheus endpoint is not set")))
876        }
877    }
878
879    pub async fn get_version(Extension(_srv): Extension<Service>) -> Result<Json<String>> {
880        Ok(Json(risingwave_common::current_cluster_version()))
881    }
882}
883
884impl DashboardService {
885    pub async fn serve(self) -> Result<()> {
886        use handlers::*;
887        let srv = Arc::new(self);
888
889        let cors_layer = CorsLayer::new()
890            .allow_origin(cors::Any)
891            .allow_methods(vec![Method::GET]);
892
893        let api_router = Router::new()
894            .route("/version", get(get_version))
895            .route("/clusters/:ty", get(list_clusters))
896            .route("/streaming_jobs", get(list_streaming_jobs))
897            .route("/fragments/job_id/:job_id", get(list_fragments_by_job_id))
898            .route("/relation_id_infos", get(get_relation_id_infos))
899            .route(
900                "/fragment_to_relation_map",
901                get(get_fragment_to_relation_map),
902            )
903            .route("/views", get(list_views))
904            .route("/functions", get(list_functions))
905            .route("/materialized_views", get(list_materialized_views))
906            .route("/tables", get(list_tables))
907            .route("/indexes", get(list_index_tables))
908            .route("/index_items", get(list_indexes))
909            .route("/subscriptions", get(list_subscription))
910            .route("/internal_tables", get(list_internal_tables))
911            .route("/sources", get(list_sources))
912            .route("/sinks", get(list_sinks))
913            .route("/users", get(list_users))
914            .route("/databases", get(list_databases))
915            .route("/schemas", get(list_schemas))
916            .route("/object_dependencies", get(list_object_dependencies))
917            .route("/metrics/cluster", get(prometheus::list_prometheus_cluster))
918            .route("/metrics/streaming_stats", get(get_streaming_stats))
919            .route(
920                "/metrics/streaming_stats_prometheus",
921                get(get_streaming_stats_from_prometheus),
922            )
923            // /monitor/await_tree/{worker_id}/?format={text or json}
924            .route("/monitor/await_tree/:worker_id", get(dump_await_tree))
925            // /monitor/await_tree/?format={text or json}
926            .route("/monitor/await_tree/", get(dump_await_tree_all))
927            .route("/monitor/dump_heap_profile/:worker_id", get(heap_profile))
928            .route(
929                "/monitor/list_heap_profile/:worker_id",
930                get(list_heap_profile),
931            )
932            .route("/monitor/analyze/:worker_id/*path", get(analyze_heap))
933            // /monitor/diagnose/?format={text or json}
934            .route("/monitor/diagnose/", get(diagnose))
935            .layer(
936                ServiceBuilder::new()
937                    .layer(AddExtensionLayer::new(srv.clone()))
938                    .into_inner(),
939            )
940            .layer(cors_layer);
941
942        let trace_ui_router = otlp_embedded::ui_app(srv.trace_state.clone(), "/trace/");
943        let dashboard_router = risingwave_meta_dashboard::router();
944
945        let app = Router::new()
946            .fallback_service(dashboard_router)
947            .nest("/api", api_router)
948            .nest("/trace", trace_ui_router)
949            .layer(CompressionLayer::new());
950
951        let listener = TcpListener::bind(&srv.dashboard_addr)
952            .await
953            .context("failed to bind dashboard address")?;
954        axum::serve(listener, app)
955            .await
956            .context("failed to serve dashboard service")?;
957
958        Ok(())
959    }
960}