risingwave_meta/dashboard/
mod.rs1mod prometheus;
16
17use std::net::SocketAddr;
18use std::path::Path as FilePath;
19use std::sync::Arc;
20
21use anyhow::{Context as _, Result, anyhow};
22use axum::Router;
23use axum::extract::{Extension, Path};
24use axum::http::{Method, StatusCode};
25use axum::response::{IntoResponse, Response};
26use axum::routing::get;
27use risingwave_common::util::StackTraceResponseExt;
28use risingwave_rpc_client::ComputeClientPool;
29use tokio::net::TcpListener;
30use tower::ServiceBuilder;
31use tower_http::add_extension::AddExtensionLayer;
32use tower_http::compression::CompressionLayer;
33use tower_http::cors::{self, CorsLayer};
34
35use crate::manager::MetadataManager;
36use crate::manager::diagnose::DiagnoseCommandRef;
37
38#[derive(Clone)]
39pub struct DashboardService {
40 pub dashboard_addr: SocketAddr,
41 pub prometheus_client: Option<prometheus_http_query::Client>,
42 pub prometheus_selector: String,
43 pub metadata_manager: MetadataManager,
44 pub compute_clients: ComputeClientPool,
45 pub diagnose_command: DiagnoseCommandRef,
46 pub trace_state: otlp_embedded::StateRef,
47}
48
49pub type Service = Arc<DashboardService>;
50
51pub(super) mod handlers {
52 use std::cmp::min;
53 use std::collections::HashMap;
54
55 use anyhow::Context;
56 use axum::Json;
57 use futures::future::join_all;
58 use itertools::Itertools;
59 use risingwave_common::catalog::TableId;
60 use risingwave_common_heap_profiling::COLLAPSED_SUFFIX;
61 use risingwave_meta_model::WorkerId;
62 use risingwave_pb::catalog::table::TableType;
63 use risingwave_pb::catalog::{PbDatabase, PbSchema, Sink, Source, Subscription, Table, View};
64 use risingwave_pb::common::{WorkerNode, WorkerType};
65 use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
66 use risingwave_pb::meta::{
67 ActorIds, FragmentIdToActorIdMap, FragmentToRelationMap, PbTableFragments, RelationIdInfos,
68 };
69 use risingwave_pb::monitor_service::{
70 GetStreamingStatsResponse, HeapProfilingResponse, ListHeapProfilingResponse,
71 StackTraceResponse,
72 };
73 use risingwave_pb::stream_plan::FragmentTypeFlag;
74 use risingwave_pb::user::PbUserInfo;
75 use serde_json::json;
76 use thiserror_ext::AsReport;
77
78 use super::*;
79 use crate::controller::fragment::StreamingJobInfo;
80
81 pub struct DashboardError(anyhow::Error);
82 pub type Result<T> = std::result::Result<T, DashboardError>;
83
84 pub fn err(err: impl Into<anyhow::Error>) -> DashboardError {
85 DashboardError(err.into())
86 }
87
88 impl From<anyhow::Error> for DashboardError {
89 fn from(value: anyhow::Error) -> Self {
90 DashboardError(value)
91 }
92 }
93
94 impl IntoResponse for DashboardError {
95 fn into_response(self) -> axum::response::Response {
96 let mut resp = Json(json!({
97 "error": self.0.to_report_string(),
98 }))
99 .into_response();
100 *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
101 resp
102 }
103 }
104
105 pub async fn list_clusters(
106 Path(ty): Path<i32>,
107 Extension(srv): Extension<Service>,
108 ) -> Result<Json<Vec<WorkerNode>>> {
109 let worker_type = WorkerType::try_from(ty)
110 .map_err(|_| anyhow!("invalid worker type"))
111 .map_err(err)?;
112 let mut result = srv
113 .metadata_manager
114 .list_worker_node(Some(worker_type), None)
115 .await
116 .map_err(err)?;
117 result.sort_unstable_by_key(|n| n.id);
118 Ok(result.into())
119 }
120
121 async fn list_table_catalogs_inner(
122 metadata_manager: &MetadataManager,
123 table_type: TableType,
124 ) -> Result<Json<Vec<Table>>> {
125 let tables = metadata_manager
126 .catalog_controller
127 .list_tables_by_type(table_type.into())
128 .await
129 .map_err(err)?;
130
131 Ok(Json(tables))
132 }
133
134 pub async fn list_materialized_views(
135 Extension(srv): Extension<Service>,
136 ) -> Result<Json<Vec<Table>>> {
137 list_table_catalogs_inner(&srv.metadata_manager, TableType::MaterializedView).await
138 }
139
140 pub async fn list_tables(Extension(srv): Extension<Service>) -> Result<Json<Vec<Table>>> {
141 list_table_catalogs_inner(&srv.metadata_manager, TableType::Table).await
142 }
143
144 pub async fn list_indexes(Extension(srv): Extension<Service>) -> Result<Json<Vec<Table>>> {
145 list_table_catalogs_inner(&srv.metadata_manager, TableType::Index).await
146 }
147
148 pub async fn list_subscription(
149 Extension(srv): Extension<Service>,
150 ) -> Result<Json<Vec<Subscription>>> {
151 let subscriptions = srv
152 .metadata_manager
153 .catalog_controller
154 .list_subscriptions()
155 .await
156 .map_err(err)?;
157
158 Ok(Json(subscriptions))
159 }
160
161 pub async fn list_internal_tables(
162 Extension(srv): Extension<Service>,
163 ) -> Result<Json<Vec<Table>>> {
164 list_table_catalogs_inner(&srv.metadata_manager, TableType::Internal).await
165 }
166
167 pub async fn list_sources(Extension(srv): Extension<Service>) -> Result<Json<Vec<Source>>> {
168 let sources = srv.metadata_manager.list_sources().await.map_err(err)?;
169
170 Ok(Json(sources))
171 }
172
173 pub async fn list_sinks(Extension(srv): Extension<Service>) -> Result<Json<Vec<Sink>>> {
174 let sinks = srv
175 .metadata_manager
176 .catalog_controller
177 .list_sinks()
178 .await
179 .map_err(err)?;
180
181 Ok(Json(sinks))
182 }
183
184 pub async fn list_views(Extension(srv): Extension<Service>) -> Result<Json<Vec<View>>> {
185 let views = srv
186 .metadata_manager
187 .catalog_controller
188 .list_views()
189 .await
190 .map_err(err)?;
191
192 Ok(Json(views))
193 }
194
195 pub async fn list_streaming_jobs(
196 Extension(srv): Extension<Service>,
197 ) -> Result<Json<Vec<StreamingJobInfo>>> {
198 let streaming_jobs = srv
199 .metadata_manager
200 .catalog_controller
201 .list_streaming_job_infos()
202 .await
203 .map_err(err)?;
204
205 Ok(Json(streaming_jobs))
206 }
207
208 pub async fn get_fragment_to_relation_map(
216 Extension(srv): Extension<Service>,
217 ) -> Result<Json<FragmentToRelationMap>> {
218 let table_fragments = srv
219 .metadata_manager
220 .catalog_controller
221 .table_fragments()
222 .await
223 .map_err(err)?;
224 let mut in_map = HashMap::new();
225 let mut out_map = HashMap::new();
226 for (relation_id, tf) in table_fragments {
227 for (fragment_id, fragment) in &tf.fragments {
228 if (fragment.fragment_type_mask & FragmentTypeFlag::StreamScan as u32) != 0 {
229 in_map.insert(*fragment_id, relation_id as u32);
230 }
231 if (fragment.fragment_type_mask & FragmentTypeFlag::Mview as u32) != 0 {
232 out_map.insert(*fragment_id, relation_id as u32);
233 }
234 }
235 }
236 let map = FragmentToRelationMap { in_map, out_map };
237 Ok(Json(map))
238 }
239
240 pub async fn get_relation_id_infos(
242 Extension(srv): Extension<Service>,
243 ) -> Result<Json<RelationIdInfos>> {
244 let table_fragments = srv
245 .metadata_manager
246 .catalog_controller
247 .table_fragments()
248 .await
249 .map_err(err)?;
250 let mut map = HashMap::new();
251 for (id, tf) in table_fragments {
252 let mut fragment_id_to_actor_ids = HashMap::new();
253 for (fragment_id, fragment) in &tf.fragments {
254 let actor_ids = fragment.actors.iter().map(|a| a.actor_id).collect_vec();
255 fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
256 }
257 map.insert(
258 id as u32,
259 FragmentIdToActorIdMap {
260 map: fragment_id_to_actor_ids,
261 },
262 );
263 }
264 let relation_id_infos = RelationIdInfos { map };
265
266 Ok(Json(relation_id_infos))
267 }
268
269 pub async fn list_fragments_by_job_id(
270 Extension(srv): Extension<Service>,
271 Path(job_id): Path<u32>,
272 ) -> Result<Json<PbTableFragments>> {
273 let table_id = TableId::new(job_id);
274 let table_fragments = srv
275 .metadata_manager
276 .get_job_fragments_by_id(&table_id)
277 .await
278 .map_err(err)?;
279 let upstream_fragments = srv
280 .metadata_manager
281 .catalog_controller
282 .upstream_fragments(table_fragments.fragment_ids())
283 .await
284 .map_err(err)?;
285 let dispatchers = srv
286 .metadata_manager
287 .catalog_controller
288 .get_fragment_actor_dispatchers(
289 table_fragments.fragment_ids().map(|id| id as _).collect(),
290 )
291 .await
292 .map_err(err)?;
293 Ok(Json(
294 table_fragments.to_protobuf(&upstream_fragments, &dispatchers),
295 ))
296 }
297
298 pub async fn list_users(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbUserInfo>>> {
299 let users = srv
300 .metadata_manager
301 .catalog_controller
302 .list_users()
303 .await
304 .map_err(err)?;
305
306 Ok(Json(users))
307 }
308
309 pub async fn list_databases(
310 Extension(srv): Extension<Service>,
311 ) -> Result<Json<Vec<PbDatabase>>> {
312 let databases = srv
313 .metadata_manager
314 .catalog_controller
315 .list_databases()
316 .await
317 .map_err(err)?;
318
319 Ok(Json(databases))
320 }
321
322 pub async fn list_schemas(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbSchema>>> {
323 let schemas = srv
324 .metadata_manager
325 .catalog_controller
326 .list_schemas()
327 .await
328 .map_err(err)?;
329
330 Ok(Json(schemas))
331 }
332
333 pub async fn list_object_dependencies(
334 Extension(srv): Extension<Service>,
335 ) -> Result<Json<Vec<PbObjectDependencies>>> {
336 let object_dependencies = srv
337 .metadata_manager
338 .catalog_controller
339 .list_all_object_dependencies()
340 .await
341 .map_err(err)?;
342
343 Ok(Json(object_dependencies))
344 }
345
346 async fn dump_await_tree_inner(
347 worker_nodes: impl IntoIterator<Item = &WorkerNode>,
348 compute_clients: &ComputeClientPool,
349 ) -> Result<Json<StackTraceResponse>> {
350 let mut all = StackTraceResponse::default();
351
352 for worker_node in worker_nodes {
353 let client = compute_clients.get(worker_node).await.map_err(err)?;
354 let result = client.stack_trace().await.map_err(err)?;
355
356 all.merge_other(result);
357 }
358
359 Ok(all.into())
360 }
361
362 pub async fn dump_await_tree_all(
363 Extension(srv): Extension<Service>,
364 ) -> Result<Json<StackTraceResponse>> {
365 let worker_nodes = srv
366 .metadata_manager
367 .list_worker_node(Some(WorkerType::ComputeNode), None)
368 .await
369 .map_err(err)?;
370
371 dump_await_tree_inner(&worker_nodes, &srv.compute_clients).await
372 }
373
374 pub async fn dump_await_tree(
375 Path(worker_id): Path<WorkerId>,
376 Extension(srv): Extension<Service>,
377 ) -> Result<Json<StackTraceResponse>> {
378 let worker_node = srv
379 .metadata_manager
380 .get_worker_by_id(worker_id)
381 .await
382 .map_err(err)?
383 .context("worker node not found")
384 .map_err(err)?;
385
386 dump_await_tree_inner(std::iter::once(&worker_node), &srv.compute_clients).await
387 }
388
389 pub async fn heap_profile(
390 Path(worker_id): Path<WorkerId>,
391 Extension(srv): Extension<Service>,
392 ) -> Result<Json<HeapProfilingResponse>> {
393 let worker_node = srv
394 .metadata_manager
395 .get_worker_by_id(worker_id)
396 .await
397 .map_err(err)?
398 .context("worker node not found")
399 .map_err(err)?;
400
401 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
402
403 let result = client.heap_profile("".to_owned()).await.map_err(err)?;
404
405 Ok(result.into())
406 }
407
408 pub async fn list_heap_profile(
409 Path(worker_id): Path<WorkerId>,
410 Extension(srv): Extension<Service>,
411 ) -> Result<Json<ListHeapProfilingResponse>> {
412 let worker_node = srv
413 .metadata_manager
414 .get_worker_by_id(worker_id)
415 .await
416 .map_err(err)?
417 .context("worker node not found")
418 .map_err(err)?;
419
420 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
421
422 let result = client.list_heap_profile().await.map_err(err)?;
423 Ok(result.into())
424 }
425
426 pub async fn analyze_heap(
427 Path((worker_id, file_path)): Path<(WorkerId, String)>,
428 Extension(srv): Extension<Service>,
429 ) -> Result<Response> {
430 let file_path =
431 String::from_utf8(base64_url::decode(&file_path).map_err(err)?).map_err(err)?;
432
433 let file_name = FilePath::new(&file_path)
434 .file_name()
435 .unwrap()
436 .to_string_lossy()
437 .to_string();
438
439 let collapsed_file_name = format!("{}.{}", file_name, COLLAPSED_SUFFIX);
440
441 let worker_node = srv
442 .metadata_manager
443 .get_worker_by_id(worker_id)
444 .await
445 .map_err(err)?
446 .context("worker node not found")
447 .map_err(err)?;
448
449 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
450
451 let collapsed_bin = client
452 .analyze_heap(file_path.clone())
453 .await
454 .map_err(err)?
455 .result;
456 let collapsed_str = String::from_utf8_lossy(&collapsed_bin).to_string();
457
458 let response = Response::builder()
459 .header("Content-Type", "application/octet-stream")
460 .header("Content-Disposition", collapsed_file_name)
461 .body(collapsed_str.into());
462
463 response.map_err(err)
464 }
465
466 pub async fn diagnose(Extension(srv): Extension<Service>) -> Result<String> {
467 Ok(srv.diagnose_command.report().await)
468 }
469
470 pub async fn get_streaming_stats(
476 Extension(srv): Extension<Service>,
477 ) -> Result<Json<GetStreamingStatsResponse>> {
478 let worker_nodes = srv
479 .metadata_manager
480 .list_active_streaming_compute_nodes()
481 .await
482 .map_err(err)?;
483
484 let mut futures = Vec::new();
485
486 for worker_node in worker_nodes {
487 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
488 let client = Arc::new(client);
489 let fut = async move {
490 let result = client.get_streaming_stats().await.map_err(err)?;
491 Ok::<_, DashboardError>(result)
492 };
493 futures.push(fut);
494 }
495 let results = join_all(futures).await;
496
497 let mut all = GetStreamingStatsResponse::default();
498
499 for result in results {
500 let result = result
501 .map_err(|_| anyhow!("Failed to get back pressure"))
502 .map_err(err)?;
503
504 for (fragment_id, fragment_stats) in result.fragment_stats {
506 if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
507 s.actor_count += fragment_stats.actor_count;
508 s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
509 } else {
510 all.fragment_stats.insert(fragment_id, fragment_stats);
511 }
512 }
513
514 for (relation_id, relation_stats) in result.relation_stats {
516 if let Some(s) = all.relation_stats.get_mut(&relation_id) {
517 s.actor_count += relation_stats.actor_count;
518 s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
519 } else {
520 all.relation_stats.insert(relation_id, relation_stats);
521 }
522 }
523
524 for (key, channel_stats) in result.channel_stats {
526 if let Some(s) = all.channel_stats.get_mut(&key) {
527 s.actor_count += channel_stats.actor_count;
528 s.output_blocking_duration += channel_stats.output_blocking_duration;
529 s.recv_row_count += channel_stats.recv_row_count;
530 s.send_row_count += channel_stats.send_row_count;
531 } else {
532 all.channel_stats.insert(key, channel_stats);
533 }
534 }
535 }
536
537 Ok(all.into())
538 }
539
540 pub async fn get_version(Extension(_srv): Extension<Service>) -> Result<Json<String>> {
541 Ok(Json(risingwave_common::current_cluster_version()))
542 }
543}
544
545impl DashboardService {
546 pub async fn serve(self) -> Result<()> {
547 use handlers::*;
548 let srv = Arc::new(self);
549
550 let cors_layer = CorsLayer::new()
551 .allow_origin(cors::Any)
552 .allow_methods(vec![Method::GET]);
553
554 let api_router = Router::new()
555 .route("/version", get(get_version))
556 .route("/clusters/:ty", get(list_clusters))
557 .route("/streaming_jobs", get(list_streaming_jobs))
558 .route("/fragments/job_id/:job_id", get(list_fragments_by_job_id))
559 .route("/relation_id_infos", get(get_relation_id_infos))
560 .route(
561 "/fragment_to_relation_map",
562 get(get_fragment_to_relation_map),
563 )
564 .route("/views", get(list_views))
565 .route("/materialized_views", get(list_materialized_views))
566 .route("/tables", get(list_tables))
567 .route("/indexes", get(list_indexes))
568 .route("/subscriptions", get(list_subscription))
569 .route("/internal_tables", get(list_internal_tables))
570 .route("/sources", get(list_sources))
571 .route("/sinks", get(list_sinks))
572 .route("/users", get(list_users))
573 .route("/databases", get(list_databases))
574 .route("/schemas", get(list_schemas))
575 .route("/object_dependencies", get(list_object_dependencies))
576 .route("/metrics/cluster", get(prometheus::list_prometheus_cluster))
577 .route("/metrics/streaming_stats", get(get_streaming_stats))
578 .route("/monitor/await_tree/:worker_id", get(dump_await_tree))
579 .route("/monitor/await_tree/", get(dump_await_tree_all))
580 .route("/monitor/dump_heap_profile/:worker_id", get(heap_profile))
581 .route(
582 "/monitor/list_heap_profile/:worker_id",
583 get(list_heap_profile),
584 )
585 .route("/monitor/analyze/:worker_id/*path", get(analyze_heap))
586 .route("/monitor/diagnose/", get(diagnose))
587 .layer(
588 ServiceBuilder::new()
589 .layer(AddExtensionLayer::new(srv.clone()))
590 .into_inner(),
591 )
592 .layer(cors_layer);
593
594 let trace_ui_router = otlp_embedded::ui_app(srv.trace_state.clone(), "/trace/");
595 let dashboard_router = risingwave_meta_dashboard::router();
596
597 let app = Router::new()
598 .fallback_service(dashboard_router)
599 .nest("/api", api_router)
600 .nest("/trace", trace_ui_router)
601 .layer(CompressionLayer::new());
602
603 let listener = TcpListener::bind(&srv.dashboard_addr)
604 .await
605 .context("failed to bind dashboard address")?;
606 axum::serve(listener, app)
607 .await
608 .context("failed to serve dashboard service")?;
609
610 Ok(())
611 }
612}