1mod prometheus;
16
17use std::net::SocketAddr;
18use std::path::Path as FilePath;
19use std::sync::Arc;
20
21use anyhow::{Context as _, Result, anyhow};
22use axum::Router;
23use axum::extract::{Extension, Path};
24use axum::http::{Method, StatusCode};
25use axum::response::{IntoResponse, Response};
26use axum::routing::get;
27use risingwave_rpc_client::ComputeClientPool;
28use tokio::net::TcpListener;
29use tower::ServiceBuilder;
30use tower_http::add_extension::AddExtensionLayer;
31use tower_http::compression::CompressionLayer;
32use tower_http::cors::{self, CorsLayer};
33
34use crate::hummock::HummockManagerRef;
35use crate::manager::MetadataManager;
36use crate::manager::diagnose::DiagnoseCommandRef;
37
38#[derive(Clone)]
39pub struct DashboardService {
40 pub await_tree_reg: await_tree::Registry,
41 pub dashboard_addr: SocketAddr,
42 pub prometheus_client: Option<prometheus_http_query::Client>,
43 pub prometheus_selector: String,
44 pub metadata_manager: MetadataManager,
45 pub hummock_manager: HummockManagerRef,
46 pub compute_clients: ComputeClientPool,
47 pub diagnose_command: DiagnoseCommandRef,
48 pub trace_state: otlp_embedded::StateRef,
49}
50
51pub type Service = Arc<DashboardService>;
52
53pub(super) mod handlers {
54 use std::cmp::min;
55 use std::collections::HashMap;
56
57 use anyhow::Context;
58 use axum::Json;
59 use axum::extract::Query;
60 use futures::future::join_all;
61 use itertools::Itertools;
62 use risingwave_common::catalog::{FragmentTypeFlag, TableId};
63 use risingwave_common_heap_profiling::COLLAPSED_SUFFIX;
64 use risingwave_meta_model::WorkerId;
65 use risingwave_pb::catalog::table::TableType;
66 use risingwave_pb::catalog::{
67 Index, PbDatabase, PbFunction, PbSchema, Sink, Source, Subscription, Table, View,
68 };
69 use risingwave_pb::common::{WorkerNode, WorkerType};
70 use risingwave_pb::hummock::TableStats;
71 use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
72 use risingwave_pb::meta::{
73 ActorIds, FragmentIdToActorIdMap, FragmentToRelationMap, PbTableFragments, RelationIdInfos,
74 };
75 use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
76 use risingwave_pb::monitor_service::{
77 GetStreamingStatsResponse, HeapProfilingResponse, ListHeapProfilingResponse,
78 StackTraceResponse,
79 };
80 use risingwave_pb::user::PbUserInfo;
81 use serde::{Deserialize, Serialize};
82 use serde_json::json;
83 use thiserror_ext::AsReport;
84
85 use super::*;
86 use crate::controller::fragment::StreamingJobInfo;
87 use crate::rpc::await_tree::{dump_cluster_await_tree, dump_worker_node_await_tree};
88
89 #[derive(Serialize)]
90 pub struct TableWithStats {
91 #[serde(flatten)]
92 pub table: Table,
93 pub total_size_bytes: i64,
94 pub total_key_count: i64,
95 pub total_key_size: i64,
96 pub total_value_size: i64,
97 pub compressed_size: u64,
98 }
99
100 impl TableWithStats {
101 pub fn from_table_and_stats(table: Table, stats: Option<&TableStats>) -> Self {
102 match stats {
103 Some(stats) => Self {
104 total_size_bytes: stats.total_key_size + stats.total_value_size,
105 total_key_count: stats.total_key_count,
106 total_key_size: stats.total_key_size,
107 total_value_size: stats.total_value_size,
108 compressed_size: stats.total_compressed_size,
109 table,
110 },
111 None => Self {
112 total_size_bytes: 0,
113 total_key_count: 0,
114 total_key_size: 0,
115 total_value_size: 0,
116 compressed_size: 0,
117 table,
118 },
119 }
120 }
121 }
122
123 pub struct DashboardError(anyhow::Error);
124 pub type Result<T> = std::result::Result<T, DashboardError>;
125
126 pub fn err(err: impl Into<anyhow::Error>) -> DashboardError {
127 DashboardError(err.into())
128 }
129
130 impl From<anyhow::Error> for DashboardError {
131 fn from(value: anyhow::Error) -> Self {
132 DashboardError(value)
133 }
134 }
135
136 impl IntoResponse for DashboardError {
137 fn into_response(self) -> axum::response::Response {
138 let mut resp = Json(json!({
139 "error": self.0.to_report_string(),
140 }))
141 .into_response();
142 *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
143 resp
144 }
145 }
146
147 pub async fn list_clusters(
148 Path(ty): Path<i32>,
149 Extension(srv): Extension<Service>,
150 ) -> Result<Json<Vec<WorkerNode>>> {
151 let worker_type = WorkerType::try_from(ty)
152 .map_err(|_| anyhow!("invalid worker type"))
153 .map_err(err)?;
154 let mut result = srv
155 .metadata_manager
156 .list_worker_node(Some(worker_type), None)
157 .await
158 .map_err(err)?;
159 result.sort_unstable_by_key(|n| n.id);
160 Ok(result.into())
161 }
162
163 async fn list_table_catalogs_inner(
164 metadata_manager: &MetadataManager,
165 hummock_manager: &HummockManagerRef,
166 table_type: TableType,
167 ) -> Result<Json<Vec<TableWithStats>>> {
168 let tables = metadata_manager
169 .catalog_controller
170 .list_tables_by_type(table_type.into())
171 .await
172 .map_err(err)?;
173
174 let version_stats = hummock_manager.get_version_stats().await;
176
177 let tables_with_stats = tables
178 .into_iter()
179 .map(|table| {
180 let stats = version_stats.table_stats.get(&table.id);
181 TableWithStats::from_table_and_stats(table, stats)
182 })
183 .collect();
184
185 Ok(Json(tables_with_stats))
186 }
187
188 pub async fn list_materialized_views(
189 Extension(srv): Extension<Service>,
190 ) -> Result<Json<Vec<TableWithStats>>> {
191 list_table_catalogs_inner(
192 &srv.metadata_manager,
193 &srv.hummock_manager,
194 TableType::MaterializedView,
195 )
196 .await
197 }
198
199 pub async fn list_tables(
200 Extension(srv): Extension<Service>,
201 ) -> Result<Json<Vec<TableWithStats>>> {
202 list_table_catalogs_inner(
203 &srv.metadata_manager,
204 &srv.hummock_manager,
205 TableType::Table,
206 )
207 .await
208 }
209
210 pub async fn list_index_tables(
211 Extension(srv): Extension<Service>,
212 ) -> Result<Json<Vec<TableWithStats>>> {
213 list_table_catalogs_inner(
214 &srv.metadata_manager,
215 &srv.hummock_manager,
216 TableType::Index,
217 )
218 .await
219 }
220
221 pub async fn list_indexes(Extension(srv): Extension<Service>) -> Result<Json<Vec<Index>>> {
222 let indexes = srv
223 .metadata_manager
224 .catalog_controller
225 .list_indexes()
226 .await
227 .map_err(err)?;
228
229 Ok(Json(indexes))
230 }
231
232 pub async fn list_subscription(
233 Extension(srv): Extension<Service>,
234 ) -> Result<Json<Vec<Subscription>>> {
235 let subscriptions = srv
236 .metadata_manager
237 .catalog_controller
238 .list_subscriptions()
239 .await
240 .map_err(err)?;
241
242 Ok(Json(subscriptions))
243 }
244
245 pub async fn list_internal_tables(
246 Extension(srv): Extension<Service>,
247 ) -> Result<Json<Vec<TableWithStats>>> {
248 list_table_catalogs_inner(
249 &srv.metadata_manager,
250 &srv.hummock_manager,
251 TableType::Internal,
252 )
253 .await
254 }
255
256 pub async fn list_sources(Extension(srv): Extension<Service>) -> Result<Json<Vec<Source>>> {
257 let sources = srv.metadata_manager.list_sources().await.map_err(err)?;
258
259 Ok(Json(sources))
260 }
261
262 pub async fn list_sinks(Extension(srv): Extension<Service>) -> Result<Json<Vec<Sink>>> {
263 let sinks = srv
264 .metadata_manager
265 .catalog_controller
266 .list_sinks()
267 .await
268 .map_err(err)?;
269
270 Ok(Json(sinks))
271 }
272
273 pub async fn list_views(Extension(srv): Extension<Service>) -> Result<Json<Vec<View>>> {
274 let views = srv
275 .metadata_manager
276 .catalog_controller
277 .list_views()
278 .await
279 .map_err(err)?;
280
281 Ok(Json(views))
282 }
283
284 pub async fn list_functions(
285 Extension(srv): Extension<Service>,
286 ) -> Result<Json<Vec<PbFunction>>> {
287 let functions = srv
288 .metadata_manager
289 .catalog_controller
290 .list_functions()
291 .await
292 .map_err(err)?;
293
294 Ok(Json(functions))
295 }
296
297 pub async fn list_streaming_jobs(
298 Extension(srv): Extension<Service>,
299 ) -> Result<Json<Vec<StreamingJobInfo>>> {
300 let streaming_jobs = srv
301 .metadata_manager
302 .catalog_controller
303 .list_streaming_job_infos()
304 .await
305 .map_err(err)?;
306
307 Ok(Json(streaming_jobs))
308 }
309
310 pub async fn get_fragment_to_relation_map(
318 Extension(srv): Extension<Service>,
319 ) -> Result<Json<FragmentToRelationMap>> {
320 let table_fragments = srv
321 .metadata_manager
322 .catalog_controller
323 .table_fragments()
324 .await
325 .map_err(err)?;
326 let mut in_map = HashMap::new();
327 let mut out_map = HashMap::new();
328 for (relation_id, tf) in table_fragments {
329 for (fragment_id, fragment) in &tf.fragments {
330 if fragment
331 .fragment_type_mask
332 .contains(FragmentTypeFlag::StreamScan)
333 {
334 in_map.insert(*fragment_id, relation_id as u32);
335 }
336 if fragment
337 .fragment_type_mask
338 .contains(FragmentTypeFlag::Mview)
339 {
340 out_map.insert(*fragment_id, relation_id as u32);
341 }
342 }
343 }
344 let map = FragmentToRelationMap { in_map, out_map };
345 Ok(Json(map))
346 }
347
348 pub async fn get_relation_id_infos(
350 Extension(srv): Extension<Service>,
351 ) -> Result<Json<RelationIdInfos>> {
352 let table_fragments = srv
353 .metadata_manager
354 .catalog_controller
355 .table_fragments()
356 .await
357 .map_err(err)?;
358 let mut map = HashMap::new();
359 for (id, tf) in table_fragments {
360 let mut fragment_id_to_actor_ids = HashMap::new();
361 for (fragment_id, fragment) in &tf.fragments {
362 let actor_ids = fragment.actors.iter().map(|a| a.actor_id).collect_vec();
363 fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
364 }
365 map.insert(
366 id as u32,
367 FragmentIdToActorIdMap {
368 map: fragment_id_to_actor_ids,
369 },
370 );
371 }
372 let relation_id_infos = RelationIdInfos { map };
373
374 Ok(Json(relation_id_infos))
375 }
376
377 pub async fn list_fragments_by_job_id(
378 Extension(srv): Extension<Service>,
379 Path(job_id): Path<u32>,
380 ) -> Result<Json<PbTableFragments>> {
381 let table_id = TableId::new(job_id);
382 let table_fragments = srv
383 .metadata_manager
384 .get_job_fragments_by_id(&table_id)
385 .await
386 .map_err(err)?;
387 let upstream_fragments = srv
388 .metadata_manager
389 .catalog_controller
390 .upstream_fragments(table_fragments.fragment_ids())
391 .await
392 .map_err(err)?;
393 let dispatchers = srv
394 .metadata_manager
395 .catalog_controller
396 .get_fragment_actor_dispatchers(
397 table_fragments.fragment_ids().map(|id| id as _).collect(),
398 )
399 .await
400 .map_err(err)?;
401 Ok(Json(
402 table_fragments.to_protobuf(&upstream_fragments, &dispatchers),
403 ))
404 }
405
406 pub async fn list_users(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbUserInfo>>> {
407 let users = srv
408 .metadata_manager
409 .catalog_controller
410 .list_users()
411 .await
412 .map_err(err)?;
413
414 Ok(Json(users))
415 }
416
417 pub async fn list_databases(
418 Extension(srv): Extension<Service>,
419 ) -> Result<Json<Vec<PbDatabase>>> {
420 let databases = srv
421 .metadata_manager
422 .catalog_controller
423 .list_databases()
424 .await
425 .map_err(err)?;
426
427 Ok(Json(databases))
428 }
429
430 pub async fn list_schemas(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbSchema>>> {
431 let schemas = srv
432 .metadata_manager
433 .catalog_controller
434 .list_schemas()
435 .await
436 .map_err(err)?;
437
438 Ok(Json(schemas))
439 }
440
441 pub async fn list_object_dependencies(
442 Extension(srv): Extension<Service>,
443 ) -> Result<Json<Vec<PbObjectDependencies>>> {
444 let object_dependencies = srv
445 .metadata_manager
446 .catalog_controller
447 .list_all_object_dependencies()
448 .await
449 .map_err(err)?;
450
451 Ok(Json(object_dependencies))
452 }
453
454 #[derive(Debug, Deserialize)]
455 pub struct AwaitTreeDumpParams {
456 #[serde(default = "await_tree_default_format")]
457 format: String,
458 }
459
460 impl AwaitTreeDumpParams {
461 pub fn actor_traces_format(&self) -> Result<ActorTracesFormat> {
463 Ok(match self.format.as_str() {
464 "text" => ActorTracesFormat::Text,
465 "json" => ActorTracesFormat::Json,
466 _ => {
467 return Err(err(anyhow!(
468 "Unsupported format `{}`, only `text` and `json` are supported for now",
469 self.format
470 )));
471 }
472 })
473 }
474 }
475
476 fn await_tree_default_format() -> String {
477 "text".to_owned()
479 }
480
481 pub async fn dump_await_tree_all(
482 Query(params): Query<AwaitTreeDumpParams>,
483 Extension(srv): Extension<Service>,
484 ) -> Result<Json<StackTraceResponse>> {
485 let actor_traces_format = params.actor_traces_format()?;
486
487 let res = dump_cluster_await_tree(
488 &srv.metadata_manager,
489 &srv.await_tree_reg,
490 actor_traces_format,
491 )
492 .await
493 .map_err(err)?;
494
495 Ok(res.into())
496 }
497
498 pub async fn dump_await_tree(
499 Path(worker_id): Path<WorkerId>,
500 Query(params): Query<AwaitTreeDumpParams>,
501 Extension(srv): Extension<Service>,
502 ) -> Result<Json<StackTraceResponse>> {
503 let actor_traces_format = params.actor_traces_format()?;
504
505 let worker_node = srv
506 .metadata_manager
507 .get_worker_by_id(worker_id)
508 .await
509 .map_err(err)?
510 .context("worker node not found")
511 .map_err(err)?;
512
513 let res = dump_worker_node_await_tree(std::iter::once(&worker_node), actor_traces_format)
514 .await
515 .map_err(err)?;
516
517 Ok(res.into())
518 }
519
520 pub async fn heap_profile(
521 Path(worker_id): Path<WorkerId>,
522 Extension(srv): Extension<Service>,
523 ) -> Result<Json<HeapProfilingResponse>> {
524 let worker_node = srv
525 .metadata_manager
526 .get_worker_by_id(worker_id)
527 .await
528 .map_err(err)?
529 .context("worker node not found")
530 .map_err(err)?;
531
532 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
533
534 let result = client.heap_profile("".to_owned()).await.map_err(err)?;
535
536 Ok(result.into())
537 }
538
539 pub async fn list_heap_profile(
540 Path(worker_id): Path<WorkerId>,
541 Extension(srv): Extension<Service>,
542 ) -> Result<Json<ListHeapProfilingResponse>> {
543 let worker_node = srv
544 .metadata_manager
545 .get_worker_by_id(worker_id)
546 .await
547 .map_err(err)?
548 .context("worker node not found")
549 .map_err(err)?;
550
551 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
552
553 let result = client.list_heap_profile().await.map_err(err)?;
554 Ok(result.into())
555 }
556
557 pub async fn analyze_heap(
558 Path((worker_id, file_path)): Path<(WorkerId, String)>,
559 Extension(srv): Extension<Service>,
560 ) -> Result<Response> {
561 let file_path =
562 String::from_utf8(base64_url::decode(&file_path).map_err(err)?).map_err(err)?;
563
564 let file_name = FilePath::new(&file_path)
565 .file_name()
566 .unwrap()
567 .to_string_lossy()
568 .to_string();
569
570 let collapsed_file_name = format!("{}.{}", file_name, COLLAPSED_SUFFIX);
571
572 let worker_node = srv
573 .metadata_manager
574 .get_worker_by_id(worker_id)
575 .await
576 .map_err(err)?
577 .context("worker node not found")
578 .map_err(err)?;
579
580 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
581
582 let collapsed_bin = client
583 .analyze_heap(file_path.clone())
584 .await
585 .map_err(err)?
586 .result;
587 let collapsed_str = String::from_utf8_lossy(&collapsed_bin).to_string();
588
589 let response = Response::builder()
590 .header("Content-Type", "application/octet-stream")
591 .header("Content-Disposition", collapsed_file_name)
592 .body(collapsed_str.into());
593
594 response.map_err(err)
595 }
596
597 #[derive(Debug, Deserialize)]
598 pub struct DiagnoseParams {
599 #[serde(default = "await_tree_default_format")]
600 actor_traces_format: String,
601 }
602
603 pub async fn diagnose(
604 Query(params): Query<DiagnoseParams>,
605 Extension(srv): Extension<Service>,
606 ) -> Result<String> {
607 let actor_traces_format = match params.actor_traces_format.as_str() {
608 "text" => ActorTracesFormat::Text,
609 "json" => ActorTracesFormat::Json,
610 _ => {
611 return Err(err(anyhow!(
612 "Unsupported actor_traces_format `{}`, only `text` and `json` are supported for now",
613 params.actor_traces_format
614 )));
615 }
616 };
617 Ok(srv.diagnose_command.report(actor_traces_format).await)
618 }
619
620 pub async fn get_streaming_stats(
626 Extension(srv): Extension<Service>,
627 ) -> Result<Json<GetStreamingStatsResponse>> {
628 let worker_nodes = srv
629 .metadata_manager
630 .list_active_streaming_compute_nodes()
631 .await
632 .map_err(err)?;
633
634 let mut futures = Vec::new();
635
636 for worker_node in worker_nodes {
637 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
638 let client = Arc::new(client);
639 let fut = async move {
640 let result = client.get_streaming_stats().await.map_err(err)?;
641 Ok::<_, DashboardError>(result)
642 };
643 futures.push(fut);
644 }
645 let results = join_all(futures).await;
646
647 let mut all = GetStreamingStatsResponse::default();
648
649 for result in results {
650 let result = result
651 .map_err(|_| anyhow!("Failed to get back pressure"))
652 .map_err(err)?;
653
654 for (fragment_id, fragment_stats) in result.fragment_stats {
656 if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
657 s.actor_count += fragment_stats.actor_count;
658 s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
659 } else {
660 all.fragment_stats.insert(fragment_id, fragment_stats);
661 }
662 }
663
664 for (relation_id, relation_stats) in result.relation_stats {
666 if let Some(s) = all.relation_stats.get_mut(&relation_id) {
667 s.actor_count += relation_stats.actor_count;
668 s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
669 } else {
670 all.relation_stats.insert(relation_id, relation_stats);
671 }
672 }
673
674 for (key, channel_stats) in result.channel_stats {
676 if let Some(s) = all.channel_stats.get_mut(&key) {
677 s.actor_count += channel_stats.actor_count;
678 s.output_blocking_duration += channel_stats.output_blocking_duration;
679 s.recv_row_count += channel_stats.recv_row_count;
680 s.send_row_count += channel_stats.send_row_count;
681 } else {
682 all.channel_stats.insert(key, channel_stats);
683 }
684 }
685 }
686
687 Ok(all.into())
688 }
689
690 pub async fn get_version(Extension(_srv): Extension<Service>) -> Result<Json<String>> {
691 Ok(Json(risingwave_common::current_cluster_version()))
692 }
693}
694
695impl DashboardService {
696 pub async fn serve(self) -> Result<()> {
697 use handlers::*;
698 let srv = Arc::new(self);
699
700 let cors_layer = CorsLayer::new()
701 .allow_origin(cors::Any)
702 .allow_methods(vec![Method::GET]);
703
704 let api_router = Router::new()
705 .route("/version", get(get_version))
706 .route("/clusters/:ty", get(list_clusters))
707 .route("/streaming_jobs", get(list_streaming_jobs))
708 .route("/fragments/job_id/:job_id", get(list_fragments_by_job_id))
709 .route("/relation_id_infos", get(get_relation_id_infos))
710 .route(
711 "/fragment_to_relation_map",
712 get(get_fragment_to_relation_map),
713 )
714 .route("/views", get(list_views))
715 .route("/functions", get(list_functions))
716 .route("/materialized_views", get(list_materialized_views))
717 .route("/tables", get(list_tables))
718 .route("/indexes", get(list_index_tables))
719 .route("/index_items", get(list_indexes))
720 .route("/subscriptions", get(list_subscription))
721 .route("/internal_tables", get(list_internal_tables))
722 .route("/sources", get(list_sources))
723 .route("/sinks", get(list_sinks))
724 .route("/users", get(list_users))
725 .route("/databases", get(list_databases))
726 .route("/schemas", get(list_schemas))
727 .route("/object_dependencies", get(list_object_dependencies))
728 .route("/metrics/cluster", get(prometheus::list_prometheus_cluster))
729 .route("/metrics/streaming_stats", get(get_streaming_stats))
730 .route("/monitor/await_tree/:worker_id", get(dump_await_tree))
732 .route("/monitor/await_tree/", get(dump_await_tree_all))
734 .route("/monitor/dump_heap_profile/:worker_id", get(heap_profile))
735 .route(
736 "/monitor/list_heap_profile/:worker_id",
737 get(list_heap_profile),
738 )
739 .route("/monitor/analyze/:worker_id/*path", get(analyze_heap))
740 .route("/monitor/diagnose/", get(diagnose))
742 .layer(
743 ServiceBuilder::new()
744 .layer(AddExtensionLayer::new(srv.clone()))
745 .into_inner(),
746 )
747 .layer(cors_layer);
748
749 let trace_ui_router = otlp_embedded::ui_app(srv.trace_state.clone(), "/trace/");
750 let dashboard_router = risingwave_meta_dashboard::router();
751
752 let app = Router::new()
753 .fallback_service(dashboard_router)
754 .nest("/api", api_router)
755 .nest("/trace", trace_ui_router)
756 .layer(CompressionLayer::new());
757
758 let listener = TcpListener::bind(&srv.dashboard_addr)
759 .await
760 .context("failed to bind dashboard address")?;
761 axum::serve(listener, app)
762 .await
763 .context("failed to serve dashboard service")?;
764
765 Ok(())
766 }
767}