1mod prometheus;
16
17use std::net::SocketAddr;
18use std::path::Path as FilePath;
19use std::sync::Arc;
20
21use anyhow::{Context as _, Result, anyhow};
22use axum::Router;
23use axum::extract::{Extension, Path};
24use axum::http::{Method, StatusCode};
25use axum::response::{IntoResponse, Response};
26use axum::routing::get;
27use risingwave_rpc_client::ComputeClientPool;
28use tokio::net::TcpListener;
29use tower::ServiceBuilder;
30use tower_http::add_extension::AddExtensionLayer;
31use tower_http::compression::CompressionLayer;
32use tower_http::cors::{self, CorsLayer};
33
34use crate::hummock::HummockManagerRef;
35use crate::manager::MetadataManager;
36use crate::manager::diagnose::DiagnoseCommandRef;
37
38#[derive(Clone)]
39pub struct DashboardService {
40 pub await_tree_reg: await_tree::Registry,
41 pub dashboard_addr: SocketAddr,
42 pub prometheus_client: Option<prometheus_http_query::Client>,
43 pub prometheus_selector: String,
44 pub metadata_manager: MetadataManager,
45 pub hummock_manager: HummockManagerRef,
46 pub compute_clients: ComputeClientPool,
47 pub diagnose_command: DiagnoseCommandRef,
48 pub trace_state: otlp_embedded::StateRef,
49}
50
51pub type Service = Arc<DashboardService>;
52
53pub(super) mod handlers {
54 use std::cmp::min;
55 use std::collections::HashMap;
56
57 use anyhow::Context;
58 use axum::Json;
59 use axum::extract::Query;
60 use futures::future::join_all;
61 use itertools::Itertools;
62 use risingwave_common::catalog::TableId;
63 use risingwave_common_heap_profiling::COLLAPSED_SUFFIX;
64 use risingwave_meta_model::WorkerId;
65 use risingwave_pb::catalog::table::TableType;
66 use risingwave_pb::catalog::{
67 Index, PbDatabase, PbFunction, PbSchema, Sink, Source, Subscription, Table, View,
68 };
69 use risingwave_pb::common::{WorkerNode, WorkerType};
70 use risingwave_pb::hummock::TableStats;
71 use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
72 use risingwave_pb::meta::{
73 ActorIds, FragmentIdToActorIdMap, FragmentToRelationMap, PbTableFragments, RelationIdInfos,
74 };
75 use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
76 use risingwave_pb::monitor_service::{
77 ChannelDeltaStats, GetStreamingPrometheusStatsResponse, GetStreamingStatsResponse,
78 HeapProfilingResponse, ListHeapProfilingResponse, StackTraceResponse,
79 };
80 use risingwave_pb::user::PbUserInfo;
81 use serde::{Deserialize, Serialize};
82 use serde_json::json;
83 use thiserror_ext::AsReport;
84
85 use super::*;
86 use crate::controller::fragment::StreamingJobInfo;
87 use crate::rpc::await_tree::{dump_cluster_await_tree, dump_worker_node_await_tree};
88
89 #[derive(Serialize)]
90 pub struct TableWithStats {
91 #[serde(flatten)]
92 pub table: Table,
93 pub total_size_bytes: i64,
94 pub total_key_count: i64,
95 pub total_key_size: i64,
96 pub total_value_size: i64,
97 pub compressed_size: u64,
98 }
99
100 impl TableWithStats {
101 pub fn from_table_and_stats(table: Table, stats: Option<&TableStats>) -> Self {
102 match stats {
103 Some(stats) => Self {
104 total_size_bytes: stats.total_key_size + stats.total_value_size,
105 total_key_count: stats.total_key_count,
106 total_key_size: stats.total_key_size,
107 total_value_size: stats.total_value_size,
108 compressed_size: stats.total_compressed_size,
109 table,
110 },
111 None => Self {
112 total_size_bytes: 0,
113 total_key_count: 0,
114 total_key_size: 0,
115 total_value_size: 0,
116 compressed_size: 0,
117 table,
118 },
119 }
120 }
121 }
122
123 pub struct DashboardError(anyhow::Error);
124 pub type Result<T> = std::result::Result<T, DashboardError>;
125
126 pub fn err(err: impl Into<anyhow::Error>) -> DashboardError {
127 DashboardError(err.into())
128 }
129
130 impl From<anyhow::Error> for DashboardError {
131 fn from(value: anyhow::Error) -> Self {
132 DashboardError(value)
133 }
134 }
135
136 impl IntoResponse for DashboardError {
137 fn into_response(self) -> axum::response::Response {
138 let mut resp = Json(json!({
139 "error": self.0.to_report_string(),
140 }))
141 .into_response();
142 *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
143 resp
144 }
145 }
146
147 pub async fn list_clusters(
148 Path(ty): Path<i32>,
149 Extension(srv): Extension<Service>,
150 ) -> Result<Json<Vec<WorkerNode>>> {
151 let worker_type = WorkerType::try_from(ty)
152 .map_err(|_| anyhow!("invalid worker type"))
153 .map_err(err)?;
154 let mut result = srv
155 .metadata_manager
156 .list_worker_node(Some(worker_type), None)
157 .await
158 .map_err(err)?;
159 result.sort_unstable_by_key(|n| n.id);
160 Ok(result.into())
161 }
162
163 async fn list_table_catalogs_inner(
164 metadata_manager: &MetadataManager,
165 hummock_manager: &HummockManagerRef,
166 table_type: TableType,
167 ) -> Result<Json<Vec<TableWithStats>>> {
168 let tables = metadata_manager
169 .catalog_controller
170 .list_tables_by_type(table_type.into())
171 .await
172 .map_err(err)?;
173
174 let version_stats = hummock_manager.get_version_stats().await;
176
177 let tables_with_stats = tables
178 .into_iter()
179 .map(|table| {
180 let stats = version_stats.table_stats.get(&table.id);
181 TableWithStats::from_table_and_stats(table, stats)
182 })
183 .collect();
184
185 Ok(Json(tables_with_stats))
186 }
187
188 pub async fn list_materialized_views(
189 Extension(srv): Extension<Service>,
190 ) -> Result<Json<Vec<TableWithStats>>> {
191 list_table_catalogs_inner(
192 &srv.metadata_manager,
193 &srv.hummock_manager,
194 TableType::MaterializedView,
195 )
196 .await
197 }
198
199 pub async fn list_tables(
200 Extension(srv): Extension<Service>,
201 ) -> Result<Json<Vec<TableWithStats>>> {
202 list_table_catalogs_inner(
203 &srv.metadata_manager,
204 &srv.hummock_manager,
205 TableType::Table,
206 )
207 .await
208 }
209
210 pub async fn list_index_tables(
211 Extension(srv): Extension<Service>,
212 ) -> Result<Json<Vec<TableWithStats>>> {
213 list_table_catalogs_inner(
214 &srv.metadata_manager,
215 &srv.hummock_manager,
216 TableType::Index,
217 )
218 .await
219 }
220
221 pub async fn list_indexes(Extension(srv): Extension<Service>) -> Result<Json<Vec<Index>>> {
222 let indexes = srv
223 .metadata_manager
224 .catalog_controller
225 .list_indexes()
226 .await
227 .map_err(err)?;
228
229 Ok(Json(indexes))
230 }
231
232 pub async fn list_subscription(
233 Extension(srv): Extension<Service>,
234 ) -> Result<Json<Vec<Subscription>>> {
235 let subscriptions = srv
236 .metadata_manager
237 .catalog_controller
238 .list_subscriptions()
239 .await
240 .map_err(err)?;
241
242 Ok(Json(subscriptions))
243 }
244
245 pub async fn list_internal_tables(
246 Extension(srv): Extension<Service>,
247 ) -> Result<Json<Vec<TableWithStats>>> {
248 list_table_catalogs_inner(
249 &srv.metadata_manager,
250 &srv.hummock_manager,
251 TableType::Internal,
252 )
253 .await
254 }
255
256 pub async fn list_sources(Extension(srv): Extension<Service>) -> Result<Json<Vec<Source>>> {
257 let sources = srv.metadata_manager.list_sources().await.map_err(err)?;
258
259 Ok(Json(sources))
260 }
261
262 pub async fn list_sinks(Extension(srv): Extension<Service>) -> Result<Json<Vec<Sink>>> {
263 let sinks = srv
264 .metadata_manager
265 .catalog_controller
266 .list_sinks()
267 .await
268 .map_err(err)?;
269
270 Ok(Json(sinks))
271 }
272
273 pub async fn list_views(Extension(srv): Extension<Service>) -> Result<Json<Vec<View>>> {
274 let views = srv
275 .metadata_manager
276 .catalog_controller
277 .list_views()
278 .await
279 .map_err(err)?;
280
281 Ok(Json(views))
282 }
283
284 pub async fn list_functions(
285 Extension(srv): Extension<Service>,
286 ) -> Result<Json<Vec<PbFunction>>> {
287 let functions = srv
288 .metadata_manager
289 .catalog_controller
290 .list_functions()
291 .await
292 .map_err(err)?;
293
294 Ok(Json(functions))
295 }
296
297 pub async fn list_streaming_jobs(
298 Extension(srv): Extension<Service>,
299 ) -> Result<Json<Vec<StreamingJobInfo>>> {
300 let streaming_jobs = srv
301 .metadata_manager
302 .catalog_controller
303 .list_streaming_job_infos()
304 .await
305 .map_err(err)?;
306
307 Ok(Json(streaming_jobs))
308 }
309
310 pub async fn get_fragment_to_relation_map(
318 Extension(srv): Extension<Service>,
319 ) -> Result<Json<FragmentToRelationMap>> {
320 let table_fragments = srv
321 .metadata_manager
322 .catalog_controller
323 .table_fragments()
324 .await
325 .map_err(err)?;
326 let mut fragment_to_relation_map = HashMap::new();
327 for (relation_id, tf) in table_fragments {
328 for fragment_id in tf.fragments.keys() {
329 fragment_to_relation_map.insert(*fragment_id, relation_id as u32);
330 }
331 }
332 let map = FragmentToRelationMap {
333 fragment_to_relation_map,
334 };
335 Ok(Json(map))
336 }
337
338 pub async fn get_relation_id_infos(
340 Extension(srv): Extension<Service>,
341 ) -> Result<Json<RelationIdInfos>> {
342 let table_fragments = srv
343 .metadata_manager
344 .catalog_controller
345 .table_fragments()
346 .await
347 .map_err(err)?;
348 let mut map = HashMap::new();
349 for (id, tf) in table_fragments {
350 let mut fragment_id_to_actor_ids = HashMap::new();
351 for (fragment_id, fragment) in &tf.fragments {
352 let actor_ids = fragment.actors.iter().map(|a| a.actor_id).collect_vec();
353 fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
354 }
355 map.insert(
356 id as u32,
357 FragmentIdToActorIdMap {
358 map: fragment_id_to_actor_ids,
359 },
360 );
361 }
362 let relation_id_infos = RelationIdInfos { map };
363
364 Ok(Json(relation_id_infos))
365 }
366
367 pub async fn list_fragments_by_job_id(
368 Extension(srv): Extension<Service>,
369 Path(job_id): Path<u32>,
370 ) -> Result<Json<PbTableFragments>> {
371 let table_id = TableId::new(job_id);
372 let table_fragments = srv
373 .metadata_manager
374 .get_job_fragments_by_id(&table_id)
375 .await
376 .map_err(err)?;
377 let upstream_fragments = srv
378 .metadata_manager
379 .catalog_controller
380 .upstream_fragments(table_fragments.fragment_ids())
381 .await
382 .map_err(err)?;
383 let dispatchers = srv
384 .metadata_manager
385 .catalog_controller
386 .get_fragment_actor_dispatchers(
387 table_fragments.fragment_ids().map(|id| id as _).collect(),
388 )
389 .await
390 .map_err(err)?;
391 Ok(Json(
392 table_fragments.to_protobuf(&upstream_fragments, &dispatchers),
393 ))
394 }
395
396 pub async fn list_users(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbUserInfo>>> {
397 let users = srv
398 .metadata_manager
399 .catalog_controller
400 .list_users()
401 .await
402 .map_err(err)?;
403
404 Ok(Json(users))
405 }
406
407 pub async fn list_databases(
408 Extension(srv): Extension<Service>,
409 ) -> Result<Json<Vec<PbDatabase>>> {
410 let databases = srv
411 .metadata_manager
412 .catalog_controller
413 .list_databases()
414 .await
415 .map_err(err)?;
416
417 Ok(Json(databases))
418 }
419
420 pub async fn list_schemas(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbSchema>>> {
421 let schemas = srv
422 .metadata_manager
423 .catalog_controller
424 .list_schemas()
425 .await
426 .map_err(err)?;
427
428 Ok(Json(schemas))
429 }
430
431 pub async fn list_object_dependencies(
432 Extension(srv): Extension<Service>,
433 ) -> Result<Json<Vec<PbObjectDependencies>>> {
434 let object_dependencies = srv
435 .metadata_manager
436 .catalog_controller
437 .list_all_object_dependencies()
438 .await
439 .map_err(err)?;
440
441 Ok(Json(object_dependencies))
442 }
443
444 #[derive(Debug, Deserialize)]
445 pub struct AwaitTreeDumpParams {
446 #[serde(default = "await_tree_default_format")]
447 format: String,
448 }
449
450 impl AwaitTreeDumpParams {
451 pub fn actor_traces_format(&self) -> Result<ActorTracesFormat> {
453 Ok(match self.format.as_str() {
454 "text" => ActorTracesFormat::Text,
455 "json" => ActorTracesFormat::Json,
456 _ => {
457 return Err(err(anyhow!(
458 "Unsupported format `{}`, only `text` and `json` are supported for now",
459 self.format
460 )));
461 }
462 })
463 }
464 }
465
466 fn await_tree_default_format() -> String {
467 "text".to_owned()
469 }
470
471 pub async fn dump_await_tree_all(
472 Query(params): Query<AwaitTreeDumpParams>,
473 Extension(srv): Extension<Service>,
474 ) -> Result<Json<StackTraceResponse>> {
475 let actor_traces_format = params.actor_traces_format()?;
476
477 let res = dump_cluster_await_tree(
478 &srv.metadata_manager,
479 &srv.await_tree_reg,
480 actor_traces_format,
481 )
482 .await
483 .map_err(err)?;
484
485 Ok(res.into())
486 }
487
488 pub async fn dump_await_tree(
489 Path(worker_id): Path<WorkerId>,
490 Query(params): Query<AwaitTreeDumpParams>,
491 Extension(srv): Extension<Service>,
492 ) -> Result<Json<StackTraceResponse>> {
493 let actor_traces_format = params.actor_traces_format()?;
494
495 let worker_node = srv
496 .metadata_manager
497 .get_worker_by_id(worker_id)
498 .await
499 .map_err(err)?
500 .context("worker node not found")
501 .map_err(err)?;
502
503 let res = dump_worker_node_await_tree(std::iter::once(&worker_node), actor_traces_format)
504 .await
505 .map_err(err)?;
506
507 Ok(res.into())
508 }
509
510 pub async fn heap_profile(
511 Path(worker_id): Path<WorkerId>,
512 Extension(srv): Extension<Service>,
513 ) -> Result<Json<HeapProfilingResponse>> {
514 let worker_node = srv
515 .metadata_manager
516 .get_worker_by_id(worker_id)
517 .await
518 .map_err(err)?
519 .context("worker node not found")
520 .map_err(err)?;
521
522 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
523
524 let result = client.heap_profile("".to_owned()).await.map_err(err)?;
525
526 Ok(result.into())
527 }
528
529 pub async fn list_heap_profile(
530 Path(worker_id): Path<WorkerId>,
531 Extension(srv): Extension<Service>,
532 ) -> Result<Json<ListHeapProfilingResponse>> {
533 let worker_node = srv
534 .metadata_manager
535 .get_worker_by_id(worker_id)
536 .await
537 .map_err(err)?
538 .context("worker node not found")
539 .map_err(err)?;
540
541 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
542
543 let result = client.list_heap_profile().await.map_err(err)?;
544 Ok(result.into())
545 }
546
547 pub async fn analyze_heap(
548 Path((worker_id, file_path)): Path<(WorkerId, String)>,
549 Extension(srv): Extension<Service>,
550 ) -> Result<Response> {
551 let file_path =
552 String::from_utf8(base64_url::decode(&file_path).map_err(err)?).map_err(err)?;
553
554 let file_name = FilePath::new(&file_path)
555 .file_name()
556 .unwrap()
557 .to_string_lossy()
558 .to_string();
559
560 let collapsed_file_name = format!("{}.{}", file_name, COLLAPSED_SUFFIX);
561
562 let worker_node = srv
563 .metadata_manager
564 .get_worker_by_id(worker_id)
565 .await
566 .map_err(err)?
567 .context("worker node not found")
568 .map_err(err)?;
569
570 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
571
572 let collapsed_bin = client
573 .analyze_heap(file_path.clone())
574 .await
575 .map_err(err)?
576 .result;
577 let collapsed_str = String::from_utf8_lossy(&collapsed_bin).to_string();
578
579 let response = Response::builder()
580 .header("Content-Type", "application/octet-stream")
581 .header("Content-Disposition", collapsed_file_name)
582 .body(collapsed_str.into());
583
584 response.map_err(err)
585 }
586
587 #[derive(Debug, Deserialize)]
588 pub struct DiagnoseParams {
589 #[serde(default = "await_tree_default_format")]
590 actor_traces_format: String,
591 }
592
593 pub async fn diagnose(
594 Query(params): Query<DiagnoseParams>,
595 Extension(srv): Extension<Service>,
596 ) -> Result<String> {
597 let actor_traces_format = match params.actor_traces_format.as_str() {
598 "text" => ActorTracesFormat::Text,
599 "json" => ActorTracesFormat::Json,
600 _ => {
601 return Err(err(anyhow!(
602 "Unsupported actor_traces_format `{}`, only `text` and `json` are supported for now",
603 params.actor_traces_format
604 )));
605 }
606 };
607 Ok(srv.diagnose_command.report(actor_traces_format).await)
608 }
609
610 pub async fn get_streaming_stats(
616 Extension(srv): Extension<Service>,
617 ) -> Result<Json<GetStreamingStatsResponse>> {
618 let worker_nodes = srv
619 .metadata_manager
620 .list_active_streaming_compute_nodes()
621 .await
622 .map_err(err)?;
623
624 let mut futures = Vec::new();
625
626 for worker_node in worker_nodes {
627 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
628 let client = Arc::new(client);
629 let fut = async move {
630 let result = client.get_streaming_stats().await.map_err(err)?;
631 Ok::<_, DashboardError>(result)
632 };
633 futures.push(fut);
634 }
635 let results = join_all(futures).await;
636
637 let mut all = GetStreamingStatsResponse::default();
638
639 for result in results {
640 let result = result
641 .map_err(|_| anyhow!("Failed to get back pressure"))
642 .map_err(err)?;
643
644 for (fragment_id, fragment_stats) in result.fragment_stats {
646 if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
647 s.actor_count += fragment_stats.actor_count;
648 s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
649 } else {
650 all.fragment_stats.insert(fragment_id, fragment_stats);
651 }
652 }
653
654 for (relation_id, relation_stats) in result.relation_stats {
656 if let Some(s) = all.relation_stats.get_mut(&relation_id) {
657 s.actor_count += relation_stats.actor_count;
658 s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
659 } else {
660 all.relation_stats.insert(relation_id, relation_stats);
661 }
662 }
663
664 for (key, channel_stats) in result.channel_stats {
666 if let Some(s) = all.channel_stats.get_mut(&key) {
667 s.actor_count += channel_stats.actor_count;
668 s.output_blocking_duration += channel_stats.output_blocking_duration;
669 s.recv_row_count += channel_stats.recv_row_count;
670 s.send_row_count += channel_stats.send_row_count;
671 } else {
672 all.channel_stats.insert(key, channel_stats);
673 }
674 }
675 }
676
677 Ok(all.into())
678 }
679
680 #[derive(Debug, Deserialize)]
681 pub struct StreamingStatsPrometheusParams {
682 #[serde(default)]
684 at: Option<i64>,
685 #[serde(default = "streaming_stats_default_time_offset")]
687 time_offset: i64,
688 }
689
690 fn streaming_stats_default_time_offset() -> i64 {
691 60
692 }
693
694 pub async fn get_streaming_stats_from_prometheus(
695 Query(params): Query<StreamingStatsPrometheusParams>,
696 Extension(srv): Extension<Service>,
697 ) -> Result<Json<GetStreamingPrometheusStatsResponse>> {
698 let mut all = GetStreamingPrometheusStatsResponse::default();
699
700 let worker_nodes = srv
702 .metadata_manager
703 .list_active_streaming_compute_nodes()
704 .await
705 .map_err(err)?;
706
707 let mut futures = Vec::new();
708
709 for worker_node in worker_nodes {
710 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
711 let client = Arc::new(client);
712 let fut = async move {
713 let result = client.get_streaming_stats().await.map_err(err)?;
714 Ok::<_, DashboardError>(result)
715 };
716 futures.push(fut);
717 }
718 let results = join_all(futures).await;
719
720 for result in results {
721 let result = result
722 .map_err(|_| anyhow!("Failed to get streaming stats from worker"))
723 .map_err(err)?;
724
725 for (fragment_id, fragment_stats) in result.fragment_stats {
727 if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
728 s.actor_count += fragment_stats.actor_count;
729 s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
730 } else {
731 all.fragment_stats.insert(fragment_id, fragment_stats);
732 }
733 }
734
735 for (relation_id, relation_stats) in result.relation_stats {
737 if let Some(s) = all.relation_stats.get_mut(&relation_id) {
738 s.actor_count += relation_stats.actor_count;
739 s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
740 } else {
741 all.relation_stats.insert(relation_id, relation_stats);
742 }
743 }
744 }
745
746 if let Some(ref client) = srv.prometheus_client {
748 let channel_input_throughput_query = format!(
750 "sum(rate(stream_actor_in_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
751 srv.prometheus_selector, params.time_offset
752 );
753 let channel_output_throughput_query = format!(
754 "sum(rate(stream_actor_out_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
755 srv.prometheus_selector, params.time_offset
756 );
757 let channel_backpressure_query = format!(
758 "sum(rate(stream_actor_output_buffer_blocking_duration_ns{{{}}}[{}s])) by (fragment_id, downstream_fragment_id) \
759 / ignoring (downstream_fragment_id) group_left sum(stream_actor_count) by (fragment_id)",
760 srv.prometheus_selector, params.time_offset
761 );
762
763 let (
765 channel_input_throughput_result,
766 channel_output_throughput_result,
767 channel_backpressure_result,
768 ) = {
769 let mut input_query = client.query(channel_input_throughput_query);
770 let mut output_query = client.query(channel_output_throughput_query);
771 let mut backpressure_query = client.query(channel_backpressure_query);
772
773 if let Some(at_time) = params.at {
775 input_query = input_query.at(at_time);
776 output_query = output_query.at(at_time);
777 backpressure_query = backpressure_query.at(at_time);
778 }
779
780 tokio::try_join!(
781 input_query.get(),
782 output_query.get(),
783 backpressure_query.get(),
784 )
785 .map_err(err)?
786 };
787
788 let mut channel_data = HashMap::new();
790
791 if let Some(channel_input_throughput_data) =
793 channel_input_throughput_result.data().as_vector()
794 {
795 for sample in channel_input_throughput_data {
796 if let Some(fragment_id_str) = sample.metric().get("fragment_id")
797 && let Some(upstream_fragment_id_str) =
798 sample.metric().get("upstream_fragment_id")
799 && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
800 fragment_id_str.parse::<u32>(),
801 upstream_fragment_id_str.parse::<u32>(),
802 )
803 {
804 let key = format!("{}_{}", upstream_fragment_id, fragment_id);
805 channel_data
806 .entry(key)
807 .or_insert_with(|| ChannelDeltaStats {
808 actor_count: 0,
809 backpressure_rate: 0.0,
810 recv_throughput: 0.0,
811 send_throughput: 0.0,
812 })
813 .recv_throughput = sample.sample().value();
814 }
815 }
816 }
817
818 if let Some(channel_output_throughput_data) =
820 channel_output_throughput_result.data().as_vector()
821 {
822 for sample in channel_output_throughput_data {
823 if let Some(fragment_id_str) = sample.metric().get("fragment_id")
824 && let Some(upstream_fragment_id_str) =
825 sample.metric().get("upstream_fragment_id")
826 && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
827 fragment_id_str.parse::<u32>(),
828 upstream_fragment_id_str.parse::<u32>(),
829 )
830 {
831 let key = format!("{}_{}", upstream_fragment_id, fragment_id);
832 channel_data
833 .entry(key)
834 .or_insert_with(|| ChannelDeltaStats {
835 actor_count: 0,
836 backpressure_rate: 0.0,
837 recv_throughput: 0.0,
838 send_throughput: 0.0,
839 })
840 .send_throughput = sample.sample().value();
841 }
842 }
843 }
844
845 if let Some(channel_backpressure_data) = channel_backpressure_result.data().as_vector()
847 {
848 for sample in channel_backpressure_data {
849 if let Some(fragment_id_str) = sample.metric().get("fragment_id")
850 && let Some(downstream_fragment_id_str) =
851 sample.metric().get("downstream_fragment_id")
852 && let (Ok(fragment_id), Ok(downstream_fragment_id)) = (
853 fragment_id_str.parse::<u32>(),
854 downstream_fragment_id_str.parse::<u32>(),
855 )
856 {
857 let key = format!("{}_{}", fragment_id, downstream_fragment_id);
858 channel_data
859 .entry(key)
860 .or_insert_with(|| ChannelDeltaStats {
861 actor_count: 0,
862 backpressure_rate: 0.0,
863 recv_throughput: 0.0,
864 send_throughput: 0.0,
865 })
866 .backpressure_rate = sample.sample().value() / 1_000_000_000.0; }
868 }
869 }
870
871 for (key, channel_stats) in &mut channel_data {
873 let parts: Vec<&str> = key.split('_').collect();
874 if parts.len() == 2
875 && let Ok(fragment_id) = parts[1].parse::<u32>()
876 && let Some(fragment_stats) = all.fragment_stats.get(&fragment_id)
877 {
878 channel_stats.actor_count = fragment_stats.actor_count;
879 }
880 }
881
882 all.channel_stats = channel_data;
883
884 Ok(Json(all))
885 } else {
886 Err(err(anyhow!("Prometheus endpoint is not set")))
887 }
888 }
889
890 pub async fn get_version(Extension(_srv): Extension<Service>) -> Result<Json<String>> {
891 Ok(Json(risingwave_common::current_cluster_version()))
892 }
893}
894
895impl DashboardService {
896 pub async fn serve(self) -> Result<()> {
897 use handlers::*;
898 let srv = Arc::new(self);
899
900 let cors_layer = CorsLayer::new()
901 .allow_origin(cors::Any)
902 .allow_methods(vec![Method::GET]);
903
904 let api_router = Router::new()
905 .route("/version", get(get_version))
906 .route("/clusters/:ty", get(list_clusters))
907 .route("/streaming_jobs", get(list_streaming_jobs))
908 .route("/fragments/job_id/:job_id", get(list_fragments_by_job_id))
909 .route("/relation_id_infos", get(get_relation_id_infos))
910 .route(
911 "/fragment_to_relation_map",
912 get(get_fragment_to_relation_map),
913 )
914 .route("/views", get(list_views))
915 .route("/functions", get(list_functions))
916 .route("/materialized_views", get(list_materialized_views))
917 .route("/tables", get(list_tables))
918 .route("/indexes", get(list_index_tables))
919 .route("/index_items", get(list_indexes))
920 .route("/subscriptions", get(list_subscription))
921 .route("/internal_tables", get(list_internal_tables))
922 .route("/sources", get(list_sources))
923 .route("/sinks", get(list_sinks))
924 .route("/users", get(list_users))
925 .route("/databases", get(list_databases))
926 .route("/schemas", get(list_schemas))
927 .route("/object_dependencies", get(list_object_dependencies))
928 .route("/metrics/cluster", get(prometheus::list_prometheus_cluster))
929 .route("/metrics/streaming_stats", get(get_streaming_stats))
930 .route(
931 "/metrics/streaming_stats_prometheus",
932 get(get_streaming_stats_from_prometheus),
933 )
934 .route("/monitor/await_tree/:worker_id", get(dump_await_tree))
936 .route("/monitor/await_tree/", get(dump_await_tree_all))
938 .route("/monitor/dump_heap_profile/:worker_id", get(heap_profile))
939 .route(
940 "/monitor/list_heap_profile/:worker_id",
941 get(list_heap_profile),
942 )
943 .route("/monitor/analyze/:worker_id/*path", get(analyze_heap))
944 .route("/monitor/diagnose/", get(diagnose))
946 .layer(
947 ServiceBuilder::new()
948 .layer(AddExtensionLayer::new(srv.clone()))
949 .into_inner(),
950 )
951 .layer(cors_layer);
952
953 let trace_ui_router = otlp_embedded::ui_app(srv.trace_state.clone(), "/trace/");
954 let dashboard_router = risingwave_meta_dashboard::router();
955
956 let app = Router::new()
957 .fallback_service(dashboard_router)
958 .nest("/api", api_router)
959 .nest("/trace", trace_ui_router)
960 .layer(CompressionLayer::new());
961
962 let listener = TcpListener::bind(&srv.dashboard_addr)
963 .await
964 .context("failed to bind dashboard address")?;
965 axum::serve(listener, app)
966 .await
967 .context("failed to serve dashboard service")?;
968
969 Ok(())
970 }
971}