risingwave_meta/dashboard/
mod.rs1mod prometheus;
16
17use std::net::SocketAddr;
18use std::path::Path as FilePath;
19use std::sync::Arc;
20
21use anyhow::{Context as _, Result, anyhow};
22use axum::Router;
23use axum::extract::{Extension, Path};
24use axum::http::{Method, StatusCode};
25use axum::response::{IntoResponse, Response};
26use axum::routing::get;
27use risingwave_rpc_client::ComputeClientPool;
28use tokio::net::TcpListener;
29use tower::ServiceBuilder;
30use tower_http::add_extension::AddExtensionLayer;
31use tower_http::compression::CompressionLayer;
32use tower_http::cors::{self, CorsLayer};
33
34use crate::manager::MetadataManager;
35use crate::manager::diagnose::DiagnoseCommandRef;
36
37#[derive(Clone)]
38pub struct DashboardService {
39 pub await_tree_reg: await_tree::Registry,
40 pub dashboard_addr: SocketAddr,
41 pub prometheus_client: Option<prometheus_http_query::Client>,
42 pub prometheus_selector: String,
43 pub metadata_manager: MetadataManager,
44 pub compute_clients: ComputeClientPool,
45 pub diagnose_command: DiagnoseCommandRef,
46 pub trace_state: otlp_embedded::StateRef,
47}
48
49pub type Service = Arc<DashboardService>;
50
51pub(super) mod handlers {
52 use std::cmp::min;
53 use std::collections::HashMap;
54
55 use anyhow::Context;
56 use axum::Json;
57 use axum::extract::Query;
58 use futures::future::join_all;
59 use itertools::Itertools;
60 use risingwave_common::catalog::TableId;
61 use risingwave_common_heap_profiling::COLLAPSED_SUFFIX;
62 use risingwave_meta_model::WorkerId;
63 use risingwave_pb::catalog::table::TableType;
64 use risingwave_pb::catalog::{
65 Index, PbDatabase, PbSchema, Sink, Source, Subscription, Table, View,
66 };
67 use risingwave_pb::common::{WorkerNode, WorkerType};
68 use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
69 use risingwave_pb::meta::{
70 ActorIds, FragmentIdToActorIdMap, FragmentToRelationMap, PbTableFragments, RelationIdInfos,
71 };
72 use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
73 use risingwave_pb::monitor_service::{
74 GetStreamingStatsResponse, HeapProfilingResponse, ListHeapProfilingResponse,
75 StackTraceResponse,
76 };
77 use risingwave_pb::stream_plan::FragmentTypeFlag;
78 use risingwave_pb::user::PbUserInfo;
79 use serde::Deserialize;
80 use serde_json::json;
81 use thiserror_ext::AsReport;
82
83 use super::*;
84 use crate::controller::fragment::StreamingJobInfo;
85 use crate::rpc::await_tree::{dump_cluster_await_tree, dump_worker_node_await_tree};
86
87 pub struct DashboardError(anyhow::Error);
88 pub type Result<T> = std::result::Result<T, DashboardError>;
89
90 pub fn err(err: impl Into<anyhow::Error>) -> DashboardError {
91 DashboardError(err.into())
92 }
93
94 impl From<anyhow::Error> for DashboardError {
95 fn from(value: anyhow::Error) -> Self {
96 DashboardError(value)
97 }
98 }
99
100 impl IntoResponse for DashboardError {
101 fn into_response(self) -> axum::response::Response {
102 let mut resp = Json(json!({
103 "error": self.0.to_report_string(),
104 }))
105 .into_response();
106 *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
107 resp
108 }
109 }
110
111 pub async fn list_clusters(
112 Path(ty): Path<i32>,
113 Extension(srv): Extension<Service>,
114 ) -> Result<Json<Vec<WorkerNode>>> {
115 let worker_type = WorkerType::try_from(ty)
116 .map_err(|_| anyhow!("invalid worker type"))
117 .map_err(err)?;
118 let mut result = srv
119 .metadata_manager
120 .list_worker_node(Some(worker_type), None)
121 .await
122 .map_err(err)?;
123 result.sort_unstable_by_key(|n| n.id);
124 Ok(result.into())
125 }
126
127 async fn list_table_catalogs_inner(
128 metadata_manager: &MetadataManager,
129 table_type: TableType,
130 ) -> Result<Json<Vec<Table>>> {
131 let tables = metadata_manager
132 .catalog_controller
133 .list_tables_by_type(table_type.into())
134 .await
135 .map_err(err)?;
136
137 Ok(Json(tables))
138 }
139
140 pub async fn list_materialized_views(
141 Extension(srv): Extension<Service>,
142 ) -> Result<Json<Vec<Table>>> {
143 list_table_catalogs_inner(&srv.metadata_manager, TableType::MaterializedView).await
144 }
145
146 pub async fn list_tables(Extension(srv): Extension<Service>) -> Result<Json<Vec<Table>>> {
147 list_table_catalogs_inner(&srv.metadata_manager, TableType::Table).await
148 }
149
150 pub async fn list_index_tables(Extension(srv): Extension<Service>) -> Result<Json<Vec<Table>>> {
151 list_table_catalogs_inner(&srv.metadata_manager, TableType::Index).await
152 }
153
154 pub async fn list_indexes(Extension(srv): Extension<Service>) -> Result<Json<Vec<Index>>> {
155 let indexes = srv
156 .metadata_manager
157 .catalog_controller
158 .list_indexes()
159 .await
160 .map_err(err)?;
161
162 Ok(Json(indexes))
163 }
164
165 pub async fn list_subscription(
166 Extension(srv): Extension<Service>,
167 ) -> Result<Json<Vec<Subscription>>> {
168 let subscriptions = srv
169 .metadata_manager
170 .catalog_controller
171 .list_subscriptions()
172 .await
173 .map_err(err)?;
174
175 Ok(Json(subscriptions))
176 }
177
178 pub async fn list_internal_tables(
179 Extension(srv): Extension<Service>,
180 ) -> Result<Json<Vec<Table>>> {
181 list_table_catalogs_inner(&srv.metadata_manager, TableType::Internal).await
182 }
183
184 pub async fn list_sources(Extension(srv): Extension<Service>) -> Result<Json<Vec<Source>>> {
185 let sources = srv.metadata_manager.list_sources().await.map_err(err)?;
186
187 Ok(Json(sources))
188 }
189
190 pub async fn list_sinks(Extension(srv): Extension<Service>) -> Result<Json<Vec<Sink>>> {
191 let sinks = srv
192 .metadata_manager
193 .catalog_controller
194 .list_sinks()
195 .await
196 .map_err(err)?;
197
198 Ok(Json(sinks))
199 }
200
201 pub async fn list_views(Extension(srv): Extension<Service>) -> Result<Json<Vec<View>>> {
202 let views = srv
203 .metadata_manager
204 .catalog_controller
205 .list_views()
206 .await
207 .map_err(err)?;
208
209 Ok(Json(views))
210 }
211
212 pub async fn list_streaming_jobs(
213 Extension(srv): Extension<Service>,
214 ) -> Result<Json<Vec<StreamingJobInfo>>> {
215 let streaming_jobs = srv
216 .metadata_manager
217 .catalog_controller
218 .list_streaming_job_infos()
219 .await
220 .map_err(err)?;
221
222 Ok(Json(streaming_jobs))
223 }
224
225 pub async fn get_fragment_to_relation_map(
233 Extension(srv): Extension<Service>,
234 ) -> Result<Json<FragmentToRelationMap>> {
235 let table_fragments = srv
236 .metadata_manager
237 .catalog_controller
238 .table_fragments()
239 .await
240 .map_err(err)?;
241 let mut in_map = HashMap::new();
242 let mut out_map = HashMap::new();
243 for (relation_id, tf) in table_fragments {
244 for (fragment_id, fragment) in &tf.fragments {
245 if (fragment.fragment_type_mask & FragmentTypeFlag::StreamScan as u32) != 0 {
246 in_map.insert(*fragment_id, relation_id as u32);
247 }
248 if (fragment.fragment_type_mask & FragmentTypeFlag::Mview as u32) != 0 {
249 out_map.insert(*fragment_id, relation_id as u32);
250 }
251 }
252 }
253 let map = FragmentToRelationMap { in_map, out_map };
254 Ok(Json(map))
255 }
256
257 pub async fn get_relation_id_infos(
259 Extension(srv): Extension<Service>,
260 ) -> Result<Json<RelationIdInfos>> {
261 let table_fragments = srv
262 .metadata_manager
263 .catalog_controller
264 .table_fragments()
265 .await
266 .map_err(err)?;
267 let mut map = HashMap::new();
268 for (id, tf) in table_fragments {
269 let mut fragment_id_to_actor_ids = HashMap::new();
270 for (fragment_id, fragment) in &tf.fragments {
271 let actor_ids = fragment.actors.iter().map(|a| a.actor_id).collect_vec();
272 fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
273 }
274 map.insert(
275 id as u32,
276 FragmentIdToActorIdMap {
277 map: fragment_id_to_actor_ids,
278 },
279 );
280 }
281 let relation_id_infos = RelationIdInfos { map };
282
283 Ok(Json(relation_id_infos))
284 }
285
286 pub async fn list_fragments_by_job_id(
287 Extension(srv): Extension<Service>,
288 Path(job_id): Path<u32>,
289 ) -> Result<Json<PbTableFragments>> {
290 let table_id = TableId::new(job_id);
291 let table_fragments = srv
292 .metadata_manager
293 .get_job_fragments_by_id(&table_id)
294 .await
295 .map_err(err)?;
296 let upstream_fragments = srv
297 .metadata_manager
298 .catalog_controller
299 .upstream_fragments(table_fragments.fragment_ids())
300 .await
301 .map_err(err)?;
302 let dispatchers = srv
303 .metadata_manager
304 .catalog_controller
305 .get_fragment_actor_dispatchers(
306 table_fragments.fragment_ids().map(|id| id as _).collect(),
307 )
308 .await
309 .map_err(err)?;
310 Ok(Json(
311 table_fragments.to_protobuf(&upstream_fragments, &dispatchers),
312 ))
313 }
314
315 pub async fn list_users(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbUserInfo>>> {
316 let users = srv
317 .metadata_manager
318 .catalog_controller
319 .list_users()
320 .await
321 .map_err(err)?;
322
323 Ok(Json(users))
324 }
325
326 pub async fn list_databases(
327 Extension(srv): Extension<Service>,
328 ) -> Result<Json<Vec<PbDatabase>>> {
329 let databases = srv
330 .metadata_manager
331 .catalog_controller
332 .list_databases()
333 .await
334 .map_err(err)?;
335
336 Ok(Json(databases))
337 }
338
339 pub async fn list_schemas(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbSchema>>> {
340 let schemas = srv
341 .metadata_manager
342 .catalog_controller
343 .list_schemas()
344 .await
345 .map_err(err)?;
346
347 Ok(Json(schemas))
348 }
349
350 pub async fn list_object_dependencies(
351 Extension(srv): Extension<Service>,
352 ) -> Result<Json<Vec<PbObjectDependencies>>> {
353 let object_dependencies = srv
354 .metadata_manager
355 .catalog_controller
356 .list_all_object_dependencies()
357 .await
358 .map_err(err)?;
359
360 Ok(Json(object_dependencies))
361 }
362
363 #[derive(Debug, Deserialize)]
364 pub struct AwaitTreeDumpParams {
365 #[serde(default = "await_tree_default_format")]
366 format: String,
367 }
368
369 impl AwaitTreeDumpParams {
370 pub fn actor_traces_format(&self) -> Result<ActorTracesFormat> {
372 Ok(match self.format.as_str() {
373 "text" => ActorTracesFormat::Text,
374 "json" => ActorTracesFormat::Json,
375 _ => {
376 return Err(err(anyhow!(
377 "Unsupported format `{}`, only `text` and `json` are supported for now",
378 self.format
379 )));
380 }
381 })
382 }
383 }
384
385 fn await_tree_default_format() -> String {
386 "text".to_owned()
388 }
389
390 pub async fn dump_await_tree_all(
391 Query(params): Query<AwaitTreeDumpParams>,
392 Extension(srv): Extension<Service>,
393 ) -> Result<Json<StackTraceResponse>> {
394 let actor_traces_format = params.actor_traces_format()?;
395
396 let res = dump_cluster_await_tree(
397 &srv.metadata_manager,
398 &srv.await_tree_reg,
399 actor_traces_format,
400 )
401 .await
402 .map_err(err)?;
403
404 Ok(res.into())
405 }
406
407 pub async fn dump_await_tree(
408 Path(worker_id): Path<WorkerId>,
409 Query(params): Query<AwaitTreeDumpParams>,
410 Extension(srv): Extension<Service>,
411 ) -> Result<Json<StackTraceResponse>> {
412 let actor_traces_format = params.actor_traces_format()?;
413
414 let worker_node = srv
415 .metadata_manager
416 .get_worker_by_id(worker_id)
417 .await
418 .map_err(err)?
419 .context("worker node not found")
420 .map_err(err)?;
421
422 let res = dump_worker_node_await_tree(std::iter::once(&worker_node), actor_traces_format)
423 .await
424 .map_err(err)?;
425
426 Ok(res.into())
427 }
428
429 pub async fn heap_profile(
430 Path(worker_id): Path<WorkerId>,
431 Extension(srv): Extension<Service>,
432 ) -> Result<Json<HeapProfilingResponse>> {
433 let worker_node = srv
434 .metadata_manager
435 .get_worker_by_id(worker_id)
436 .await
437 .map_err(err)?
438 .context("worker node not found")
439 .map_err(err)?;
440
441 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
442
443 let result = client.heap_profile("".to_owned()).await.map_err(err)?;
444
445 Ok(result.into())
446 }
447
448 pub async fn list_heap_profile(
449 Path(worker_id): Path<WorkerId>,
450 Extension(srv): Extension<Service>,
451 ) -> Result<Json<ListHeapProfilingResponse>> {
452 let worker_node = srv
453 .metadata_manager
454 .get_worker_by_id(worker_id)
455 .await
456 .map_err(err)?
457 .context("worker node not found")
458 .map_err(err)?;
459
460 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
461
462 let result = client.list_heap_profile().await.map_err(err)?;
463 Ok(result.into())
464 }
465
466 pub async fn analyze_heap(
467 Path((worker_id, file_path)): Path<(WorkerId, String)>,
468 Extension(srv): Extension<Service>,
469 ) -> Result<Response> {
470 let file_path =
471 String::from_utf8(base64_url::decode(&file_path).map_err(err)?).map_err(err)?;
472
473 let file_name = FilePath::new(&file_path)
474 .file_name()
475 .unwrap()
476 .to_string_lossy()
477 .to_string();
478
479 let collapsed_file_name = format!("{}.{}", file_name, COLLAPSED_SUFFIX);
480
481 let worker_node = srv
482 .metadata_manager
483 .get_worker_by_id(worker_id)
484 .await
485 .map_err(err)?
486 .context("worker node not found")
487 .map_err(err)?;
488
489 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
490
491 let collapsed_bin = client
492 .analyze_heap(file_path.clone())
493 .await
494 .map_err(err)?
495 .result;
496 let collapsed_str = String::from_utf8_lossy(&collapsed_bin).to_string();
497
498 let response = Response::builder()
499 .header("Content-Type", "application/octet-stream")
500 .header("Content-Disposition", collapsed_file_name)
501 .body(collapsed_str.into());
502
503 response.map_err(err)
504 }
505
506 #[derive(Debug, Deserialize)]
507 pub struct DiagnoseParams {
508 #[serde(default = "await_tree_default_format")]
509 actor_traces_format: String,
510 }
511
512 pub async fn diagnose(
513 Query(params): Query<DiagnoseParams>,
514 Extension(srv): Extension<Service>,
515 ) -> Result<String> {
516 let actor_traces_format = match params.actor_traces_format.as_str() {
517 "text" => ActorTracesFormat::Text,
518 "json" => ActorTracesFormat::Json,
519 _ => {
520 return Err(err(anyhow!(
521 "Unsupported actor_traces_format `{}`, only `text` and `json` are supported for now",
522 params.actor_traces_format
523 )));
524 }
525 };
526 Ok(srv.diagnose_command.report(actor_traces_format).await)
527 }
528
529 pub async fn get_streaming_stats(
535 Extension(srv): Extension<Service>,
536 ) -> Result<Json<GetStreamingStatsResponse>> {
537 let worker_nodes = srv
538 .metadata_manager
539 .list_active_streaming_compute_nodes()
540 .await
541 .map_err(err)?;
542
543 let mut futures = Vec::new();
544
545 for worker_node in worker_nodes {
546 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
547 let client = Arc::new(client);
548 let fut = async move {
549 let result = client.get_streaming_stats().await.map_err(err)?;
550 Ok::<_, DashboardError>(result)
551 };
552 futures.push(fut);
553 }
554 let results = join_all(futures).await;
555
556 let mut all = GetStreamingStatsResponse::default();
557
558 for result in results {
559 let result = result
560 .map_err(|_| anyhow!("Failed to get back pressure"))
561 .map_err(err)?;
562
563 for (fragment_id, fragment_stats) in result.fragment_stats {
565 if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
566 s.actor_count += fragment_stats.actor_count;
567 s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
568 } else {
569 all.fragment_stats.insert(fragment_id, fragment_stats);
570 }
571 }
572
573 for (relation_id, relation_stats) in result.relation_stats {
575 if let Some(s) = all.relation_stats.get_mut(&relation_id) {
576 s.actor_count += relation_stats.actor_count;
577 s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
578 } else {
579 all.relation_stats.insert(relation_id, relation_stats);
580 }
581 }
582
583 for (key, channel_stats) in result.channel_stats {
585 if let Some(s) = all.channel_stats.get_mut(&key) {
586 s.actor_count += channel_stats.actor_count;
587 s.output_blocking_duration += channel_stats.output_blocking_duration;
588 s.recv_row_count += channel_stats.recv_row_count;
589 s.send_row_count += channel_stats.send_row_count;
590 } else {
591 all.channel_stats.insert(key, channel_stats);
592 }
593 }
594 }
595
596 Ok(all.into())
597 }
598
599 pub async fn get_version(Extension(_srv): Extension<Service>) -> Result<Json<String>> {
600 Ok(Json(risingwave_common::current_cluster_version()))
601 }
602}
603
604impl DashboardService {
605 pub async fn serve(self) -> Result<()> {
606 use handlers::*;
607 let srv = Arc::new(self);
608
609 let cors_layer = CorsLayer::new()
610 .allow_origin(cors::Any)
611 .allow_methods(vec![Method::GET]);
612
613 let api_router = Router::new()
614 .route("/version", get(get_version))
615 .route("/clusters/:ty", get(list_clusters))
616 .route("/streaming_jobs", get(list_streaming_jobs))
617 .route("/fragments/job_id/:job_id", get(list_fragments_by_job_id))
618 .route("/relation_id_infos", get(get_relation_id_infos))
619 .route(
620 "/fragment_to_relation_map",
621 get(get_fragment_to_relation_map),
622 )
623 .route("/views", get(list_views))
624 .route("/materialized_views", get(list_materialized_views))
625 .route("/tables", get(list_tables))
626 .route("/indexes", get(list_index_tables))
627 .route("/index_items", get(list_indexes))
628 .route("/subscriptions", get(list_subscription))
629 .route("/internal_tables", get(list_internal_tables))
630 .route("/sources", get(list_sources))
631 .route("/sinks", get(list_sinks))
632 .route("/users", get(list_users))
633 .route("/databases", get(list_databases))
634 .route("/schemas", get(list_schemas))
635 .route("/object_dependencies", get(list_object_dependencies))
636 .route("/metrics/cluster", get(prometheus::list_prometheus_cluster))
637 .route("/metrics/streaming_stats", get(get_streaming_stats))
638 .route("/monitor/await_tree/:worker_id", get(dump_await_tree))
640 .route("/monitor/await_tree/", get(dump_await_tree_all))
642 .route("/monitor/dump_heap_profile/:worker_id", get(heap_profile))
643 .route(
644 "/monitor/list_heap_profile/:worker_id",
645 get(list_heap_profile),
646 )
647 .route("/monitor/analyze/:worker_id/*path", get(analyze_heap))
648 .route("/monitor/diagnose/", get(diagnose))
650 .layer(
651 ServiceBuilder::new()
652 .layer(AddExtensionLayer::new(srv.clone()))
653 .into_inner(),
654 )
655 .layer(cors_layer);
656
657 let trace_ui_router = otlp_embedded::ui_app(srv.trace_state.clone(), "/trace/");
658 let dashboard_router = risingwave_meta_dashboard::router();
659
660 let app = Router::new()
661 .fallback_service(dashboard_router)
662 .nest("/api", api_router)
663 .nest("/trace", trace_ui_router)
664 .layer(CompressionLayer::new());
665
666 let listener = TcpListener::bind(&srv.dashboard_addr)
667 .await
668 .context("failed to bind dashboard address")?;
669 axum::serve(listener, app)
670 .await
671 .context("failed to serve dashboard service")?;
672
673 Ok(())
674 }
675}