risingwave_meta/dashboard/
mod.rs1mod prometheus;
16
17use std::net::SocketAddr;
18use std::path::Path as FilePath;
19use std::sync::Arc;
20
21use anyhow::{Context as _, Result, anyhow};
22use axum::Router;
23use axum::extract::{Extension, Path};
24use axum::http::{Method, StatusCode};
25use axum::response::{IntoResponse, Response};
26use axum::routing::get;
27use risingwave_common::util::StackTraceResponseExt;
28use risingwave_rpc_client::ComputeClientPool;
29use tokio::net::TcpListener;
30use tower::ServiceBuilder;
31use tower_http::add_extension::AddExtensionLayer;
32use tower_http::compression::CompressionLayer;
33use tower_http::cors::{self, CorsLayer};
34
35use crate::manager::MetadataManager;
36use crate::manager::diagnose::DiagnoseCommandRef;
37
38#[derive(Clone)]
39pub struct DashboardService {
40 pub dashboard_addr: SocketAddr,
41 pub prometheus_client: Option<prometheus_http_query::Client>,
42 pub prometheus_selector: String,
43 pub metadata_manager: MetadataManager,
44 pub compute_clients: ComputeClientPool,
45 pub diagnose_command: DiagnoseCommandRef,
46 pub trace_state: otlp_embedded::StateRef,
47}
48
49pub type Service = Arc<DashboardService>;
50
51pub(super) mod handlers {
52 use std::cmp::min;
53 use std::collections::HashMap;
54
55 use anyhow::Context;
56 use axum::Json;
57 use axum::extract::Query;
58 use futures::future::join_all;
59 use itertools::Itertools;
60 use risingwave_common::catalog::TableId;
61 use risingwave_common_heap_profiling::COLLAPSED_SUFFIX;
62 use risingwave_meta_model::WorkerId;
63 use risingwave_pb::catalog::table::TableType;
64 use risingwave_pb::catalog::{
65 Index, PbDatabase, PbSchema, Sink, Source, Subscription, Table, View,
66 };
67 use risingwave_pb::common::{WorkerNode, WorkerType};
68 use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
69 use risingwave_pb::meta::{
70 ActorIds, FragmentIdToActorIdMap, FragmentToRelationMap, PbTableFragments, RelationIdInfos,
71 };
72 use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
73 use risingwave_pb::monitor_service::{
74 GetStreamingStatsResponse, HeapProfilingResponse, ListHeapProfilingResponse,
75 StackTraceRequest, StackTraceResponse,
76 };
77 use risingwave_pb::stream_plan::FragmentTypeFlag;
78 use risingwave_pb::user::PbUserInfo;
79 use serde::Deserialize;
80 use serde_json::json;
81 use thiserror_ext::AsReport;
82
83 use super::*;
84 use crate::controller::fragment::StreamingJobInfo;
85
86 pub struct DashboardError(anyhow::Error);
87 pub type Result<T> = std::result::Result<T, DashboardError>;
88
89 pub fn err(err: impl Into<anyhow::Error>) -> DashboardError {
90 DashboardError(err.into())
91 }
92
93 impl From<anyhow::Error> for DashboardError {
94 fn from(value: anyhow::Error) -> Self {
95 DashboardError(value)
96 }
97 }
98
99 impl IntoResponse for DashboardError {
100 fn into_response(self) -> axum::response::Response {
101 let mut resp = Json(json!({
102 "error": self.0.to_report_string(),
103 }))
104 .into_response();
105 *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
106 resp
107 }
108 }
109
110 pub async fn list_clusters(
111 Path(ty): Path<i32>,
112 Extension(srv): Extension<Service>,
113 ) -> Result<Json<Vec<WorkerNode>>> {
114 let worker_type = WorkerType::try_from(ty)
115 .map_err(|_| anyhow!("invalid worker type"))
116 .map_err(err)?;
117 let mut result = srv
118 .metadata_manager
119 .list_worker_node(Some(worker_type), None)
120 .await
121 .map_err(err)?;
122 result.sort_unstable_by_key(|n| n.id);
123 Ok(result.into())
124 }
125
126 async fn list_table_catalogs_inner(
127 metadata_manager: &MetadataManager,
128 table_type: TableType,
129 ) -> Result<Json<Vec<Table>>> {
130 let tables = metadata_manager
131 .catalog_controller
132 .list_tables_by_type(table_type.into())
133 .await
134 .map_err(err)?;
135
136 Ok(Json(tables))
137 }
138
139 pub async fn list_materialized_views(
140 Extension(srv): Extension<Service>,
141 ) -> Result<Json<Vec<Table>>> {
142 list_table_catalogs_inner(&srv.metadata_manager, TableType::MaterializedView).await
143 }
144
145 pub async fn list_tables(Extension(srv): Extension<Service>) -> Result<Json<Vec<Table>>> {
146 list_table_catalogs_inner(&srv.metadata_manager, TableType::Table).await
147 }
148
149 pub async fn list_index_tables(Extension(srv): Extension<Service>) -> Result<Json<Vec<Table>>> {
150 list_table_catalogs_inner(&srv.metadata_manager, TableType::Index).await
151 }
152
153 pub async fn list_indexes(Extension(srv): Extension<Service>) -> Result<Json<Vec<Index>>> {
154 let indexes = srv
155 .metadata_manager
156 .catalog_controller
157 .list_indexes()
158 .await
159 .map_err(err)?;
160
161 Ok(Json(indexes))
162 }
163
164 pub async fn list_subscription(
165 Extension(srv): Extension<Service>,
166 ) -> Result<Json<Vec<Subscription>>> {
167 let subscriptions = srv
168 .metadata_manager
169 .catalog_controller
170 .list_subscriptions()
171 .await
172 .map_err(err)?;
173
174 Ok(Json(subscriptions))
175 }
176
177 pub async fn list_internal_tables(
178 Extension(srv): Extension<Service>,
179 ) -> Result<Json<Vec<Table>>> {
180 list_table_catalogs_inner(&srv.metadata_manager, TableType::Internal).await
181 }
182
183 pub async fn list_sources(Extension(srv): Extension<Service>) -> Result<Json<Vec<Source>>> {
184 let sources = srv.metadata_manager.list_sources().await.map_err(err)?;
185
186 Ok(Json(sources))
187 }
188
189 pub async fn list_sinks(Extension(srv): Extension<Service>) -> Result<Json<Vec<Sink>>> {
190 let sinks = srv
191 .metadata_manager
192 .catalog_controller
193 .list_sinks()
194 .await
195 .map_err(err)?;
196
197 Ok(Json(sinks))
198 }
199
200 pub async fn list_views(Extension(srv): Extension<Service>) -> Result<Json<Vec<View>>> {
201 let views = srv
202 .metadata_manager
203 .catalog_controller
204 .list_views()
205 .await
206 .map_err(err)?;
207
208 Ok(Json(views))
209 }
210
211 pub async fn list_streaming_jobs(
212 Extension(srv): Extension<Service>,
213 ) -> Result<Json<Vec<StreamingJobInfo>>> {
214 let streaming_jobs = srv
215 .metadata_manager
216 .catalog_controller
217 .list_streaming_job_infos()
218 .await
219 .map_err(err)?;
220
221 Ok(Json(streaming_jobs))
222 }
223
224 pub async fn get_fragment_to_relation_map(
232 Extension(srv): Extension<Service>,
233 ) -> Result<Json<FragmentToRelationMap>> {
234 let table_fragments = srv
235 .metadata_manager
236 .catalog_controller
237 .table_fragments()
238 .await
239 .map_err(err)?;
240 let mut in_map = HashMap::new();
241 let mut out_map = HashMap::new();
242 for (relation_id, tf) in table_fragments {
243 for (fragment_id, fragment) in &tf.fragments {
244 if (fragment.fragment_type_mask & FragmentTypeFlag::StreamScan as u32) != 0 {
245 in_map.insert(*fragment_id, relation_id as u32);
246 }
247 if (fragment.fragment_type_mask & FragmentTypeFlag::Mview as u32) != 0 {
248 out_map.insert(*fragment_id, relation_id as u32);
249 }
250 }
251 }
252 let map = FragmentToRelationMap { in_map, out_map };
253 Ok(Json(map))
254 }
255
256 pub async fn get_relation_id_infos(
258 Extension(srv): Extension<Service>,
259 ) -> Result<Json<RelationIdInfos>> {
260 let table_fragments = srv
261 .metadata_manager
262 .catalog_controller
263 .table_fragments()
264 .await
265 .map_err(err)?;
266 let mut map = HashMap::new();
267 for (id, tf) in table_fragments {
268 let mut fragment_id_to_actor_ids = HashMap::new();
269 for (fragment_id, fragment) in &tf.fragments {
270 let actor_ids = fragment.actors.iter().map(|a| a.actor_id).collect_vec();
271 fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
272 }
273 map.insert(
274 id as u32,
275 FragmentIdToActorIdMap {
276 map: fragment_id_to_actor_ids,
277 },
278 );
279 }
280 let relation_id_infos = RelationIdInfos { map };
281
282 Ok(Json(relation_id_infos))
283 }
284
285 pub async fn list_fragments_by_job_id(
286 Extension(srv): Extension<Service>,
287 Path(job_id): Path<u32>,
288 ) -> Result<Json<PbTableFragments>> {
289 let table_id = TableId::new(job_id);
290 let table_fragments = srv
291 .metadata_manager
292 .get_job_fragments_by_id(&table_id)
293 .await
294 .map_err(err)?;
295 let upstream_fragments = srv
296 .metadata_manager
297 .catalog_controller
298 .upstream_fragments(table_fragments.fragment_ids())
299 .await
300 .map_err(err)?;
301 let dispatchers = srv
302 .metadata_manager
303 .catalog_controller
304 .get_fragment_actor_dispatchers(
305 table_fragments.fragment_ids().map(|id| id as _).collect(),
306 )
307 .await
308 .map_err(err)?;
309 Ok(Json(
310 table_fragments.to_protobuf(&upstream_fragments, &dispatchers),
311 ))
312 }
313
314 pub async fn list_users(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbUserInfo>>> {
315 let users = srv
316 .metadata_manager
317 .catalog_controller
318 .list_users()
319 .await
320 .map_err(err)?;
321
322 Ok(Json(users))
323 }
324
325 pub async fn list_databases(
326 Extension(srv): Extension<Service>,
327 ) -> Result<Json<Vec<PbDatabase>>> {
328 let databases = srv
329 .metadata_manager
330 .catalog_controller
331 .list_databases()
332 .await
333 .map_err(err)?;
334
335 Ok(Json(databases))
336 }
337
338 pub async fn list_schemas(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbSchema>>> {
339 let schemas = srv
340 .metadata_manager
341 .catalog_controller
342 .list_schemas()
343 .await
344 .map_err(err)?;
345
346 Ok(Json(schemas))
347 }
348
349 pub async fn list_object_dependencies(
350 Extension(srv): Extension<Service>,
351 ) -> Result<Json<Vec<PbObjectDependencies>>> {
352 let object_dependencies = srv
353 .metadata_manager
354 .catalog_controller
355 .list_all_object_dependencies()
356 .await
357 .map_err(err)?;
358
359 Ok(Json(object_dependencies))
360 }
361
362 async fn dump_await_tree_inner(
363 worker_nodes: impl IntoIterator<Item = &WorkerNode>,
364 compute_clients: &ComputeClientPool,
365 params: AwaitTreeDumpParams,
366 ) -> Result<Json<StackTraceResponse>> {
367 let mut all = StackTraceResponse::default();
368
369 let req = StackTraceRequest {
370 actor_traces_format: match params.format.as_str() {
371 "text" => ActorTracesFormat::Text as i32,
372 "json" => ActorTracesFormat::Json as i32,
373 _ => {
374 return Err(err(anyhow!(
375 "Unsupported format `{}`, only `text` and `json` are supported for now",
376 params.format
377 )));
378 }
379 },
380 };
381
382 for worker_node in worker_nodes {
383 let client = compute_clients.get(worker_node).await.map_err(err)?;
384 let result = client.stack_trace(req).await.map_err(err)?;
385
386 all.merge_other(result);
387 }
388
389 Ok(all.into())
390 }
391
392 #[derive(Debug, Deserialize)]
393 pub struct AwaitTreeDumpParams {
394 #[serde(default = "await_tree_default_format")]
395 format: String,
396 }
397
398 fn await_tree_default_format() -> String {
399 "text".to_owned()
401 }
402
403 pub async fn dump_await_tree_all(
404 Query(params): Query<AwaitTreeDumpParams>,
405 Extension(srv): Extension<Service>,
406 ) -> Result<Json<StackTraceResponse>> {
407 let worker_nodes = srv
408 .metadata_manager
409 .list_worker_node(Some(WorkerType::ComputeNode), None)
410 .await
411 .map_err(err)?;
412
413 dump_await_tree_inner(&worker_nodes, &srv.compute_clients, params).await
414 }
415
416 pub async fn dump_await_tree(
417 Path(worker_id): Path<WorkerId>,
418 Query(params): Query<AwaitTreeDumpParams>,
419 Extension(srv): Extension<Service>,
420 ) -> Result<Json<StackTraceResponse>> {
421 let worker_node = srv
422 .metadata_manager
423 .get_worker_by_id(worker_id)
424 .await
425 .map_err(err)?
426 .context("worker node not found")
427 .map_err(err)?;
428
429 dump_await_tree_inner(std::iter::once(&worker_node), &srv.compute_clients, params).await
430 }
431
432 pub async fn heap_profile(
433 Path(worker_id): Path<WorkerId>,
434 Extension(srv): Extension<Service>,
435 ) -> Result<Json<HeapProfilingResponse>> {
436 let worker_node = srv
437 .metadata_manager
438 .get_worker_by_id(worker_id)
439 .await
440 .map_err(err)?
441 .context("worker node not found")
442 .map_err(err)?;
443
444 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
445
446 let result = client.heap_profile("".to_owned()).await.map_err(err)?;
447
448 Ok(result.into())
449 }
450
451 pub async fn list_heap_profile(
452 Path(worker_id): Path<WorkerId>,
453 Extension(srv): Extension<Service>,
454 ) -> Result<Json<ListHeapProfilingResponse>> {
455 let worker_node = srv
456 .metadata_manager
457 .get_worker_by_id(worker_id)
458 .await
459 .map_err(err)?
460 .context("worker node not found")
461 .map_err(err)?;
462
463 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
464
465 let result = client.list_heap_profile().await.map_err(err)?;
466 Ok(result.into())
467 }
468
469 pub async fn analyze_heap(
470 Path((worker_id, file_path)): Path<(WorkerId, String)>,
471 Extension(srv): Extension<Service>,
472 ) -> Result<Response> {
473 let file_path =
474 String::from_utf8(base64_url::decode(&file_path).map_err(err)?).map_err(err)?;
475
476 let file_name = FilePath::new(&file_path)
477 .file_name()
478 .unwrap()
479 .to_string_lossy()
480 .to_string();
481
482 let collapsed_file_name = format!("{}.{}", file_name, COLLAPSED_SUFFIX);
483
484 let worker_node = srv
485 .metadata_manager
486 .get_worker_by_id(worker_id)
487 .await
488 .map_err(err)?
489 .context("worker node not found")
490 .map_err(err)?;
491
492 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
493
494 let collapsed_bin = client
495 .analyze_heap(file_path.clone())
496 .await
497 .map_err(err)?
498 .result;
499 let collapsed_str = String::from_utf8_lossy(&collapsed_bin).to_string();
500
501 let response = Response::builder()
502 .header("Content-Type", "application/octet-stream")
503 .header("Content-Disposition", collapsed_file_name)
504 .body(collapsed_str.into());
505
506 response.map_err(err)
507 }
508
509 #[derive(Debug, Deserialize)]
510 pub struct DiagnoseParams {
511 #[serde(default = "await_tree_default_format")]
512 actor_traces_format: String,
513 }
514
515 pub async fn diagnose(
516 Query(params): Query<DiagnoseParams>,
517 Extension(srv): Extension<Service>,
518 ) -> Result<String> {
519 let actor_traces_format = match params.actor_traces_format.as_str() {
520 "text" => ActorTracesFormat::Text,
521 "json" => ActorTracesFormat::Json,
522 _ => {
523 return Err(err(anyhow!(
524 "Unsupported actor_traces_format `{}`, only `text` and `json` are supported for now",
525 params.actor_traces_format
526 )));
527 }
528 };
529 Ok(srv.diagnose_command.report(actor_traces_format).await)
530 }
531
532 pub async fn get_streaming_stats(
538 Extension(srv): Extension<Service>,
539 ) -> Result<Json<GetStreamingStatsResponse>> {
540 let worker_nodes = srv
541 .metadata_manager
542 .list_active_streaming_compute_nodes()
543 .await
544 .map_err(err)?;
545
546 let mut futures = Vec::new();
547
548 for worker_node in worker_nodes {
549 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
550 let client = Arc::new(client);
551 let fut = async move {
552 let result = client.get_streaming_stats().await.map_err(err)?;
553 Ok::<_, DashboardError>(result)
554 };
555 futures.push(fut);
556 }
557 let results = join_all(futures).await;
558
559 let mut all = GetStreamingStatsResponse::default();
560
561 for result in results {
562 let result = result
563 .map_err(|_| anyhow!("Failed to get back pressure"))
564 .map_err(err)?;
565
566 for (fragment_id, fragment_stats) in result.fragment_stats {
568 if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
569 s.actor_count += fragment_stats.actor_count;
570 s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
571 } else {
572 all.fragment_stats.insert(fragment_id, fragment_stats);
573 }
574 }
575
576 for (relation_id, relation_stats) in result.relation_stats {
578 if let Some(s) = all.relation_stats.get_mut(&relation_id) {
579 s.actor_count += relation_stats.actor_count;
580 s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
581 } else {
582 all.relation_stats.insert(relation_id, relation_stats);
583 }
584 }
585
586 for (key, channel_stats) in result.channel_stats {
588 if let Some(s) = all.channel_stats.get_mut(&key) {
589 s.actor_count += channel_stats.actor_count;
590 s.output_blocking_duration += channel_stats.output_blocking_duration;
591 s.recv_row_count += channel_stats.recv_row_count;
592 s.send_row_count += channel_stats.send_row_count;
593 } else {
594 all.channel_stats.insert(key, channel_stats);
595 }
596 }
597 }
598
599 Ok(all.into())
600 }
601
602 pub async fn get_version(Extension(_srv): Extension<Service>) -> Result<Json<String>> {
603 Ok(Json(risingwave_common::current_cluster_version()))
604 }
605}
606
607impl DashboardService {
608 pub async fn serve(self) -> Result<()> {
609 use handlers::*;
610 let srv = Arc::new(self);
611
612 let cors_layer = CorsLayer::new()
613 .allow_origin(cors::Any)
614 .allow_methods(vec![Method::GET]);
615
616 let api_router = Router::new()
617 .route("/version", get(get_version))
618 .route("/clusters/:ty", get(list_clusters))
619 .route("/streaming_jobs", get(list_streaming_jobs))
620 .route("/fragments/job_id/:job_id", get(list_fragments_by_job_id))
621 .route("/relation_id_infos", get(get_relation_id_infos))
622 .route(
623 "/fragment_to_relation_map",
624 get(get_fragment_to_relation_map),
625 )
626 .route("/views", get(list_views))
627 .route("/materialized_views", get(list_materialized_views))
628 .route("/tables", get(list_tables))
629 .route("/indexes", get(list_index_tables))
630 .route("/index_items", get(list_indexes))
631 .route("/subscriptions", get(list_subscription))
632 .route("/internal_tables", get(list_internal_tables))
633 .route("/sources", get(list_sources))
634 .route("/sinks", get(list_sinks))
635 .route("/users", get(list_users))
636 .route("/databases", get(list_databases))
637 .route("/schemas", get(list_schemas))
638 .route("/object_dependencies", get(list_object_dependencies))
639 .route("/metrics/cluster", get(prometheus::list_prometheus_cluster))
640 .route("/metrics/streaming_stats", get(get_streaming_stats))
641 .route("/monitor/await_tree/:worker_id", get(dump_await_tree))
643 .route("/monitor/await_tree/", get(dump_await_tree_all))
645 .route("/monitor/dump_heap_profile/:worker_id", get(heap_profile))
646 .route(
647 "/monitor/list_heap_profile/:worker_id",
648 get(list_heap_profile),
649 )
650 .route("/monitor/analyze/:worker_id/*path", get(analyze_heap))
651 .route("/monitor/diagnose/", get(diagnose))
653 .layer(
654 ServiceBuilder::new()
655 .layer(AddExtensionLayer::new(srv.clone()))
656 .into_inner(),
657 )
658 .layer(cors_layer);
659
660 let trace_ui_router = otlp_embedded::ui_app(srv.trace_state.clone(), "/trace/");
661 let dashboard_router = risingwave_meta_dashboard::router();
662
663 let app = Router::new()
664 .fallback_service(dashboard_router)
665 .nest("/api", api_router)
666 .nest("/trace", trace_ui_router)
667 .layer(CompressionLayer::new());
668
669 let listener = TcpListener::bind(&srv.dashboard_addr)
670 .await
671 .context("failed to bind dashboard address")?;
672 axum::serve(listener, app)
673 .await
674 .context("failed to serve dashboard service")?;
675
676 Ok(())
677 }
678}