risingwave_meta/dashboard/
mod.rs1mod prometheus;
16
17use std::net::SocketAddr;
18use std::path::Path as FilePath;
19use std::sync::Arc;
20
21use anyhow::{Context as _, Result, anyhow};
22use axum::Router;
23use axum::extract::{Extension, Path};
24use axum::http::{Method, StatusCode};
25use axum::response::{IntoResponse, Response};
26use axum::routing::get;
27use risingwave_common::util::StackTraceResponseExt;
28use risingwave_rpc_client::ComputeClientPool;
29use tokio::net::TcpListener;
30use tower::ServiceBuilder;
31use tower_http::add_extension::AddExtensionLayer;
32use tower_http::compression::CompressionLayer;
33use tower_http::cors::{self, CorsLayer};
34
35use crate::manager::MetadataManager;
36use crate::manager::diagnose::DiagnoseCommandRef;
37
38#[derive(Clone)]
39pub struct DashboardService {
40 pub dashboard_addr: SocketAddr,
41 pub prometheus_client: Option<prometheus_http_query::Client>,
42 pub prometheus_selector: String,
43 pub metadata_manager: MetadataManager,
44 pub compute_clients: ComputeClientPool,
45 pub diagnose_command: DiagnoseCommandRef,
46 pub trace_state: otlp_embedded::StateRef,
47}
48
49pub type Service = Arc<DashboardService>;
50
51pub(super) mod handlers {
52 use std::cmp::min;
53 use std::collections::HashMap;
54
55 use anyhow::Context;
56 use axum::Json;
57 use axum::extract::Query;
58 use futures::future::join_all;
59 use itertools::Itertools;
60 use risingwave_common::catalog::TableId;
61 use risingwave_common_heap_profiling::COLLAPSED_SUFFIX;
62 use risingwave_meta_model::WorkerId;
63 use risingwave_pb::catalog::table::TableType;
64 use risingwave_pb::catalog::{PbDatabase, PbSchema, Sink, Source, Subscription, Table, View};
65 use risingwave_pb::common::{WorkerNode, WorkerType};
66 use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
67 use risingwave_pb::meta::{
68 ActorIds, FragmentIdToActorIdMap, FragmentToRelationMap, PbTableFragments, RelationIdInfos,
69 };
70 use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
71 use risingwave_pb::monitor_service::{
72 GetStreamingStatsResponse, HeapProfilingResponse, ListHeapProfilingResponse,
73 StackTraceRequest, StackTraceResponse,
74 };
75 use risingwave_pb::stream_plan::FragmentTypeFlag;
76 use risingwave_pb::user::PbUserInfo;
77 use serde::Deserialize;
78 use serde_json::json;
79 use thiserror_ext::AsReport;
80
81 use super::*;
82 use crate::controller::fragment::StreamingJobInfo;
83
84 pub struct DashboardError(anyhow::Error);
85 pub type Result<T> = std::result::Result<T, DashboardError>;
86
87 pub fn err(err: impl Into<anyhow::Error>) -> DashboardError {
88 DashboardError(err.into())
89 }
90
91 impl From<anyhow::Error> for DashboardError {
92 fn from(value: anyhow::Error) -> Self {
93 DashboardError(value)
94 }
95 }
96
97 impl IntoResponse for DashboardError {
98 fn into_response(self) -> axum::response::Response {
99 let mut resp = Json(json!({
100 "error": self.0.to_report_string(),
101 }))
102 .into_response();
103 *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
104 resp
105 }
106 }
107
108 pub async fn list_clusters(
109 Path(ty): Path<i32>,
110 Extension(srv): Extension<Service>,
111 ) -> Result<Json<Vec<WorkerNode>>> {
112 let worker_type = WorkerType::try_from(ty)
113 .map_err(|_| anyhow!("invalid worker type"))
114 .map_err(err)?;
115 let mut result = srv
116 .metadata_manager
117 .list_worker_node(Some(worker_type), None)
118 .await
119 .map_err(err)?;
120 result.sort_unstable_by_key(|n| n.id);
121 Ok(result.into())
122 }
123
124 async fn list_table_catalogs_inner(
125 metadata_manager: &MetadataManager,
126 table_type: TableType,
127 ) -> Result<Json<Vec<Table>>> {
128 let tables = metadata_manager
129 .catalog_controller
130 .list_tables_by_type(table_type.into())
131 .await
132 .map_err(err)?;
133
134 Ok(Json(tables))
135 }
136
137 pub async fn list_materialized_views(
138 Extension(srv): Extension<Service>,
139 ) -> Result<Json<Vec<Table>>> {
140 list_table_catalogs_inner(&srv.metadata_manager, TableType::MaterializedView).await
141 }
142
143 pub async fn list_tables(Extension(srv): Extension<Service>) -> Result<Json<Vec<Table>>> {
144 list_table_catalogs_inner(&srv.metadata_manager, TableType::Table).await
145 }
146
147 pub async fn list_indexes(Extension(srv): Extension<Service>) -> Result<Json<Vec<Table>>> {
148 list_table_catalogs_inner(&srv.metadata_manager, TableType::Index).await
149 }
150
151 pub async fn list_subscription(
152 Extension(srv): Extension<Service>,
153 ) -> Result<Json<Vec<Subscription>>> {
154 let subscriptions = srv
155 .metadata_manager
156 .catalog_controller
157 .list_subscriptions()
158 .await
159 .map_err(err)?;
160
161 Ok(Json(subscriptions))
162 }
163
164 pub async fn list_internal_tables(
165 Extension(srv): Extension<Service>,
166 ) -> Result<Json<Vec<Table>>> {
167 list_table_catalogs_inner(&srv.metadata_manager, TableType::Internal).await
168 }
169
170 pub async fn list_sources(Extension(srv): Extension<Service>) -> Result<Json<Vec<Source>>> {
171 let sources = srv.metadata_manager.list_sources().await.map_err(err)?;
172
173 Ok(Json(sources))
174 }
175
176 pub async fn list_sinks(Extension(srv): Extension<Service>) -> Result<Json<Vec<Sink>>> {
177 let sinks = srv
178 .metadata_manager
179 .catalog_controller
180 .list_sinks()
181 .await
182 .map_err(err)?;
183
184 Ok(Json(sinks))
185 }
186
187 pub async fn list_views(Extension(srv): Extension<Service>) -> Result<Json<Vec<View>>> {
188 let views = srv
189 .metadata_manager
190 .catalog_controller
191 .list_views()
192 .await
193 .map_err(err)?;
194
195 Ok(Json(views))
196 }
197
198 pub async fn list_streaming_jobs(
199 Extension(srv): Extension<Service>,
200 ) -> Result<Json<Vec<StreamingJobInfo>>> {
201 let streaming_jobs = srv
202 .metadata_manager
203 .catalog_controller
204 .list_streaming_job_infos()
205 .await
206 .map_err(err)?;
207
208 Ok(Json(streaming_jobs))
209 }
210
211 pub async fn get_fragment_to_relation_map(
219 Extension(srv): Extension<Service>,
220 ) -> Result<Json<FragmentToRelationMap>> {
221 let table_fragments = srv
222 .metadata_manager
223 .catalog_controller
224 .table_fragments()
225 .await
226 .map_err(err)?;
227 let mut in_map = HashMap::new();
228 let mut out_map = HashMap::new();
229 for (relation_id, tf) in table_fragments {
230 for (fragment_id, fragment) in &tf.fragments {
231 if (fragment.fragment_type_mask & FragmentTypeFlag::StreamScan as u32) != 0 {
232 in_map.insert(*fragment_id, relation_id as u32);
233 }
234 if (fragment.fragment_type_mask & FragmentTypeFlag::Mview as u32) != 0 {
235 out_map.insert(*fragment_id, relation_id as u32);
236 }
237 }
238 }
239 let map = FragmentToRelationMap { in_map, out_map };
240 Ok(Json(map))
241 }
242
243 pub async fn get_relation_id_infos(
245 Extension(srv): Extension<Service>,
246 ) -> Result<Json<RelationIdInfos>> {
247 let table_fragments = srv
248 .metadata_manager
249 .catalog_controller
250 .table_fragments()
251 .await
252 .map_err(err)?;
253 let mut map = HashMap::new();
254 for (id, tf) in table_fragments {
255 let mut fragment_id_to_actor_ids = HashMap::new();
256 for (fragment_id, fragment) in &tf.fragments {
257 let actor_ids = fragment.actors.iter().map(|a| a.actor_id).collect_vec();
258 fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
259 }
260 map.insert(
261 id as u32,
262 FragmentIdToActorIdMap {
263 map: fragment_id_to_actor_ids,
264 },
265 );
266 }
267 let relation_id_infos = RelationIdInfos { map };
268
269 Ok(Json(relation_id_infos))
270 }
271
272 pub async fn list_fragments_by_job_id(
273 Extension(srv): Extension<Service>,
274 Path(job_id): Path<u32>,
275 ) -> Result<Json<PbTableFragments>> {
276 let table_id = TableId::new(job_id);
277 let table_fragments = srv
278 .metadata_manager
279 .get_job_fragments_by_id(&table_id)
280 .await
281 .map_err(err)?;
282 let upstream_fragments = srv
283 .metadata_manager
284 .catalog_controller
285 .upstream_fragments(table_fragments.fragment_ids())
286 .await
287 .map_err(err)?;
288 let dispatchers = srv
289 .metadata_manager
290 .catalog_controller
291 .get_fragment_actor_dispatchers(
292 table_fragments.fragment_ids().map(|id| id as _).collect(),
293 )
294 .await
295 .map_err(err)?;
296 Ok(Json(
297 table_fragments.to_protobuf(&upstream_fragments, &dispatchers),
298 ))
299 }
300
301 pub async fn list_users(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbUserInfo>>> {
302 let users = srv
303 .metadata_manager
304 .catalog_controller
305 .list_users()
306 .await
307 .map_err(err)?;
308
309 Ok(Json(users))
310 }
311
312 pub async fn list_databases(
313 Extension(srv): Extension<Service>,
314 ) -> Result<Json<Vec<PbDatabase>>> {
315 let databases = srv
316 .metadata_manager
317 .catalog_controller
318 .list_databases()
319 .await
320 .map_err(err)?;
321
322 Ok(Json(databases))
323 }
324
325 pub async fn list_schemas(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbSchema>>> {
326 let schemas = srv
327 .metadata_manager
328 .catalog_controller
329 .list_schemas()
330 .await
331 .map_err(err)?;
332
333 Ok(Json(schemas))
334 }
335
336 pub async fn list_object_dependencies(
337 Extension(srv): Extension<Service>,
338 ) -> Result<Json<Vec<PbObjectDependencies>>> {
339 let object_dependencies = srv
340 .metadata_manager
341 .catalog_controller
342 .list_all_object_dependencies()
343 .await
344 .map_err(err)?;
345
346 Ok(Json(object_dependencies))
347 }
348
349 async fn dump_await_tree_inner(
350 worker_nodes: impl IntoIterator<Item = &WorkerNode>,
351 compute_clients: &ComputeClientPool,
352 params: AwaitTreeDumpParams,
353 ) -> Result<Json<StackTraceResponse>> {
354 let mut all = StackTraceResponse::default();
355
356 let req = StackTraceRequest {
357 actor_traces_format: match params.format.as_str() {
358 "text" => ActorTracesFormat::Text as i32,
359 "json" => ActorTracesFormat::Json as i32,
360 _ => {
361 return Err(err(anyhow!(
362 "Unsupported format `{}`, only `text` and `json` are supported for now",
363 params.format
364 )));
365 }
366 },
367 };
368
369 for worker_node in worker_nodes {
370 let client = compute_clients.get(worker_node).await.map_err(err)?;
371 let result = client.stack_trace(req).await.map_err(err)?;
372
373 all.merge_other(result);
374 }
375
376 Ok(all.into())
377 }
378
379 #[derive(Debug, Deserialize)]
380 pub struct AwaitTreeDumpParams {
381 #[serde(default = "await_tree_default_format")]
382 format: String,
383 }
384
385 fn await_tree_default_format() -> String {
386 "text".to_owned()
388 }
389
390 pub async fn dump_await_tree_all(
391 Query(params): Query<AwaitTreeDumpParams>,
392 Extension(srv): Extension<Service>,
393 ) -> Result<Json<StackTraceResponse>> {
394 let worker_nodes = srv
395 .metadata_manager
396 .list_worker_node(Some(WorkerType::ComputeNode), None)
397 .await
398 .map_err(err)?;
399
400 dump_await_tree_inner(&worker_nodes, &srv.compute_clients, params).await
401 }
402
403 pub async fn dump_await_tree(
404 Path(worker_id): Path<WorkerId>,
405 Query(params): Query<AwaitTreeDumpParams>,
406 Extension(srv): Extension<Service>,
407 ) -> Result<Json<StackTraceResponse>> {
408 let worker_node = srv
409 .metadata_manager
410 .get_worker_by_id(worker_id)
411 .await
412 .map_err(err)?
413 .context("worker node not found")
414 .map_err(err)?;
415
416 dump_await_tree_inner(std::iter::once(&worker_node), &srv.compute_clients, params).await
417 }
418
419 pub async fn heap_profile(
420 Path(worker_id): Path<WorkerId>,
421 Extension(srv): Extension<Service>,
422 ) -> Result<Json<HeapProfilingResponse>> {
423 let worker_node = srv
424 .metadata_manager
425 .get_worker_by_id(worker_id)
426 .await
427 .map_err(err)?
428 .context("worker node not found")
429 .map_err(err)?;
430
431 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
432
433 let result = client.heap_profile("".to_owned()).await.map_err(err)?;
434
435 Ok(result.into())
436 }
437
438 pub async fn list_heap_profile(
439 Path(worker_id): Path<WorkerId>,
440 Extension(srv): Extension<Service>,
441 ) -> Result<Json<ListHeapProfilingResponse>> {
442 let worker_node = srv
443 .metadata_manager
444 .get_worker_by_id(worker_id)
445 .await
446 .map_err(err)?
447 .context("worker node not found")
448 .map_err(err)?;
449
450 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
451
452 let result = client.list_heap_profile().await.map_err(err)?;
453 Ok(result.into())
454 }
455
456 pub async fn analyze_heap(
457 Path((worker_id, file_path)): Path<(WorkerId, String)>,
458 Extension(srv): Extension<Service>,
459 ) -> Result<Response> {
460 let file_path =
461 String::from_utf8(base64_url::decode(&file_path).map_err(err)?).map_err(err)?;
462
463 let file_name = FilePath::new(&file_path)
464 .file_name()
465 .unwrap()
466 .to_string_lossy()
467 .to_string();
468
469 let collapsed_file_name = format!("{}.{}", file_name, COLLAPSED_SUFFIX);
470
471 let worker_node = srv
472 .metadata_manager
473 .get_worker_by_id(worker_id)
474 .await
475 .map_err(err)?
476 .context("worker node not found")
477 .map_err(err)?;
478
479 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
480
481 let collapsed_bin = client
482 .analyze_heap(file_path.clone())
483 .await
484 .map_err(err)?
485 .result;
486 let collapsed_str = String::from_utf8_lossy(&collapsed_bin).to_string();
487
488 let response = Response::builder()
489 .header("Content-Type", "application/octet-stream")
490 .header("Content-Disposition", collapsed_file_name)
491 .body(collapsed_str.into());
492
493 response.map_err(err)
494 }
495
496 #[derive(Debug, Deserialize)]
497 pub struct DiagnoseParams {
498 #[serde(default = "await_tree_default_format")]
499 actor_traces_format: String,
500 }
501
502 pub async fn diagnose(
503 Query(params): Query<DiagnoseParams>,
504 Extension(srv): Extension<Service>,
505 ) -> Result<String> {
506 let actor_traces_format = match params.actor_traces_format.as_str() {
507 "text" => ActorTracesFormat::Text,
508 "json" => ActorTracesFormat::Json,
509 _ => {
510 return Err(err(anyhow!(
511 "Unsupported actor_traces_format `{}`, only `text` and `json` are supported for now",
512 params.actor_traces_format
513 )));
514 }
515 };
516 Ok(srv.diagnose_command.report(actor_traces_format).await)
517 }
518
519 pub async fn get_streaming_stats(
525 Extension(srv): Extension<Service>,
526 ) -> Result<Json<GetStreamingStatsResponse>> {
527 let worker_nodes = srv
528 .metadata_manager
529 .list_active_streaming_compute_nodes()
530 .await
531 .map_err(err)?;
532
533 let mut futures = Vec::new();
534
535 for worker_node in worker_nodes {
536 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
537 let client = Arc::new(client);
538 let fut = async move {
539 let result = client.get_streaming_stats().await.map_err(err)?;
540 Ok::<_, DashboardError>(result)
541 };
542 futures.push(fut);
543 }
544 let results = join_all(futures).await;
545
546 let mut all = GetStreamingStatsResponse::default();
547
548 for result in results {
549 let result = result
550 .map_err(|_| anyhow!("Failed to get back pressure"))
551 .map_err(err)?;
552
553 for (fragment_id, fragment_stats) in result.fragment_stats {
555 if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
556 s.actor_count += fragment_stats.actor_count;
557 s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
558 } else {
559 all.fragment_stats.insert(fragment_id, fragment_stats);
560 }
561 }
562
563 for (relation_id, relation_stats) in result.relation_stats {
565 if let Some(s) = all.relation_stats.get_mut(&relation_id) {
566 s.actor_count += relation_stats.actor_count;
567 s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
568 } else {
569 all.relation_stats.insert(relation_id, relation_stats);
570 }
571 }
572
573 for (key, channel_stats) in result.channel_stats {
575 if let Some(s) = all.channel_stats.get_mut(&key) {
576 s.actor_count += channel_stats.actor_count;
577 s.output_blocking_duration += channel_stats.output_blocking_duration;
578 s.recv_row_count += channel_stats.recv_row_count;
579 s.send_row_count += channel_stats.send_row_count;
580 } else {
581 all.channel_stats.insert(key, channel_stats);
582 }
583 }
584 }
585
586 Ok(all.into())
587 }
588
589 pub async fn get_version(Extension(_srv): Extension<Service>) -> Result<Json<String>> {
590 Ok(Json(risingwave_common::current_cluster_version()))
591 }
592}
593
594impl DashboardService {
595 pub async fn serve(self) -> Result<()> {
596 use handlers::*;
597 let srv = Arc::new(self);
598
599 let cors_layer = CorsLayer::new()
600 .allow_origin(cors::Any)
601 .allow_methods(vec![Method::GET]);
602
603 let api_router = Router::new()
604 .route("/version", get(get_version))
605 .route("/clusters/:ty", get(list_clusters))
606 .route("/streaming_jobs", get(list_streaming_jobs))
607 .route("/fragments/job_id/:job_id", get(list_fragments_by_job_id))
608 .route("/relation_id_infos", get(get_relation_id_infos))
609 .route(
610 "/fragment_to_relation_map",
611 get(get_fragment_to_relation_map),
612 )
613 .route("/views", get(list_views))
614 .route("/materialized_views", get(list_materialized_views))
615 .route("/tables", get(list_tables))
616 .route("/indexes", get(list_indexes))
617 .route("/subscriptions", get(list_subscription))
618 .route("/internal_tables", get(list_internal_tables))
619 .route("/sources", get(list_sources))
620 .route("/sinks", get(list_sinks))
621 .route("/users", get(list_users))
622 .route("/databases", get(list_databases))
623 .route("/schemas", get(list_schemas))
624 .route("/object_dependencies", get(list_object_dependencies))
625 .route("/metrics/cluster", get(prometheus::list_prometheus_cluster))
626 .route("/metrics/streaming_stats", get(get_streaming_stats))
627 .route("/monitor/await_tree/:worker_id", get(dump_await_tree))
629 .route("/monitor/await_tree/", get(dump_await_tree_all))
631 .route("/monitor/dump_heap_profile/:worker_id", get(heap_profile))
632 .route(
633 "/monitor/list_heap_profile/:worker_id",
634 get(list_heap_profile),
635 )
636 .route("/monitor/analyze/:worker_id/*path", get(analyze_heap))
637 .route("/monitor/diagnose/", get(diagnose))
639 .layer(
640 ServiceBuilder::new()
641 .layer(AddExtensionLayer::new(srv.clone()))
642 .into_inner(),
643 )
644 .layer(cors_layer);
645
646 let trace_ui_router = otlp_embedded::ui_app(srv.trace_state.clone(), "/trace/");
647 let dashboard_router = risingwave_meta_dashboard::router();
648
649 let app = Router::new()
650 .fallback_service(dashboard_router)
651 .nest("/api", api_router)
652 .nest("/trace", trace_ui_router)
653 .layer(CompressionLayer::new());
654
655 let listener = TcpListener::bind(&srv.dashboard_addr)
656 .await
657 .context("failed to bind dashboard address")?;
658 axum::serve(listener, app)
659 .await
660 .context("failed to serve dashboard service")?;
661
662 Ok(())
663 }
664}