risingwave_meta/dashboard/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prometheus;
16
17use std::net::SocketAddr;
18use std::path::Path as FilePath;
19use std::sync::Arc;
20
21use anyhow::{Context as _, Result, anyhow};
22use axum::Router;
23use axum::extract::{Extension, Path};
24use axum::http::{Method, StatusCode};
25use axum::response::{IntoResponse, Response};
26use axum::routing::get;
27use risingwave_rpc_client::ComputeClientPool;
28use tokio::net::TcpListener;
29use tower::ServiceBuilder;
30use tower_http::add_extension::AddExtensionLayer;
31use tower_http::compression::CompressionLayer;
32use tower_http::cors::{self, CorsLayer};
33
34use crate::manager::MetadataManager;
35use crate::manager::diagnose::DiagnoseCommandRef;
36
37#[derive(Clone)]
38pub struct DashboardService {
39    pub await_tree_reg: await_tree::Registry,
40    pub dashboard_addr: SocketAddr,
41    pub prometheus_client: Option<prometheus_http_query::Client>,
42    pub prometheus_selector: String,
43    pub metadata_manager: MetadataManager,
44    pub compute_clients: ComputeClientPool,
45    pub diagnose_command: DiagnoseCommandRef,
46    pub trace_state: otlp_embedded::StateRef,
47}
48
49pub type Service = Arc<DashboardService>;
50
51pub(super) mod handlers {
52    use std::cmp::min;
53    use std::collections::HashMap;
54
55    use anyhow::Context;
56    use axum::Json;
57    use axum::extract::Query;
58    use futures::future::join_all;
59    use itertools::Itertools;
60    use risingwave_common::catalog::TableId;
61    use risingwave_common_heap_profiling::COLLAPSED_SUFFIX;
62    use risingwave_meta_model::WorkerId;
63    use risingwave_pb::catalog::table::TableType;
64    use risingwave_pb::catalog::{
65        Index, PbDatabase, PbSchema, Sink, Source, Subscription, Table, View,
66    };
67    use risingwave_pb::common::{WorkerNode, WorkerType};
68    use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
69    use risingwave_pb::meta::{
70        ActorIds, FragmentIdToActorIdMap, FragmentToRelationMap, PbTableFragments, RelationIdInfos,
71    };
72    use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
73    use risingwave_pb::monitor_service::{
74        GetStreamingStatsResponse, HeapProfilingResponse, ListHeapProfilingResponse,
75        StackTraceResponse,
76    };
77    use risingwave_pb::stream_plan::FragmentTypeFlag;
78    use risingwave_pb::user::PbUserInfo;
79    use serde::Deserialize;
80    use serde_json::json;
81    use thiserror_ext::AsReport;
82
83    use super::*;
84    use crate::controller::fragment::StreamingJobInfo;
85    use crate::rpc::await_tree::{dump_cluster_await_tree, dump_worker_node_await_tree};
86
87    pub struct DashboardError(anyhow::Error);
88    pub type Result<T> = std::result::Result<T, DashboardError>;
89
90    pub fn err(err: impl Into<anyhow::Error>) -> DashboardError {
91        DashboardError(err.into())
92    }
93
94    impl From<anyhow::Error> for DashboardError {
95        fn from(value: anyhow::Error) -> Self {
96            DashboardError(value)
97        }
98    }
99
100    impl IntoResponse for DashboardError {
101        fn into_response(self) -> axum::response::Response {
102            let mut resp = Json(json!({
103                "error": self.0.to_report_string(),
104            }))
105            .into_response();
106            *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
107            resp
108        }
109    }
110
111    pub async fn list_clusters(
112        Path(ty): Path<i32>,
113        Extension(srv): Extension<Service>,
114    ) -> Result<Json<Vec<WorkerNode>>> {
115        let worker_type = WorkerType::try_from(ty)
116            .map_err(|_| anyhow!("invalid worker type"))
117            .map_err(err)?;
118        let mut result = srv
119            .metadata_manager
120            .list_worker_node(Some(worker_type), None)
121            .await
122            .map_err(err)?;
123        result.sort_unstable_by_key(|n| n.id);
124        Ok(result.into())
125    }
126
127    async fn list_table_catalogs_inner(
128        metadata_manager: &MetadataManager,
129        table_type: TableType,
130    ) -> Result<Json<Vec<Table>>> {
131        let tables = metadata_manager
132            .catalog_controller
133            .list_tables_by_type(table_type.into())
134            .await
135            .map_err(err)?;
136
137        Ok(Json(tables))
138    }
139
140    pub async fn list_materialized_views(
141        Extension(srv): Extension<Service>,
142    ) -> Result<Json<Vec<Table>>> {
143        list_table_catalogs_inner(&srv.metadata_manager, TableType::MaterializedView).await
144    }
145
146    pub async fn list_tables(Extension(srv): Extension<Service>) -> Result<Json<Vec<Table>>> {
147        list_table_catalogs_inner(&srv.metadata_manager, TableType::Table).await
148    }
149
150    pub async fn list_index_tables(Extension(srv): Extension<Service>) -> Result<Json<Vec<Table>>> {
151        list_table_catalogs_inner(&srv.metadata_manager, TableType::Index).await
152    }
153
154    pub async fn list_indexes(Extension(srv): Extension<Service>) -> Result<Json<Vec<Index>>> {
155        let indexes = srv
156            .metadata_manager
157            .catalog_controller
158            .list_indexes()
159            .await
160            .map_err(err)?;
161
162        Ok(Json(indexes))
163    }
164
165    pub async fn list_subscription(
166        Extension(srv): Extension<Service>,
167    ) -> Result<Json<Vec<Subscription>>> {
168        let subscriptions = srv
169            .metadata_manager
170            .catalog_controller
171            .list_subscriptions()
172            .await
173            .map_err(err)?;
174
175        Ok(Json(subscriptions))
176    }
177
178    pub async fn list_internal_tables(
179        Extension(srv): Extension<Service>,
180    ) -> Result<Json<Vec<Table>>> {
181        list_table_catalogs_inner(&srv.metadata_manager, TableType::Internal).await
182    }
183
184    pub async fn list_sources(Extension(srv): Extension<Service>) -> Result<Json<Vec<Source>>> {
185        let sources = srv.metadata_manager.list_sources().await.map_err(err)?;
186
187        Ok(Json(sources))
188    }
189
190    pub async fn list_sinks(Extension(srv): Extension<Service>) -> Result<Json<Vec<Sink>>> {
191        let sinks = srv
192            .metadata_manager
193            .catalog_controller
194            .list_sinks()
195            .await
196            .map_err(err)?;
197
198        Ok(Json(sinks))
199    }
200
201    pub async fn list_views(Extension(srv): Extension<Service>) -> Result<Json<Vec<View>>> {
202        let views = srv
203            .metadata_manager
204            .catalog_controller
205            .list_views()
206            .await
207            .map_err(err)?;
208
209        Ok(Json(views))
210    }
211
212    pub async fn list_streaming_jobs(
213        Extension(srv): Extension<Service>,
214    ) -> Result<Json<Vec<StreamingJobInfo>>> {
215        let streaming_jobs = srv
216            .metadata_manager
217            .catalog_controller
218            .list_streaming_job_infos()
219            .await
220            .map_err(err)?;
221
222        Ok(Json(streaming_jobs))
223    }
224
225    /// In the ddl backpressure graph, we want to compute the backpressure between relations.
226    /// So we need to know which are the fragments which are connected to external relations.
227    /// These fragments form the vertices of the graph.
228    /// We can get collection of backpressure values, keyed by vertex_id-vertex_id.
229    /// This function will return a map of fragment vertex id to relation id.
230    /// We can convert `fragment_id-fragment_id` to `relation_id-relation_id` using that.
231    /// Finally, we have a map of `relation_id-relation_id` to backpressure values.
232    pub async fn get_fragment_to_relation_map(
233        Extension(srv): Extension<Service>,
234    ) -> Result<Json<FragmentToRelationMap>> {
235        let table_fragments = srv
236            .metadata_manager
237            .catalog_controller
238            .table_fragments()
239            .await
240            .map_err(err)?;
241        let mut in_map = HashMap::new();
242        let mut out_map = HashMap::new();
243        for (relation_id, tf) in table_fragments {
244            for (fragment_id, fragment) in &tf.fragments {
245                if (fragment.fragment_type_mask & FragmentTypeFlag::StreamScan as u32) != 0 {
246                    in_map.insert(*fragment_id, relation_id as u32);
247                }
248                if (fragment.fragment_type_mask & FragmentTypeFlag::Mview as u32) != 0 {
249                    out_map.insert(*fragment_id, relation_id as u32);
250                }
251            }
252        }
253        let map = FragmentToRelationMap { in_map, out_map };
254        Ok(Json(map))
255    }
256
257    /// Provides a hierarchy of relation ids to fragments to actors.
258    pub async fn get_relation_id_infos(
259        Extension(srv): Extension<Service>,
260    ) -> Result<Json<RelationIdInfos>> {
261        let table_fragments = srv
262            .metadata_manager
263            .catalog_controller
264            .table_fragments()
265            .await
266            .map_err(err)?;
267        let mut map = HashMap::new();
268        for (id, tf) in table_fragments {
269            let mut fragment_id_to_actor_ids = HashMap::new();
270            for (fragment_id, fragment) in &tf.fragments {
271                let actor_ids = fragment.actors.iter().map(|a| a.actor_id).collect_vec();
272                fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
273            }
274            map.insert(
275                id as u32,
276                FragmentIdToActorIdMap {
277                    map: fragment_id_to_actor_ids,
278                },
279            );
280        }
281        let relation_id_infos = RelationIdInfos { map };
282
283        Ok(Json(relation_id_infos))
284    }
285
286    pub async fn list_fragments_by_job_id(
287        Extension(srv): Extension<Service>,
288        Path(job_id): Path<u32>,
289    ) -> Result<Json<PbTableFragments>> {
290        let table_id = TableId::new(job_id);
291        let table_fragments = srv
292            .metadata_manager
293            .get_job_fragments_by_id(&table_id)
294            .await
295            .map_err(err)?;
296        let upstream_fragments = srv
297            .metadata_manager
298            .catalog_controller
299            .upstream_fragments(table_fragments.fragment_ids())
300            .await
301            .map_err(err)?;
302        let dispatchers = srv
303            .metadata_manager
304            .catalog_controller
305            .get_fragment_actor_dispatchers(
306                table_fragments.fragment_ids().map(|id| id as _).collect(),
307            )
308            .await
309            .map_err(err)?;
310        Ok(Json(
311            table_fragments.to_protobuf(&upstream_fragments, &dispatchers),
312        ))
313    }
314
315    pub async fn list_users(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbUserInfo>>> {
316        let users = srv
317            .metadata_manager
318            .catalog_controller
319            .list_users()
320            .await
321            .map_err(err)?;
322
323        Ok(Json(users))
324    }
325
326    pub async fn list_databases(
327        Extension(srv): Extension<Service>,
328    ) -> Result<Json<Vec<PbDatabase>>> {
329        let databases = srv
330            .metadata_manager
331            .catalog_controller
332            .list_databases()
333            .await
334            .map_err(err)?;
335
336        Ok(Json(databases))
337    }
338
339    pub async fn list_schemas(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbSchema>>> {
340        let schemas = srv
341            .metadata_manager
342            .catalog_controller
343            .list_schemas()
344            .await
345            .map_err(err)?;
346
347        Ok(Json(schemas))
348    }
349
350    pub async fn list_object_dependencies(
351        Extension(srv): Extension<Service>,
352    ) -> Result<Json<Vec<PbObjectDependencies>>> {
353        let object_dependencies = srv
354            .metadata_manager
355            .catalog_controller
356            .list_all_object_dependencies()
357            .await
358            .map_err(err)?;
359
360        Ok(Json(object_dependencies))
361    }
362
363    #[derive(Debug, Deserialize)]
364    pub struct AwaitTreeDumpParams {
365        #[serde(default = "await_tree_default_format")]
366        format: String,
367    }
368
369    impl AwaitTreeDumpParams {
370        /// Parse the `format` parameter to [`ActorTracesFormat`].
371        pub fn actor_traces_format(&self) -> Result<ActorTracesFormat> {
372            Ok(match self.format.as_str() {
373                "text" => ActorTracesFormat::Text,
374                "json" => ActorTracesFormat::Json,
375                _ => {
376                    return Err(err(anyhow!(
377                        "Unsupported format `{}`, only `text` and `json` are supported for now",
378                        self.format
379                    )));
380                }
381            })
382        }
383    }
384
385    fn await_tree_default_format() -> String {
386        // In dashboard, await tree is usually for engineer to debug, so we use human-readable text format by default here.
387        "text".to_owned()
388    }
389
390    pub async fn dump_await_tree_all(
391        Query(params): Query<AwaitTreeDumpParams>,
392        Extension(srv): Extension<Service>,
393    ) -> Result<Json<StackTraceResponse>> {
394        let actor_traces_format = params.actor_traces_format()?;
395
396        let res = dump_cluster_await_tree(
397            &srv.metadata_manager,
398            &srv.await_tree_reg,
399            actor_traces_format,
400        )
401        .await
402        .map_err(err)?;
403
404        Ok(res.into())
405    }
406
407    pub async fn dump_await_tree(
408        Path(worker_id): Path<WorkerId>,
409        Query(params): Query<AwaitTreeDumpParams>,
410        Extension(srv): Extension<Service>,
411    ) -> Result<Json<StackTraceResponse>> {
412        let actor_traces_format = params.actor_traces_format()?;
413
414        let worker_node = srv
415            .metadata_manager
416            .get_worker_by_id(worker_id)
417            .await
418            .map_err(err)?
419            .context("worker node not found")
420            .map_err(err)?;
421
422        let res = dump_worker_node_await_tree(std::iter::once(&worker_node), actor_traces_format)
423            .await
424            .map_err(err)?;
425
426        Ok(res.into())
427    }
428
429    pub async fn heap_profile(
430        Path(worker_id): Path<WorkerId>,
431        Extension(srv): Extension<Service>,
432    ) -> Result<Json<HeapProfilingResponse>> {
433        let worker_node = srv
434            .metadata_manager
435            .get_worker_by_id(worker_id)
436            .await
437            .map_err(err)?
438            .context("worker node not found")
439            .map_err(err)?;
440
441        let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
442
443        let result = client.heap_profile("".to_owned()).await.map_err(err)?;
444
445        Ok(result.into())
446    }
447
448    pub async fn list_heap_profile(
449        Path(worker_id): Path<WorkerId>,
450        Extension(srv): Extension<Service>,
451    ) -> Result<Json<ListHeapProfilingResponse>> {
452        let worker_node = srv
453            .metadata_manager
454            .get_worker_by_id(worker_id)
455            .await
456            .map_err(err)?
457            .context("worker node not found")
458            .map_err(err)?;
459
460        let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
461
462        let result = client.list_heap_profile().await.map_err(err)?;
463        Ok(result.into())
464    }
465
466    pub async fn analyze_heap(
467        Path((worker_id, file_path)): Path<(WorkerId, String)>,
468        Extension(srv): Extension<Service>,
469    ) -> Result<Response> {
470        let file_path =
471            String::from_utf8(base64_url::decode(&file_path).map_err(err)?).map_err(err)?;
472
473        let file_name = FilePath::new(&file_path)
474            .file_name()
475            .unwrap()
476            .to_string_lossy()
477            .to_string();
478
479        let collapsed_file_name = format!("{}.{}", file_name, COLLAPSED_SUFFIX);
480
481        let worker_node = srv
482            .metadata_manager
483            .get_worker_by_id(worker_id)
484            .await
485            .map_err(err)?
486            .context("worker node not found")
487            .map_err(err)?;
488
489        let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
490
491        let collapsed_bin = client
492            .analyze_heap(file_path.clone())
493            .await
494            .map_err(err)?
495            .result;
496        let collapsed_str = String::from_utf8_lossy(&collapsed_bin).to_string();
497
498        let response = Response::builder()
499            .header("Content-Type", "application/octet-stream")
500            .header("Content-Disposition", collapsed_file_name)
501            .body(collapsed_str.into());
502
503        response.map_err(err)
504    }
505
506    #[derive(Debug, Deserialize)]
507    pub struct DiagnoseParams {
508        #[serde(default = "await_tree_default_format")]
509        actor_traces_format: String,
510    }
511
512    pub async fn diagnose(
513        Query(params): Query<DiagnoseParams>,
514        Extension(srv): Extension<Service>,
515    ) -> Result<String> {
516        let actor_traces_format = match params.actor_traces_format.as_str() {
517            "text" => ActorTracesFormat::Text,
518            "json" => ActorTracesFormat::Json,
519            _ => {
520                return Err(err(anyhow!(
521                    "Unsupported actor_traces_format `{}`, only `text` and `json` are supported for now",
522                    params.actor_traces_format
523                )));
524            }
525        };
526        Ok(srv.diagnose_command.report(actor_traces_format).await)
527    }
528
529    /// NOTE(kwannoel): Although we fetch the BP for the entire graph via this API,
530    /// the workload should be reasonable.
531    /// In most cases, we can safely assume each node has most 2 outgoing edges (e.g. join).
532    /// In such a scenario, the number of edges is linear to the number of nodes.
533    /// So the workload is proportional to the relation id graph we fetch in `get_relation_id_infos`.
534    pub async fn get_streaming_stats(
535        Extension(srv): Extension<Service>,
536    ) -> Result<Json<GetStreamingStatsResponse>> {
537        let worker_nodes = srv
538            .metadata_manager
539            .list_active_streaming_compute_nodes()
540            .await
541            .map_err(err)?;
542
543        let mut futures = Vec::new();
544
545        for worker_node in worker_nodes {
546            let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
547            let client = Arc::new(client);
548            let fut = async move {
549                let result = client.get_streaming_stats().await.map_err(err)?;
550                Ok::<_, DashboardError>(result)
551            };
552            futures.push(fut);
553        }
554        let results = join_all(futures).await;
555
556        let mut all = GetStreamingStatsResponse::default();
557
558        for result in results {
559            let result = result
560                .map_err(|_| anyhow!("Failed to get back pressure"))
561                .map_err(err)?;
562
563            // Aggregate fragment_stats
564            for (fragment_id, fragment_stats) in result.fragment_stats {
565                if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
566                    s.actor_count += fragment_stats.actor_count;
567                    s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
568                } else {
569                    all.fragment_stats.insert(fragment_id, fragment_stats);
570                }
571            }
572
573            // Aggregate relation_stats
574            for (relation_id, relation_stats) in result.relation_stats {
575                if let Some(s) = all.relation_stats.get_mut(&relation_id) {
576                    s.actor_count += relation_stats.actor_count;
577                    s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
578                } else {
579                    all.relation_stats.insert(relation_id, relation_stats);
580                }
581            }
582
583            // Aggregate channel_stats
584            for (key, channel_stats) in result.channel_stats {
585                if let Some(s) = all.channel_stats.get_mut(&key) {
586                    s.actor_count += channel_stats.actor_count;
587                    s.output_blocking_duration += channel_stats.output_blocking_duration;
588                    s.recv_row_count += channel_stats.recv_row_count;
589                    s.send_row_count += channel_stats.send_row_count;
590                } else {
591                    all.channel_stats.insert(key, channel_stats);
592                }
593            }
594        }
595
596        Ok(all.into())
597    }
598
599    pub async fn get_version(Extension(_srv): Extension<Service>) -> Result<Json<String>> {
600        Ok(Json(risingwave_common::current_cluster_version()))
601    }
602}
603
604impl DashboardService {
605    pub async fn serve(self) -> Result<()> {
606        use handlers::*;
607        let srv = Arc::new(self);
608
609        let cors_layer = CorsLayer::new()
610            .allow_origin(cors::Any)
611            .allow_methods(vec![Method::GET]);
612
613        let api_router = Router::new()
614            .route("/version", get(get_version))
615            .route("/clusters/:ty", get(list_clusters))
616            .route("/streaming_jobs", get(list_streaming_jobs))
617            .route("/fragments/job_id/:job_id", get(list_fragments_by_job_id))
618            .route("/relation_id_infos", get(get_relation_id_infos))
619            .route(
620                "/fragment_to_relation_map",
621                get(get_fragment_to_relation_map),
622            )
623            .route("/views", get(list_views))
624            .route("/materialized_views", get(list_materialized_views))
625            .route("/tables", get(list_tables))
626            .route("/indexes", get(list_index_tables))
627            .route("/index_items", get(list_indexes))
628            .route("/subscriptions", get(list_subscription))
629            .route("/internal_tables", get(list_internal_tables))
630            .route("/sources", get(list_sources))
631            .route("/sinks", get(list_sinks))
632            .route("/users", get(list_users))
633            .route("/databases", get(list_databases))
634            .route("/schemas", get(list_schemas))
635            .route("/object_dependencies", get(list_object_dependencies))
636            .route("/metrics/cluster", get(prometheus::list_prometheus_cluster))
637            .route("/metrics/streaming_stats", get(get_streaming_stats))
638            // /monitor/await_tree/{worker_id}/?format={text or json}
639            .route("/monitor/await_tree/:worker_id", get(dump_await_tree))
640            // /monitor/await_tree/?format={text or json}
641            .route("/monitor/await_tree/", get(dump_await_tree_all))
642            .route("/monitor/dump_heap_profile/:worker_id", get(heap_profile))
643            .route(
644                "/monitor/list_heap_profile/:worker_id",
645                get(list_heap_profile),
646            )
647            .route("/monitor/analyze/:worker_id/*path", get(analyze_heap))
648            // /monitor/diagnose/?format={text or json}
649            .route("/monitor/diagnose/", get(diagnose))
650            .layer(
651                ServiceBuilder::new()
652                    .layer(AddExtensionLayer::new(srv.clone()))
653                    .into_inner(),
654            )
655            .layer(cors_layer);
656
657        let trace_ui_router = otlp_embedded::ui_app(srv.trace_state.clone(), "/trace/");
658        let dashboard_router = risingwave_meta_dashboard::router();
659
660        let app = Router::new()
661            .fallback_service(dashboard_router)
662            .nest("/api", api_router)
663            .nest("/trace", trace_ui_router)
664            .layer(CompressionLayer::new());
665
666        let listener = TcpListener::bind(&srv.dashboard_addr)
667            .await
668            .context("failed to bind dashboard address")?;
669        axum::serve(listener, app)
670            .await
671            .context("failed to serve dashboard service")?;
672
673        Ok(())
674    }
675}