1mod prometheus;
16
17use std::net::SocketAddr;
18use std::path::Path as FilePath;
19use std::sync::Arc;
20
21use anyhow::{Context as _, Result, anyhow};
22use axum::Router;
23use axum::extract::{Extension, Path};
24use axum::http::{Method, StatusCode};
25use axum::response::{IntoResponse, Response};
26use axum::routing::get;
27use risingwave_rpc_client::ComputeClientPool;
28use tokio::net::TcpListener;
29use tower::ServiceBuilder;
30use tower_http::add_extension::AddExtensionLayer;
31use tower_http::compression::CompressionLayer;
32use tower_http::cors::{self, CorsLayer};
33
34use crate::hummock::HummockManagerRef;
35use crate::manager::MetadataManager;
36use crate::manager::diagnose::DiagnoseCommandRef;
37
38#[derive(Clone)]
39pub struct DashboardService {
40 pub await_tree_reg: await_tree::Registry,
41 pub dashboard_addr: SocketAddr,
42 pub prometheus_client: Option<prometheus_http_query::Client>,
43 pub prometheus_selector: String,
44 pub metadata_manager: MetadataManager,
45 pub hummock_manager: HummockManagerRef,
46 pub compute_clients: ComputeClientPool,
47 pub diagnose_command: DiagnoseCommandRef,
48 pub trace_state: otlp_embedded::StateRef,
49}
50
51pub type Service = Arc<DashboardService>;
52
53pub(super) mod handlers {
54 use std::cmp::min;
55 use std::collections::HashMap;
56
57 use anyhow::Context;
58 use axum::Json;
59 use axum::extract::Query;
60 use futures::future::join_all;
61 use itertools::Itertools;
62 use risingwave_common::catalog::{FragmentTypeFlag, TableId};
63 use risingwave_common_heap_profiling::COLLAPSED_SUFFIX;
64 use risingwave_meta_model::WorkerId;
65 use risingwave_pb::catalog::table::TableType;
66 use risingwave_pb::catalog::{
67 Index, PbDatabase, PbSchema, Sink, Source, Subscription, Table, View,
68 };
69 use risingwave_pb::common::{WorkerNode, WorkerType};
70 use risingwave_pb::hummock::TableStats;
71 use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
72 use risingwave_pb::meta::{
73 ActorIds, FragmentIdToActorIdMap, FragmentToRelationMap, PbTableFragments, RelationIdInfos,
74 };
75 use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
76 use risingwave_pb::monitor_service::{
77 GetStreamingStatsResponse, HeapProfilingResponse, ListHeapProfilingResponse,
78 StackTraceResponse,
79 };
80 use risingwave_pb::user::PbUserInfo;
81 use serde::{Deserialize, Serialize};
82 use serde_json::json;
83 use thiserror_ext::AsReport;
84
85 use super::*;
86 use crate::controller::fragment::StreamingJobInfo;
87 use crate::rpc::await_tree::{dump_cluster_await_tree, dump_worker_node_await_tree};
88
89 #[derive(Serialize)]
90 pub struct TableWithStats {
91 #[serde(flatten)]
92 pub table: Table,
93 pub total_size_bytes: i64,
94 pub total_key_count: i64,
95 pub total_key_size: i64,
96 pub total_value_size: i64,
97 pub compressed_size: u64,
98 }
99
100 impl TableWithStats {
101 pub fn from_table_and_stats(table: Table, stats: Option<&TableStats>) -> Self {
102 match stats {
103 Some(stats) => Self {
104 total_size_bytes: stats.total_key_size + stats.total_value_size,
105 total_key_count: stats.total_key_count,
106 total_key_size: stats.total_key_size,
107 total_value_size: stats.total_value_size,
108 compressed_size: stats.total_compressed_size,
109 table,
110 },
111 None => Self {
112 total_size_bytes: 0,
113 total_key_count: 0,
114 total_key_size: 0,
115 total_value_size: 0,
116 compressed_size: 0,
117 table,
118 },
119 }
120 }
121 }
122
123 pub struct DashboardError(anyhow::Error);
124 pub type Result<T> = std::result::Result<T, DashboardError>;
125
126 pub fn err(err: impl Into<anyhow::Error>) -> DashboardError {
127 DashboardError(err.into())
128 }
129
130 impl From<anyhow::Error> for DashboardError {
131 fn from(value: anyhow::Error) -> Self {
132 DashboardError(value)
133 }
134 }
135
136 impl IntoResponse for DashboardError {
137 fn into_response(self) -> axum::response::Response {
138 let mut resp = Json(json!({
139 "error": self.0.to_report_string(),
140 }))
141 .into_response();
142 *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
143 resp
144 }
145 }
146
147 pub async fn list_clusters(
148 Path(ty): Path<i32>,
149 Extension(srv): Extension<Service>,
150 ) -> Result<Json<Vec<WorkerNode>>> {
151 let worker_type = WorkerType::try_from(ty)
152 .map_err(|_| anyhow!("invalid worker type"))
153 .map_err(err)?;
154 let mut result = srv
155 .metadata_manager
156 .list_worker_node(Some(worker_type), None)
157 .await
158 .map_err(err)?;
159 result.sort_unstable_by_key(|n| n.id);
160 Ok(result.into())
161 }
162
163 async fn list_table_catalogs_inner(
164 metadata_manager: &MetadataManager,
165 hummock_manager: &HummockManagerRef,
166 table_type: TableType,
167 ) -> Result<Json<Vec<TableWithStats>>> {
168 let tables = metadata_manager
169 .catalog_controller
170 .list_tables_by_type(table_type.into())
171 .await
172 .map_err(err)?;
173
174 let version_stats = hummock_manager.get_version_stats().await;
176
177 let tables_with_stats = tables
178 .into_iter()
179 .map(|table| {
180 let stats = version_stats.table_stats.get(&table.id);
181 TableWithStats::from_table_and_stats(table, stats)
182 })
183 .collect();
184
185 Ok(Json(tables_with_stats))
186 }
187
188 pub async fn list_materialized_views(
189 Extension(srv): Extension<Service>,
190 ) -> Result<Json<Vec<TableWithStats>>> {
191 list_table_catalogs_inner(
192 &srv.metadata_manager,
193 &srv.hummock_manager,
194 TableType::MaterializedView,
195 )
196 .await
197 }
198
199 pub async fn list_tables(
200 Extension(srv): Extension<Service>,
201 ) -> Result<Json<Vec<TableWithStats>>> {
202 list_table_catalogs_inner(
203 &srv.metadata_manager,
204 &srv.hummock_manager,
205 TableType::Table,
206 )
207 .await
208 }
209
210 pub async fn list_index_tables(
211 Extension(srv): Extension<Service>,
212 ) -> Result<Json<Vec<TableWithStats>>> {
213 list_table_catalogs_inner(
214 &srv.metadata_manager,
215 &srv.hummock_manager,
216 TableType::Index,
217 )
218 .await
219 }
220
221 pub async fn list_indexes(Extension(srv): Extension<Service>) -> Result<Json<Vec<Index>>> {
222 let indexes = srv
223 .metadata_manager
224 .catalog_controller
225 .list_indexes()
226 .await
227 .map_err(err)?;
228
229 Ok(Json(indexes))
230 }
231
232 pub async fn list_subscription(
233 Extension(srv): Extension<Service>,
234 ) -> Result<Json<Vec<Subscription>>> {
235 let subscriptions = srv
236 .metadata_manager
237 .catalog_controller
238 .list_subscriptions()
239 .await
240 .map_err(err)?;
241
242 Ok(Json(subscriptions))
243 }
244
245 pub async fn list_internal_tables(
246 Extension(srv): Extension<Service>,
247 ) -> Result<Json<Vec<TableWithStats>>> {
248 list_table_catalogs_inner(
249 &srv.metadata_manager,
250 &srv.hummock_manager,
251 TableType::Internal,
252 )
253 .await
254 }
255
256 pub async fn list_sources(Extension(srv): Extension<Service>) -> Result<Json<Vec<Source>>> {
257 let sources = srv.metadata_manager.list_sources().await.map_err(err)?;
258
259 Ok(Json(sources))
260 }
261
262 pub async fn list_sinks(Extension(srv): Extension<Service>) -> Result<Json<Vec<Sink>>> {
263 let sinks = srv
264 .metadata_manager
265 .catalog_controller
266 .list_sinks()
267 .await
268 .map_err(err)?;
269
270 Ok(Json(sinks))
271 }
272
273 pub async fn list_views(Extension(srv): Extension<Service>) -> Result<Json<Vec<View>>> {
274 let views = srv
275 .metadata_manager
276 .catalog_controller
277 .list_views()
278 .await
279 .map_err(err)?;
280
281 Ok(Json(views))
282 }
283
284 pub async fn list_streaming_jobs(
285 Extension(srv): Extension<Service>,
286 ) -> Result<Json<Vec<StreamingJobInfo>>> {
287 let streaming_jobs = srv
288 .metadata_manager
289 .catalog_controller
290 .list_streaming_job_infos()
291 .await
292 .map_err(err)?;
293
294 Ok(Json(streaming_jobs))
295 }
296
297 pub async fn get_fragment_to_relation_map(
305 Extension(srv): Extension<Service>,
306 ) -> Result<Json<FragmentToRelationMap>> {
307 let table_fragments = srv
308 .metadata_manager
309 .catalog_controller
310 .table_fragments()
311 .await
312 .map_err(err)?;
313 let mut in_map = HashMap::new();
314 let mut out_map = HashMap::new();
315 for (relation_id, tf) in table_fragments {
316 for (fragment_id, fragment) in &tf.fragments {
317 if fragment
318 .fragment_type_mask
319 .contains(FragmentTypeFlag::StreamScan)
320 {
321 in_map.insert(*fragment_id, relation_id as u32);
322 }
323 if fragment
324 .fragment_type_mask
325 .contains(FragmentTypeFlag::Mview)
326 {
327 out_map.insert(*fragment_id, relation_id as u32);
328 }
329 }
330 }
331 let map = FragmentToRelationMap { in_map, out_map };
332 Ok(Json(map))
333 }
334
335 pub async fn get_relation_id_infos(
337 Extension(srv): Extension<Service>,
338 ) -> Result<Json<RelationIdInfos>> {
339 let table_fragments = srv
340 .metadata_manager
341 .catalog_controller
342 .table_fragments()
343 .await
344 .map_err(err)?;
345 let mut map = HashMap::new();
346 for (id, tf) in table_fragments {
347 let mut fragment_id_to_actor_ids = HashMap::new();
348 for (fragment_id, fragment) in &tf.fragments {
349 let actor_ids = fragment.actors.iter().map(|a| a.actor_id).collect_vec();
350 fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
351 }
352 map.insert(
353 id as u32,
354 FragmentIdToActorIdMap {
355 map: fragment_id_to_actor_ids,
356 },
357 );
358 }
359 let relation_id_infos = RelationIdInfos { map };
360
361 Ok(Json(relation_id_infos))
362 }
363
364 pub async fn list_fragments_by_job_id(
365 Extension(srv): Extension<Service>,
366 Path(job_id): Path<u32>,
367 ) -> Result<Json<PbTableFragments>> {
368 let table_id = TableId::new(job_id);
369 let table_fragments = srv
370 .metadata_manager
371 .get_job_fragments_by_id(&table_id)
372 .await
373 .map_err(err)?;
374 let upstream_fragments = srv
375 .metadata_manager
376 .catalog_controller
377 .upstream_fragments(table_fragments.fragment_ids())
378 .await
379 .map_err(err)?;
380 let dispatchers = srv
381 .metadata_manager
382 .catalog_controller
383 .get_fragment_actor_dispatchers(
384 table_fragments.fragment_ids().map(|id| id as _).collect(),
385 )
386 .await
387 .map_err(err)?;
388 Ok(Json(
389 table_fragments.to_protobuf(&upstream_fragments, &dispatchers),
390 ))
391 }
392
393 pub async fn list_users(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbUserInfo>>> {
394 let users = srv
395 .metadata_manager
396 .catalog_controller
397 .list_users()
398 .await
399 .map_err(err)?;
400
401 Ok(Json(users))
402 }
403
404 pub async fn list_databases(
405 Extension(srv): Extension<Service>,
406 ) -> Result<Json<Vec<PbDatabase>>> {
407 let databases = srv
408 .metadata_manager
409 .catalog_controller
410 .list_databases()
411 .await
412 .map_err(err)?;
413
414 Ok(Json(databases))
415 }
416
417 pub async fn list_schemas(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbSchema>>> {
418 let schemas = srv
419 .metadata_manager
420 .catalog_controller
421 .list_schemas()
422 .await
423 .map_err(err)?;
424
425 Ok(Json(schemas))
426 }
427
428 pub async fn list_object_dependencies(
429 Extension(srv): Extension<Service>,
430 ) -> Result<Json<Vec<PbObjectDependencies>>> {
431 let object_dependencies = srv
432 .metadata_manager
433 .catalog_controller
434 .list_all_object_dependencies()
435 .await
436 .map_err(err)?;
437
438 Ok(Json(object_dependencies))
439 }
440
441 #[derive(Debug, Deserialize)]
442 pub struct AwaitTreeDumpParams {
443 #[serde(default = "await_tree_default_format")]
444 format: String,
445 }
446
447 impl AwaitTreeDumpParams {
448 pub fn actor_traces_format(&self) -> Result<ActorTracesFormat> {
450 Ok(match self.format.as_str() {
451 "text" => ActorTracesFormat::Text,
452 "json" => ActorTracesFormat::Json,
453 _ => {
454 return Err(err(anyhow!(
455 "Unsupported format `{}`, only `text` and `json` are supported for now",
456 self.format
457 )));
458 }
459 })
460 }
461 }
462
463 fn await_tree_default_format() -> String {
464 "text".to_owned()
466 }
467
468 pub async fn dump_await_tree_all(
469 Query(params): Query<AwaitTreeDumpParams>,
470 Extension(srv): Extension<Service>,
471 ) -> Result<Json<StackTraceResponse>> {
472 let actor_traces_format = params.actor_traces_format()?;
473
474 let res = dump_cluster_await_tree(
475 &srv.metadata_manager,
476 &srv.await_tree_reg,
477 actor_traces_format,
478 )
479 .await
480 .map_err(err)?;
481
482 Ok(res.into())
483 }
484
485 pub async fn dump_await_tree(
486 Path(worker_id): Path<WorkerId>,
487 Query(params): Query<AwaitTreeDumpParams>,
488 Extension(srv): Extension<Service>,
489 ) -> Result<Json<StackTraceResponse>> {
490 let actor_traces_format = params.actor_traces_format()?;
491
492 let worker_node = srv
493 .metadata_manager
494 .get_worker_by_id(worker_id)
495 .await
496 .map_err(err)?
497 .context("worker node not found")
498 .map_err(err)?;
499
500 let res = dump_worker_node_await_tree(std::iter::once(&worker_node), actor_traces_format)
501 .await
502 .map_err(err)?;
503
504 Ok(res.into())
505 }
506
507 pub async fn heap_profile(
508 Path(worker_id): Path<WorkerId>,
509 Extension(srv): Extension<Service>,
510 ) -> Result<Json<HeapProfilingResponse>> {
511 let worker_node = srv
512 .metadata_manager
513 .get_worker_by_id(worker_id)
514 .await
515 .map_err(err)?
516 .context("worker node not found")
517 .map_err(err)?;
518
519 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
520
521 let result = client.heap_profile("".to_owned()).await.map_err(err)?;
522
523 Ok(result.into())
524 }
525
526 pub async fn list_heap_profile(
527 Path(worker_id): Path<WorkerId>,
528 Extension(srv): Extension<Service>,
529 ) -> Result<Json<ListHeapProfilingResponse>> {
530 let worker_node = srv
531 .metadata_manager
532 .get_worker_by_id(worker_id)
533 .await
534 .map_err(err)?
535 .context("worker node not found")
536 .map_err(err)?;
537
538 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
539
540 let result = client.list_heap_profile().await.map_err(err)?;
541 Ok(result.into())
542 }
543
544 pub async fn analyze_heap(
545 Path((worker_id, file_path)): Path<(WorkerId, String)>,
546 Extension(srv): Extension<Service>,
547 ) -> Result<Response> {
548 let file_path =
549 String::from_utf8(base64_url::decode(&file_path).map_err(err)?).map_err(err)?;
550
551 let file_name = FilePath::new(&file_path)
552 .file_name()
553 .unwrap()
554 .to_string_lossy()
555 .to_string();
556
557 let collapsed_file_name = format!("{}.{}", file_name, COLLAPSED_SUFFIX);
558
559 let worker_node = srv
560 .metadata_manager
561 .get_worker_by_id(worker_id)
562 .await
563 .map_err(err)?
564 .context("worker node not found")
565 .map_err(err)?;
566
567 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
568
569 let collapsed_bin = client
570 .analyze_heap(file_path.clone())
571 .await
572 .map_err(err)?
573 .result;
574 let collapsed_str = String::from_utf8_lossy(&collapsed_bin).to_string();
575
576 let response = Response::builder()
577 .header("Content-Type", "application/octet-stream")
578 .header("Content-Disposition", collapsed_file_name)
579 .body(collapsed_str.into());
580
581 response.map_err(err)
582 }
583
584 #[derive(Debug, Deserialize)]
585 pub struct DiagnoseParams {
586 #[serde(default = "await_tree_default_format")]
587 actor_traces_format: String,
588 }
589
590 pub async fn diagnose(
591 Query(params): Query<DiagnoseParams>,
592 Extension(srv): Extension<Service>,
593 ) -> Result<String> {
594 let actor_traces_format = match params.actor_traces_format.as_str() {
595 "text" => ActorTracesFormat::Text,
596 "json" => ActorTracesFormat::Json,
597 _ => {
598 return Err(err(anyhow!(
599 "Unsupported actor_traces_format `{}`, only `text` and `json` are supported for now",
600 params.actor_traces_format
601 )));
602 }
603 };
604 Ok(srv.diagnose_command.report(actor_traces_format).await)
605 }
606
607 pub async fn get_streaming_stats(
613 Extension(srv): Extension<Service>,
614 ) -> Result<Json<GetStreamingStatsResponse>> {
615 let worker_nodes = srv
616 .metadata_manager
617 .list_active_streaming_compute_nodes()
618 .await
619 .map_err(err)?;
620
621 let mut futures = Vec::new();
622
623 for worker_node in worker_nodes {
624 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
625 let client = Arc::new(client);
626 let fut = async move {
627 let result = client.get_streaming_stats().await.map_err(err)?;
628 Ok::<_, DashboardError>(result)
629 };
630 futures.push(fut);
631 }
632 let results = join_all(futures).await;
633
634 let mut all = GetStreamingStatsResponse::default();
635
636 for result in results {
637 let result = result
638 .map_err(|_| anyhow!("Failed to get back pressure"))
639 .map_err(err)?;
640
641 for (fragment_id, fragment_stats) in result.fragment_stats {
643 if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
644 s.actor_count += fragment_stats.actor_count;
645 s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
646 } else {
647 all.fragment_stats.insert(fragment_id, fragment_stats);
648 }
649 }
650
651 for (relation_id, relation_stats) in result.relation_stats {
653 if let Some(s) = all.relation_stats.get_mut(&relation_id) {
654 s.actor_count += relation_stats.actor_count;
655 s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
656 } else {
657 all.relation_stats.insert(relation_id, relation_stats);
658 }
659 }
660
661 for (key, channel_stats) in result.channel_stats {
663 if let Some(s) = all.channel_stats.get_mut(&key) {
664 s.actor_count += channel_stats.actor_count;
665 s.output_blocking_duration += channel_stats.output_blocking_duration;
666 s.recv_row_count += channel_stats.recv_row_count;
667 s.send_row_count += channel_stats.send_row_count;
668 } else {
669 all.channel_stats.insert(key, channel_stats);
670 }
671 }
672 }
673
674 Ok(all.into())
675 }
676
677 pub async fn get_version(Extension(_srv): Extension<Service>) -> Result<Json<String>> {
678 Ok(Json(risingwave_common::current_cluster_version()))
679 }
680}
681
682impl DashboardService {
683 pub async fn serve(self) -> Result<()> {
684 use handlers::*;
685 let srv = Arc::new(self);
686
687 let cors_layer = CorsLayer::new()
688 .allow_origin(cors::Any)
689 .allow_methods(vec![Method::GET]);
690
691 let api_router = Router::new()
692 .route("/version", get(get_version))
693 .route("/clusters/:ty", get(list_clusters))
694 .route("/streaming_jobs", get(list_streaming_jobs))
695 .route("/fragments/job_id/:job_id", get(list_fragments_by_job_id))
696 .route("/relation_id_infos", get(get_relation_id_infos))
697 .route(
698 "/fragment_to_relation_map",
699 get(get_fragment_to_relation_map),
700 )
701 .route("/views", get(list_views))
702 .route("/materialized_views", get(list_materialized_views))
703 .route("/tables", get(list_tables))
704 .route("/indexes", get(list_index_tables))
705 .route("/index_items", get(list_indexes))
706 .route("/subscriptions", get(list_subscription))
707 .route("/internal_tables", get(list_internal_tables))
708 .route("/sources", get(list_sources))
709 .route("/sinks", get(list_sinks))
710 .route("/users", get(list_users))
711 .route("/databases", get(list_databases))
712 .route("/schemas", get(list_schemas))
713 .route("/object_dependencies", get(list_object_dependencies))
714 .route("/metrics/cluster", get(prometheus::list_prometheus_cluster))
715 .route("/metrics/streaming_stats", get(get_streaming_stats))
716 .route("/monitor/await_tree/:worker_id", get(dump_await_tree))
718 .route("/monitor/await_tree/", get(dump_await_tree_all))
720 .route("/monitor/dump_heap_profile/:worker_id", get(heap_profile))
721 .route(
722 "/monitor/list_heap_profile/:worker_id",
723 get(list_heap_profile),
724 )
725 .route("/monitor/analyze/:worker_id/*path", get(analyze_heap))
726 .route("/monitor/diagnose/", get(diagnose))
728 .layer(
729 ServiceBuilder::new()
730 .layer(AddExtensionLayer::new(srv.clone()))
731 .into_inner(),
732 )
733 .layer(cors_layer);
734
735 let trace_ui_router = otlp_embedded::ui_app(srv.trace_state.clone(), "/trace/");
736 let dashboard_router = risingwave_meta_dashboard::router();
737
738 let app = Router::new()
739 .fallback_service(dashboard_router)
740 .nest("/api", api_router)
741 .nest("/trace", trace_ui_router)
742 .layer(CompressionLayer::new());
743
744 let listener = TcpListener::bind(&srv.dashboard_addr)
745 .await
746 .context("failed to bind dashboard address")?;
747 axum::serve(listener, app)
748 .await
749 .context("failed to serve dashboard service")?;
750
751 Ok(())
752 }
753}