1mod prometheus;
16
17use std::net::SocketAddr;
18use std::sync::Arc;
19
20use anyhow::{Context as _, Result, anyhow};
21use axum::Router;
22use axum::extract::{Extension, Path};
23use axum::http::{Method, StatusCode};
24use axum::response::{IntoResponse, Response};
25use axum::routing::get;
26use risingwave_common_heap_profiling::ProfileServiceImpl;
27use risingwave_rpc_client::MonitorClientPool;
28use tokio::net::TcpListener;
29use tower::ServiceBuilder;
30use tower_http::add_extension::AddExtensionLayer;
31use tower_http::compression::CompressionLayer;
32use tower_http::cors::{self, CorsLayer};
33
34use crate::hummock::HummockManagerRef;
35use crate::manager::MetadataManager;
36use crate::manager::diagnose::DiagnoseCommandRef;
37
38#[derive(Clone)]
39pub struct DashboardService {
40 pub await_tree_reg: await_tree::Registry,
41 pub dashboard_addr: SocketAddr,
42 pub prometheus_client: Option<prometheus_http_query::Client>,
43 pub prometheus_selector: String,
44 pub metadata_manager: MetadataManager,
45 pub hummock_manager: HummockManagerRef,
46 pub monitor_clients: MonitorClientPool,
47 pub diagnose_command: DiagnoseCommandRef,
48 pub profile_service: ProfileServiceImpl,
49 pub trace_state: otlp_embedded::StateRef,
50}
51
52pub type Service = Arc<DashboardService>;
53
54pub(super) mod handlers {
55 use std::cmp::min;
56 use std::collections::HashMap;
57
58 use anyhow::Context;
59 use axum::Json;
60 use axum::extract::Query;
61 use futures::future::join_all;
62 use itertools::Itertools;
63 use risingwave_common::id::JobId;
64 use risingwave_meta_model::WorkerId;
65 use risingwave_pb::catalog::table::TableType;
66 use risingwave_pb::catalog::{
67 Index, PbDatabase, PbFunction, PbSchema, Sink, Source, Subscription, Table, View,
68 };
69 use risingwave_pb::common::{WorkerNode, WorkerType};
70 use risingwave_pb::hummock::TableStats;
71 use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
72 use risingwave_pb::meta::{
73 ActorIds, FragmentIdToActorIdMap, FragmentToRelationMap, PbTableFragments, RelationIdInfos,
74 };
75 use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
76 use risingwave_pb::monitor_service::{
77 AnalyzeHeapRequest, ChannelDeltaStats, GetStreamingPrometheusStatsResponse,
78 GetStreamingStatsResponse, HeapProfilingRequest, HeapProfilingResponse,
79 ListHeapProfilingRequest, ListHeapProfilingResponse, StackTraceResponse,
80 };
81 use risingwave_pb::user::PbUserInfo;
82 use serde::{Deserialize, Serialize};
83 use serde_json::json;
84 use thiserror_ext::AsReport;
85 use tonic::Request;
86
87 use super::*;
88 use crate::controller::fragment::StreamingJobInfo;
89 use crate::rpc::await_tree::{dump_cluster_await_tree, dump_worker_node_await_tree};
90
91 #[derive(Serialize)]
92 pub struct TableWithStats {
93 #[serde(flatten)]
94 pub table: Table,
95 pub total_size_bytes: i64,
96 pub total_key_count: i64,
97 pub total_key_size: i64,
98 pub total_value_size: i64,
99 pub compressed_size: u64,
100 }
101
102 impl TableWithStats {
103 pub fn from_table_and_stats(table: Table, stats: Option<&TableStats>) -> Self {
104 match stats {
105 Some(stats) => Self {
106 total_size_bytes: stats.total_key_size + stats.total_value_size,
107 total_key_count: stats.total_key_count,
108 total_key_size: stats.total_key_size,
109 total_value_size: stats.total_value_size,
110 compressed_size: stats.total_compressed_size,
111 table,
112 },
113 None => Self {
114 total_size_bytes: 0,
115 total_key_count: 0,
116 total_key_size: 0,
117 total_value_size: 0,
118 compressed_size: 0,
119 table,
120 },
121 }
122 }
123 }
124
125 pub struct DashboardError(anyhow::Error);
126 pub type Result<T> = std::result::Result<T, DashboardError>;
127
128 pub fn err(err: impl Into<anyhow::Error>) -> DashboardError {
129 DashboardError(err.into())
130 }
131
132 impl From<anyhow::Error> for DashboardError {
133 fn from(value: anyhow::Error) -> Self {
134 DashboardError(value)
135 }
136 }
137
138 impl IntoResponse for DashboardError {
139 fn into_response(self) -> axum::response::Response {
140 let mut resp = Json(json!({
141 "error": self.0.to_report_string(),
142 }))
143 .into_response();
144 *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
145 resp
146 }
147 }
148
149 pub async fn list_clusters(
150 Path(ty): Path<i32>,
151 Extension(srv): Extension<Service>,
152 ) -> Result<Json<Vec<WorkerNode>>> {
153 let worker_type = WorkerType::try_from(ty)
154 .map_err(|_| anyhow!("invalid worker type"))
155 .map_err(err)?;
156 let mut result = srv
157 .metadata_manager
158 .list_worker_node(Some(worker_type), None)
159 .await
160 .map_err(err)?;
161 result.sort_unstable_by_key(|n| n.id);
162 Ok(result.into())
163 }
164
165 async fn list_table_catalogs_inner(
166 metadata_manager: &MetadataManager,
167 hummock_manager: &HummockManagerRef,
168 table_type: TableType,
169 ) -> Result<Json<Vec<TableWithStats>>> {
170 let tables = metadata_manager
171 .catalog_controller
172 .list_tables_by_type(table_type.into())
173 .await
174 .map_err(err)?;
175
176 let version_stats = hummock_manager.get_version_stats().await;
178
179 let tables_with_stats = tables
180 .into_iter()
181 .map(|table| {
182 let stats = version_stats.table_stats.get(&table.id);
183 TableWithStats::from_table_and_stats(table, stats)
184 })
185 .collect();
186
187 Ok(Json(tables_with_stats))
188 }
189
190 pub async fn list_materialized_views(
191 Extension(srv): Extension<Service>,
192 ) -> Result<Json<Vec<TableWithStats>>> {
193 list_table_catalogs_inner(
194 &srv.metadata_manager,
195 &srv.hummock_manager,
196 TableType::MaterializedView,
197 )
198 .await
199 }
200
201 pub async fn list_tables(
202 Extension(srv): Extension<Service>,
203 ) -> Result<Json<Vec<TableWithStats>>> {
204 list_table_catalogs_inner(
205 &srv.metadata_manager,
206 &srv.hummock_manager,
207 TableType::Table,
208 )
209 .await
210 }
211
212 pub async fn list_index_tables(
213 Extension(srv): Extension<Service>,
214 ) -> Result<Json<Vec<TableWithStats>>> {
215 list_table_catalogs_inner(
216 &srv.metadata_manager,
217 &srv.hummock_manager,
218 TableType::Index,
219 )
220 .await
221 }
222
223 pub async fn list_indexes(Extension(srv): Extension<Service>) -> Result<Json<Vec<Index>>> {
224 let indexes = srv
225 .metadata_manager
226 .catalog_controller
227 .list_indexes()
228 .await
229 .map_err(err)?;
230
231 Ok(Json(indexes))
232 }
233
234 pub async fn list_subscription(
235 Extension(srv): Extension<Service>,
236 ) -> Result<Json<Vec<Subscription>>> {
237 let subscriptions = srv
238 .metadata_manager
239 .catalog_controller
240 .list_subscriptions()
241 .await
242 .map_err(err)?;
243
244 Ok(Json(subscriptions))
245 }
246
247 pub async fn list_internal_tables(
248 Extension(srv): Extension<Service>,
249 ) -> Result<Json<Vec<TableWithStats>>> {
250 list_table_catalogs_inner(
251 &srv.metadata_manager,
252 &srv.hummock_manager,
253 TableType::Internal,
254 )
255 .await
256 }
257
258 pub async fn list_sources(Extension(srv): Extension<Service>) -> Result<Json<Vec<Source>>> {
259 let sources = srv.metadata_manager.list_sources().await.map_err(err)?;
260
261 Ok(Json(sources))
262 }
263
264 pub async fn list_sinks(Extension(srv): Extension<Service>) -> Result<Json<Vec<Sink>>> {
265 let sinks = srv
266 .metadata_manager
267 .catalog_controller
268 .list_sinks()
269 .await
270 .map_err(err)?;
271
272 Ok(Json(sinks))
273 }
274
275 pub async fn list_views(Extension(srv): Extension<Service>) -> Result<Json<Vec<View>>> {
276 let views = srv
277 .metadata_manager
278 .catalog_controller
279 .list_views()
280 .await
281 .map_err(err)?;
282
283 Ok(Json(views))
284 }
285
286 pub async fn list_functions(
287 Extension(srv): Extension<Service>,
288 ) -> Result<Json<Vec<PbFunction>>> {
289 let functions = srv
290 .metadata_manager
291 .catalog_controller
292 .list_functions()
293 .await
294 .map_err(err)?;
295
296 Ok(Json(functions))
297 }
298
299 pub async fn list_streaming_jobs(
300 Extension(srv): Extension<Service>,
301 ) -> Result<Json<Vec<StreamingJobInfo>>> {
302 let streaming_jobs = srv
303 .metadata_manager
304 .catalog_controller
305 .list_streaming_job_infos()
306 .await
307 .map_err(err)?;
308
309 Ok(Json(streaming_jobs))
310 }
311
312 pub async fn get_fragment_to_relation_map(
320 Extension(srv): Extension<Service>,
321 ) -> Result<Json<FragmentToRelationMap>> {
322 let table_fragments = srv
323 .metadata_manager
324 .catalog_controller
325 .table_fragments()
326 .await
327 .map_err(err)?;
328 let mut fragment_to_relation_map = HashMap::new();
329 for (relation_id, tf) in table_fragments {
330 for fragment_id in tf.fragments.keys() {
331 fragment_to_relation_map.insert(*fragment_id, relation_id.as_raw_id());
332 }
333 }
334 let map = FragmentToRelationMap {
335 fragment_to_relation_map,
336 };
337 Ok(Json(map))
338 }
339
340 pub async fn get_relation_id_infos(
342 Extension(srv): Extension<Service>,
343 ) -> Result<Json<RelationIdInfos>> {
344 let table_fragments = srv
345 .metadata_manager
346 .catalog_controller
347 .table_fragments()
348 .await
349 .map_err(err)?;
350 let mut map = HashMap::new();
351 for (id, tf) in table_fragments {
352 let mut fragment_id_to_actor_ids = HashMap::new();
353 for (fragment_id, fragment) in &tf.fragments {
354 let actor_ids = fragment.actors.iter().map(|a| a.actor_id).collect_vec();
355 fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
356 }
357 map.insert(
358 id.as_raw_id(),
359 FragmentIdToActorIdMap {
360 map: fragment_id_to_actor_ids,
361 },
362 );
363 }
364 let relation_id_infos = RelationIdInfos { map };
365
366 Ok(Json(relation_id_infos))
367 }
368
369 pub async fn list_fragments_by_job_id(
370 Extension(srv): Extension<Service>,
371 Path(job_id): Path<u32>,
372 ) -> Result<Json<PbTableFragments>> {
373 let job_id = JobId::new(job_id);
374 let table_fragments = srv
375 .metadata_manager
376 .get_job_fragments_by_id(job_id)
377 .await
378 .map_err(err)?;
379 let upstream_fragments = srv
380 .metadata_manager
381 .catalog_controller
382 .upstream_fragments(table_fragments.fragment_ids())
383 .await
384 .map_err(err)?;
385 let dispatchers = srv
386 .metadata_manager
387 .catalog_controller
388 .get_fragment_actor_dispatchers(
389 table_fragments.fragment_ids().map(|id| id as _).collect(),
390 )
391 .await
392 .map_err(err)?;
393 Ok(Json(
394 table_fragments.to_protobuf(&upstream_fragments, &dispatchers),
395 ))
396 }
397
398 pub async fn list_users(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbUserInfo>>> {
399 let users = srv
400 .metadata_manager
401 .catalog_controller
402 .list_users()
403 .await
404 .map_err(err)?;
405
406 Ok(Json(users))
407 }
408
409 pub async fn list_databases(
410 Extension(srv): Extension<Service>,
411 ) -> Result<Json<Vec<PbDatabase>>> {
412 let databases = srv
413 .metadata_manager
414 .catalog_controller
415 .list_databases()
416 .await
417 .map_err(err)?;
418
419 Ok(Json(databases))
420 }
421
422 pub async fn list_schemas(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbSchema>>> {
423 let schemas = srv
424 .metadata_manager
425 .catalog_controller
426 .list_schemas()
427 .await
428 .map_err(err)?;
429
430 Ok(Json(schemas))
431 }
432
433 pub async fn list_object_dependencies(
434 Extension(srv): Extension<Service>,
435 ) -> Result<Json<Vec<PbObjectDependencies>>> {
436 let object_dependencies = srv
437 .metadata_manager
438 .catalog_controller
439 .list_all_object_dependencies()
440 .await
441 .map_err(err)?;
442
443 Ok(Json(object_dependencies))
444 }
445
446 #[derive(Debug, Deserialize)]
447 pub struct AwaitTreeDumpParams {
448 #[serde(default = "await_tree_default_format")]
449 format: String,
450 }
451
452 impl AwaitTreeDumpParams {
453 pub fn actor_traces_format(&self) -> Result<ActorTracesFormat> {
455 Ok(match self.format.as_str() {
456 "text" => ActorTracesFormat::Text,
457 "json" => ActorTracesFormat::Json,
458 _ => {
459 return Err(err(anyhow!(
460 "Unsupported format `{}`, only `text` and `json` are supported for now",
461 self.format
462 )));
463 }
464 })
465 }
466 }
467
468 fn await_tree_default_format() -> String {
469 "text".to_owned()
471 }
472
473 pub async fn dump_await_tree_all(
474 Query(params): Query<AwaitTreeDumpParams>,
475 Extension(srv): Extension<Service>,
476 ) -> Result<Json<StackTraceResponse>> {
477 let actor_traces_format = params.actor_traces_format()?;
478
479 let res = dump_cluster_await_tree(
480 &srv.metadata_manager,
481 &srv.await_tree_reg,
482 actor_traces_format,
483 )
484 .await
485 .map_err(err)?;
486
487 Ok(res.into())
488 }
489
490 pub async fn dump_await_tree(
491 Path(worker_id): Path<WorkerId>,
492 Query(params): Query<AwaitTreeDumpParams>,
493 Extension(srv): Extension<Service>,
494 ) -> Result<Json<StackTraceResponse>> {
495 let actor_traces_format = params.actor_traces_format()?;
496
497 let worker_node = srv
498 .metadata_manager
499 .get_worker_by_id(worker_id)
500 .await
501 .map_err(err)?
502 .context("worker node not found")
503 .map_err(err)?;
504
505 let res = dump_worker_node_await_tree(std::iter::once(&worker_node), actor_traces_format)
506 .await
507 .map_err(err)?;
508
509 Ok(res.into())
510 }
511
512 pub async fn heap_profile(
513 Path(worker_id): Path<WorkerId>,
514 Extension(srv): Extension<Service>,
515 ) -> Result<Json<HeapProfilingResponse>> {
516 if worker_id == crate::manager::META_NODE_ID {
517 let result = srv
518 .profile_service
519 .heap_profiling(Request::new(HeapProfilingRequest { dir: "".to_owned() }))
520 .await
521 .map_err(err)?
522 .into_inner();
523 return Ok(result.into());
524 }
525
526 let worker_node = srv
527 .metadata_manager
528 .get_worker_by_id(worker_id)
529 .await
530 .map_err(err)?
531 .context("worker node not found")
532 .map_err(err)?;
533
534 let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
535
536 let result = client.heap_profile("".to_owned()).await.map_err(err)?;
537
538 Ok(result.into())
539 }
540
541 pub async fn list_heap_profile(
542 Path(worker_id): Path<WorkerId>,
543 Extension(srv): Extension<Service>,
544 ) -> Result<Json<ListHeapProfilingResponse>> {
545 if worker_id == crate::manager::META_NODE_ID {
546 let result = srv
547 .profile_service
548 .list_heap_profiling(Request::new(ListHeapProfilingRequest {}))
549 .await
550 .map_err(err)?
551 .into_inner();
552 return Ok(result.into());
553 }
554
555 let worker_node = srv
556 .metadata_manager
557 .get_worker_by_id(worker_id)
558 .await
559 .map_err(err)?
560 .context("worker node not found")
561 .map_err(err)?;
562
563 let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
564
565 let result = client.list_heap_profile().await.map_err(err)?;
566 Ok(result.into())
567 }
568
569 pub async fn analyze_heap(
570 Path((worker_id, file_path)): Path<(WorkerId, String)>,
571 Extension(srv): Extension<Service>,
572 ) -> Result<Response> {
573 let file_path =
574 String::from_utf8(base64_url::decode(&file_path).map_err(err)?).map_err(err)?;
575
576 let collapsed_bin = if worker_id == crate::manager::META_NODE_ID {
577 srv.profile_service
578 .analyze_heap(Request::new(AnalyzeHeapRequest {
579 path: file_path.clone(),
580 }))
581 .await
582 .map_err(err)?
583 .into_inner()
584 .result
585 } else {
586 let worker_node = srv
587 .metadata_manager
588 .get_worker_by_id(worker_id)
589 .await
590 .map_err(err)?
591 .context("worker node not found")
592 .map_err(err)?;
593
594 let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
595 client
596 .analyze_heap(file_path.clone())
597 .await
598 .map_err(err)?
599 .result
600 };
601 let collapsed_str = String::from_utf8_lossy(&collapsed_bin).to_string();
602
603 let response = Response::builder()
604 .header("Content-Type", "application/octet-stream")
605 .body(collapsed_str.into());
606
607 response.map_err(err)
608 }
609
610 #[derive(Debug, Deserialize)]
611 pub struct DiagnoseParams {
612 #[serde(default = "await_tree_default_format")]
613 actor_traces_format: String,
614 }
615
616 pub async fn diagnose(
617 Query(params): Query<DiagnoseParams>,
618 Extension(srv): Extension<Service>,
619 ) -> Result<String> {
620 let actor_traces_format = match params.actor_traces_format.as_str() {
621 "text" => ActorTracesFormat::Text,
622 "json" => ActorTracesFormat::Json,
623 _ => {
624 return Err(err(anyhow!(
625 "Unsupported actor_traces_format `{}`, only `text` and `json` are supported for now",
626 params.actor_traces_format
627 )));
628 }
629 };
630 Ok(srv.diagnose_command.report(actor_traces_format).await)
631 }
632
633 pub async fn get_streaming_stats(
639 Extension(srv): Extension<Service>,
640 ) -> Result<Json<GetStreamingStatsResponse>> {
641 let worker_nodes = srv
642 .metadata_manager
643 .list_active_streaming_compute_nodes()
644 .await
645 .map_err(err)?;
646
647 let mut futures = Vec::new();
648
649 for worker_node in worker_nodes {
650 let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
651 let client = Arc::new(client);
652 let fut = async move {
653 let result = client.get_streaming_stats().await.map_err(err)?;
654 Ok::<_, DashboardError>(result)
655 };
656 futures.push(fut);
657 }
658 let results = join_all(futures).await;
659
660 let mut all = GetStreamingStatsResponse::default();
661
662 for result in results {
663 let result = result
664 .map_err(|_| anyhow!("Failed to get back pressure"))
665 .map_err(err)?;
666
667 for (fragment_id, fragment_stats) in result.fragment_stats {
669 if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
670 s.actor_count += fragment_stats.actor_count;
671 s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
672 } else {
673 all.fragment_stats.insert(fragment_id, fragment_stats);
674 }
675 }
676
677 for (relation_id, relation_stats) in result.relation_stats {
679 if let Some(s) = all.relation_stats.get_mut(&relation_id) {
680 s.actor_count += relation_stats.actor_count;
681 s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
682 } else {
683 all.relation_stats.insert(relation_id, relation_stats);
684 }
685 }
686
687 for (key, channel_stats) in result.channel_stats {
689 if let Some(s) = all.channel_stats.get_mut(&key) {
690 s.actor_count += channel_stats.actor_count;
691 s.output_blocking_duration += channel_stats.output_blocking_duration;
692 s.recv_row_count += channel_stats.recv_row_count;
693 s.send_row_count += channel_stats.send_row_count;
694 } else {
695 all.channel_stats.insert(key, channel_stats);
696 }
697 }
698 }
699
700 Ok(all.into())
701 }
702
703 #[derive(Debug, Deserialize)]
704 pub struct StreamingStatsPrometheusParams {
705 #[serde(default)]
707 at: Option<i64>,
708 #[serde(default = "streaming_stats_default_time_offset")]
710 time_offset: i64,
711 }
712
713 fn streaming_stats_default_time_offset() -> i64 {
714 60
715 }
716
717 pub async fn get_streaming_stats_from_prometheus(
718 Query(params): Query<StreamingStatsPrometheusParams>,
719 Extension(srv): Extension<Service>,
720 ) -> Result<Json<GetStreamingPrometheusStatsResponse>> {
721 let mut all = GetStreamingPrometheusStatsResponse::default();
722
723 let worker_nodes = srv
725 .metadata_manager
726 .list_active_streaming_compute_nodes()
727 .await
728 .map_err(err)?;
729
730 let mut futures = Vec::new();
731
732 for worker_node in worker_nodes {
733 let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
734 let client = Arc::new(client);
735 let fut = async move {
736 let result = client.get_streaming_stats().await.map_err(err)?;
737 Ok::<_, DashboardError>(result)
738 };
739 futures.push(fut);
740 }
741 let results = join_all(futures).await;
742
743 for result in results {
744 let result = result
745 .map_err(|_| anyhow!("Failed to get streaming stats from worker"))
746 .map_err(err)?;
747
748 for (fragment_id, fragment_stats) in result.fragment_stats {
750 if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
751 s.actor_count += fragment_stats.actor_count;
752 s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
753 } else {
754 all.fragment_stats.insert(fragment_id, fragment_stats);
755 }
756 }
757
758 for (relation_id, relation_stats) in result.relation_stats {
760 if let Some(s) = all.relation_stats.get_mut(&relation_id) {
761 s.actor_count += relation_stats.actor_count;
762 s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
763 } else {
764 all.relation_stats.insert(relation_id, relation_stats);
765 }
766 }
767 }
768
769 if let Some(ref client) = srv.prometheus_client {
771 let channel_input_throughput_query = format!(
773 "sum(rate(stream_actor_in_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
774 srv.prometheus_selector, params.time_offset
775 );
776 let channel_output_throughput_query = format!(
777 "sum(rate(stream_actor_out_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
778 srv.prometheus_selector, params.time_offset
779 );
780 let channel_backpressure_query = format!(
781 "sum(rate(stream_actor_output_buffer_blocking_duration_ns{{{}}}[{}s])) by (fragment_id, downstream_fragment_id) \
782 / ignoring (downstream_fragment_id) group_left sum(stream_actor_count) by (fragment_id)",
783 srv.prometheus_selector, params.time_offset
784 );
785
786 let (
788 channel_input_throughput_result,
789 channel_output_throughput_result,
790 channel_backpressure_result,
791 ) = {
792 let mut input_query = client.query(channel_input_throughput_query);
793 let mut output_query = client.query(channel_output_throughput_query);
794 let mut backpressure_query = client.query(channel_backpressure_query);
795
796 if let Some(at_time) = params.at {
798 input_query = input_query.at(at_time);
799 output_query = output_query.at(at_time);
800 backpressure_query = backpressure_query.at(at_time);
801 }
802
803 tokio::try_join!(
804 input_query.get(),
805 output_query.get(),
806 backpressure_query.get(),
807 )
808 .map_err(err)?
809 };
810
811 let mut channel_data = HashMap::new();
813
814 if let Some(channel_input_throughput_data) =
816 channel_input_throughput_result.data().as_vector()
817 {
818 for sample in channel_input_throughput_data {
819 if let Some(fragment_id_str) = sample.metric().get("fragment_id")
820 && let Some(upstream_fragment_id_str) =
821 sample.metric().get("upstream_fragment_id")
822 && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
823 fragment_id_str.parse::<u32>(),
824 upstream_fragment_id_str.parse::<u32>(),
825 )
826 {
827 let key = format!("{}_{}", upstream_fragment_id, fragment_id);
828 channel_data
829 .entry(key)
830 .or_insert_with(|| ChannelDeltaStats {
831 actor_count: 0,
832 backpressure_rate: 0.0,
833 recv_throughput: 0.0,
834 send_throughput: 0.0,
835 })
836 .recv_throughput = sample.sample().value();
837 }
838 }
839 }
840
841 if let Some(channel_output_throughput_data) =
843 channel_output_throughput_result.data().as_vector()
844 {
845 for sample in channel_output_throughput_data {
846 if let Some(fragment_id_str) = sample.metric().get("fragment_id")
847 && let Some(upstream_fragment_id_str) =
848 sample.metric().get("upstream_fragment_id")
849 && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
850 fragment_id_str.parse::<u32>(),
851 upstream_fragment_id_str.parse::<u32>(),
852 )
853 {
854 let key = format!("{}_{}", upstream_fragment_id, fragment_id);
855 channel_data
856 .entry(key)
857 .or_insert_with(|| ChannelDeltaStats {
858 actor_count: 0,
859 backpressure_rate: 0.0,
860 recv_throughput: 0.0,
861 send_throughput: 0.0,
862 })
863 .send_throughput = sample.sample().value();
864 }
865 }
866 }
867
868 if let Some(channel_backpressure_data) = channel_backpressure_result.data().as_vector()
870 {
871 for sample in channel_backpressure_data {
872 if let Some(fragment_id_str) = sample.metric().get("fragment_id")
873 && let Some(downstream_fragment_id_str) =
874 sample.metric().get("downstream_fragment_id")
875 && let (Ok(fragment_id), Ok(downstream_fragment_id)) = (
876 fragment_id_str.parse::<u32>(),
877 downstream_fragment_id_str.parse::<u32>(),
878 )
879 {
880 let key = format!("{}_{}", fragment_id, downstream_fragment_id);
881 channel_data
882 .entry(key)
883 .or_insert_with(|| ChannelDeltaStats {
884 actor_count: 0,
885 backpressure_rate: 0.0,
886 recv_throughput: 0.0,
887 send_throughput: 0.0,
888 })
889 .backpressure_rate = sample.sample().value() / 1_000_000_000.0; }
891 }
892 }
893
894 for (key, channel_stats) in &mut channel_data {
896 let parts: Vec<&str> = key.split('_').collect();
897 if parts.len() == 2
898 && let Ok(fragment_id) = parts[1].parse::<u32>()
899 && let Some(fragment_stats) = all.fragment_stats.get(&fragment_id)
900 {
901 channel_stats.actor_count = fragment_stats.actor_count;
902 }
903 }
904
905 all.channel_stats = channel_data;
906
907 Ok(Json(all))
908 } else {
909 Err(err(anyhow!("Prometheus endpoint is not set")))
910 }
911 }
912
913 pub async fn get_version(Extension(_srv): Extension<Service>) -> Result<Json<String>> {
914 Ok(Json(risingwave_common::current_cluster_version()))
915 }
916}
917
918impl DashboardService {
919 pub async fn serve(self) -> Result<()> {
920 use handlers::*;
921 let srv = Arc::new(self);
922
923 let cors_layer = CorsLayer::new()
924 .allow_origin(cors::Any)
925 .allow_methods(vec![Method::GET]);
926
927 let api_router = Router::new()
928 .route("/version", get(get_version))
929 .route("/clusters/:ty", get(list_clusters))
930 .route("/streaming_jobs", get(list_streaming_jobs))
931 .route("/fragments/job_id/:job_id", get(list_fragments_by_job_id))
932 .route("/relation_id_infos", get(get_relation_id_infos))
933 .route(
934 "/fragment_to_relation_map",
935 get(get_fragment_to_relation_map),
936 )
937 .route("/views", get(list_views))
938 .route("/functions", get(list_functions))
939 .route("/materialized_views", get(list_materialized_views))
940 .route("/tables", get(list_tables))
941 .route("/indexes", get(list_index_tables))
942 .route("/index_items", get(list_indexes))
943 .route("/subscriptions", get(list_subscription))
944 .route("/internal_tables", get(list_internal_tables))
945 .route("/sources", get(list_sources))
946 .route("/sinks", get(list_sinks))
947 .route("/users", get(list_users))
948 .route("/databases", get(list_databases))
949 .route("/schemas", get(list_schemas))
950 .route("/object_dependencies", get(list_object_dependencies))
951 .route("/metrics/cluster", get(prometheus::list_prometheus_cluster))
952 .route("/metrics/streaming_stats", get(get_streaming_stats))
953 .route(
954 "/metrics/streaming_stats_prometheus",
955 get(get_streaming_stats_from_prometheus),
956 )
957 .route("/monitor/await_tree/:worker_id", get(dump_await_tree))
959 .route("/monitor/await_tree/", get(dump_await_tree_all))
961 .route("/monitor/dump_heap_profile/:worker_id", get(heap_profile))
962 .route(
963 "/monitor/list_heap_profile/:worker_id",
964 get(list_heap_profile),
965 )
966 .route("/monitor/analyze/:worker_id/*path", get(analyze_heap))
967 .route("/monitor/diagnose/", get(diagnose))
969 .layer(
970 ServiceBuilder::new()
971 .layer(AddExtensionLayer::new(srv.clone()))
972 .into_inner(),
973 )
974 .layer(cors_layer);
975
976 let trace_ui_router = otlp_embedded::ui_app(srv.trace_state.clone(), "/trace/");
977 let dashboard_router = risingwave_meta_dashboard::router();
978
979 let app = Router::new()
980 .fallback_service(dashboard_router)
981 .nest("/api", api_router)
982 .nest("/trace", trace_ui_router)
983 .layer(CompressionLayer::new());
984
985 let listener = TcpListener::bind(&srv.dashboard_addr)
986 .await
987 .context("failed to bind dashboard address")?;
988 axum::serve(listener, app)
989 .await
990 .context("failed to serve dashboard service")?;
991
992 Ok(())
993 }
994}