risingwave_meta/dashboard/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prometheus;
16
17use std::net::SocketAddr;
18use std::path::Path as FilePath;
19use std::sync::Arc;
20
21use anyhow::{Context as _, Result, anyhow};
22use axum::Router;
23use axum::extract::{Extension, Path};
24use axum::http::{Method, StatusCode};
25use axum::response::{IntoResponse, Response};
26use axum::routing::get;
27use risingwave_common::util::StackTraceResponseExt;
28use risingwave_rpc_client::ComputeClientPool;
29use tokio::net::TcpListener;
30use tower::ServiceBuilder;
31use tower_http::add_extension::AddExtensionLayer;
32use tower_http::compression::CompressionLayer;
33use tower_http::cors::{self, CorsLayer};
34
35use crate::manager::MetadataManager;
36use crate::manager::diagnose::DiagnoseCommandRef;
37
38#[derive(Clone)]
39pub struct DashboardService {
40    pub dashboard_addr: SocketAddr,
41    pub prometheus_client: Option<prometheus_http_query::Client>,
42    pub prometheus_selector: String,
43    pub metadata_manager: MetadataManager,
44    pub compute_clients: ComputeClientPool,
45    pub diagnose_command: DiagnoseCommandRef,
46    pub trace_state: otlp_embedded::StateRef,
47}
48
49pub type Service = Arc<DashboardService>;
50
51pub(super) mod handlers {
52    use std::cmp::min;
53    use std::collections::HashMap;
54
55    use anyhow::Context;
56    use axum::Json;
57    use axum::extract::Query;
58    use futures::future::join_all;
59    use itertools::Itertools;
60    use risingwave_common::catalog::TableId;
61    use risingwave_common_heap_profiling::COLLAPSED_SUFFIX;
62    use risingwave_meta_model::WorkerId;
63    use risingwave_pb::catalog::table::TableType;
64    use risingwave_pb::catalog::{
65        Index, PbDatabase, PbSchema, Sink, Source, Subscription, Table, View,
66    };
67    use risingwave_pb::common::{WorkerNode, WorkerType};
68    use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
69    use risingwave_pb::meta::{
70        ActorIds, FragmentIdToActorIdMap, FragmentToRelationMap, PbTableFragments, RelationIdInfos,
71    };
72    use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
73    use risingwave_pb::monitor_service::{
74        GetStreamingStatsResponse, HeapProfilingResponse, ListHeapProfilingResponse,
75        StackTraceRequest, StackTraceResponse,
76    };
77    use risingwave_pb::stream_plan::FragmentTypeFlag;
78    use risingwave_pb::user::PbUserInfo;
79    use serde::Deserialize;
80    use serde_json::json;
81    use thiserror_ext::AsReport;
82
83    use super::*;
84    use crate::controller::fragment::StreamingJobInfo;
85
86    pub struct DashboardError(anyhow::Error);
87    pub type Result<T> = std::result::Result<T, DashboardError>;
88
89    pub fn err(err: impl Into<anyhow::Error>) -> DashboardError {
90        DashboardError(err.into())
91    }
92
93    impl From<anyhow::Error> for DashboardError {
94        fn from(value: anyhow::Error) -> Self {
95            DashboardError(value)
96        }
97    }
98
99    impl IntoResponse for DashboardError {
100        fn into_response(self) -> axum::response::Response {
101            let mut resp = Json(json!({
102                "error": self.0.to_report_string(),
103            }))
104            .into_response();
105            *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
106            resp
107        }
108    }
109
110    pub async fn list_clusters(
111        Path(ty): Path<i32>,
112        Extension(srv): Extension<Service>,
113    ) -> Result<Json<Vec<WorkerNode>>> {
114        let worker_type = WorkerType::try_from(ty)
115            .map_err(|_| anyhow!("invalid worker type"))
116            .map_err(err)?;
117        let mut result = srv
118            .metadata_manager
119            .list_worker_node(Some(worker_type), None)
120            .await
121            .map_err(err)?;
122        result.sort_unstable_by_key(|n| n.id);
123        Ok(result.into())
124    }
125
126    async fn list_table_catalogs_inner(
127        metadata_manager: &MetadataManager,
128        table_type: TableType,
129    ) -> Result<Json<Vec<Table>>> {
130        let tables = metadata_manager
131            .catalog_controller
132            .list_tables_by_type(table_type.into())
133            .await
134            .map_err(err)?;
135
136        Ok(Json(tables))
137    }
138
139    pub async fn list_materialized_views(
140        Extension(srv): Extension<Service>,
141    ) -> Result<Json<Vec<Table>>> {
142        list_table_catalogs_inner(&srv.metadata_manager, TableType::MaterializedView).await
143    }
144
145    pub async fn list_tables(Extension(srv): Extension<Service>) -> Result<Json<Vec<Table>>> {
146        list_table_catalogs_inner(&srv.metadata_manager, TableType::Table).await
147    }
148
149    pub async fn list_index_tables(Extension(srv): Extension<Service>) -> Result<Json<Vec<Table>>> {
150        list_table_catalogs_inner(&srv.metadata_manager, TableType::Index).await
151    }
152
153    pub async fn list_indexes(Extension(srv): Extension<Service>) -> Result<Json<Vec<Index>>> {
154        let indexes = srv
155            .metadata_manager
156            .catalog_controller
157            .list_indexes()
158            .await
159            .map_err(err)?;
160
161        Ok(Json(indexes))
162    }
163
164    pub async fn list_subscription(
165        Extension(srv): Extension<Service>,
166    ) -> Result<Json<Vec<Subscription>>> {
167        let subscriptions = srv
168            .metadata_manager
169            .catalog_controller
170            .list_subscriptions()
171            .await
172            .map_err(err)?;
173
174        Ok(Json(subscriptions))
175    }
176
177    pub async fn list_internal_tables(
178        Extension(srv): Extension<Service>,
179    ) -> Result<Json<Vec<Table>>> {
180        list_table_catalogs_inner(&srv.metadata_manager, TableType::Internal).await
181    }
182
183    pub async fn list_sources(Extension(srv): Extension<Service>) -> Result<Json<Vec<Source>>> {
184        let sources = srv.metadata_manager.list_sources().await.map_err(err)?;
185
186        Ok(Json(sources))
187    }
188
189    pub async fn list_sinks(Extension(srv): Extension<Service>) -> Result<Json<Vec<Sink>>> {
190        let sinks = srv
191            .metadata_manager
192            .catalog_controller
193            .list_sinks()
194            .await
195            .map_err(err)?;
196
197        Ok(Json(sinks))
198    }
199
200    pub async fn list_views(Extension(srv): Extension<Service>) -> Result<Json<Vec<View>>> {
201        let views = srv
202            .metadata_manager
203            .catalog_controller
204            .list_views()
205            .await
206            .map_err(err)?;
207
208        Ok(Json(views))
209    }
210
211    pub async fn list_streaming_jobs(
212        Extension(srv): Extension<Service>,
213    ) -> Result<Json<Vec<StreamingJobInfo>>> {
214        let streaming_jobs = srv
215            .metadata_manager
216            .catalog_controller
217            .list_streaming_job_infos()
218            .await
219            .map_err(err)?;
220
221        Ok(Json(streaming_jobs))
222    }
223
224    /// In the ddl backpressure graph, we want to compute the backpressure between relations.
225    /// So we need to know which are the fragments which are connected to external relations.
226    /// These fragments form the vertices of the graph.
227    /// We can get collection of backpressure values, keyed by vertex_id-vertex_id.
228    /// This function will return a map of fragment vertex id to relation id.
229    /// We can convert `fragment_id-fragment_id` to `relation_id-relation_id` using that.
230    /// Finally, we have a map of `relation_id-relation_id` to backpressure values.
231    pub async fn get_fragment_to_relation_map(
232        Extension(srv): Extension<Service>,
233    ) -> Result<Json<FragmentToRelationMap>> {
234        let table_fragments = srv
235            .metadata_manager
236            .catalog_controller
237            .table_fragments()
238            .await
239            .map_err(err)?;
240        let mut in_map = HashMap::new();
241        let mut out_map = HashMap::new();
242        for (relation_id, tf) in table_fragments {
243            for (fragment_id, fragment) in &tf.fragments {
244                if (fragment.fragment_type_mask & FragmentTypeFlag::StreamScan as u32) != 0 {
245                    in_map.insert(*fragment_id, relation_id as u32);
246                }
247                if (fragment.fragment_type_mask & FragmentTypeFlag::Mview as u32) != 0 {
248                    out_map.insert(*fragment_id, relation_id as u32);
249                }
250            }
251        }
252        let map = FragmentToRelationMap { in_map, out_map };
253        Ok(Json(map))
254    }
255
256    /// Provides a hierarchy of relation ids to fragments to actors.
257    pub async fn get_relation_id_infos(
258        Extension(srv): Extension<Service>,
259    ) -> Result<Json<RelationIdInfos>> {
260        let table_fragments = srv
261            .metadata_manager
262            .catalog_controller
263            .table_fragments()
264            .await
265            .map_err(err)?;
266        let mut map = HashMap::new();
267        for (id, tf) in table_fragments {
268            let mut fragment_id_to_actor_ids = HashMap::new();
269            for (fragment_id, fragment) in &tf.fragments {
270                let actor_ids = fragment.actors.iter().map(|a| a.actor_id).collect_vec();
271                fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
272            }
273            map.insert(
274                id as u32,
275                FragmentIdToActorIdMap {
276                    map: fragment_id_to_actor_ids,
277                },
278            );
279        }
280        let relation_id_infos = RelationIdInfos { map };
281
282        Ok(Json(relation_id_infos))
283    }
284
285    pub async fn list_fragments_by_job_id(
286        Extension(srv): Extension<Service>,
287        Path(job_id): Path<u32>,
288    ) -> Result<Json<PbTableFragments>> {
289        let table_id = TableId::new(job_id);
290        let table_fragments = srv
291            .metadata_manager
292            .get_job_fragments_by_id(&table_id)
293            .await
294            .map_err(err)?;
295        let upstream_fragments = srv
296            .metadata_manager
297            .catalog_controller
298            .upstream_fragments(table_fragments.fragment_ids())
299            .await
300            .map_err(err)?;
301        let dispatchers = srv
302            .metadata_manager
303            .catalog_controller
304            .get_fragment_actor_dispatchers(
305                table_fragments.fragment_ids().map(|id| id as _).collect(),
306            )
307            .await
308            .map_err(err)?;
309        Ok(Json(
310            table_fragments.to_protobuf(&upstream_fragments, &dispatchers),
311        ))
312    }
313
314    pub async fn list_users(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbUserInfo>>> {
315        let users = srv
316            .metadata_manager
317            .catalog_controller
318            .list_users()
319            .await
320            .map_err(err)?;
321
322        Ok(Json(users))
323    }
324
325    pub async fn list_databases(
326        Extension(srv): Extension<Service>,
327    ) -> Result<Json<Vec<PbDatabase>>> {
328        let databases = srv
329            .metadata_manager
330            .catalog_controller
331            .list_databases()
332            .await
333            .map_err(err)?;
334
335        Ok(Json(databases))
336    }
337
338    pub async fn list_schemas(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbSchema>>> {
339        let schemas = srv
340            .metadata_manager
341            .catalog_controller
342            .list_schemas()
343            .await
344            .map_err(err)?;
345
346        Ok(Json(schemas))
347    }
348
349    pub async fn list_object_dependencies(
350        Extension(srv): Extension<Service>,
351    ) -> Result<Json<Vec<PbObjectDependencies>>> {
352        let object_dependencies = srv
353            .metadata_manager
354            .catalog_controller
355            .list_all_object_dependencies()
356            .await
357            .map_err(err)?;
358
359        Ok(Json(object_dependencies))
360    }
361
362    async fn dump_await_tree_inner(
363        worker_nodes: impl IntoIterator<Item = &WorkerNode>,
364        compute_clients: &ComputeClientPool,
365        params: AwaitTreeDumpParams,
366    ) -> Result<Json<StackTraceResponse>> {
367        let mut all = StackTraceResponse::default();
368
369        let req = StackTraceRequest {
370            actor_traces_format: match params.format.as_str() {
371                "text" => ActorTracesFormat::Text as i32,
372                "json" => ActorTracesFormat::Json as i32,
373                _ => {
374                    return Err(err(anyhow!(
375                        "Unsupported format `{}`, only `text` and `json` are supported for now",
376                        params.format
377                    )));
378                }
379            },
380        };
381
382        for worker_node in worker_nodes {
383            let client = compute_clients.get(worker_node).await.map_err(err)?;
384            let result = client.stack_trace(req).await.map_err(err)?;
385
386            all.merge_other(result);
387        }
388
389        Ok(all.into())
390    }
391
392    #[derive(Debug, Deserialize)]
393    pub struct AwaitTreeDumpParams {
394        #[serde(default = "await_tree_default_format")]
395        format: String,
396    }
397
398    fn await_tree_default_format() -> String {
399        // In dashboard, await tree is usually for engineer to debug, so we use human-readable text format by default here.
400        "text".to_owned()
401    }
402
403    pub async fn dump_await_tree_all(
404        Query(params): Query<AwaitTreeDumpParams>,
405        Extension(srv): Extension<Service>,
406    ) -> Result<Json<StackTraceResponse>> {
407        let worker_nodes = srv
408            .metadata_manager
409            .list_worker_node(Some(WorkerType::ComputeNode), None)
410            .await
411            .map_err(err)?;
412
413        dump_await_tree_inner(&worker_nodes, &srv.compute_clients, params).await
414    }
415
416    pub async fn dump_await_tree(
417        Path(worker_id): Path<WorkerId>,
418        Query(params): Query<AwaitTreeDumpParams>,
419        Extension(srv): Extension<Service>,
420    ) -> Result<Json<StackTraceResponse>> {
421        let worker_node = srv
422            .metadata_manager
423            .get_worker_by_id(worker_id)
424            .await
425            .map_err(err)?
426            .context("worker node not found")
427            .map_err(err)?;
428
429        dump_await_tree_inner(std::iter::once(&worker_node), &srv.compute_clients, params).await
430    }
431
432    pub async fn heap_profile(
433        Path(worker_id): Path<WorkerId>,
434        Extension(srv): Extension<Service>,
435    ) -> Result<Json<HeapProfilingResponse>> {
436        let worker_node = srv
437            .metadata_manager
438            .get_worker_by_id(worker_id)
439            .await
440            .map_err(err)?
441            .context("worker node not found")
442            .map_err(err)?;
443
444        let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
445
446        let result = client.heap_profile("".to_owned()).await.map_err(err)?;
447
448        Ok(result.into())
449    }
450
451    pub async fn list_heap_profile(
452        Path(worker_id): Path<WorkerId>,
453        Extension(srv): Extension<Service>,
454    ) -> Result<Json<ListHeapProfilingResponse>> {
455        let worker_node = srv
456            .metadata_manager
457            .get_worker_by_id(worker_id)
458            .await
459            .map_err(err)?
460            .context("worker node not found")
461            .map_err(err)?;
462
463        let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
464
465        let result = client.list_heap_profile().await.map_err(err)?;
466        Ok(result.into())
467    }
468
469    pub async fn analyze_heap(
470        Path((worker_id, file_path)): Path<(WorkerId, String)>,
471        Extension(srv): Extension<Service>,
472    ) -> Result<Response> {
473        let file_path =
474            String::from_utf8(base64_url::decode(&file_path).map_err(err)?).map_err(err)?;
475
476        let file_name = FilePath::new(&file_path)
477            .file_name()
478            .unwrap()
479            .to_string_lossy()
480            .to_string();
481
482        let collapsed_file_name = format!("{}.{}", file_name, COLLAPSED_SUFFIX);
483
484        let worker_node = srv
485            .metadata_manager
486            .get_worker_by_id(worker_id)
487            .await
488            .map_err(err)?
489            .context("worker node not found")
490            .map_err(err)?;
491
492        let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
493
494        let collapsed_bin = client
495            .analyze_heap(file_path.clone())
496            .await
497            .map_err(err)?
498            .result;
499        let collapsed_str = String::from_utf8_lossy(&collapsed_bin).to_string();
500
501        let response = Response::builder()
502            .header("Content-Type", "application/octet-stream")
503            .header("Content-Disposition", collapsed_file_name)
504            .body(collapsed_str.into());
505
506        response.map_err(err)
507    }
508
509    #[derive(Debug, Deserialize)]
510    pub struct DiagnoseParams {
511        #[serde(default = "await_tree_default_format")]
512        actor_traces_format: String,
513    }
514
515    pub async fn diagnose(
516        Query(params): Query<DiagnoseParams>,
517        Extension(srv): Extension<Service>,
518    ) -> Result<String> {
519        let actor_traces_format = match params.actor_traces_format.as_str() {
520            "text" => ActorTracesFormat::Text,
521            "json" => ActorTracesFormat::Json,
522            _ => {
523                return Err(err(anyhow!(
524                    "Unsupported actor_traces_format `{}`, only `text` and `json` are supported for now",
525                    params.actor_traces_format
526                )));
527            }
528        };
529        Ok(srv.diagnose_command.report(actor_traces_format).await)
530    }
531
532    /// NOTE(kwannoel): Although we fetch the BP for the entire graph via this API,
533    /// the workload should be reasonable.
534    /// In most cases, we can safely assume each node has most 2 outgoing edges (e.g. join).
535    /// In such a scenario, the number of edges is linear to the number of nodes.
536    /// So the workload is proportional to the relation id graph we fetch in `get_relation_id_infos`.
537    pub async fn get_streaming_stats(
538        Extension(srv): Extension<Service>,
539    ) -> Result<Json<GetStreamingStatsResponse>> {
540        let worker_nodes = srv
541            .metadata_manager
542            .list_active_streaming_compute_nodes()
543            .await
544            .map_err(err)?;
545
546        let mut futures = Vec::new();
547
548        for worker_node in worker_nodes {
549            let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
550            let client = Arc::new(client);
551            let fut = async move {
552                let result = client.get_streaming_stats().await.map_err(err)?;
553                Ok::<_, DashboardError>(result)
554            };
555            futures.push(fut);
556        }
557        let results = join_all(futures).await;
558
559        let mut all = GetStreamingStatsResponse::default();
560
561        for result in results {
562            let result = result
563                .map_err(|_| anyhow!("Failed to get back pressure"))
564                .map_err(err)?;
565
566            // Aggregate fragment_stats
567            for (fragment_id, fragment_stats) in result.fragment_stats {
568                if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
569                    s.actor_count += fragment_stats.actor_count;
570                    s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
571                } else {
572                    all.fragment_stats.insert(fragment_id, fragment_stats);
573                }
574            }
575
576            // Aggregate relation_stats
577            for (relation_id, relation_stats) in result.relation_stats {
578                if let Some(s) = all.relation_stats.get_mut(&relation_id) {
579                    s.actor_count += relation_stats.actor_count;
580                    s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
581                } else {
582                    all.relation_stats.insert(relation_id, relation_stats);
583                }
584            }
585
586            // Aggregate channel_stats
587            for (key, channel_stats) in result.channel_stats {
588                if let Some(s) = all.channel_stats.get_mut(&key) {
589                    s.actor_count += channel_stats.actor_count;
590                    s.output_blocking_duration += channel_stats.output_blocking_duration;
591                    s.recv_row_count += channel_stats.recv_row_count;
592                    s.send_row_count += channel_stats.send_row_count;
593                } else {
594                    all.channel_stats.insert(key, channel_stats);
595                }
596            }
597        }
598
599        Ok(all.into())
600    }
601
602    pub async fn get_version(Extension(_srv): Extension<Service>) -> Result<Json<String>> {
603        Ok(Json(risingwave_common::current_cluster_version()))
604    }
605}
606
607impl DashboardService {
608    pub async fn serve(self) -> Result<()> {
609        use handlers::*;
610        let srv = Arc::new(self);
611
612        let cors_layer = CorsLayer::new()
613            .allow_origin(cors::Any)
614            .allow_methods(vec![Method::GET]);
615
616        let api_router = Router::new()
617            .route("/version", get(get_version))
618            .route("/clusters/:ty", get(list_clusters))
619            .route("/streaming_jobs", get(list_streaming_jobs))
620            .route("/fragments/job_id/:job_id", get(list_fragments_by_job_id))
621            .route("/relation_id_infos", get(get_relation_id_infos))
622            .route(
623                "/fragment_to_relation_map",
624                get(get_fragment_to_relation_map),
625            )
626            .route("/views", get(list_views))
627            .route("/materialized_views", get(list_materialized_views))
628            .route("/tables", get(list_tables))
629            .route("/indexes", get(list_index_tables))
630            .route("/index_items", get(list_indexes))
631            .route("/subscriptions", get(list_subscription))
632            .route("/internal_tables", get(list_internal_tables))
633            .route("/sources", get(list_sources))
634            .route("/sinks", get(list_sinks))
635            .route("/users", get(list_users))
636            .route("/databases", get(list_databases))
637            .route("/schemas", get(list_schemas))
638            .route("/object_dependencies", get(list_object_dependencies))
639            .route("/metrics/cluster", get(prometheus::list_prometheus_cluster))
640            .route("/metrics/streaming_stats", get(get_streaming_stats))
641            // /monitor/await_tree/{worker_id}/?format={text or json}
642            .route("/monitor/await_tree/:worker_id", get(dump_await_tree))
643            // /monitor/await_tree/?format={text or json}
644            .route("/monitor/await_tree/", get(dump_await_tree_all))
645            .route("/monitor/dump_heap_profile/:worker_id", get(heap_profile))
646            .route(
647                "/monitor/list_heap_profile/:worker_id",
648                get(list_heap_profile),
649            )
650            .route("/monitor/analyze/:worker_id/*path", get(analyze_heap))
651            // /monitor/diagnose/?format={text or json}
652            .route("/monitor/diagnose/", get(diagnose))
653            .layer(
654                ServiceBuilder::new()
655                    .layer(AddExtensionLayer::new(srv.clone()))
656                    .into_inner(),
657            )
658            .layer(cors_layer);
659
660        let trace_ui_router = otlp_embedded::ui_app(srv.trace_state.clone(), "/trace/");
661        let dashboard_router = risingwave_meta_dashboard::router();
662
663        let app = Router::new()
664            .fallback_service(dashboard_router)
665            .nest("/api", api_router)
666            .nest("/trace", trace_ui_router)
667            .layer(CompressionLayer::new());
668
669        let listener = TcpListener::bind(&srv.dashboard_addr)
670            .await
671            .context("failed to bind dashboard address")?;
672        axum::serve(listener, app)
673            .await
674            .context("failed to serve dashboard service")?;
675
676        Ok(())
677    }
678}