risingwave_meta/dashboard/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prometheus;
16
17use std::net::SocketAddr;
18use std::path::Path as FilePath;
19use std::sync::Arc;
20
21use anyhow::{Context as _, Result, anyhow};
22use axum::Router;
23use axum::extract::{Extension, Path};
24use axum::http::{Method, StatusCode};
25use axum::response::{IntoResponse, Response};
26use axum::routing::get;
27use risingwave_rpc_client::ComputeClientPool;
28use tokio::net::TcpListener;
29use tower::ServiceBuilder;
30use tower_http::add_extension::AddExtensionLayer;
31use tower_http::compression::CompressionLayer;
32use tower_http::cors::{self, CorsLayer};
33
34use crate::hummock::HummockManagerRef;
35use crate::manager::MetadataManager;
36use crate::manager::diagnose::DiagnoseCommandRef;
37
38#[derive(Clone)]
39pub struct DashboardService {
40    pub await_tree_reg: await_tree::Registry,
41    pub dashboard_addr: SocketAddr,
42    pub prometheus_client: Option<prometheus_http_query::Client>,
43    pub prometheus_selector: String,
44    pub metadata_manager: MetadataManager,
45    pub hummock_manager: HummockManagerRef,
46    pub compute_clients: ComputeClientPool,
47    pub diagnose_command: DiagnoseCommandRef,
48    pub trace_state: otlp_embedded::StateRef,
49}
50
51pub type Service = Arc<DashboardService>;
52
53pub(super) mod handlers {
54    use std::cmp::min;
55    use std::collections::HashMap;
56
57    use anyhow::Context;
58    use axum::Json;
59    use axum::extract::Query;
60    use futures::future::join_all;
61    use itertools::Itertools;
62    use risingwave_common::catalog::{FragmentTypeFlag, TableId};
63    use risingwave_common_heap_profiling::COLLAPSED_SUFFIX;
64    use risingwave_meta_model::WorkerId;
65    use risingwave_pb::catalog::table::TableType;
66    use risingwave_pb::catalog::{
67        Index, PbDatabase, PbSchema, Sink, Source, Subscription, Table, View,
68    };
69    use risingwave_pb::common::{WorkerNode, WorkerType};
70    use risingwave_pb::hummock::TableStats;
71    use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
72    use risingwave_pb::meta::{
73        ActorIds, FragmentIdToActorIdMap, FragmentToRelationMap, PbTableFragments, RelationIdInfos,
74    };
75    use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
76    use risingwave_pb::monitor_service::{
77        GetStreamingStatsResponse, HeapProfilingResponse, ListHeapProfilingResponse,
78        StackTraceResponse,
79    };
80    use risingwave_pb::user::PbUserInfo;
81    use serde::{Deserialize, Serialize};
82    use serde_json::json;
83    use thiserror_ext::AsReport;
84
85    use super::*;
86    use crate::controller::fragment::StreamingJobInfo;
87    use crate::rpc::await_tree::{dump_cluster_await_tree, dump_worker_node_await_tree};
88
89    #[derive(Serialize)]
90    pub struct TableWithStats {
91        #[serde(flatten)]
92        pub table: Table,
93        pub total_size_bytes: i64,
94        pub total_key_count: i64,
95        pub total_key_size: i64,
96        pub total_value_size: i64,
97        pub compressed_size: u64,
98    }
99
100    impl TableWithStats {
101        pub fn from_table_and_stats(table: Table, stats: Option<&TableStats>) -> Self {
102            match stats {
103                Some(stats) => Self {
104                    total_size_bytes: stats.total_key_size + stats.total_value_size,
105                    total_key_count: stats.total_key_count,
106                    total_key_size: stats.total_key_size,
107                    total_value_size: stats.total_value_size,
108                    compressed_size: stats.total_compressed_size,
109                    table,
110                },
111                None => Self {
112                    total_size_bytes: 0,
113                    total_key_count: 0,
114                    total_key_size: 0,
115                    total_value_size: 0,
116                    compressed_size: 0,
117                    table,
118                },
119            }
120        }
121    }
122
123    pub struct DashboardError(anyhow::Error);
124    pub type Result<T> = std::result::Result<T, DashboardError>;
125
126    pub fn err(err: impl Into<anyhow::Error>) -> DashboardError {
127        DashboardError(err.into())
128    }
129
130    impl From<anyhow::Error> for DashboardError {
131        fn from(value: anyhow::Error) -> Self {
132            DashboardError(value)
133        }
134    }
135
136    impl IntoResponse for DashboardError {
137        fn into_response(self) -> axum::response::Response {
138            let mut resp = Json(json!({
139                "error": self.0.to_report_string(),
140            }))
141            .into_response();
142            *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
143            resp
144        }
145    }
146
147    pub async fn list_clusters(
148        Path(ty): Path<i32>,
149        Extension(srv): Extension<Service>,
150    ) -> Result<Json<Vec<WorkerNode>>> {
151        let worker_type = WorkerType::try_from(ty)
152            .map_err(|_| anyhow!("invalid worker type"))
153            .map_err(err)?;
154        let mut result = srv
155            .metadata_manager
156            .list_worker_node(Some(worker_type), None)
157            .await
158            .map_err(err)?;
159        result.sort_unstable_by_key(|n| n.id);
160        Ok(result.into())
161    }
162
163    async fn list_table_catalogs_inner(
164        metadata_manager: &MetadataManager,
165        hummock_manager: &HummockManagerRef,
166        table_type: TableType,
167    ) -> Result<Json<Vec<TableWithStats>>> {
168        let tables = metadata_manager
169            .catalog_controller
170            .list_tables_by_type(table_type.into())
171            .await
172            .map_err(err)?;
173
174        // Get table statistics from hummock manager
175        let version_stats = hummock_manager.get_version_stats().await;
176
177        let tables_with_stats = tables
178            .into_iter()
179            .map(|table| {
180                let stats = version_stats.table_stats.get(&table.id);
181                TableWithStats::from_table_and_stats(table, stats)
182            })
183            .collect();
184
185        Ok(Json(tables_with_stats))
186    }
187
188    pub async fn list_materialized_views(
189        Extension(srv): Extension<Service>,
190    ) -> Result<Json<Vec<TableWithStats>>> {
191        list_table_catalogs_inner(
192            &srv.metadata_manager,
193            &srv.hummock_manager,
194            TableType::MaterializedView,
195        )
196        .await
197    }
198
199    pub async fn list_tables(
200        Extension(srv): Extension<Service>,
201    ) -> Result<Json<Vec<TableWithStats>>> {
202        list_table_catalogs_inner(
203            &srv.metadata_manager,
204            &srv.hummock_manager,
205            TableType::Table,
206        )
207        .await
208    }
209
210    pub async fn list_index_tables(
211        Extension(srv): Extension<Service>,
212    ) -> Result<Json<Vec<TableWithStats>>> {
213        list_table_catalogs_inner(
214            &srv.metadata_manager,
215            &srv.hummock_manager,
216            TableType::Index,
217        )
218        .await
219    }
220
221    pub async fn list_indexes(Extension(srv): Extension<Service>) -> Result<Json<Vec<Index>>> {
222        let indexes = srv
223            .metadata_manager
224            .catalog_controller
225            .list_indexes()
226            .await
227            .map_err(err)?;
228
229        Ok(Json(indexes))
230    }
231
232    pub async fn list_subscription(
233        Extension(srv): Extension<Service>,
234    ) -> Result<Json<Vec<Subscription>>> {
235        let subscriptions = srv
236            .metadata_manager
237            .catalog_controller
238            .list_subscriptions()
239            .await
240            .map_err(err)?;
241
242        Ok(Json(subscriptions))
243    }
244
245    pub async fn list_internal_tables(
246        Extension(srv): Extension<Service>,
247    ) -> Result<Json<Vec<TableWithStats>>> {
248        list_table_catalogs_inner(
249            &srv.metadata_manager,
250            &srv.hummock_manager,
251            TableType::Internal,
252        )
253        .await
254    }
255
256    pub async fn list_sources(Extension(srv): Extension<Service>) -> Result<Json<Vec<Source>>> {
257        let sources = srv.metadata_manager.list_sources().await.map_err(err)?;
258
259        Ok(Json(sources))
260    }
261
262    pub async fn list_sinks(Extension(srv): Extension<Service>) -> Result<Json<Vec<Sink>>> {
263        let sinks = srv
264            .metadata_manager
265            .catalog_controller
266            .list_sinks()
267            .await
268            .map_err(err)?;
269
270        Ok(Json(sinks))
271    }
272
273    pub async fn list_views(Extension(srv): Extension<Service>) -> Result<Json<Vec<View>>> {
274        let views = srv
275            .metadata_manager
276            .catalog_controller
277            .list_views()
278            .await
279            .map_err(err)?;
280
281        Ok(Json(views))
282    }
283
284    pub async fn list_streaming_jobs(
285        Extension(srv): Extension<Service>,
286    ) -> Result<Json<Vec<StreamingJobInfo>>> {
287        let streaming_jobs = srv
288            .metadata_manager
289            .catalog_controller
290            .list_streaming_job_infos()
291            .await
292            .map_err(err)?;
293
294        Ok(Json(streaming_jobs))
295    }
296
297    /// In the ddl backpressure graph, we want to compute the backpressure between relations.
298    /// So we need to know which are the fragments which are connected to external relations.
299    /// These fragments form the vertices of the graph.
300    /// We can get collection of backpressure values, keyed by vertex_id-vertex_id.
301    /// This function will return a map of fragment vertex id to relation id.
302    /// We can convert `fragment_id-fragment_id` to `relation_id-relation_id` using that.
303    /// Finally, we have a map of `relation_id-relation_id` to backpressure values.
304    pub async fn get_fragment_to_relation_map(
305        Extension(srv): Extension<Service>,
306    ) -> Result<Json<FragmentToRelationMap>> {
307        let table_fragments = srv
308            .metadata_manager
309            .catalog_controller
310            .table_fragments()
311            .await
312            .map_err(err)?;
313        let mut in_map = HashMap::new();
314        let mut out_map = HashMap::new();
315        for (relation_id, tf) in table_fragments {
316            for (fragment_id, fragment) in &tf.fragments {
317                if fragment
318                    .fragment_type_mask
319                    .contains(FragmentTypeFlag::StreamScan)
320                {
321                    in_map.insert(*fragment_id, relation_id as u32);
322                }
323                if fragment
324                    .fragment_type_mask
325                    .contains(FragmentTypeFlag::Mview)
326                {
327                    out_map.insert(*fragment_id, relation_id as u32);
328                }
329            }
330        }
331        let map = FragmentToRelationMap { in_map, out_map };
332        Ok(Json(map))
333    }
334
335    /// Provides a hierarchy of relation ids to fragments to actors.
336    pub async fn get_relation_id_infos(
337        Extension(srv): Extension<Service>,
338    ) -> Result<Json<RelationIdInfos>> {
339        let table_fragments = srv
340            .metadata_manager
341            .catalog_controller
342            .table_fragments()
343            .await
344            .map_err(err)?;
345        let mut map = HashMap::new();
346        for (id, tf) in table_fragments {
347            let mut fragment_id_to_actor_ids = HashMap::new();
348            for (fragment_id, fragment) in &tf.fragments {
349                let actor_ids = fragment.actors.iter().map(|a| a.actor_id).collect_vec();
350                fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
351            }
352            map.insert(
353                id as u32,
354                FragmentIdToActorIdMap {
355                    map: fragment_id_to_actor_ids,
356                },
357            );
358        }
359        let relation_id_infos = RelationIdInfos { map };
360
361        Ok(Json(relation_id_infos))
362    }
363
364    pub async fn list_fragments_by_job_id(
365        Extension(srv): Extension<Service>,
366        Path(job_id): Path<u32>,
367    ) -> Result<Json<PbTableFragments>> {
368        let table_id = TableId::new(job_id);
369        let table_fragments = srv
370            .metadata_manager
371            .get_job_fragments_by_id(&table_id)
372            .await
373            .map_err(err)?;
374        let upstream_fragments = srv
375            .metadata_manager
376            .catalog_controller
377            .upstream_fragments(table_fragments.fragment_ids())
378            .await
379            .map_err(err)?;
380        let dispatchers = srv
381            .metadata_manager
382            .catalog_controller
383            .get_fragment_actor_dispatchers(
384                table_fragments.fragment_ids().map(|id| id as _).collect(),
385            )
386            .await
387            .map_err(err)?;
388        Ok(Json(
389            table_fragments.to_protobuf(&upstream_fragments, &dispatchers),
390        ))
391    }
392
393    pub async fn list_users(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbUserInfo>>> {
394        let users = srv
395            .metadata_manager
396            .catalog_controller
397            .list_users()
398            .await
399            .map_err(err)?;
400
401        Ok(Json(users))
402    }
403
404    pub async fn list_databases(
405        Extension(srv): Extension<Service>,
406    ) -> Result<Json<Vec<PbDatabase>>> {
407        let databases = srv
408            .metadata_manager
409            .catalog_controller
410            .list_databases()
411            .await
412            .map_err(err)?;
413
414        Ok(Json(databases))
415    }
416
417    pub async fn list_schemas(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbSchema>>> {
418        let schemas = srv
419            .metadata_manager
420            .catalog_controller
421            .list_schemas()
422            .await
423            .map_err(err)?;
424
425        Ok(Json(schemas))
426    }
427
428    pub async fn list_object_dependencies(
429        Extension(srv): Extension<Service>,
430    ) -> Result<Json<Vec<PbObjectDependencies>>> {
431        let object_dependencies = srv
432            .metadata_manager
433            .catalog_controller
434            .list_all_object_dependencies()
435            .await
436            .map_err(err)?;
437
438        Ok(Json(object_dependencies))
439    }
440
441    #[derive(Debug, Deserialize)]
442    pub struct AwaitTreeDumpParams {
443        #[serde(default = "await_tree_default_format")]
444        format: String,
445    }
446
447    impl AwaitTreeDumpParams {
448        /// Parse the `format` parameter to [`ActorTracesFormat`].
449        pub fn actor_traces_format(&self) -> Result<ActorTracesFormat> {
450            Ok(match self.format.as_str() {
451                "text" => ActorTracesFormat::Text,
452                "json" => ActorTracesFormat::Json,
453                _ => {
454                    return Err(err(anyhow!(
455                        "Unsupported format `{}`, only `text` and `json` are supported for now",
456                        self.format
457                    )));
458                }
459            })
460        }
461    }
462
463    fn await_tree_default_format() -> String {
464        // In dashboard, await tree is usually for engineer to debug, so we use human-readable text format by default here.
465        "text".to_owned()
466    }
467
468    pub async fn dump_await_tree_all(
469        Query(params): Query<AwaitTreeDumpParams>,
470        Extension(srv): Extension<Service>,
471    ) -> Result<Json<StackTraceResponse>> {
472        let actor_traces_format = params.actor_traces_format()?;
473
474        let res = dump_cluster_await_tree(
475            &srv.metadata_manager,
476            &srv.await_tree_reg,
477            actor_traces_format,
478        )
479        .await
480        .map_err(err)?;
481
482        Ok(res.into())
483    }
484
485    pub async fn dump_await_tree(
486        Path(worker_id): Path<WorkerId>,
487        Query(params): Query<AwaitTreeDumpParams>,
488        Extension(srv): Extension<Service>,
489    ) -> Result<Json<StackTraceResponse>> {
490        let actor_traces_format = params.actor_traces_format()?;
491
492        let worker_node = srv
493            .metadata_manager
494            .get_worker_by_id(worker_id)
495            .await
496            .map_err(err)?
497            .context("worker node not found")
498            .map_err(err)?;
499
500        let res = dump_worker_node_await_tree(std::iter::once(&worker_node), actor_traces_format)
501            .await
502            .map_err(err)?;
503
504        Ok(res.into())
505    }
506
507    pub async fn heap_profile(
508        Path(worker_id): Path<WorkerId>,
509        Extension(srv): Extension<Service>,
510    ) -> Result<Json<HeapProfilingResponse>> {
511        let worker_node = srv
512            .metadata_manager
513            .get_worker_by_id(worker_id)
514            .await
515            .map_err(err)?
516            .context("worker node not found")
517            .map_err(err)?;
518
519        let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
520
521        let result = client.heap_profile("".to_owned()).await.map_err(err)?;
522
523        Ok(result.into())
524    }
525
526    pub async fn list_heap_profile(
527        Path(worker_id): Path<WorkerId>,
528        Extension(srv): Extension<Service>,
529    ) -> Result<Json<ListHeapProfilingResponse>> {
530        let worker_node = srv
531            .metadata_manager
532            .get_worker_by_id(worker_id)
533            .await
534            .map_err(err)?
535            .context("worker node not found")
536            .map_err(err)?;
537
538        let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
539
540        let result = client.list_heap_profile().await.map_err(err)?;
541        Ok(result.into())
542    }
543
544    pub async fn analyze_heap(
545        Path((worker_id, file_path)): Path<(WorkerId, String)>,
546        Extension(srv): Extension<Service>,
547    ) -> Result<Response> {
548        let file_path =
549            String::from_utf8(base64_url::decode(&file_path).map_err(err)?).map_err(err)?;
550
551        let file_name = FilePath::new(&file_path)
552            .file_name()
553            .unwrap()
554            .to_string_lossy()
555            .to_string();
556
557        let collapsed_file_name = format!("{}.{}", file_name, COLLAPSED_SUFFIX);
558
559        let worker_node = srv
560            .metadata_manager
561            .get_worker_by_id(worker_id)
562            .await
563            .map_err(err)?
564            .context("worker node not found")
565            .map_err(err)?;
566
567        let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
568
569        let collapsed_bin = client
570            .analyze_heap(file_path.clone())
571            .await
572            .map_err(err)?
573            .result;
574        let collapsed_str = String::from_utf8_lossy(&collapsed_bin).to_string();
575
576        let response = Response::builder()
577            .header("Content-Type", "application/octet-stream")
578            .header("Content-Disposition", collapsed_file_name)
579            .body(collapsed_str.into());
580
581        response.map_err(err)
582    }
583
584    #[derive(Debug, Deserialize)]
585    pub struct DiagnoseParams {
586        #[serde(default = "await_tree_default_format")]
587        actor_traces_format: String,
588    }
589
590    pub async fn diagnose(
591        Query(params): Query<DiagnoseParams>,
592        Extension(srv): Extension<Service>,
593    ) -> Result<String> {
594        let actor_traces_format = match params.actor_traces_format.as_str() {
595            "text" => ActorTracesFormat::Text,
596            "json" => ActorTracesFormat::Json,
597            _ => {
598                return Err(err(anyhow!(
599                    "Unsupported actor_traces_format `{}`, only `text` and `json` are supported for now",
600                    params.actor_traces_format
601                )));
602            }
603        };
604        Ok(srv.diagnose_command.report(actor_traces_format).await)
605    }
606
607    /// NOTE(kwannoel): Although we fetch the BP for the entire graph via this API,
608    /// the workload should be reasonable.
609    /// In most cases, we can safely assume each node has most 2 outgoing edges (e.g. join).
610    /// In such a scenario, the number of edges is linear to the number of nodes.
611    /// So the workload is proportional to the relation id graph we fetch in `get_relation_id_infos`.
612    pub async fn get_streaming_stats(
613        Extension(srv): Extension<Service>,
614    ) -> Result<Json<GetStreamingStatsResponse>> {
615        let worker_nodes = srv
616            .metadata_manager
617            .list_active_streaming_compute_nodes()
618            .await
619            .map_err(err)?;
620
621        let mut futures = Vec::new();
622
623        for worker_node in worker_nodes {
624            let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
625            let client = Arc::new(client);
626            let fut = async move {
627                let result = client.get_streaming_stats().await.map_err(err)?;
628                Ok::<_, DashboardError>(result)
629            };
630            futures.push(fut);
631        }
632        let results = join_all(futures).await;
633
634        let mut all = GetStreamingStatsResponse::default();
635
636        for result in results {
637            let result = result
638                .map_err(|_| anyhow!("Failed to get back pressure"))
639                .map_err(err)?;
640
641            // Aggregate fragment_stats
642            for (fragment_id, fragment_stats) in result.fragment_stats {
643                if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
644                    s.actor_count += fragment_stats.actor_count;
645                    s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
646                } else {
647                    all.fragment_stats.insert(fragment_id, fragment_stats);
648                }
649            }
650
651            // Aggregate relation_stats
652            for (relation_id, relation_stats) in result.relation_stats {
653                if let Some(s) = all.relation_stats.get_mut(&relation_id) {
654                    s.actor_count += relation_stats.actor_count;
655                    s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
656                } else {
657                    all.relation_stats.insert(relation_id, relation_stats);
658                }
659            }
660
661            // Aggregate channel_stats
662            for (key, channel_stats) in result.channel_stats {
663                if let Some(s) = all.channel_stats.get_mut(&key) {
664                    s.actor_count += channel_stats.actor_count;
665                    s.output_blocking_duration += channel_stats.output_blocking_duration;
666                    s.recv_row_count += channel_stats.recv_row_count;
667                    s.send_row_count += channel_stats.send_row_count;
668                } else {
669                    all.channel_stats.insert(key, channel_stats);
670                }
671            }
672        }
673
674        Ok(all.into())
675    }
676
677    pub async fn get_version(Extension(_srv): Extension<Service>) -> Result<Json<String>> {
678        Ok(Json(risingwave_common::current_cluster_version()))
679    }
680}
681
682impl DashboardService {
683    pub async fn serve(self) -> Result<()> {
684        use handlers::*;
685        let srv = Arc::new(self);
686
687        let cors_layer = CorsLayer::new()
688            .allow_origin(cors::Any)
689            .allow_methods(vec![Method::GET]);
690
691        let api_router = Router::new()
692            .route("/version", get(get_version))
693            .route("/clusters/:ty", get(list_clusters))
694            .route("/streaming_jobs", get(list_streaming_jobs))
695            .route("/fragments/job_id/:job_id", get(list_fragments_by_job_id))
696            .route("/relation_id_infos", get(get_relation_id_infos))
697            .route(
698                "/fragment_to_relation_map",
699                get(get_fragment_to_relation_map),
700            )
701            .route("/views", get(list_views))
702            .route("/materialized_views", get(list_materialized_views))
703            .route("/tables", get(list_tables))
704            .route("/indexes", get(list_index_tables))
705            .route("/index_items", get(list_indexes))
706            .route("/subscriptions", get(list_subscription))
707            .route("/internal_tables", get(list_internal_tables))
708            .route("/sources", get(list_sources))
709            .route("/sinks", get(list_sinks))
710            .route("/users", get(list_users))
711            .route("/databases", get(list_databases))
712            .route("/schemas", get(list_schemas))
713            .route("/object_dependencies", get(list_object_dependencies))
714            .route("/metrics/cluster", get(prometheus::list_prometheus_cluster))
715            .route("/metrics/streaming_stats", get(get_streaming_stats))
716            // /monitor/await_tree/{worker_id}/?format={text or json}
717            .route("/monitor/await_tree/:worker_id", get(dump_await_tree))
718            // /monitor/await_tree/?format={text or json}
719            .route("/monitor/await_tree/", get(dump_await_tree_all))
720            .route("/monitor/dump_heap_profile/:worker_id", get(heap_profile))
721            .route(
722                "/monitor/list_heap_profile/:worker_id",
723                get(list_heap_profile),
724            )
725            .route("/monitor/analyze/:worker_id/*path", get(analyze_heap))
726            // /monitor/diagnose/?format={text or json}
727            .route("/monitor/diagnose/", get(diagnose))
728            .layer(
729                ServiceBuilder::new()
730                    .layer(AddExtensionLayer::new(srv.clone()))
731                    .into_inner(),
732            )
733            .layer(cors_layer);
734
735        let trace_ui_router = otlp_embedded::ui_app(srv.trace_state.clone(), "/trace/");
736        let dashboard_router = risingwave_meta_dashboard::router();
737
738        let app = Router::new()
739            .fallback_service(dashboard_router)
740            .nest("/api", api_router)
741            .nest("/trace", trace_ui_router)
742            .layer(CompressionLayer::new());
743
744        let listener = TcpListener::bind(&srv.dashboard_addr)
745            .await
746            .context("failed to bind dashboard address")?;
747        axum::serve(listener, app)
748            .await
749            .context("failed to serve dashboard service")?;
750
751        Ok(())
752    }
753}