risingwave_meta/dashboard/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prometheus;
16
17use std::net::SocketAddr;
18use std::path::Path as FilePath;
19use std::sync::Arc;
20
21use anyhow::{Context as _, Result, anyhow};
22use axum::Router;
23use axum::extract::{Extension, Path};
24use axum::http::{Method, StatusCode};
25use axum::response::{IntoResponse, Response};
26use axum::routing::get;
27use risingwave_common::util::StackTraceResponseExt;
28use risingwave_rpc_client::ComputeClientPool;
29use tokio::net::TcpListener;
30use tower::ServiceBuilder;
31use tower_http::add_extension::AddExtensionLayer;
32use tower_http::compression::CompressionLayer;
33use tower_http::cors::{self, CorsLayer};
34
35use crate::manager::MetadataManager;
36use crate::manager::diagnose::DiagnoseCommandRef;
37
38#[derive(Clone)]
39pub struct DashboardService {
40    pub dashboard_addr: SocketAddr,
41    pub prometheus_client: Option<prometheus_http_query::Client>,
42    pub prometheus_selector: String,
43    pub metadata_manager: MetadataManager,
44    pub compute_clients: ComputeClientPool,
45    pub diagnose_command: DiagnoseCommandRef,
46    pub trace_state: otlp_embedded::StateRef,
47}
48
49pub type Service = Arc<DashboardService>;
50
51pub(super) mod handlers {
52    use std::cmp::min;
53    use std::collections::HashMap;
54
55    use anyhow::Context;
56    use axum::Json;
57    use axum::extract::Query;
58    use futures::future::join_all;
59    use itertools::Itertools;
60    use risingwave_common::catalog::TableId;
61    use risingwave_common_heap_profiling::COLLAPSED_SUFFIX;
62    use risingwave_meta_model::WorkerId;
63    use risingwave_pb::catalog::table::TableType;
64    use risingwave_pb::catalog::{PbDatabase, PbSchema, Sink, Source, Subscription, Table, View};
65    use risingwave_pb::common::{WorkerNode, WorkerType};
66    use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
67    use risingwave_pb::meta::{
68        ActorIds, FragmentIdToActorIdMap, FragmentToRelationMap, PbTableFragments, RelationIdInfos,
69    };
70    use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
71    use risingwave_pb::monitor_service::{
72        GetStreamingStatsResponse, HeapProfilingResponse, ListHeapProfilingResponse,
73        StackTraceRequest, StackTraceResponse,
74    };
75    use risingwave_pb::stream_plan::FragmentTypeFlag;
76    use risingwave_pb::user::PbUserInfo;
77    use serde::Deserialize;
78    use serde_json::json;
79    use thiserror_ext::AsReport;
80
81    use super::*;
82    use crate::controller::fragment::StreamingJobInfo;
83
84    pub struct DashboardError(anyhow::Error);
85    pub type Result<T> = std::result::Result<T, DashboardError>;
86
87    pub fn err(err: impl Into<anyhow::Error>) -> DashboardError {
88        DashboardError(err.into())
89    }
90
91    impl From<anyhow::Error> for DashboardError {
92        fn from(value: anyhow::Error) -> Self {
93            DashboardError(value)
94        }
95    }
96
97    impl IntoResponse for DashboardError {
98        fn into_response(self) -> axum::response::Response {
99            let mut resp = Json(json!({
100                "error": self.0.to_report_string(),
101            }))
102            .into_response();
103            *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
104            resp
105        }
106    }
107
108    pub async fn list_clusters(
109        Path(ty): Path<i32>,
110        Extension(srv): Extension<Service>,
111    ) -> Result<Json<Vec<WorkerNode>>> {
112        let worker_type = WorkerType::try_from(ty)
113            .map_err(|_| anyhow!("invalid worker type"))
114            .map_err(err)?;
115        let mut result = srv
116            .metadata_manager
117            .list_worker_node(Some(worker_type), None)
118            .await
119            .map_err(err)?;
120        result.sort_unstable_by_key(|n| n.id);
121        Ok(result.into())
122    }
123
124    async fn list_table_catalogs_inner(
125        metadata_manager: &MetadataManager,
126        table_type: TableType,
127    ) -> Result<Json<Vec<Table>>> {
128        let tables = metadata_manager
129            .catalog_controller
130            .list_tables_by_type(table_type.into())
131            .await
132            .map_err(err)?;
133
134        Ok(Json(tables))
135    }
136
137    pub async fn list_materialized_views(
138        Extension(srv): Extension<Service>,
139    ) -> Result<Json<Vec<Table>>> {
140        list_table_catalogs_inner(&srv.metadata_manager, TableType::MaterializedView).await
141    }
142
143    pub async fn list_tables(Extension(srv): Extension<Service>) -> Result<Json<Vec<Table>>> {
144        list_table_catalogs_inner(&srv.metadata_manager, TableType::Table).await
145    }
146
147    pub async fn list_indexes(Extension(srv): Extension<Service>) -> Result<Json<Vec<Table>>> {
148        list_table_catalogs_inner(&srv.metadata_manager, TableType::Index).await
149    }
150
151    pub async fn list_subscription(
152        Extension(srv): Extension<Service>,
153    ) -> Result<Json<Vec<Subscription>>> {
154        let subscriptions = srv
155            .metadata_manager
156            .catalog_controller
157            .list_subscriptions()
158            .await
159            .map_err(err)?;
160
161        Ok(Json(subscriptions))
162    }
163
164    pub async fn list_internal_tables(
165        Extension(srv): Extension<Service>,
166    ) -> Result<Json<Vec<Table>>> {
167        list_table_catalogs_inner(&srv.metadata_manager, TableType::Internal).await
168    }
169
170    pub async fn list_sources(Extension(srv): Extension<Service>) -> Result<Json<Vec<Source>>> {
171        let sources = srv.metadata_manager.list_sources().await.map_err(err)?;
172
173        Ok(Json(sources))
174    }
175
176    pub async fn list_sinks(Extension(srv): Extension<Service>) -> Result<Json<Vec<Sink>>> {
177        let sinks = srv
178            .metadata_manager
179            .catalog_controller
180            .list_sinks()
181            .await
182            .map_err(err)?;
183
184        Ok(Json(sinks))
185    }
186
187    pub async fn list_views(Extension(srv): Extension<Service>) -> Result<Json<Vec<View>>> {
188        let views = srv
189            .metadata_manager
190            .catalog_controller
191            .list_views()
192            .await
193            .map_err(err)?;
194
195        Ok(Json(views))
196    }
197
198    pub async fn list_streaming_jobs(
199        Extension(srv): Extension<Service>,
200    ) -> Result<Json<Vec<StreamingJobInfo>>> {
201        let streaming_jobs = srv
202            .metadata_manager
203            .catalog_controller
204            .list_streaming_job_infos()
205            .await
206            .map_err(err)?;
207
208        Ok(Json(streaming_jobs))
209    }
210
211    /// In the ddl backpressure graph, we want to compute the backpressure between relations.
212    /// So we need to know which are the fragments which are connected to external relations.
213    /// These fragments form the vertices of the graph.
214    /// We can get collection of backpressure values, keyed by vertex_id-vertex_id.
215    /// This function will return a map of fragment vertex id to relation id.
216    /// We can convert `fragment_id-fragment_id` to `relation_id-relation_id` using that.
217    /// Finally, we have a map of `relation_id-relation_id` to backpressure values.
218    pub async fn get_fragment_to_relation_map(
219        Extension(srv): Extension<Service>,
220    ) -> Result<Json<FragmentToRelationMap>> {
221        let table_fragments = srv
222            .metadata_manager
223            .catalog_controller
224            .table_fragments()
225            .await
226            .map_err(err)?;
227        let mut in_map = HashMap::new();
228        let mut out_map = HashMap::new();
229        for (relation_id, tf) in table_fragments {
230            for (fragment_id, fragment) in &tf.fragments {
231                if (fragment.fragment_type_mask & FragmentTypeFlag::StreamScan as u32) != 0 {
232                    in_map.insert(*fragment_id, relation_id as u32);
233                }
234                if (fragment.fragment_type_mask & FragmentTypeFlag::Mview as u32) != 0 {
235                    out_map.insert(*fragment_id, relation_id as u32);
236                }
237            }
238        }
239        let map = FragmentToRelationMap { in_map, out_map };
240        Ok(Json(map))
241    }
242
243    /// Provides a hierarchy of relation ids to fragments to actors.
244    pub async fn get_relation_id_infos(
245        Extension(srv): Extension<Service>,
246    ) -> Result<Json<RelationIdInfos>> {
247        let table_fragments = srv
248            .metadata_manager
249            .catalog_controller
250            .table_fragments()
251            .await
252            .map_err(err)?;
253        let mut map = HashMap::new();
254        for (id, tf) in table_fragments {
255            let mut fragment_id_to_actor_ids = HashMap::new();
256            for (fragment_id, fragment) in &tf.fragments {
257                let actor_ids = fragment.actors.iter().map(|a| a.actor_id).collect_vec();
258                fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
259            }
260            map.insert(
261                id as u32,
262                FragmentIdToActorIdMap {
263                    map: fragment_id_to_actor_ids,
264                },
265            );
266        }
267        let relation_id_infos = RelationIdInfos { map };
268
269        Ok(Json(relation_id_infos))
270    }
271
272    pub async fn list_fragments_by_job_id(
273        Extension(srv): Extension<Service>,
274        Path(job_id): Path<u32>,
275    ) -> Result<Json<PbTableFragments>> {
276        let table_id = TableId::new(job_id);
277        let table_fragments = srv
278            .metadata_manager
279            .get_job_fragments_by_id(&table_id)
280            .await
281            .map_err(err)?;
282        let upstream_fragments = srv
283            .metadata_manager
284            .catalog_controller
285            .upstream_fragments(table_fragments.fragment_ids())
286            .await
287            .map_err(err)?;
288        let dispatchers = srv
289            .metadata_manager
290            .catalog_controller
291            .get_fragment_actor_dispatchers(
292                table_fragments.fragment_ids().map(|id| id as _).collect(),
293            )
294            .await
295            .map_err(err)?;
296        Ok(Json(
297            table_fragments.to_protobuf(&upstream_fragments, &dispatchers),
298        ))
299    }
300
301    pub async fn list_users(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbUserInfo>>> {
302        let users = srv
303            .metadata_manager
304            .catalog_controller
305            .list_users()
306            .await
307            .map_err(err)?;
308
309        Ok(Json(users))
310    }
311
312    pub async fn list_databases(
313        Extension(srv): Extension<Service>,
314    ) -> Result<Json<Vec<PbDatabase>>> {
315        let databases = srv
316            .metadata_manager
317            .catalog_controller
318            .list_databases()
319            .await
320            .map_err(err)?;
321
322        Ok(Json(databases))
323    }
324
325    pub async fn list_schemas(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbSchema>>> {
326        let schemas = srv
327            .metadata_manager
328            .catalog_controller
329            .list_schemas()
330            .await
331            .map_err(err)?;
332
333        Ok(Json(schemas))
334    }
335
336    pub async fn list_object_dependencies(
337        Extension(srv): Extension<Service>,
338    ) -> Result<Json<Vec<PbObjectDependencies>>> {
339        let object_dependencies = srv
340            .metadata_manager
341            .catalog_controller
342            .list_all_object_dependencies()
343            .await
344            .map_err(err)?;
345
346        Ok(Json(object_dependencies))
347    }
348
349    async fn dump_await_tree_inner(
350        worker_nodes: impl IntoIterator<Item = &WorkerNode>,
351        compute_clients: &ComputeClientPool,
352        params: AwaitTreeDumpParams,
353    ) -> Result<Json<StackTraceResponse>> {
354        let mut all = StackTraceResponse::default();
355
356        let req = StackTraceRequest {
357            actor_traces_format: match params.format.as_str() {
358                "text" => ActorTracesFormat::Text as i32,
359                "json" => ActorTracesFormat::Json as i32,
360                _ => {
361                    return Err(err(anyhow!(
362                        "Unsupported format `{}`, only `text` and `json` are supported for now",
363                        params.format
364                    )));
365                }
366            },
367        };
368
369        for worker_node in worker_nodes {
370            let client = compute_clients.get(worker_node).await.map_err(err)?;
371            let result = client.stack_trace(req).await.map_err(err)?;
372
373            all.merge_other(result);
374        }
375
376        Ok(all.into())
377    }
378
379    #[derive(Debug, Deserialize)]
380    pub struct AwaitTreeDumpParams {
381        #[serde(default = "await_tree_default_format")]
382        format: String,
383    }
384
385    fn await_tree_default_format() -> String {
386        // In dashboard, await tree is usually for engineer to debug, so we use human-readable text format by default here.
387        "text".to_owned()
388    }
389
390    pub async fn dump_await_tree_all(
391        Query(params): Query<AwaitTreeDumpParams>,
392        Extension(srv): Extension<Service>,
393    ) -> Result<Json<StackTraceResponse>> {
394        let worker_nodes = srv
395            .metadata_manager
396            .list_worker_node(Some(WorkerType::ComputeNode), None)
397            .await
398            .map_err(err)?;
399
400        dump_await_tree_inner(&worker_nodes, &srv.compute_clients, params).await
401    }
402
403    pub async fn dump_await_tree(
404        Path(worker_id): Path<WorkerId>,
405        Query(params): Query<AwaitTreeDumpParams>,
406        Extension(srv): Extension<Service>,
407    ) -> Result<Json<StackTraceResponse>> {
408        let worker_node = srv
409            .metadata_manager
410            .get_worker_by_id(worker_id)
411            .await
412            .map_err(err)?
413            .context("worker node not found")
414            .map_err(err)?;
415
416        dump_await_tree_inner(std::iter::once(&worker_node), &srv.compute_clients, params).await
417    }
418
419    pub async fn heap_profile(
420        Path(worker_id): Path<WorkerId>,
421        Extension(srv): Extension<Service>,
422    ) -> Result<Json<HeapProfilingResponse>> {
423        let worker_node = srv
424            .metadata_manager
425            .get_worker_by_id(worker_id)
426            .await
427            .map_err(err)?
428            .context("worker node not found")
429            .map_err(err)?;
430
431        let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
432
433        let result = client.heap_profile("".to_owned()).await.map_err(err)?;
434
435        Ok(result.into())
436    }
437
438    pub async fn list_heap_profile(
439        Path(worker_id): Path<WorkerId>,
440        Extension(srv): Extension<Service>,
441    ) -> Result<Json<ListHeapProfilingResponse>> {
442        let worker_node = srv
443            .metadata_manager
444            .get_worker_by_id(worker_id)
445            .await
446            .map_err(err)?
447            .context("worker node not found")
448            .map_err(err)?;
449
450        let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
451
452        let result = client.list_heap_profile().await.map_err(err)?;
453        Ok(result.into())
454    }
455
456    pub async fn analyze_heap(
457        Path((worker_id, file_path)): Path<(WorkerId, String)>,
458        Extension(srv): Extension<Service>,
459    ) -> Result<Response> {
460        let file_path =
461            String::from_utf8(base64_url::decode(&file_path).map_err(err)?).map_err(err)?;
462
463        let file_name = FilePath::new(&file_path)
464            .file_name()
465            .unwrap()
466            .to_string_lossy()
467            .to_string();
468
469        let collapsed_file_name = format!("{}.{}", file_name, COLLAPSED_SUFFIX);
470
471        let worker_node = srv
472            .metadata_manager
473            .get_worker_by_id(worker_id)
474            .await
475            .map_err(err)?
476            .context("worker node not found")
477            .map_err(err)?;
478
479        let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
480
481        let collapsed_bin = client
482            .analyze_heap(file_path.clone())
483            .await
484            .map_err(err)?
485            .result;
486        let collapsed_str = String::from_utf8_lossy(&collapsed_bin).to_string();
487
488        let response = Response::builder()
489            .header("Content-Type", "application/octet-stream")
490            .header("Content-Disposition", collapsed_file_name)
491            .body(collapsed_str.into());
492
493        response.map_err(err)
494    }
495
496    #[derive(Debug, Deserialize)]
497    pub struct DiagnoseParams {
498        #[serde(default = "await_tree_default_format")]
499        actor_traces_format: String,
500    }
501
502    pub async fn diagnose(
503        Query(params): Query<DiagnoseParams>,
504        Extension(srv): Extension<Service>,
505    ) -> Result<String> {
506        let actor_traces_format = match params.actor_traces_format.as_str() {
507            "text" => ActorTracesFormat::Text,
508            "json" => ActorTracesFormat::Json,
509            _ => {
510                return Err(err(anyhow!(
511                    "Unsupported actor_traces_format `{}`, only `text` and `json` are supported for now",
512                    params.actor_traces_format
513                )));
514            }
515        };
516        Ok(srv.diagnose_command.report(actor_traces_format).await)
517    }
518
519    /// NOTE(kwannoel): Although we fetch the BP for the entire graph via this API,
520    /// the workload should be reasonable.
521    /// In most cases, we can safely assume each node has most 2 outgoing edges (e.g. join).
522    /// In such a scenario, the number of edges is linear to the number of nodes.
523    /// So the workload is proportional to the relation id graph we fetch in `get_relation_id_infos`.
524    pub async fn get_streaming_stats(
525        Extension(srv): Extension<Service>,
526    ) -> Result<Json<GetStreamingStatsResponse>> {
527        let worker_nodes = srv
528            .metadata_manager
529            .list_active_streaming_compute_nodes()
530            .await
531            .map_err(err)?;
532
533        let mut futures = Vec::new();
534
535        for worker_node in worker_nodes {
536            let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
537            let client = Arc::new(client);
538            let fut = async move {
539                let result = client.get_streaming_stats().await.map_err(err)?;
540                Ok::<_, DashboardError>(result)
541            };
542            futures.push(fut);
543        }
544        let results = join_all(futures).await;
545
546        let mut all = GetStreamingStatsResponse::default();
547
548        for result in results {
549            let result = result
550                .map_err(|_| anyhow!("Failed to get back pressure"))
551                .map_err(err)?;
552
553            // Aggregate fragment_stats
554            for (fragment_id, fragment_stats) in result.fragment_stats {
555                if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
556                    s.actor_count += fragment_stats.actor_count;
557                    s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
558                } else {
559                    all.fragment_stats.insert(fragment_id, fragment_stats);
560                }
561            }
562
563            // Aggregate relation_stats
564            for (relation_id, relation_stats) in result.relation_stats {
565                if let Some(s) = all.relation_stats.get_mut(&relation_id) {
566                    s.actor_count += relation_stats.actor_count;
567                    s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
568                } else {
569                    all.relation_stats.insert(relation_id, relation_stats);
570                }
571            }
572
573            // Aggregate channel_stats
574            for (key, channel_stats) in result.channel_stats {
575                if let Some(s) = all.channel_stats.get_mut(&key) {
576                    s.actor_count += channel_stats.actor_count;
577                    s.output_blocking_duration += channel_stats.output_blocking_duration;
578                    s.recv_row_count += channel_stats.recv_row_count;
579                    s.send_row_count += channel_stats.send_row_count;
580                } else {
581                    all.channel_stats.insert(key, channel_stats);
582                }
583            }
584        }
585
586        Ok(all.into())
587    }
588
589    pub async fn get_version(Extension(_srv): Extension<Service>) -> Result<Json<String>> {
590        Ok(Json(risingwave_common::current_cluster_version()))
591    }
592}
593
594impl DashboardService {
595    pub async fn serve(self) -> Result<()> {
596        use handlers::*;
597        let srv = Arc::new(self);
598
599        let cors_layer = CorsLayer::new()
600            .allow_origin(cors::Any)
601            .allow_methods(vec![Method::GET]);
602
603        let api_router = Router::new()
604            .route("/version", get(get_version))
605            .route("/clusters/:ty", get(list_clusters))
606            .route("/streaming_jobs", get(list_streaming_jobs))
607            .route("/fragments/job_id/:job_id", get(list_fragments_by_job_id))
608            .route("/relation_id_infos", get(get_relation_id_infos))
609            .route(
610                "/fragment_to_relation_map",
611                get(get_fragment_to_relation_map),
612            )
613            .route("/views", get(list_views))
614            .route("/materialized_views", get(list_materialized_views))
615            .route("/tables", get(list_tables))
616            .route("/indexes", get(list_indexes))
617            .route("/subscriptions", get(list_subscription))
618            .route("/internal_tables", get(list_internal_tables))
619            .route("/sources", get(list_sources))
620            .route("/sinks", get(list_sinks))
621            .route("/users", get(list_users))
622            .route("/databases", get(list_databases))
623            .route("/schemas", get(list_schemas))
624            .route("/object_dependencies", get(list_object_dependencies))
625            .route("/metrics/cluster", get(prometheus::list_prometheus_cluster))
626            .route("/metrics/streaming_stats", get(get_streaming_stats))
627            // /monitor/await_tree/{worker_id}/?format={text or json}
628            .route("/monitor/await_tree/:worker_id", get(dump_await_tree))
629            // /monitor/await_tree/?format={text or json}
630            .route("/monitor/await_tree/", get(dump_await_tree_all))
631            .route("/monitor/dump_heap_profile/:worker_id", get(heap_profile))
632            .route(
633                "/monitor/list_heap_profile/:worker_id",
634                get(list_heap_profile),
635            )
636            .route("/monitor/analyze/:worker_id/*path", get(analyze_heap))
637            // /monitor/diagnose/?format={text or json}
638            .route("/monitor/diagnose/", get(diagnose))
639            .layer(
640                ServiceBuilder::new()
641                    .layer(AddExtensionLayer::new(srv.clone()))
642                    .into_inner(),
643            )
644            .layer(cors_layer);
645
646        let trace_ui_router = otlp_embedded::ui_app(srv.trace_state.clone(), "/trace/");
647        let dashboard_router = risingwave_meta_dashboard::router();
648
649        let app = Router::new()
650            .fallback_service(dashboard_router)
651            .nest("/api", api_router)
652            .nest("/trace", trace_ui_router)
653            .layer(CompressionLayer::new());
654
655        let listener = TcpListener::bind(&srv.dashboard_addr)
656            .await
657            .context("failed to bind dashboard address")?;
658        axum::serve(listener, app)
659            .await
660            .context("failed to serve dashboard service")?;
661
662        Ok(())
663    }
664}