risingwave_meta/dashboard/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prometheus;
16
17use std::net::SocketAddr;
18use std::path::Path as FilePath;
19use std::sync::Arc;
20
21use anyhow::{Context as _, Result, anyhow};
22use axum::Router;
23use axum::extract::{Extension, Path};
24use axum::http::{Method, StatusCode};
25use axum::response::{IntoResponse, Response};
26use axum::routing::get;
27use risingwave_rpc_client::ComputeClientPool;
28use tokio::net::TcpListener;
29use tower::ServiceBuilder;
30use tower_http::add_extension::AddExtensionLayer;
31use tower_http::compression::CompressionLayer;
32use tower_http::cors::{self, CorsLayer};
33
34use crate::hummock::HummockManagerRef;
35use crate::manager::MetadataManager;
36use crate::manager::diagnose::DiagnoseCommandRef;
37
38#[derive(Clone)]
39pub struct DashboardService {
40    pub await_tree_reg: await_tree::Registry,
41    pub dashboard_addr: SocketAddr,
42    pub prometheus_client: Option<prometheus_http_query::Client>,
43    pub prometheus_selector: String,
44    pub metadata_manager: MetadataManager,
45    pub hummock_manager: HummockManagerRef,
46    pub compute_clients: ComputeClientPool,
47    pub diagnose_command: DiagnoseCommandRef,
48    pub trace_state: otlp_embedded::StateRef,
49}
50
51pub type Service = Arc<DashboardService>;
52
53pub(super) mod handlers {
54    use std::cmp::min;
55    use std::collections::HashMap;
56
57    use anyhow::Context;
58    use axum::Json;
59    use axum::extract::Query;
60    use futures::future::join_all;
61    use itertools::Itertools;
62    use risingwave_common::catalog::{FragmentTypeFlag, TableId};
63    use risingwave_common_heap_profiling::COLLAPSED_SUFFIX;
64    use risingwave_meta_model::WorkerId;
65    use risingwave_pb::catalog::table::TableType;
66    use risingwave_pb::catalog::{
67        Index, PbDatabase, PbFunction, PbSchema, Sink, Source, Subscription, Table, View,
68    };
69    use risingwave_pb::common::{WorkerNode, WorkerType};
70    use risingwave_pb::hummock::TableStats;
71    use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
72    use risingwave_pb::meta::{
73        ActorIds, FragmentIdToActorIdMap, FragmentToRelationMap, PbTableFragments, RelationIdInfos,
74    };
75    use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
76    use risingwave_pb::monitor_service::{
77        GetStreamingStatsResponse, HeapProfilingResponse, ListHeapProfilingResponse,
78        StackTraceResponse,
79    };
80    use risingwave_pb::user::PbUserInfo;
81    use serde::{Deserialize, Serialize};
82    use serde_json::json;
83    use thiserror_ext::AsReport;
84
85    use super::*;
86    use crate::controller::fragment::StreamingJobInfo;
87    use crate::rpc::await_tree::{dump_cluster_await_tree, dump_worker_node_await_tree};
88
89    #[derive(Serialize)]
90    pub struct TableWithStats {
91        #[serde(flatten)]
92        pub table: Table,
93        pub total_size_bytes: i64,
94        pub total_key_count: i64,
95        pub total_key_size: i64,
96        pub total_value_size: i64,
97        pub compressed_size: u64,
98    }
99
100    impl TableWithStats {
101        pub fn from_table_and_stats(table: Table, stats: Option<&TableStats>) -> Self {
102            match stats {
103                Some(stats) => Self {
104                    total_size_bytes: stats.total_key_size + stats.total_value_size,
105                    total_key_count: stats.total_key_count,
106                    total_key_size: stats.total_key_size,
107                    total_value_size: stats.total_value_size,
108                    compressed_size: stats.total_compressed_size,
109                    table,
110                },
111                None => Self {
112                    total_size_bytes: 0,
113                    total_key_count: 0,
114                    total_key_size: 0,
115                    total_value_size: 0,
116                    compressed_size: 0,
117                    table,
118                },
119            }
120        }
121    }
122
123    pub struct DashboardError(anyhow::Error);
124    pub type Result<T> = std::result::Result<T, DashboardError>;
125
126    pub fn err(err: impl Into<anyhow::Error>) -> DashboardError {
127        DashboardError(err.into())
128    }
129
130    impl From<anyhow::Error> for DashboardError {
131        fn from(value: anyhow::Error) -> Self {
132            DashboardError(value)
133        }
134    }
135
136    impl IntoResponse for DashboardError {
137        fn into_response(self) -> axum::response::Response {
138            let mut resp = Json(json!({
139                "error": self.0.to_report_string(),
140            }))
141            .into_response();
142            *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
143            resp
144        }
145    }
146
147    pub async fn list_clusters(
148        Path(ty): Path<i32>,
149        Extension(srv): Extension<Service>,
150    ) -> Result<Json<Vec<WorkerNode>>> {
151        let worker_type = WorkerType::try_from(ty)
152            .map_err(|_| anyhow!("invalid worker type"))
153            .map_err(err)?;
154        let mut result = srv
155            .metadata_manager
156            .list_worker_node(Some(worker_type), None)
157            .await
158            .map_err(err)?;
159        result.sort_unstable_by_key(|n| n.id);
160        Ok(result.into())
161    }
162
163    async fn list_table_catalogs_inner(
164        metadata_manager: &MetadataManager,
165        hummock_manager: &HummockManagerRef,
166        table_type: TableType,
167    ) -> Result<Json<Vec<TableWithStats>>> {
168        let tables = metadata_manager
169            .catalog_controller
170            .list_tables_by_type(table_type.into())
171            .await
172            .map_err(err)?;
173
174        // Get table statistics from hummock manager
175        let version_stats = hummock_manager.get_version_stats().await;
176
177        let tables_with_stats = tables
178            .into_iter()
179            .map(|table| {
180                let stats = version_stats.table_stats.get(&table.id);
181                TableWithStats::from_table_and_stats(table, stats)
182            })
183            .collect();
184
185        Ok(Json(tables_with_stats))
186    }
187
188    pub async fn list_materialized_views(
189        Extension(srv): Extension<Service>,
190    ) -> Result<Json<Vec<TableWithStats>>> {
191        list_table_catalogs_inner(
192            &srv.metadata_manager,
193            &srv.hummock_manager,
194            TableType::MaterializedView,
195        )
196        .await
197    }
198
199    pub async fn list_tables(
200        Extension(srv): Extension<Service>,
201    ) -> Result<Json<Vec<TableWithStats>>> {
202        list_table_catalogs_inner(
203            &srv.metadata_manager,
204            &srv.hummock_manager,
205            TableType::Table,
206        )
207        .await
208    }
209
210    pub async fn list_index_tables(
211        Extension(srv): Extension<Service>,
212    ) -> Result<Json<Vec<TableWithStats>>> {
213        list_table_catalogs_inner(
214            &srv.metadata_manager,
215            &srv.hummock_manager,
216            TableType::Index,
217        )
218        .await
219    }
220
221    pub async fn list_indexes(Extension(srv): Extension<Service>) -> Result<Json<Vec<Index>>> {
222        let indexes = srv
223            .metadata_manager
224            .catalog_controller
225            .list_indexes()
226            .await
227            .map_err(err)?;
228
229        Ok(Json(indexes))
230    }
231
232    pub async fn list_subscription(
233        Extension(srv): Extension<Service>,
234    ) -> Result<Json<Vec<Subscription>>> {
235        let subscriptions = srv
236            .metadata_manager
237            .catalog_controller
238            .list_subscriptions()
239            .await
240            .map_err(err)?;
241
242        Ok(Json(subscriptions))
243    }
244
245    pub async fn list_internal_tables(
246        Extension(srv): Extension<Service>,
247    ) -> Result<Json<Vec<TableWithStats>>> {
248        list_table_catalogs_inner(
249            &srv.metadata_manager,
250            &srv.hummock_manager,
251            TableType::Internal,
252        )
253        .await
254    }
255
256    pub async fn list_sources(Extension(srv): Extension<Service>) -> Result<Json<Vec<Source>>> {
257        let sources = srv.metadata_manager.list_sources().await.map_err(err)?;
258
259        Ok(Json(sources))
260    }
261
262    pub async fn list_sinks(Extension(srv): Extension<Service>) -> Result<Json<Vec<Sink>>> {
263        let sinks = srv
264            .metadata_manager
265            .catalog_controller
266            .list_sinks()
267            .await
268            .map_err(err)?;
269
270        Ok(Json(sinks))
271    }
272
273    pub async fn list_views(Extension(srv): Extension<Service>) -> Result<Json<Vec<View>>> {
274        let views = srv
275            .metadata_manager
276            .catalog_controller
277            .list_views()
278            .await
279            .map_err(err)?;
280
281        Ok(Json(views))
282    }
283
284    pub async fn list_functions(
285        Extension(srv): Extension<Service>,
286    ) -> Result<Json<Vec<PbFunction>>> {
287        let functions = srv
288            .metadata_manager
289            .catalog_controller
290            .list_functions()
291            .await
292            .map_err(err)?;
293
294        Ok(Json(functions))
295    }
296
297    pub async fn list_streaming_jobs(
298        Extension(srv): Extension<Service>,
299    ) -> Result<Json<Vec<StreamingJobInfo>>> {
300        let streaming_jobs = srv
301            .metadata_manager
302            .catalog_controller
303            .list_streaming_job_infos()
304            .await
305            .map_err(err)?;
306
307        Ok(Json(streaming_jobs))
308    }
309
310    /// In the ddl backpressure graph, we want to compute the backpressure between relations.
311    /// So we need to know which are the fragments which are connected to external relations.
312    /// These fragments form the vertices of the graph.
313    /// We can get collection of backpressure values, keyed by vertex_id-vertex_id.
314    /// This function will return a map of fragment vertex id to relation id.
315    /// We can convert `fragment_id-fragment_id` to `relation_id-relation_id` using that.
316    /// Finally, we have a map of `relation_id-relation_id` to backpressure values.
317    pub async fn get_fragment_to_relation_map(
318        Extension(srv): Extension<Service>,
319    ) -> Result<Json<FragmentToRelationMap>> {
320        let table_fragments = srv
321            .metadata_manager
322            .catalog_controller
323            .table_fragments()
324            .await
325            .map_err(err)?;
326        let mut in_map = HashMap::new();
327        let mut out_map = HashMap::new();
328        for (relation_id, tf) in table_fragments {
329            for (fragment_id, fragment) in &tf.fragments {
330                if fragment
331                    .fragment_type_mask
332                    .contains(FragmentTypeFlag::StreamScan)
333                {
334                    in_map.insert(*fragment_id, relation_id as u32);
335                }
336                if fragment
337                    .fragment_type_mask
338                    .contains(FragmentTypeFlag::Mview)
339                {
340                    out_map.insert(*fragment_id, relation_id as u32);
341                }
342            }
343        }
344        let map = FragmentToRelationMap { in_map, out_map };
345        Ok(Json(map))
346    }
347
348    /// Provides a hierarchy of relation ids to fragments to actors.
349    pub async fn get_relation_id_infos(
350        Extension(srv): Extension<Service>,
351    ) -> Result<Json<RelationIdInfos>> {
352        let table_fragments = srv
353            .metadata_manager
354            .catalog_controller
355            .table_fragments()
356            .await
357            .map_err(err)?;
358        let mut map = HashMap::new();
359        for (id, tf) in table_fragments {
360            let mut fragment_id_to_actor_ids = HashMap::new();
361            for (fragment_id, fragment) in &tf.fragments {
362                let actor_ids = fragment.actors.iter().map(|a| a.actor_id).collect_vec();
363                fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
364            }
365            map.insert(
366                id as u32,
367                FragmentIdToActorIdMap {
368                    map: fragment_id_to_actor_ids,
369                },
370            );
371        }
372        let relation_id_infos = RelationIdInfos { map };
373
374        Ok(Json(relation_id_infos))
375    }
376
377    pub async fn list_fragments_by_job_id(
378        Extension(srv): Extension<Service>,
379        Path(job_id): Path<u32>,
380    ) -> Result<Json<PbTableFragments>> {
381        let table_id = TableId::new(job_id);
382        let table_fragments = srv
383            .metadata_manager
384            .get_job_fragments_by_id(&table_id)
385            .await
386            .map_err(err)?;
387        let upstream_fragments = srv
388            .metadata_manager
389            .catalog_controller
390            .upstream_fragments(table_fragments.fragment_ids())
391            .await
392            .map_err(err)?;
393        let dispatchers = srv
394            .metadata_manager
395            .catalog_controller
396            .get_fragment_actor_dispatchers(
397                table_fragments.fragment_ids().map(|id| id as _).collect(),
398            )
399            .await
400            .map_err(err)?;
401        Ok(Json(
402            table_fragments.to_protobuf(&upstream_fragments, &dispatchers),
403        ))
404    }
405
406    pub async fn list_users(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbUserInfo>>> {
407        let users = srv
408            .metadata_manager
409            .catalog_controller
410            .list_users()
411            .await
412            .map_err(err)?;
413
414        Ok(Json(users))
415    }
416
417    pub async fn list_databases(
418        Extension(srv): Extension<Service>,
419    ) -> Result<Json<Vec<PbDatabase>>> {
420        let databases = srv
421            .metadata_manager
422            .catalog_controller
423            .list_databases()
424            .await
425            .map_err(err)?;
426
427        Ok(Json(databases))
428    }
429
430    pub async fn list_schemas(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbSchema>>> {
431        let schemas = srv
432            .metadata_manager
433            .catalog_controller
434            .list_schemas()
435            .await
436            .map_err(err)?;
437
438        Ok(Json(schemas))
439    }
440
441    pub async fn list_object_dependencies(
442        Extension(srv): Extension<Service>,
443    ) -> Result<Json<Vec<PbObjectDependencies>>> {
444        let object_dependencies = srv
445            .metadata_manager
446            .catalog_controller
447            .list_all_object_dependencies()
448            .await
449            .map_err(err)?;
450
451        Ok(Json(object_dependencies))
452    }
453
454    #[derive(Debug, Deserialize)]
455    pub struct AwaitTreeDumpParams {
456        #[serde(default = "await_tree_default_format")]
457        format: String,
458    }
459
460    impl AwaitTreeDumpParams {
461        /// Parse the `format` parameter to [`ActorTracesFormat`].
462        pub fn actor_traces_format(&self) -> Result<ActorTracesFormat> {
463            Ok(match self.format.as_str() {
464                "text" => ActorTracesFormat::Text,
465                "json" => ActorTracesFormat::Json,
466                _ => {
467                    return Err(err(anyhow!(
468                        "Unsupported format `{}`, only `text` and `json` are supported for now",
469                        self.format
470                    )));
471                }
472            })
473        }
474    }
475
476    fn await_tree_default_format() -> String {
477        // In dashboard, await tree is usually for engineer to debug, so we use human-readable text format by default here.
478        "text".to_owned()
479    }
480
481    pub async fn dump_await_tree_all(
482        Query(params): Query<AwaitTreeDumpParams>,
483        Extension(srv): Extension<Service>,
484    ) -> Result<Json<StackTraceResponse>> {
485        let actor_traces_format = params.actor_traces_format()?;
486
487        let res = dump_cluster_await_tree(
488            &srv.metadata_manager,
489            &srv.await_tree_reg,
490            actor_traces_format,
491        )
492        .await
493        .map_err(err)?;
494
495        Ok(res.into())
496    }
497
498    pub async fn dump_await_tree(
499        Path(worker_id): Path<WorkerId>,
500        Query(params): Query<AwaitTreeDumpParams>,
501        Extension(srv): Extension<Service>,
502    ) -> Result<Json<StackTraceResponse>> {
503        let actor_traces_format = params.actor_traces_format()?;
504
505        let worker_node = srv
506            .metadata_manager
507            .get_worker_by_id(worker_id)
508            .await
509            .map_err(err)?
510            .context("worker node not found")
511            .map_err(err)?;
512
513        let res = dump_worker_node_await_tree(std::iter::once(&worker_node), actor_traces_format)
514            .await
515            .map_err(err)?;
516
517        Ok(res.into())
518    }
519
520    pub async fn heap_profile(
521        Path(worker_id): Path<WorkerId>,
522        Extension(srv): Extension<Service>,
523    ) -> Result<Json<HeapProfilingResponse>> {
524        let worker_node = srv
525            .metadata_manager
526            .get_worker_by_id(worker_id)
527            .await
528            .map_err(err)?
529            .context("worker node not found")
530            .map_err(err)?;
531
532        let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
533
534        let result = client.heap_profile("".to_owned()).await.map_err(err)?;
535
536        Ok(result.into())
537    }
538
539    pub async fn list_heap_profile(
540        Path(worker_id): Path<WorkerId>,
541        Extension(srv): Extension<Service>,
542    ) -> Result<Json<ListHeapProfilingResponse>> {
543        let worker_node = srv
544            .metadata_manager
545            .get_worker_by_id(worker_id)
546            .await
547            .map_err(err)?
548            .context("worker node not found")
549            .map_err(err)?;
550
551        let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
552
553        let result = client.list_heap_profile().await.map_err(err)?;
554        Ok(result.into())
555    }
556
557    pub async fn analyze_heap(
558        Path((worker_id, file_path)): Path<(WorkerId, String)>,
559        Extension(srv): Extension<Service>,
560    ) -> Result<Response> {
561        let file_path =
562            String::from_utf8(base64_url::decode(&file_path).map_err(err)?).map_err(err)?;
563
564        let file_name = FilePath::new(&file_path)
565            .file_name()
566            .unwrap()
567            .to_string_lossy()
568            .to_string();
569
570        let collapsed_file_name = format!("{}.{}", file_name, COLLAPSED_SUFFIX);
571
572        let worker_node = srv
573            .metadata_manager
574            .get_worker_by_id(worker_id)
575            .await
576            .map_err(err)?
577            .context("worker node not found")
578            .map_err(err)?;
579
580        let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
581
582        let collapsed_bin = client
583            .analyze_heap(file_path.clone())
584            .await
585            .map_err(err)?
586            .result;
587        let collapsed_str = String::from_utf8_lossy(&collapsed_bin).to_string();
588
589        let response = Response::builder()
590            .header("Content-Type", "application/octet-stream")
591            .header("Content-Disposition", collapsed_file_name)
592            .body(collapsed_str.into());
593
594        response.map_err(err)
595    }
596
597    #[derive(Debug, Deserialize)]
598    pub struct DiagnoseParams {
599        #[serde(default = "await_tree_default_format")]
600        actor_traces_format: String,
601    }
602
603    pub async fn diagnose(
604        Query(params): Query<DiagnoseParams>,
605        Extension(srv): Extension<Service>,
606    ) -> Result<String> {
607        let actor_traces_format = match params.actor_traces_format.as_str() {
608            "text" => ActorTracesFormat::Text,
609            "json" => ActorTracesFormat::Json,
610            _ => {
611                return Err(err(anyhow!(
612                    "Unsupported actor_traces_format `{}`, only `text` and `json` are supported for now",
613                    params.actor_traces_format
614                )));
615            }
616        };
617        Ok(srv.diagnose_command.report(actor_traces_format).await)
618    }
619
620    /// NOTE(kwannoel): Although we fetch the BP for the entire graph via this API,
621    /// the workload should be reasonable.
622    /// In most cases, we can safely assume each node has most 2 outgoing edges (e.g. join).
623    /// In such a scenario, the number of edges is linear to the number of nodes.
624    /// So the workload is proportional to the relation id graph we fetch in `get_relation_id_infos`.
625    pub async fn get_streaming_stats(
626        Extension(srv): Extension<Service>,
627    ) -> Result<Json<GetStreamingStatsResponse>> {
628        let worker_nodes = srv
629            .metadata_manager
630            .list_active_streaming_compute_nodes()
631            .await
632            .map_err(err)?;
633
634        let mut futures = Vec::new();
635
636        for worker_node in worker_nodes {
637            let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
638            let client = Arc::new(client);
639            let fut = async move {
640                let result = client.get_streaming_stats().await.map_err(err)?;
641                Ok::<_, DashboardError>(result)
642            };
643            futures.push(fut);
644        }
645        let results = join_all(futures).await;
646
647        let mut all = GetStreamingStatsResponse::default();
648
649        for result in results {
650            let result = result
651                .map_err(|_| anyhow!("Failed to get back pressure"))
652                .map_err(err)?;
653
654            // Aggregate fragment_stats
655            for (fragment_id, fragment_stats) in result.fragment_stats {
656                if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
657                    s.actor_count += fragment_stats.actor_count;
658                    s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
659                } else {
660                    all.fragment_stats.insert(fragment_id, fragment_stats);
661                }
662            }
663
664            // Aggregate relation_stats
665            for (relation_id, relation_stats) in result.relation_stats {
666                if let Some(s) = all.relation_stats.get_mut(&relation_id) {
667                    s.actor_count += relation_stats.actor_count;
668                    s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
669                } else {
670                    all.relation_stats.insert(relation_id, relation_stats);
671                }
672            }
673
674            // Aggregate channel_stats
675            for (key, channel_stats) in result.channel_stats {
676                if let Some(s) = all.channel_stats.get_mut(&key) {
677                    s.actor_count += channel_stats.actor_count;
678                    s.output_blocking_duration += channel_stats.output_blocking_duration;
679                    s.recv_row_count += channel_stats.recv_row_count;
680                    s.send_row_count += channel_stats.send_row_count;
681                } else {
682                    all.channel_stats.insert(key, channel_stats);
683                }
684            }
685        }
686
687        Ok(all.into())
688    }
689
690    pub async fn get_version(Extension(_srv): Extension<Service>) -> Result<Json<String>> {
691        Ok(Json(risingwave_common::current_cluster_version()))
692    }
693}
694
695impl DashboardService {
696    pub async fn serve(self) -> Result<()> {
697        use handlers::*;
698        let srv = Arc::new(self);
699
700        let cors_layer = CorsLayer::new()
701            .allow_origin(cors::Any)
702            .allow_methods(vec![Method::GET]);
703
704        let api_router = Router::new()
705            .route("/version", get(get_version))
706            .route("/clusters/:ty", get(list_clusters))
707            .route("/streaming_jobs", get(list_streaming_jobs))
708            .route("/fragments/job_id/:job_id", get(list_fragments_by_job_id))
709            .route("/relation_id_infos", get(get_relation_id_infos))
710            .route(
711                "/fragment_to_relation_map",
712                get(get_fragment_to_relation_map),
713            )
714            .route("/views", get(list_views))
715            .route("/functions", get(list_functions))
716            .route("/materialized_views", get(list_materialized_views))
717            .route("/tables", get(list_tables))
718            .route("/indexes", get(list_index_tables))
719            .route("/index_items", get(list_indexes))
720            .route("/subscriptions", get(list_subscription))
721            .route("/internal_tables", get(list_internal_tables))
722            .route("/sources", get(list_sources))
723            .route("/sinks", get(list_sinks))
724            .route("/users", get(list_users))
725            .route("/databases", get(list_databases))
726            .route("/schemas", get(list_schemas))
727            .route("/object_dependencies", get(list_object_dependencies))
728            .route("/metrics/cluster", get(prometheus::list_prometheus_cluster))
729            .route("/metrics/streaming_stats", get(get_streaming_stats))
730            // /monitor/await_tree/{worker_id}/?format={text or json}
731            .route("/monitor/await_tree/:worker_id", get(dump_await_tree))
732            // /monitor/await_tree/?format={text or json}
733            .route("/monitor/await_tree/", get(dump_await_tree_all))
734            .route("/monitor/dump_heap_profile/:worker_id", get(heap_profile))
735            .route(
736                "/monitor/list_heap_profile/:worker_id",
737                get(list_heap_profile),
738            )
739            .route("/monitor/analyze/:worker_id/*path", get(analyze_heap))
740            // /monitor/diagnose/?format={text or json}
741            .route("/monitor/diagnose/", get(diagnose))
742            .layer(
743                ServiceBuilder::new()
744                    .layer(AddExtensionLayer::new(srv.clone()))
745                    .into_inner(),
746            )
747            .layer(cors_layer);
748
749        let trace_ui_router = otlp_embedded::ui_app(srv.trace_state.clone(), "/trace/");
750        let dashboard_router = risingwave_meta_dashboard::router();
751
752        let app = Router::new()
753            .fallback_service(dashboard_router)
754            .nest("/api", api_router)
755            .nest("/trace", trace_ui_router)
756            .layer(CompressionLayer::new());
757
758        let listener = TcpListener::bind(&srv.dashboard_addr)
759            .await
760            .context("failed to bind dashboard address")?;
761        axum::serve(listener, app)
762            .await
763            .context("failed to serve dashboard service")?;
764
765        Ok(())
766    }
767}