risingwave_meta/dashboard/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prometheus;
16
17use std::net::SocketAddr;
18use std::path::Path as FilePath;
19use std::sync::Arc;
20
21use anyhow::{Context as _, Result, anyhow};
22use axum::Router;
23use axum::extract::{Extension, Path};
24use axum::http::{Method, StatusCode};
25use axum::response::{IntoResponse, Response};
26use axum::routing::get;
27use risingwave_common::util::StackTraceResponseExt;
28use risingwave_rpc_client::ComputeClientPool;
29use tokio::net::TcpListener;
30use tower::ServiceBuilder;
31use tower_http::add_extension::AddExtensionLayer;
32use tower_http::compression::CompressionLayer;
33use tower_http::cors::{self, CorsLayer};
34
35use crate::manager::MetadataManager;
36use crate::manager::diagnose::DiagnoseCommandRef;
37
38#[derive(Clone)]
39pub struct DashboardService {
40    pub dashboard_addr: SocketAddr,
41    pub prometheus_client: Option<prometheus_http_query::Client>,
42    pub prometheus_selector: String,
43    pub metadata_manager: MetadataManager,
44    pub compute_clients: ComputeClientPool,
45    pub diagnose_command: DiagnoseCommandRef,
46    pub trace_state: otlp_embedded::StateRef,
47}
48
49pub type Service = Arc<DashboardService>;
50
51pub(super) mod handlers {
52    use std::cmp::min;
53    use std::collections::HashMap;
54
55    use anyhow::Context;
56    use axum::Json;
57    use futures::future::join_all;
58    use itertools::Itertools;
59    use risingwave_common::catalog::TableId;
60    use risingwave_common_heap_profiling::COLLAPSED_SUFFIX;
61    use risingwave_meta_model::WorkerId;
62    use risingwave_pb::catalog::table::TableType;
63    use risingwave_pb::catalog::{PbDatabase, PbSchema, Sink, Source, Subscription, Table, View};
64    use risingwave_pb::common::{WorkerNode, WorkerType};
65    use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
66    use risingwave_pb::meta::{
67        ActorIds, FragmentIdToActorIdMap, FragmentToRelationMap, PbTableFragments, RelationIdInfos,
68    };
69    use risingwave_pb::monitor_service::{
70        GetStreamingStatsResponse, HeapProfilingResponse, ListHeapProfilingResponse,
71        StackTraceResponse,
72    };
73    use risingwave_pb::stream_plan::FragmentTypeFlag;
74    use risingwave_pb::user::PbUserInfo;
75    use serde_json::json;
76    use thiserror_ext::AsReport;
77
78    use super::*;
79    use crate::controller::fragment::StreamingJobInfo;
80
81    pub struct DashboardError(anyhow::Error);
82    pub type Result<T> = std::result::Result<T, DashboardError>;
83
84    pub fn err(err: impl Into<anyhow::Error>) -> DashboardError {
85        DashboardError(err.into())
86    }
87
88    impl From<anyhow::Error> for DashboardError {
89        fn from(value: anyhow::Error) -> Self {
90            DashboardError(value)
91        }
92    }
93
94    impl IntoResponse for DashboardError {
95        fn into_response(self) -> axum::response::Response {
96            let mut resp = Json(json!({
97                "error": self.0.to_report_string(),
98            }))
99            .into_response();
100            *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
101            resp
102        }
103    }
104
105    pub async fn list_clusters(
106        Path(ty): Path<i32>,
107        Extension(srv): Extension<Service>,
108    ) -> Result<Json<Vec<WorkerNode>>> {
109        let worker_type = WorkerType::try_from(ty)
110            .map_err(|_| anyhow!("invalid worker type"))
111            .map_err(err)?;
112        let mut result = srv
113            .metadata_manager
114            .list_worker_node(Some(worker_type), None)
115            .await
116            .map_err(err)?;
117        result.sort_unstable_by_key(|n| n.id);
118        Ok(result.into())
119    }
120
121    async fn list_table_catalogs_inner(
122        metadata_manager: &MetadataManager,
123        table_type: TableType,
124    ) -> Result<Json<Vec<Table>>> {
125        let tables = metadata_manager
126            .catalog_controller
127            .list_tables_by_type(table_type.into())
128            .await
129            .map_err(err)?;
130
131        Ok(Json(tables))
132    }
133
134    pub async fn list_materialized_views(
135        Extension(srv): Extension<Service>,
136    ) -> Result<Json<Vec<Table>>> {
137        list_table_catalogs_inner(&srv.metadata_manager, TableType::MaterializedView).await
138    }
139
140    pub async fn list_tables(Extension(srv): Extension<Service>) -> Result<Json<Vec<Table>>> {
141        list_table_catalogs_inner(&srv.metadata_manager, TableType::Table).await
142    }
143
144    pub async fn list_indexes(Extension(srv): Extension<Service>) -> Result<Json<Vec<Table>>> {
145        list_table_catalogs_inner(&srv.metadata_manager, TableType::Index).await
146    }
147
148    pub async fn list_subscription(
149        Extension(srv): Extension<Service>,
150    ) -> Result<Json<Vec<Subscription>>> {
151        let subscriptions = srv
152            .metadata_manager
153            .catalog_controller
154            .list_subscriptions()
155            .await
156            .map_err(err)?;
157
158        Ok(Json(subscriptions))
159    }
160
161    pub async fn list_internal_tables(
162        Extension(srv): Extension<Service>,
163    ) -> Result<Json<Vec<Table>>> {
164        list_table_catalogs_inner(&srv.metadata_manager, TableType::Internal).await
165    }
166
167    pub async fn list_sources(Extension(srv): Extension<Service>) -> Result<Json<Vec<Source>>> {
168        let sources = srv.metadata_manager.list_sources().await.map_err(err)?;
169
170        Ok(Json(sources))
171    }
172
173    pub async fn list_sinks(Extension(srv): Extension<Service>) -> Result<Json<Vec<Sink>>> {
174        let sinks = srv
175            .metadata_manager
176            .catalog_controller
177            .list_sinks()
178            .await
179            .map_err(err)?;
180
181        Ok(Json(sinks))
182    }
183
184    pub async fn list_views(Extension(srv): Extension<Service>) -> Result<Json<Vec<View>>> {
185        let views = srv
186            .metadata_manager
187            .catalog_controller
188            .list_views()
189            .await
190            .map_err(err)?;
191
192        Ok(Json(views))
193    }
194
195    pub async fn list_streaming_jobs(
196        Extension(srv): Extension<Service>,
197    ) -> Result<Json<Vec<StreamingJobInfo>>> {
198        let streaming_jobs = srv
199            .metadata_manager
200            .catalog_controller
201            .list_streaming_job_infos()
202            .await
203            .map_err(err)?;
204
205        Ok(Json(streaming_jobs))
206    }
207
208    /// In the ddl backpressure graph, we want to compute the backpressure between relations.
209    /// So we need to know which are the fragments which are connected to external relations.
210    /// These fragments form the vertices of the graph.
211    /// We can get collection of backpressure values, keyed by vertex_id-vertex_id.
212    /// This function will return a map of fragment vertex id to relation id.
213    /// We can convert `fragment_id-fragment_id` to `relation_id-relation_id` using that.
214    /// Finally, we have a map of `relation_id-relation_id` to backpressure values.
215    pub async fn get_fragment_to_relation_map(
216        Extension(srv): Extension<Service>,
217    ) -> Result<Json<FragmentToRelationMap>> {
218        let table_fragments = srv
219            .metadata_manager
220            .catalog_controller
221            .table_fragments()
222            .await
223            .map_err(err)?;
224        let mut in_map = HashMap::new();
225        let mut out_map = HashMap::new();
226        for (relation_id, tf) in table_fragments {
227            for (fragment_id, fragment) in &tf.fragments {
228                if (fragment.fragment_type_mask & FragmentTypeFlag::StreamScan as u32) != 0 {
229                    in_map.insert(*fragment_id, relation_id as u32);
230                }
231                if (fragment.fragment_type_mask & FragmentTypeFlag::Mview as u32) != 0 {
232                    out_map.insert(*fragment_id, relation_id as u32);
233                }
234            }
235        }
236        let map = FragmentToRelationMap { in_map, out_map };
237        Ok(Json(map))
238    }
239
240    /// Provides a hierarchy of relation ids to fragments to actors.
241    pub async fn get_relation_id_infos(
242        Extension(srv): Extension<Service>,
243    ) -> Result<Json<RelationIdInfos>> {
244        let table_fragments = srv
245            .metadata_manager
246            .catalog_controller
247            .table_fragments()
248            .await
249            .map_err(err)?;
250        let mut map = HashMap::new();
251        for (id, tf) in table_fragments {
252            let mut fragment_id_to_actor_ids = HashMap::new();
253            for (fragment_id, fragment) in &tf.fragments {
254                let actor_ids = fragment.actors.iter().map(|a| a.actor_id).collect_vec();
255                fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
256            }
257            map.insert(
258                id as u32,
259                FragmentIdToActorIdMap {
260                    map: fragment_id_to_actor_ids,
261                },
262            );
263        }
264        let relation_id_infos = RelationIdInfos { map };
265
266        Ok(Json(relation_id_infos))
267    }
268
269    pub async fn list_fragments_by_job_id(
270        Extension(srv): Extension<Service>,
271        Path(job_id): Path<u32>,
272    ) -> Result<Json<PbTableFragments>> {
273        let table_id = TableId::new(job_id);
274        let table_fragments = srv
275            .metadata_manager
276            .get_job_fragments_by_id(&table_id)
277            .await
278            .map_err(err)?;
279        let upstream_fragments = srv
280            .metadata_manager
281            .catalog_controller
282            .upstream_fragments(table_fragments.fragment_ids())
283            .await
284            .map_err(err)?;
285        let dispatchers = srv
286            .metadata_manager
287            .catalog_controller
288            .get_fragment_actor_dispatchers(
289                table_fragments.fragment_ids().map(|id| id as _).collect(),
290            )
291            .await
292            .map_err(err)?;
293        Ok(Json(
294            table_fragments.to_protobuf(&upstream_fragments, &dispatchers),
295        ))
296    }
297
298    pub async fn list_users(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbUserInfo>>> {
299        let users = srv
300            .metadata_manager
301            .catalog_controller
302            .list_users()
303            .await
304            .map_err(err)?;
305
306        Ok(Json(users))
307    }
308
309    pub async fn list_databases(
310        Extension(srv): Extension<Service>,
311    ) -> Result<Json<Vec<PbDatabase>>> {
312        let databases = srv
313            .metadata_manager
314            .catalog_controller
315            .list_databases()
316            .await
317            .map_err(err)?;
318
319        Ok(Json(databases))
320    }
321
322    pub async fn list_schemas(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbSchema>>> {
323        let schemas = srv
324            .metadata_manager
325            .catalog_controller
326            .list_schemas()
327            .await
328            .map_err(err)?;
329
330        Ok(Json(schemas))
331    }
332
333    pub async fn list_object_dependencies(
334        Extension(srv): Extension<Service>,
335    ) -> Result<Json<Vec<PbObjectDependencies>>> {
336        let object_dependencies = srv
337            .metadata_manager
338            .catalog_controller
339            .list_all_object_dependencies()
340            .await
341            .map_err(err)?;
342
343        Ok(Json(object_dependencies))
344    }
345
346    async fn dump_await_tree_inner(
347        worker_nodes: impl IntoIterator<Item = &WorkerNode>,
348        compute_clients: &ComputeClientPool,
349    ) -> Result<Json<StackTraceResponse>> {
350        let mut all = StackTraceResponse::default();
351
352        for worker_node in worker_nodes {
353            let client = compute_clients.get(worker_node).await.map_err(err)?;
354            let result = client.stack_trace().await.map_err(err)?;
355
356            all.merge_other(result);
357        }
358
359        Ok(all.into())
360    }
361
362    pub async fn dump_await_tree_all(
363        Extension(srv): Extension<Service>,
364    ) -> Result<Json<StackTraceResponse>> {
365        let worker_nodes = srv
366            .metadata_manager
367            .list_worker_node(Some(WorkerType::ComputeNode), None)
368            .await
369            .map_err(err)?;
370
371        dump_await_tree_inner(&worker_nodes, &srv.compute_clients).await
372    }
373
374    pub async fn dump_await_tree(
375        Path(worker_id): Path<WorkerId>,
376        Extension(srv): Extension<Service>,
377    ) -> Result<Json<StackTraceResponse>> {
378        let worker_node = srv
379            .metadata_manager
380            .get_worker_by_id(worker_id)
381            .await
382            .map_err(err)?
383            .context("worker node not found")
384            .map_err(err)?;
385
386        dump_await_tree_inner(std::iter::once(&worker_node), &srv.compute_clients).await
387    }
388
389    pub async fn heap_profile(
390        Path(worker_id): Path<WorkerId>,
391        Extension(srv): Extension<Service>,
392    ) -> Result<Json<HeapProfilingResponse>> {
393        let worker_node = srv
394            .metadata_manager
395            .get_worker_by_id(worker_id)
396            .await
397            .map_err(err)?
398            .context("worker node not found")
399            .map_err(err)?;
400
401        let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
402
403        let result = client.heap_profile("".to_owned()).await.map_err(err)?;
404
405        Ok(result.into())
406    }
407
408    pub async fn list_heap_profile(
409        Path(worker_id): Path<WorkerId>,
410        Extension(srv): Extension<Service>,
411    ) -> Result<Json<ListHeapProfilingResponse>> {
412        let worker_node = srv
413            .metadata_manager
414            .get_worker_by_id(worker_id)
415            .await
416            .map_err(err)?
417            .context("worker node not found")
418            .map_err(err)?;
419
420        let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
421
422        let result = client.list_heap_profile().await.map_err(err)?;
423        Ok(result.into())
424    }
425
426    pub async fn analyze_heap(
427        Path((worker_id, file_path)): Path<(WorkerId, String)>,
428        Extension(srv): Extension<Service>,
429    ) -> Result<Response> {
430        let file_path =
431            String::from_utf8(base64_url::decode(&file_path).map_err(err)?).map_err(err)?;
432
433        let file_name = FilePath::new(&file_path)
434            .file_name()
435            .unwrap()
436            .to_string_lossy()
437            .to_string();
438
439        let collapsed_file_name = format!("{}.{}", file_name, COLLAPSED_SUFFIX);
440
441        let worker_node = srv
442            .metadata_manager
443            .get_worker_by_id(worker_id)
444            .await
445            .map_err(err)?
446            .context("worker node not found")
447            .map_err(err)?;
448
449        let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
450
451        let collapsed_bin = client
452            .analyze_heap(file_path.clone())
453            .await
454            .map_err(err)?
455            .result;
456        let collapsed_str = String::from_utf8_lossy(&collapsed_bin).to_string();
457
458        let response = Response::builder()
459            .header("Content-Type", "application/octet-stream")
460            .header("Content-Disposition", collapsed_file_name)
461            .body(collapsed_str.into());
462
463        response.map_err(err)
464    }
465
466    pub async fn diagnose(Extension(srv): Extension<Service>) -> Result<String> {
467        Ok(srv.diagnose_command.report().await)
468    }
469
470    /// NOTE(kwannoel): Although we fetch the BP for the entire graph via this API,
471    /// the workload should be reasonable.
472    /// In most cases, we can safely assume each node has most 2 outgoing edges (e.g. join).
473    /// In such a scenario, the number of edges is linear to the number of nodes.
474    /// So the workload is proportional to the relation id graph we fetch in `get_relation_id_infos`.
475    pub async fn get_streaming_stats(
476        Extension(srv): Extension<Service>,
477    ) -> Result<Json<GetStreamingStatsResponse>> {
478        let worker_nodes = srv
479            .metadata_manager
480            .list_active_streaming_compute_nodes()
481            .await
482            .map_err(err)?;
483
484        let mut futures = Vec::new();
485
486        for worker_node in worker_nodes {
487            let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
488            let client = Arc::new(client);
489            let fut = async move {
490                let result = client.get_streaming_stats().await.map_err(err)?;
491                Ok::<_, DashboardError>(result)
492            };
493            futures.push(fut);
494        }
495        let results = join_all(futures).await;
496
497        let mut all = GetStreamingStatsResponse::default();
498
499        for result in results {
500            let result = result
501                .map_err(|_| anyhow!("Failed to get back pressure"))
502                .map_err(err)?;
503
504            // Aggregate fragment_stats
505            for (fragment_id, fragment_stats) in result.fragment_stats {
506                if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
507                    s.actor_count += fragment_stats.actor_count;
508                    s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
509                } else {
510                    all.fragment_stats.insert(fragment_id, fragment_stats);
511                }
512            }
513
514            // Aggregate relation_stats
515            for (relation_id, relation_stats) in result.relation_stats {
516                if let Some(s) = all.relation_stats.get_mut(&relation_id) {
517                    s.actor_count += relation_stats.actor_count;
518                    s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
519                } else {
520                    all.relation_stats.insert(relation_id, relation_stats);
521                }
522            }
523
524            // Aggregate channel_stats
525            for (key, channel_stats) in result.channel_stats {
526                if let Some(s) = all.channel_stats.get_mut(&key) {
527                    s.actor_count += channel_stats.actor_count;
528                    s.output_blocking_duration += channel_stats.output_blocking_duration;
529                    s.recv_row_count += channel_stats.recv_row_count;
530                    s.send_row_count += channel_stats.send_row_count;
531                } else {
532                    all.channel_stats.insert(key, channel_stats);
533                }
534            }
535        }
536
537        Ok(all.into())
538    }
539
540    pub async fn get_version(Extension(_srv): Extension<Service>) -> Result<Json<String>> {
541        Ok(Json(risingwave_common::current_cluster_version()))
542    }
543}
544
545impl DashboardService {
546    pub async fn serve(self) -> Result<()> {
547        use handlers::*;
548        let srv = Arc::new(self);
549
550        let cors_layer = CorsLayer::new()
551            .allow_origin(cors::Any)
552            .allow_methods(vec![Method::GET]);
553
554        let api_router = Router::new()
555            .route("/version", get(get_version))
556            .route("/clusters/:ty", get(list_clusters))
557            .route("/streaming_jobs", get(list_streaming_jobs))
558            .route("/fragments/job_id/:job_id", get(list_fragments_by_job_id))
559            .route("/relation_id_infos", get(get_relation_id_infos))
560            .route(
561                "/fragment_to_relation_map",
562                get(get_fragment_to_relation_map),
563            )
564            .route("/views", get(list_views))
565            .route("/materialized_views", get(list_materialized_views))
566            .route("/tables", get(list_tables))
567            .route("/indexes", get(list_indexes))
568            .route("/subscriptions", get(list_subscription))
569            .route("/internal_tables", get(list_internal_tables))
570            .route("/sources", get(list_sources))
571            .route("/sinks", get(list_sinks))
572            .route("/users", get(list_users))
573            .route("/databases", get(list_databases))
574            .route("/schemas", get(list_schemas))
575            .route("/object_dependencies", get(list_object_dependencies))
576            .route("/metrics/cluster", get(prometheus::list_prometheus_cluster))
577            .route("/metrics/streaming_stats", get(get_streaming_stats))
578            .route("/monitor/await_tree/:worker_id", get(dump_await_tree))
579            .route("/monitor/await_tree/", get(dump_await_tree_all))
580            .route("/monitor/dump_heap_profile/:worker_id", get(heap_profile))
581            .route(
582                "/monitor/list_heap_profile/:worker_id",
583                get(list_heap_profile),
584            )
585            .route("/monitor/analyze/:worker_id/*path", get(analyze_heap))
586            .route("/monitor/diagnose/", get(diagnose))
587            .layer(
588                ServiceBuilder::new()
589                    .layer(AddExtensionLayer::new(srv.clone()))
590                    .into_inner(),
591            )
592            .layer(cors_layer);
593
594        let trace_ui_router = otlp_embedded::ui_app(srv.trace_state.clone(), "/trace/");
595        let dashboard_router = risingwave_meta_dashboard::router();
596
597        let app = Router::new()
598            .fallback_service(dashboard_router)
599            .nest("/api", api_router)
600            .nest("/trace", trace_ui_router)
601            .layer(CompressionLayer::new());
602
603        let listener = TcpListener::bind(&srv.dashboard_addr)
604            .await
605            .context("failed to bind dashboard address")?;
606        axum::serve(listener, app)
607            .await
608            .context("failed to serve dashboard service")?;
609
610        Ok(())
611    }
612}