1mod prometheus;
16
17use std::net::SocketAddr;
18use std::sync::Arc;
19
20use anyhow::{Context as _, Result, anyhow};
21use axum::Router;
22use axum::extract::{Extension, Path};
23use axum::http::{Method, StatusCode};
24use axum::response::{IntoResponse, Response};
25use axum::routing::get;
26use risingwave_rpc_client::ComputeClientPool;
27use tokio::net::TcpListener;
28use tower::ServiceBuilder;
29use tower_http::add_extension::AddExtensionLayer;
30use tower_http::compression::CompressionLayer;
31use tower_http::cors::{self, CorsLayer};
32
33use crate::hummock::HummockManagerRef;
34use crate::manager::MetadataManager;
35use crate::manager::diagnose::DiagnoseCommandRef;
36
37#[derive(Clone)]
38pub struct DashboardService {
39 pub await_tree_reg: await_tree::Registry,
40 pub dashboard_addr: SocketAddr,
41 pub prometheus_client: Option<prometheus_http_query::Client>,
42 pub prometheus_selector: String,
43 pub metadata_manager: MetadataManager,
44 pub hummock_manager: HummockManagerRef,
45 pub compute_clients: ComputeClientPool,
46 pub diagnose_command: DiagnoseCommandRef,
47 pub trace_state: otlp_embedded::StateRef,
48}
49
50pub type Service = Arc<DashboardService>;
51
52pub(super) mod handlers {
53 use std::cmp::min;
54 use std::collections::HashMap;
55
56 use anyhow::Context;
57 use axum::Json;
58 use axum::extract::Query;
59 use futures::future::join_all;
60 use itertools::Itertools;
61 use risingwave_common::id::JobId;
62 use risingwave_meta_model::WorkerId;
63 use risingwave_pb::catalog::table::TableType;
64 use risingwave_pb::catalog::{
65 Index, PbDatabase, PbFunction, PbSchema, Sink, Source, Subscription, Table, View,
66 };
67 use risingwave_pb::common::{WorkerNode, WorkerType};
68 use risingwave_pb::hummock::TableStats;
69 use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
70 use risingwave_pb::meta::{
71 ActorIds, FragmentIdToActorIdMap, FragmentToRelationMap, PbTableFragments, RelationIdInfos,
72 };
73 use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
74 use risingwave_pb::monitor_service::{
75 ChannelDeltaStats, GetStreamingPrometheusStatsResponse, GetStreamingStatsResponse,
76 HeapProfilingResponse, ListHeapProfilingResponse, StackTraceResponse,
77 };
78 use risingwave_pb::user::PbUserInfo;
79 use serde::{Deserialize, Serialize};
80 use serde_json::json;
81 use thiserror_ext::AsReport;
82
83 use super::*;
84 use crate::controller::fragment::StreamingJobInfo;
85 use crate::rpc::await_tree::{dump_cluster_await_tree, dump_worker_node_await_tree};
86
87 #[derive(Serialize)]
88 pub struct TableWithStats {
89 #[serde(flatten)]
90 pub table: Table,
91 pub total_size_bytes: i64,
92 pub total_key_count: i64,
93 pub total_key_size: i64,
94 pub total_value_size: i64,
95 pub compressed_size: u64,
96 }
97
98 impl TableWithStats {
99 pub fn from_table_and_stats(table: Table, stats: Option<&TableStats>) -> Self {
100 match stats {
101 Some(stats) => Self {
102 total_size_bytes: stats.total_key_size + stats.total_value_size,
103 total_key_count: stats.total_key_count,
104 total_key_size: stats.total_key_size,
105 total_value_size: stats.total_value_size,
106 compressed_size: stats.total_compressed_size,
107 table,
108 },
109 None => Self {
110 total_size_bytes: 0,
111 total_key_count: 0,
112 total_key_size: 0,
113 total_value_size: 0,
114 compressed_size: 0,
115 table,
116 },
117 }
118 }
119 }
120
121 pub struct DashboardError(anyhow::Error);
122 pub type Result<T> = std::result::Result<T, DashboardError>;
123
124 pub fn err(err: impl Into<anyhow::Error>) -> DashboardError {
125 DashboardError(err.into())
126 }
127
128 impl From<anyhow::Error> for DashboardError {
129 fn from(value: anyhow::Error) -> Self {
130 DashboardError(value)
131 }
132 }
133
134 impl IntoResponse for DashboardError {
135 fn into_response(self) -> axum::response::Response {
136 let mut resp = Json(json!({
137 "error": self.0.to_report_string(),
138 }))
139 .into_response();
140 *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
141 resp
142 }
143 }
144
145 pub async fn list_clusters(
146 Path(ty): Path<i32>,
147 Extension(srv): Extension<Service>,
148 ) -> Result<Json<Vec<WorkerNode>>> {
149 let worker_type = WorkerType::try_from(ty)
150 .map_err(|_| anyhow!("invalid worker type"))
151 .map_err(err)?;
152 let mut result = srv
153 .metadata_manager
154 .list_worker_node(Some(worker_type), None)
155 .await
156 .map_err(err)?;
157 result.sort_unstable_by_key(|n| n.id);
158 Ok(result.into())
159 }
160
161 async fn list_table_catalogs_inner(
162 metadata_manager: &MetadataManager,
163 hummock_manager: &HummockManagerRef,
164 table_type: TableType,
165 ) -> Result<Json<Vec<TableWithStats>>> {
166 let tables = metadata_manager
167 .catalog_controller
168 .list_tables_by_type(table_type.into())
169 .await
170 .map_err(err)?;
171
172 let version_stats = hummock_manager.get_version_stats().await;
174
175 let tables_with_stats = tables
176 .into_iter()
177 .map(|table| {
178 let stats = version_stats.table_stats.get(&table.id);
179 TableWithStats::from_table_and_stats(table, stats)
180 })
181 .collect();
182
183 Ok(Json(tables_with_stats))
184 }
185
186 pub async fn list_materialized_views(
187 Extension(srv): Extension<Service>,
188 ) -> Result<Json<Vec<TableWithStats>>> {
189 list_table_catalogs_inner(
190 &srv.metadata_manager,
191 &srv.hummock_manager,
192 TableType::MaterializedView,
193 )
194 .await
195 }
196
197 pub async fn list_tables(
198 Extension(srv): Extension<Service>,
199 ) -> Result<Json<Vec<TableWithStats>>> {
200 list_table_catalogs_inner(
201 &srv.metadata_manager,
202 &srv.hummock_manager,
203 TableType::Table,
204 )
205 .await
206 }
207
208 pub async fn list_index_tables(
209 Extension(srv): Extension<Service>,
210 ) -> Result<Json<Vec<TableWithStats>>> {
211 list_table_catalogs_inner(
212 &srv.metadata_manager,
213 &srv.hummock_manager,
214 TableType::Index,
215 )
216 .await
217 }
218
219 pub async fn list_indexes(Extension(srv): Extension<Service>) -> Result<Json<Vec<Index>>> {
220 let indexes = srv
221 .metadata_manager
222 .catalog_controller
223 .list_indexes()
224 .await
225 .map_err(err)?;
226
227 Ok(Json(indexes))
228 }
229
230 pub async fn list_subscription(
231 Extension(srv): Extension<Service>,
232 ) -> Result<Json<Vec<Subscription>>> {
233 let subscriptions = srv
234 .metadata_manager
235 .catalog_controller
236 .list_subscriptions()
237 .await
238 .map_err(err)?;
239
240 Ok(Json(subscriptions))
241 }
242
243 pub async fn list_internal_tables(
244 Extension(srv): Extension<Service>,
245 ) -> Result<Json<Vec<TableWithStats>>> {
246 list_table_catalogs_inner(
247 &srv.metadata_manager,
248 &srv.hummock_manager,
249 TableType::Internal,
250 )
251 .await
252 }
253
254 pub async fn list_sources(Extension(srv): Extension<Service>) -> Result<Json<Vec<Source>>> {
255 let sources = srv.metadata_manager.list_sources().await.map_err(err)?;
256
257 Ok(Json(sources))
258 }
259
260 pub async fn list_sinks(Extension(srv): Extension<Service>) -> Result<Json<Vec<Sink>>> {
261 let sinks = srv
262 .metadata_manager
263 .catalog_controller
264 .list_sinks()
265 .await
266 .map_err(err)?;
267
268 Ok(Json(sinks))
269 }
270
271 pub async fn list_views(Extension(srv): Extension<Service>) -> Result<Json<Vec<View>>> {
272 let views = srv
273 .metadata_manager
274 .catalog_controller
275 .list_views()
276 .await
277 .map_err(err)?;
278
279 Ok(Json(views))
280 }
281
282 pub async fn list_functions(
283 Extension(srv): Extension<Service>,
284 ) -> Result<Json<Vec<PbFunction>>> {
285 let functions = srv
286 .metadata_manager
287 .catalog_controller
288 .list_functions()
289 .await
290 .map_err(err)?;
291
292 Ok(Json(functions))
293 }
294
295 pub async fn list_streaming_jobs(
296 Extension(srv): Extension<Service>,
297 ) -> Result<Json<Vec<StreamingJobInfo>>> {
298 let streaming_jobs = srv
299 .metadata_manager
300 .catalog_controller
301 .list_streaming_job_infos()
302 .await
303 .map_err(err)?;
304
305 Ok(Json(streaming_jobs))
306 }
307
308 pub async fn get_fragment_to_relation_map(
316 Extension(srv): Extension<Service>,
317 ) -> Result<Json<FragmentToRelationMap>> {
318 let table_fragments = srv
319 .metadata_manager
320 .catalog_controller
321 .table_fragments()
322 .await
323 .map_err(err)?;
324 let mut fragment_to_relation_map = HashMap::new();
325 for (relation_id, tf) in table_fragments {
326 for fragment_id in tf.fragments.keys() {
327 fragment_to_relation_map.insert(*fragment_id, relation_id.as_raw_id());
328 }
329 }
330 let map = FragmentToRelationMap {
331 fragment_to_relation_map,
332 };
333 Ok(Json(map))
334 }
335
336 pub async fn get_relation_id_infos(
338 Extension(srv): Extension<Service>,
339 ) -> Result<Json<RelationIdInfos>> {
340 let table_fragments = srv
341 .metadata_manager
342 .catalog_controller
343 .table_fragments()
344 .await
345 .map_err(err)?;
346 let mut map = HashMap::new();
347 for (id, tf) in table_fragments {
348 let mut fragment_id_to_actor_ids = HashMap::new();
349 for (fragment_id, fragment) in &tf.fragments {
350 let actor_ids = fragment.actors.iter().map(|a| a.actor_id).collect_vec();
351 fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
352 }
353 map.insert(
354 id.as_raw_id(),
355 FragmentIdToActorIdMap {
356 map: fragment_id_to_actor_ids,
357 },
358 );
359 }
360 let relation_id_infos = RelationIdInfos { map };
361
362 Ok(Json(relation_id_infos))
363 }
364
365 pub async fn list_fragments_by_job_id(
366 Extension(srv): Extension<Service>,
367 Path(job_id): Path<u32>,
368 ) -> Result<Json<PbTableFragments>> {
369 let job_id = JobId::new(job_id);
370 let table_fragments = srv
371 .metadata_manager
372 .get_job_fragments_by_id(job_id)
373 .await
374 .map_err(err)?;
375 let upstream_fragments = srv
376 .metadata_manager
377 .catalog_controller
378 .upstream_fragments(table_fragments.fragment_ids())
379 .await
380 .map_err(err)?;
381 let dispatchers = srv
382 .metadata_manager
383 .catalog_controller
384 .get_fragment_actor_dispatchers(
385 table_fragments.fragment_ids().map(|id| id as _).collect(),
386 )
387 .await
388 .map_err(err)?;
389 Ok(Json(
390 table_fragments.to_protobuf(&upstream_fragments, &dispatchers),
391 ))
392 }
393
394 pub async fn list_users(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbUserInfo>>> {
395 let users = srv
396 .metadata_manager
397 .catalog_controller
398 .list_users()
399 .await
400 .map_err(err)?;
401
402 Ok(Json(users))
403 }
404
405 pub async fn list_databases(
406 Extension(srv): Extension<Service>,
407 ) -> Result<Json<Vec<PbDatabase>>> {
408 let databases = srv
409 .metadata_manager
410 .catalog_controller
411 .list_databases()
412 .await
413 .map_err(err)?;
414
415 Ok(Json(databases))
416 }
417
418 pub async fn list_schemas(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbSchema>>> {
419 let schemas = srv
420 .metadata_manager
421 .catalog_controller
422 .list_schemas()
423 .await
424 .map_err(err)?;
425
426 Ok(Json(schemas))
427 }
428
429 pub async fn list_object_dependencies(
430 Extension(srv): Extension<Service>,
431 ) -> Result<Json<Vec<PbObjectDependencies>>> {
432 let object_dependencies = srv
433 .metadata_manager
434 .catalog_controller
435 .list_all_object_dependencies()
436 .await
437 .map_err(err)?;
438
439 Ok(Json(object_dependencies))
440 }
441
442 #[derive(Debug, Deserialize)]
443 pub struct AwaitTreeDumpParams {
444 #[serde(default = "await_tree_default_format")]
445 format: String,
446 }
447
448 impl AwaitTreeDumpParams {
449 pub fn actor_traces_format(&self) -> Result<ActorTracesFormat> {
451 Ok(match self.format.as_str() {
452 "text" => ActorTracesFormat::Text,
453 "json" => ActorTracesFormat::Json,
454 _ => {
455 return Err(err(anyhow!(
456 "Unsupported format `{}`, only `text` and `json` are supported for now",
457 self.format
458 )));
459 }
460 })
461 }
462 }
463
464 fn await_tree_default_format() -> String {
465 "text".to_owned()
467 }
468
469 pub async fn dump_await_tree_all(
470 Query(params): Query<AwaitTreeDumpParams>,
471 Extension(srv): Extension<Service>,
472 ) -> Result<Json<StackTraceResponse>> {
473 let actor_traces_format = params.actor_traces_format()?;
474
475 let res = dump_cluster_await_tree(
476 &srv.metadata_manager,
477 &srv.await_tree_reg,
478 actor_traces_format,
479 )
480 .await
481 .map_err(err)?;
482
483 Ok(res.into())
484 }
485
486 pub async fn dump_await_tree(
487 Path(worker_id): Path<WorkerId>,
488 Query(params): Query<AwaitTreeDumpParams>,
489 Extension(srv): Extension<Service>,
490 ) -> Result<Json<StackTraceResponse>> {
491 let actor_traces_format = params.actor_traces_format()?;
492
493 let worker_node = srv
494 .metadata_manager
495 .get_worker_by_id(worker_id)
496 .await
497 .map_err(err)?
498 .context("worker node not found")
499 .map_err(err)?;
500
501 let res = dump_worker_node_await_tree(std::iter::once(&worker_node), actor_traces_format)
502 .await
503 .map_err(err)?;
504
505 Ok(res.into())
506 }
507
508 pub async fn heap_profile(
509 Path(worker_id): Path<WorkerId>,
510 Extension(srv): Extension<Service>,
511 ) -> Result<Json<HeapProfilingResponse>> {
512 let worker_node = srv
513 .metadata_manager
514 .get_worker_by_id(worker_id)
515 .await
516 .map_err(err)?
517 .context("worker node not found")
518 .map_err(err)?;
519
520 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
521
522 let result = client.heap_profile("".to_owned()).await.map_err(err)?;
523
524 Ok(result.into())
525 }
526
527 pub async fn list_heap_profile(
528 Path(worker_id): Path<WorkerId>,
529 Extension(srv): Extension<Service>,
530 ) -> Result<Json<ListHeapProfilingResponse>> {
531 let worker_node = srv
532 .metadata_manager
533 .get_worker_by_id(worker_id)
534 .await
535 .map_err(err)?
536 .context("worker node not found")
537 .map_err(err)?;
538
539 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
540
541 let result = client.list_heap_profile().await.map_err(err)?;
542 Ok(result.into())
543 }
544
545 pub async fn analyze_heap(
546 Path((worker_id, file_path)): Path<(WorkerId, String)>,
547 Extension(srv): Extension<Service>,
548 ) -> Result<Response> {
549 let file_path =
550 String::from_utf8(base64_url::decode(&file_path).map_err(err)?).map_err(err)?;
551
552 let worker_node = srv
553 .metadata_manager
554 .get_worker_by_id(worker_id)
555 .await
556 .map_err(err)?
557 .context("worker node not found")
558 .map_err(err)?;
559
560 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
561
562 let collapsed_bin = client
563 .analyze_heap(file_path.clone())
564 .await
565 .map_err(err)?
566 .result;
567 let collapsed_str = String::from_utf8_lossy(&collapsed_bin).to_string();
568
569 let response = Response::builder()
570 .header("Content-Type", "application/octet-stream")
571 .body(collapsed_str.into());
572
573 response.map_err(err)
574 }
575
576 #[derive(Debug, Deserialize)]
577 pub struct DiagnoseParams {
578 #[serde(default = "await_tree_default_format")]
579 actor_traces_format: String,
580 }
581
582 pub async fn diagnose(
583 Query(params): Query<DiagnoseParams>,
584 Extension(srv): Extension<Service>,
585 ) -> Result<String> {
586 let actor_traces_format = match params.actor_traces_format.as_str() {
587 "text" => ActorTracesFormat::Text,
588 "json" => ActorTracesFormat::Json,
589 _ => {
590 return Err(err(anyhow!(
591 "Unsupported actor_traces_format `{}`, only `text` and `json` are supported for now",
592 params.actor_traces_format
593 )));
594 }
595 };
596 Ok(srv.diagnose_command.report(actor_traces_format).await)
597 }
598
599 pub async fn get_streaming_stats(
605 Extension(srv): Extension<Service>,
606 ) -> Result<Json<GetStreamingStatsResponse>> {
607 let worker_nodes = srv
608 .metadata_manager
609 .list_active_streaming_compute_nodes()
610 .await
611 .map_err(err)?;
612
613 let mut futures = Vec::new();
614
615 for worker_node in worker_nodes {
616 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
617 let client = Arc::new(client);
618 let fut = async move {
619 let result = client.get_streaming_stats().await.map_err(err)?;
620 Ok::<_, DashboardError>(result)
621 };
622 futures.push(fut);
623 }
624 let results = join_all(futures).await;
625
626 let mut all = GetStreamingStatsResponse::default();
627
628 for result in results {
629 let result = result
630 .map_err(|_| anyhow!("Failed to get back pressure"))
631 .map_err(err)?;
632
633 for (fragment_id, fragment_stats) in result.fragment_stats {
635 if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
636 s.actor_count += fragment_stats.actor_count;
637 s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
638 } else {
639 all.fragment_stats.insert(fragment_id, fragment_stats);
640 }
641 }
642
643 for (relation_id, relation_stats) in result.relation_stats {
645 if let Some(s) = all.relation_stats.get_mut(&relation_id) {
646 s.actor_count += relation_stats.actor_count;
647 s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
648 } else {
649 all.relation_stats.insert(relation_id, relation_stats);
650 }
651 }
652
653 for (key, channel_stats) in result.channel_stats {
655 if let Some(s) = all.channel_stats.get_mut(&key) {
656 s.actor_count += channel_stats.actor_count;
657 s.output_blocking_duration += channel_stats.output_blocking_duration;
658 s.recv_row_count += channel_stats.recv_row_count;
659 s.send_row_count += channel_stats.send_row_count;
660 } else {
661 all.channel_stats.insert(key, channel_stats);
662 }
663 }
664 }
665
666 Ok(all.into())
667 }
668
669 #[derive(Debug, Deserialize)]
670 pub struct StreamingStatsPrometheusParams {
671 #[serde(default)]
673 at: Option<i64>,
674 #[serde(default = "streaming_stats_default_time_offset")]
676 time_offset: i64,
677 }
678
679 fn streaming_stats_default_time_offset() -> i64 {
680 60
681 }
682
683 pub async fn get_streaming_stats_from_prometheus(
684 Query(params): Query<StreamingStatsPrometheusParams>,
685 Extension(srv): Extension<Service>,
686 ) -> Result<Json<GetStreamingPrometheusStatsResponse>> {
687 let mut all = GetStreamingPrometheusStatsResponse::default();
688
689 let worker_nodes = srv
691 .metadata_manager
692 .list_active_streaming_compute_nodes()
693 .await
694 .map_err(err)?;
695
696 let mut futures = Vec::new();
697
698 for worker_node in worker_nodes {
699 let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
700 let client = Arc::new(client);
701 let fut = async move {
702 let result = client.get_streaming_stats().await.map_err(err)?;
703 Ok::<_, DashboardError>(result)
704 };
705 futures.push(fut);
706 }
707 let results = join_all(futures).await;
708
709 for result in results {
710 let result = result
711 .map_err(|_| anyhow!("Failed to get streaming stats from worker"))
712 .map_err(err)?;
713
714 for (fragment_id, fragment_stats) in result.fragment_stats {
716 if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
717 s.actor_count += fragment_stats.actor_count;
718 s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
719 } else {
720 all.fragment_stats.insert(fragment_id, fragment_stats);
721 }
722 }
723
724 for (relation_id, relation_stats) in result.relation_stats {
726 if let Some(s) = all.relation_stats.get_mut(&relation_id) {
727 s.actor_count += relation_stats.actor_count;
728 s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
729 } else {
730 all.relation_stats.insert(relation_id, relation_stats);
731 }
732 }
733 }
734
735 if let Some(ref client) = srv.prometheus_client {
737 let channel_input_throughput_query = format!(
739 "sum(rate(stream_actor_in_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
740 srv.prometheus_selector, params.time_offset
741 );
742 let channel_output_throughput_query = format!(
743 "sum(rate(stream_actor_out_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
744 srv.prometheus_selector, params.time_offset
745 );
746 let channel_backpressure_query = format!(
747 "sum(rate(stream_actor_output_buffer_blocking_duration_ns{{{}}}[{}s])) by (fragment_id, downstream_fragment_id) \
748 / ignoring (downstream_fragment_id) group_left sum(stream_actor_count) by (fragment_id)",
749 srv.prometheus_selector, params.time_offset
750 );
751
752 let (
754 channel_input_throughput_result,
755 channel_output_throughput_result,
756 channel_backpressure_result,
757 ) = {
758 let mut input_query = client.query(channel_input_throughput_query);
759 let mut output_query = client.query(channel_output_throughput_query);
760 let mut backpressure_query = client.query(channel_backpressure_query);
761
762 if let Some(at_time) = params.at {
764 input_query = input_query.at(at_time);
765 output_query = output_query.at(at_time);
766 backpressure_query = backpressure_query.at(at_time);
767 }
768
769 tokio::try_join!(
770 input_query.get(),
771 output_query.get(),
772 backpressure_query.get(),
773 )
774 .map_err(err)?
775 };
776
777 let mut channel_data = HashMap::new();
779
780 if let Some(channel_input_throughput_data) =
782 channel_input_throughput_result.data().as_vector()
783 {
784 for sample in channel_input_throughput_data {
785 if let Some(fragment_id_str) = sample.metric().get("fragment_id")
786 && let Some(upstream_fragment_id_str) =
787 sample.metric().get("upstream_fragment_id")
788 && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
789 fragment_id_str.parse::<u32>(),
790 upstream_fragment_id_str.parse::<u32>(),
791 )
792 {
793 let key = format!("{}_{}", upstream_fragment_id, fragment_id);
794 channel_data
795 .entry(key)
796 .or_insert_with(|| ChannelDeltaStats {
797 actor_count: 0,
798 backpressure_rate: 0.0,
799 recv_throughput: 0.0,
800 send_throughput: 0.0,
801 })
802 .recv_throughput = sample.sample().value();
803 }
804 }
805 }
806
807 if let Some(channel_output_throughput_data) =
809 channel_output_throughput_result.data().as_vector()
810 {
811 for sample in channel_output_throughput_data {
812 if let Some(fragment_id_str) = sample.metric().get("fragment_id")
813 && let Some(upstream_fragment_id_str) =
814 sample.metric().get("upstream_fragment_id")
815 && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
816 fragment_id_str.parse::<u32>(),
817 upstream_fragment_id_str.parse::<u32>(),
818 )
819 {
820 let key = format!("{}_{}", upstream_fragment_id, fragment_id);
821 channel_data
822 .entry(key)
823 .or_insert_with(|| ChannelDeltaStats {
824 actor_count: 0,
825 backpressure_rate: 0.0,
826 recv_throughput: 0.0,
827 send_throughput: 0.0,
828 })
829 .send_throughput = sample.sample().value();
830 }
831 }
832 }
833
834 if let Some(channel_backpressure_data) = channel_backpressure_result.data().as_vector()
836 {
837 for sample in channel_backpressure_data {
838 if let Some(fragment_id_str) = sample.metric().get("fragment_id")
839 && let Some(downstream_fragment_id_str) =
840 sample.metric().get("downstream_fragment_id")
841 && let (Ok(fragment_id), Ok(downstream_fragment_id)) = (
842 fragment_id_str.parse::<u32>(),
843 downstream_fragment_id_str.parse::<u32>(),
844 )
845 {
846 let key = format!("{}_{}", fragment_id, downstream_fragment_id);
847 channel_data
848 .entry(key)
849 .or_insert_with(|| ChannelDeltaStats {
850 actor_count: 0,
851 backpressure_rate: 0.0,
852 recv_throughput: 0.0,
853 send_throughput: 0.0,
854 })
855 .backpressure_rate = sample.sample().value() / 1_000_000_000.0; }
857 }
858 }
859
860 for (key, channel_stats) in &mut channel_data {
862 let parts: Vec<&str> = key.split('_').collect();
863 if parts.len() == 2
864 && let Ok(fragment_id) = parts[1].parse::<u32>()
865 && let Some(fragment_stats) = all.fragment_stats.get(&fragment_id)
866 {
867 channel_stats.actor_count = fragment_stats.actor_count;
868 }
869 }
870
871 all.channel_stats = channel_data;
872
873 Ok(Json(all))
874 } else {
875 Err(err(anyhow!("Prometheus endpoint is not set")))
876 }
877 }
878
879 pub async fn get_version(Extension(_srv): Extension<Service>) -> Result<Json<String>> {
880 Ok(Json(risingwave_common::current_cluster_version()))
881 }
882}
883
884impl DashboardService {
885 pub async fn serve(self) -> Result<()> {
886 use handlers::*;
887 let srv = Arc::new(self);
888
889 let cors_layer = CorsLayer::new()
890 .allow_origin(cors::Any)
891 .allow_methods(vec![Method::GET]);
892
893 let api_router = Router::new()
894 .route("/version", get(get_version))
895 .route("/clusters/:ty", get(list_clusters))
896 .route("/streaming_jobs", get(list_streaming_jobs))
897 .route("/fragments/job_id/:job_id", get(list_fragments_by_job_id))
898 .route("/relation_id_infos", get(get_relation_id_infos))
899 .route(
900 "/fragment_to_relation_map",
901 get(get_fragment_to_relation_map),
902 )
903 .route("/views", get(list_views))
904 .route("/functions", get(list_functions))
905 .route("/materialized_views", get(list_materialized_views))
906 .route("/tables", get(list_tables))
907 .route("/indexes", get(list_index_tables))
908 .route("/index_items", get(list_indexes))
909 .route("/subscriptions", get(list_subscription))
910 .route("/internal_tables", get(list_internal_tables))
911 .route("/sources", get(list_sources))
912 .route("/sinks", get(list_sinks))
913 .route("/users", get(list_users))
914 .route("/databases", get(list_databases))
915 .route("/schemas", get(list_schemas))
916 .route("/object_dependencies", get(list_object_dependencies))
917 .route("/metrics/cluster", get(prometheus::list_prometheus_cluster))
918 .route("/metrics/streaming_stats", get(get_streaming_stats))
919 .route(
920 "/metrics/streaming_stats_prometheus",
921 get(get_streaming_stats_from_prometheus),
922 )
923 .route("/monitor/await_tree/:worker_id", get(dump_await_tree))
925 .route("/monitor/await_tree/", get(dump_await_tree_all))
927 .route("/monitor/dump_heap_profile/:worker_id", get(heap_profile))
928 .route(
929 "/monitor/list_heap_profile/:worker_id",
930 get(list_heap_profile),
931 )
932 .route("/monitor/analyze/:worker_id/*path", get(analyze_heap))
933 .route("/monitor/diagnose/", get(diagnose))
935 .layer(
936 ServiceBuilder::new()
937 .layer(AddExtensionLayer::new(srv.clone()))
938 .into_inner(),
939 )
940 .layer(cors_layer);
941
942 let trace_ui_router = otlp_embedded::ui_app(srv.trace_state.clone(), "/trace/");
943 let dashboard_router = risingwave_meta_dashboard::router();
944
945 let app = Router::new()
946 .fallback_service(dashboard_router)
947 .nest("/api", api_router)
948 .nest("/trace", trace_ui_router)
949 .layer(CompressionLayer::new());
950
951 let listener = TcpListener::bind(&srv.dashboard_addr)
952 .await
953 .context("failed to bind dashboard address")?;
954 axum::serve(listener, app)
955 .await
956 .context("failed to serve dashboard service")?;
957
958 Ok(())
959 }
960}