1mod prometheus;
16
17use std::net::SocketAddr;
18use std::sync::Arc;
19
20use anyhow::{Context as _, Result, anyhow};
21use axum::Router;
22use axum::extract::{Extension, Path};
23use axum::http::{Method, StatusCode};
24use axum::response::{IntoResponse, Response};
25use axum::routing::get;
26use risingwave_common_heap_profiling::ProfileServiceImpl;
27use risingwave_rpc_client::MonitorClientPool;
28use tokio::net::TcpListener;
29use tower::ServiceBuilder;
30use tower_http::add_extension::AddExtensionLayer;
31use tower_http::compression::CompressionLayer;
32use tower_http::cors::{self, CorsLayer};
33
34use crate::hummock::HummockManagerRef;
35use crate::manager::MetadataManager;
36use crate::manager::diagnose::DiagnoseCommandRef;
37
38#[derive(Clone)]
39pub struct DashboardService {
40 pub await_tree_reg: await_tree::Registry,
41 pub dashboard_addr: SocketAddr,
42 pub prometheus_client: Option<prometheus_http_query::Client>,
43 pub prometheus_selector: String,
44 pub metadata_manager: MetadataManager,
45 pub hummock_manager: HummockManagerRef,
46 pub monitor_clients: MonitorClientPool,
47 pub diagnose_command: DiagnoseCommandRef,
48 pub profile_service: ProfileServiceImpl,
49 pub trace_state: otlp_embedded::StateRef,
50}
51
52pub type Service = Arc<DashboardService>;
53
54pub(super) mod handlers {
55 use std::cmp::min;
56 use std::collections::HashMap;
57
58 use anyhow::Context;
59 use axum::Json;
60 use axum::extract::Query;
61 use futures::future::join_all;
62 use itertools::Itertools;
63 use risingwave_common::id::JobId;
64 use risingwave_meta_model::WorkerId;
65 use risingwave_pb::catalog::table::TableType;
66 use risingwave_pb::catalog::{
67 Index, PbDatabase, PbFunction, PbSchema, Sink, Source, Subscription, Table, View,
68 };
69 use risingwave_pb::common::{WorkerNode, WorkerType};
70 use risingwave_pb::hummock::TableStats;
71 use risingwave_pb::meta::{
72 ActorIds, FragmentIdToActorIdMap, FragmentToRelationMap, PbTableFragments, RelationIdInfos,
73 };
74 use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
75 use risingwave_pb::monitor_service::{
76 AnalyzeHeapRequest, ChannelDeltaStats, GetStreamingPrometheusStatsResponse,
77 GetStreamingStatsResponse, HeapProfilingRequest, HeapProfilingResponse,
78 ListHeapProfilingRequest, ListHeapProfilingResponse, StackTraceResponse,
79 };
80 use risingwave_pb::user::PbUserInfo;
81 use serde::{Deserialize, Serialize};
82 use serde_json::json;
83 use thiserror_ext::AsReport;
84 use tonic::Request;
85
86 use super::*;
87 use crate::controller::fragment::StreamingJobInfo;
88 use crate::rpc::await_tree::{dump_cluster_await_tree, dump_worker_node_await_tree};
89
90 #[derive(Serialize)]
91 pub struct TableWithStats {
92 #[serde(flatten)]
93 pub table: Table,
94 pub total_size_bytes: i64,
95 pub total_key_count: i64,
96 pub total_key_size: i64,
97 pub total_value_size: i64,
98 pub compressed_size: u64,
99 }
100
101 impl TableWithStats {
102 pub fn from_table_and_stats(table: Table, stats: Option<&TableStats>) -> Self {
103 match stats {
104 Some(stats) => Self {
105 total_size_bytes: stats.total_key_size + stats.total_value_size,
106 total_key_count: stats.total_key_count,
107 total_key_size: stats.total_key_size,
108 total_value_size: stats.total_value_size,
109 compressed_size: stats.total_compressed_size,
110 table,
111 },
112 None => Self {
113 total_size_bytes: 0,
114 total_key_count: 0,
115 total_key_size: 0,
116 total_value_size: 0,
117 compressed_size: 0,
118 table,
119 },
120 }
121 }
122 }
123
124 pub struct DashboardError(anyhow::Error);
125 pub type Result<T> = std::result::Result<T, DashboardError>;
126
127 pub fn err(err: impl Into<anyhow::Error>) -> DashboardError {
128 DashboardError(err.into())
129 }
130
131 impl From<anyhow::Error> for DashboardError {
132 fn from(value: anyhow::Error) -> Self {
133 DashboardError(value)
134 }
135 }
136
137 impl IntoResponse for DashboardError {
138 fn into_response(self) -> axum::response::Response {
139 let mut resp = Json(json!({
140 "error": self.0.to_report_string(),
141 }))
142 .into_response();
143 *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
144 resp
145 }
146 }
147
148 pub async fn list_clusters(
149 Path(ty): Path<i32>,
150 Extension(srv): Extension<Service>,
151 ) -> Result<Json<Vec<WorkerNode>>> {
152 let worker_type = WorkerType::try_from(ty)
153 .map_err(|_| anyhow!("invalid worker type"))
154 .map_err(err)?;
155 let mut result = srv
156 .metadata_manager
157 .list_worker_node(Some(worker_type), None)
158 .await
159 .map_err(err)?;
160 result.sort_unstable_by_key(|n| n.id);
161 Ok(result.into())
162 }
163
164 async fn list_table_catalogs_inner(
165 metadata_manager: &MetadataManager,
166 hummock_manager: &HummockManagerRef,
167 table_type: TableType,
168 ) -> Result<Json<Vec<TableWithStats>>> {
169 let tables = metadata_manager
170 .catalog_controller
171 .list_tables_by_type(table_type.into())
172 .await
173 .map_err(err)?;
174
175 let version_stats = hummock_manager.get_version_stats().await;
177
178 let tables_with_stats = tables
179 .into_iter()
180 .map(|table| {
181 let stats = version_stats.table_stats.get(&table.id);
182 TableWithStats::from_table_and_stats(table, stats)
183 })
184 .collect();
185
186 Ok(Json(tables_with_stats))
187 }
188
189 pub async fn list_materialized_views(
190 Extension(srv): Extension<Service>,
191 ) -> Result<Json<Vec<TableWithStats>>> {
192 list_table_catalogs_inner(
193 &srv.metadata_manager,
194 &srv.hummock_manager,
195 TableType::MaterializedView,
196 )
197 .await
198 }
199
200 pub async fn list_tables(
201 Extension(srv): Extension<Service>,
202 ) -> Result<Json<Vec<TableWithStats>>> {
203 list_table_catalogs_inner(
204 &srv.metadata_manager,
205 &srv.hummock_manager,
206 TableType::Table,
207 )
208 .await
209 }
210
211 pub async fn list_index_tables(
212 Extension(srv): Extension<Service>,
213 ) -> Result<Json<Vec<TableWithStats>>> {
214 list_table_catalogs_inner(
215 &srv.metadata_manager,
216 &srv.hummock_manager,
217 TableType::Index,
218 )
219 .await
220 }
221
222 pub async fn list_indexes(Extension(srv): Extension<Service>) -> Result<Json<Vec<Index>>> {
223 let indexes = srv
224 .metadata_manager
225 .catalog_controller
226 .list_indexes()
227 .await
228 .map_err(err)?;
229
230 Ok(Json(indexes))
231 }
232
233 pub async fn list_subscription(
234 Extension(srv): Extension<Service>,
235 ) -> Result<Json<Vec<Subscription>>> {
236 let subscriptions = srv
237 .metadata_manager
238 .catalog_controller
239 .list_subscriptions()
240 .await
241 .map_err(err)?;
242
243 Ok(Json(subscriptions))
244 }
245
246 pub async fn list_internal_tables(
247 Extension(srv): Extension<Service>,
248 ) -> Result<Json<Vec<TableWithStats>>> {
249 list_table_catalogs_inner(
250 &srv.metadata_manager,
251 &srv.hummock_manager,
252 TableType::Internal,
253 )
254 .await
255 }
256
257 pub async fn list_sources(Extension(srv): Extension<Service>) -> Result<Json<Vec<Source>>> {
258 let sources = srv.metadata_manager.list_sources().await.map_err(err)?;
259
260 Ok(Json(sources))
261 }
262
263 pub async fn list_sinks(Extension(srv): Extension<Service>) -> Result<Json<Vec<Sink>>> {
264 let sinks = srv
265 .metadata_manager
266 .catalog_controller
267 .list_sinks()
268 .await
269 .map_err(err)?;
270
271 Ok(Json(sinks))
272 }
273
274 pub async fn list_views(Extension(srv): Extension<Service>) -> Result<Json<Vec<View>>> {
275 let views = srv
276 .metadata_manager
277 .catalog_controller
278 .list_views()
279 .await
280 .map_err(err)?;
281
282 Ok(Json(views))
283 }
284
285 pub async fn list_functions(
286 Extension(srv): Extension<Service>,
287 ) -> Result<Json<Vec<PbFunction>>> {
288 let functions = srv
289 .metadata_manager
290 .catalog_controller
291 .list_functions()
292 .await
293 .map_err(err)?;
294
295 Ok(Json(functions))
296 }
297
298 pub async fn list_streaming_jobs(
299 Extension(srv): Extension<Service>,
300 ) -> Result<Json<Vec<StreamingJobInfo>>> {
301 let streaming_jobs = srv
302 .metadata_manager
303 .catalog_controller
304 .list_streaming_job_infos()
305 .await
306 .map_err(err)?;
307
308 Ok(Json(streaming_jobs))
309 }
310
311 pub async fn get_fragment_to_relation_map(
319 Extension(srv): Extension<Service>,
320 ) -> Result<Json<FragmentToRelationMap>> {
321 let table_fragments = srv
322 .metadata_manager
323 .catalog_controller
324 .table_fragments()
325 .await
326 .map_err(err)?;
327 let mut fragment_to_relation_map = HashMap::new();
328 for (relation_id, tf) in table_fragments {
329 for fragment_id in tf.fragments.keys() {
330 fragment_to_relation_map.insert(*fragment_id, relation_id);
331 }
332 }
333 let map = FragmentToRelationMap {
334 fragment_to_relation_map,
335 };
336 Ok(Json(map))
337 }
338
339 pub async fn get_relation_id_infos(
341 Extension(srv): Extension<Service>,
342 ) -> Result<Json<RelationIdInfos>> {
343 let table_fragments = srv
344 .metadata_manager
345 .catalog_controller
346 .table_fragments()
347 .await
348 .map_err(err)?;
349 let mut map = HashMap::new();
350 for (id, tf) in table_fragments {
351 let mut fragment_id_to_actor_ids = HashMap::new();
352 for (fragment_id, fragment) in &tf.fragments {
353 let actor_ids = fragment.actors.iter().map(|a| a.actor_id).collect_vec();
354 fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
355 }
356 map.insert(
357 id.as_raw_id(),
358 FragmentIdToActorIdMap {
359 map: fragment_id_to_actor_ids,
360 },
361 );
362 }
363 let relation_id_infos = RelationIdInfos { map };
364
365 Ok(Json(relation_id_infos))
366 }
367
368 pub async fn list_fragments_by_job_id(
369 Extension(srv): Extension<Service>,
370 Path(job_id): Path<u32>,
371 ) -> Result<Json<PbTableFragments>> {
372 let job_id = JobId::new(job_id);
373 let table_fragments = srv
374 .metadata_manager
375 .get_job_fragments_by_id(job_id)
376 .await
377 .map_err(err)?;
378 let upstream_fragments = srv
379 .metadata_manager
380 .catalog_controller
381 .upstream_fragments(table_fragments.fragment_ids())
382 .await
383 .map_err(err)?;
384 let dispatchers = srv
385 .metadata_manager
386 .catalog_controller
387 .get_fragment_actor_dispatchers(
388 table_fragments.fragment_ids().map(|id| id as _).collect(),
389 )
390 .await
391 .map_err(err)?;
392 Ok(Json(
393 table_fragments.to_protobuf(&upstream_fragments, &dispatchers),
394 ))
395 }
396
397 pub async fn list_users(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbUserInfo>>> {
398 let users = srv
399 .metadata_manager
400 .catalog_controller
401 .list_users()
402 .await
403 .map_err(err)?;
404
405 Ok(Json(users))
406 }
407
408 pub async fn list_databases(
409 Extension(srv): Extension<Service>,
410 ) -> Result<Json<Vec<PbDatabase>>> {
411 let databases = srv
412 .metadata_manager
413 .catalog_controller
414 .list_databases()
415 .await
416 .map_err(err)?;
417
418 Ok(Json(databases))
419 }
420
421 pub async fn list_schemas(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbSchema>>> {
422 let schemas = srv
423 .metadata_manager
424 .catalog_controller
425 .list_schemas()
426 .await
427 .map_err(err)?;
428
429 Ok(Json(schemas))
430 }
431
432 #[derive(Serialize)]
433 #[serde(rename_all = "camelCase")]
434 pub struct DashboardObjectDependency {
435 pub object_id: u32,
436 pub referenced_object_id: u32,
437 }
438
439 pub async fn list_object_dependencies(
440 Extension(srv): Extension<Service>,
441 ) -> Result<Json<Vec<DashboardObjectDependency>>> {
442 let object_dependencies = srv
443 .metadata_manager
444 .catalog_controller
445 .list_all_object_dependencies()
446 .await
447 .map_err(err)?;
448
449 let result = object_dependencies
450 .into_iter()
451 .map(|dependency| DashboardObjectDependency {
452 object_id: dependency.object_id.as_raw_id(),
453 referenced_object_id: dependency.referenced_object_id.as_raw_id(),
454 })
455 .collect();
456
457 Ok(Json(result))
458 }
459
460 #[derive(Debug, Deserialize)]
461 pub struct AwaitTreeDumpParams {
462 #[serde(default = "await_tree_default_format")]
463 format: String,
464 }
465
466 impl AwaitTreeDumpParams {
467 pub fn actor_traces_format(&self) -> Result<ActorTracesFormat> {
469 Ok(match self.format.as_str() {
470 "text" => ActorTracesFormat::Text,
471 "json" => ActorTracesFormat::Json,
472 _ => {
473 return Err(err(anyhow!(
474 "Unsupported format `{}`, only `text` and `json` are supported for now",
475 self.format
476 )));
477 }
478 })
479 }
480 }
481
482 fn await_tree_default_format() -> String {
483 "text".to_owned()
485 }
486
487 pub async fn dump_await_tree_all(
488 Query(params): Query<AwaitTreeDumpParams>,
489 Extension(srv): Extension<Service>,
490 ) -> Result<Json<StackTraceResponse>> {
491 let actor_traces_format = params.actor_traces_format()?;
492
493 let res = dump_cluster_await_tree(
494 &srv.metadata_manager,
495 &srv.await_tree_reg,
496 actor_traces_format,
497 )
498 .await
499 .map_err(err)?;
500
501 Ok(res.into())
502 }
503
504 pub async fn dump_await_tree(
505 Path(worker_id): Path<WorkerId>,
506 Query(params): Query<AwaitTreeDumpParams>,
507 Extension(srv): Extension<Service>,
508 ) -> Result<Json<StackTraceResponse>> {
509 let actor_traces_format = params.actor_traces_format()?;
510
511 let worker_node = srv
512 .metadata_manager
513 .get_worker_by_id(worker_id)
514 .await
515 .map_err(err)?
516 .context("worker node not found")
517 .map_err(err)?;
518
519 let res = dump_worker_node_await_tree(std::iter::once(&worker_node), actor_traces_format)
520 .await
521 .map_err(err)?;
522
523 Ok(res.into())
524 }
525
526 pub async fn heap_profile(
527 Path(worker_id): Path<WorkerId>,
528 Extension(srv): Extension<Service>,
529 ) -> Result<Json<HeapProfilingResponse>> {
530 if worker_id == crate::manager::META_NODE_ID {
531 let result = srv
532 .profile_service
533 .heap_profiling(Request::new(HeapProfilingRequest { dir: "".to_owned() }))
534 .await
535 .map_err(err)?
536 .into_inner();
537 return Ok(result.into());
538 }
539
540 let worker_node = srv
541 .metadata_manager
542 .get_worker_by_id(worker_id)
543 .await
544 .map_err(err)?
545 .context("worker node not found")
546 .map_err(err)?;
547
548 let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
549
550 let result = client.heap_profile("".to_owned()).await.map_err(err)?;
551
552 Ok(result.into())
553 }
554
555 pub async fn list_heap_profile(
556 Path(worker_id): Path<WorkerId>,
557 Extension(srv): Extension<Service>,
558 ) -> Result<Json<ListHeapProfilingResponse>> {
559 if worker_id == crate::manager::META_NODE_ID {
560 let result = srv
561 .profile_service
562 .list_heap_profiling(Request::new(ListHeapProfilingRequest {}))
563 .await
564 .map_err(err)?
565 .into_inner();
566 return Ok(result.into());
567 }
568
569 let worker_node = srv
570 .metadata_manager
571 .get_worker_by_id(worker_id)
572 .await
573 .map_err(err)?
574 .context("worker node not found")
575 .map_err(err)?;
576
577 let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
578
579 let result = client.list_heap_profile().await.map_err(err)?;
580 Ok(result.into())
581 }
582
583 pub async fn analyze_heap(
584 Path((worker_id, file_path)): Path<(WorkerId, String)>,
585 Extension(srv): Extension<Service>,
586 ) -> Result<Response> {
587 let file_path =
588 String::from_utf8(base64_url::decode(&file_path).map_err(err)?).map_err(err)?;
589
590 let collapsed_bin = if worker_id == crate::manager::META_NODE_ID {
591 srv.profile_service
592 .analyze_heap(Request::new(AnalyzeHeapRequest {
593 path: file_path.clone(),
594 }))
595 .await
596 .map_err(err)?
597 .into_inner()
598 .result
599 } else {
600 let worker_node = srv
601 .metadata_manager
602 .get_worker_by_id(worker_id)
603 .await
604 .map_err(err)?
605 .context("worker node not found")
606 .map_err(err)?;
607
608 let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
609 client
610 .analyze_heap(file_path.clone())
611 .await
612 .map_err(err)?
613 .result
614 };
615 let collapsed_str = String::from_utf8_lossy(&collapsed_bin).to_string();
616
617 let response = Response::builder()
618 .header("Content-Type", "application/octet-stream")
619 .body(collapsed_str.into());
620
621 response.map_err(err)
622 }
623
624 #[derive(Debug, Deserialize)]
625 pub struct DiagnoseParams {
626 #[serde(default = "await_tree_default_format")]
627 actor_traces_format: String,
628 }
629
630 pub async fn diagnose(
631 Query(params): Query<DiagnoseParams>,
632 Extension(srv): Extension<Service>,
633 ) -> Result<String> {
634 let actor_traces_format = match params.actor_traces_format.as_str() {
635 "text" => ActorTracesFormat::Text,
636 "json" => ActorTracesFormat::Json,
637 _ => {
638 return Err(err(anyhow!(
639 "Unsupported actor_traces_format `{}`, only `text` and `json` are supported for now",
640 params.actor_traces_format
641 )));
642 }
643 };
644 Ok(srv.diagnose_command.report(actor_traces_format).await)
645 }
646
647 pub async fn get_streaming_stats(
653 Extension(srv): Extension<Service>,
654 ) -> Result<Json<GetStreamingStatsResponse>> {
655 let worker_nodes = srv
656 .metadata_manager
657 .list_active_streaming_compute_nodes()
658 .await
659 .map_err(err)?;
660
661 let mut futures = Vec::new();
662
663 for worker_node in worker_nodes {
664 let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
665 let client = Arc::new(client);
666 let fut = async move {
667 let result = client.get_streaming_stats().await.map_err(err)?;
668 Ok::<_, DashboardError>(result)
669 };
670 futures.push(fut);
671 }
672 let results = join_all(futures).await;
673
674 let mut all = GetStreamingStatsResponse::default();
675
676 for result in results {
677 let result = result
678 .map_err(|_| anyhow!("Failed to get back pressure"))
679 .map_err(err)?;
680
681 for (fragment_id, fragment_stats) in result.fragment_stats {
683 if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
684 s.actor_count += fragment_stats.actor_count;
685 s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
686 } else {
687 all.fragment_stats.insert(fragment_id, fragment_stats);
688 }
689 }
690
691 for (relation_id, relation_stats) in result.relation_stats {
693 if let Some(s) = all.relation_stats.get_mut(&relation_id) {
694 s.actor_count += relation_stats.actor_count;
695 s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
696 } else {
697 all.relation_stats.insert(relation_id, relation_stats);
698 }
699 }
700
701 for (key, channel_stats) in result.channel_stats {
703 if let Some(s) = all.channel_stats.get_mut(&key) {
704 s.actor_count += channel_stats.actor_count;
705 s.output_blocking_duration += channel_stats.output_blocking_duration;
706 s.recv_row_count += channel_stats.recv_row_count;
707 s.send_row_count += channel_stats.send_row_count;
708 } else {
709 all.channel_stats.insert(key, channel_stats);
710 }
711 }
712 }
713
714 Ok(all.into())
715 }
716
717 #[derive(Debug, Deserialize)]
718 pub struct StreamingStatsPrometheusParams {
719 #[serde(default)]
721 at: Option<i64>,
722 #[serde(default = "streaming_stats_default_time_offset")]
724 time_offset: i64,
725 }
726
727 fn streaming_stats_default_time_offset() -> i64 {
728 60
729 }
730
731 pub async fn get_streaming_stats_from_prometheus(
732 Query(params): Query<StreamingStatsPrometheusParams>,
733 Extension(srv): Extension<Service>,
734 ) -> Result<Json<GetStreamingPrometheusStatsResponse>> {
735 let mut all = GetStreamingPrometheusStatsResponse::default();
736
737 let worker_nodes = srv
739 .metadata_manager
740 .list_active_streaming_compute_nodes()
741 .await
742 .map_err(err)?;
743
744 let mut futures = Vec::new();
745
746 for worker_node in worker_nodes {
747 let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
748 let client = Arc::new(client);
749 let fut = async move {
750 let result = client.get_streaming_stats().await.map_err(err)?;
751 Ok::<_, DashboardError>(result)
752 };
753 futures.push(fut);
754 }
755 let results = join_all(futures).await;
756
757 for result in results {
758 let result = result
759 .map_err(|_| anyhow!("Failed to get streaming stats from worker"))
760 .map_err(err)?;
761
762 for (fragment_id, fragment_stats) in result.fragment_stats {
764 if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
765 s.actor_count += fragment_stats.actor_count;
766 s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
767 } else {
768 all.fragment_stats.insert(fragment_id, fragment_stats);
769 }
770 }
771
772 for (relation_id, relation_stats) in result.relation_stats {
774 if let Some(s) = all.relation_stats.get_mut(&relation_id) {
775 s.actor_count += relation_stats.actor_count;
776 s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
777 } else {
778 all.relation_stats.insert(relation_id, relation_stats);
779 }
780 }
781 }
782
783 if let Some(ref client) = srv.prometheus_client {
785 let channel_input_throughput_query = format!(
787 "sum(rate(stream_actor_in_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
788 srv.prometheus_selector, params.time_offset
789 );
790 let channel_output_throughput_query = format!(
791 "sum(rate(stream_actor_out_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
792 srv.prometheus_selector, params.time_offset
793 );
794 let channel_backpressure_query = format!(
795 "sum(rate(stream_actor_output_buffer_blocking_duration_ns{{{}}}[{}s])) by (fragment_id, downstream_fragment_id) \
796 / ignoring (downstream_fragment_id) group_left sum(stream_actor_count) by (fragment_id)",
797 srv.prometheus_selector, params.time_offset
798 );
799
800 let (
802 channel_input_throughput_result,
803 channel_output_throughput_result,
804 channel_backpressure_result,
805 ) = {
806 let mut input_query = client.query(channel_input_throughput_query);
807 let mut output_query = client.query(channel_output_throughput_query);
808 let mut backpressure_query = client.query(channel_backpressure_query);
809
810 if let Some(at_time) = params.at {
812 input_query = input_query.at(at_time);
813 output_query = output_query.at(at_time);
814 backpressure_query = backpressure_query.at(at_time);
815 }
816
817 tokio::try_join!(
818 input_query.get(),
819 output_query.get(),
820 backpressure_query.get(),
821 )
822 .map_err(err)?
823 };
824
825 let mut channel_data = HashMap::new();
827
828 if let Some(channel_input_throughput_data) =
830 channel_input_throughput_result.data().as_vector()
831 {
832 for sample in channel_input_throughput_data {
833 if let Some(fragment_id_str) = sample.metric().get("fragment_id")
834 && let Some(upstream_fragment_id_str) =
835 sample.metric().get("upstream_fragment_id")
836 && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
837 fragment_id_str.parse::<u32>(),
838 upstream_fragment_id_str.parse::<u32>(),
839 )
840 {
841 let key = format!("{}_{}", upstream_fragment_id, fragment_id);
842 channel_data
843 .entry(key)
844 .or_insert_with(|| ChannelDeltaStats {
845 actor_count: 0,
846 backpressure_rate: 0.0,
847 recv_throughput: 0.0,
848 send_throughput: 0.0,
849 })
850 .recv_throughput = sample.sample().value();
851 }
852 }
853 }
854
855 if let Some(channel_output_throughput_data) =
857 channel_output_throughput_result.data().as_vector()
858 {
859 for sample in channel_output_throughput_data {
860 if let Some(fragment_id_str) = sample.metric().get("fragment_id")
861 && let Some(upstream_fragment_id_str) =
862 sample.metric().get("upstream_fragment_id")
863 && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
864 fragment_id_str.parse::<u32>(),
865 upstream_fragment_id_str.parse::<u32>(),
866 )
867 {
868 let key = format!("{}_{}", upstream_fragment_id, fragment_id);
869 channel_data
870 .entry(key)
871 .or_insert_with(|| ChannelDeltaStats {
872 actor_count: 0,
873 backpressure_rate: 0.0,
874 recv_throughput: 0.0,
875 send_throughput: 0.0,
876 })
877 .send_throughput = sample.sample().value();
878 }
879 }
880 }
881
882 if let Some(channel_backpressure_data) = channel_backpressure_result.data().as_vector()
884 {
885 for sample in channel_backpressure_data {
886 if let Some(fragment_id_str) = sample.metric().get("fragment_id")
887 && let Some(downstream_fragment_id_str) =
888 sample.metric().get("downstream_fragment_id")
889 && let (Ok(fragment_id), Ok(downstream_fragment_id)) = (
890 fragment_id_str.parse::<u32>(),
891 downstream_fragment_id_str.parse::<u32>(),
892 )
893 {
894 let key = format!("{}_{}", fragment_id, downstream_fragment_id);
895 channel_data
896 .entry(key)
897 .or_insert_with(|| ChannelDeltaStats {
898 actor_count: 0,
899 backpressure_rate: 0.0,
900 recv_throughput: 0.0,
901 send_throughput: 0.0,
902 })
903 .backpressure_rate = sample.sample().value() / 1_000_000_000.0; }
905 }
906 }
907
908 for (key, channel_stats) in &mut channel_data {
910 let parts: Vec<&str> = key.split('_').collect();
911 if parts.len() == 2
912 && let Ok(fragment_id) = parts[1].parse::<u32>()
913 && let Some(fragment_stats) = all.fragment_stats.get(&fragment_id)
914 {
915 channel_stats.actor_count = fragment_stats.actor_count;
916 }
917 }
918
919 all.channel_stats = channel_data;
920
921 Ok(Json(all))
922 } else {
923 Err(err(anyhow!("Prometheus endpoint is not set")))
924 }
925 }
926
927 pub async fn get_version(Extension(_srv): Extension<Service>) -> Result<Json<String>> {
928 Ok(Json(risingwave_common::current_cluster_version()))
929 }
930}
931
932impl DashboardService {
933 pub async fn serve(self) -> Result<()> {
934 use handlers::*;
935 let srv = Arc::new(self);
936
937 let cors_layer = CorsLayer::new()
938 .allow_origin(cors::Any)
939 .allow_methods(vec![Method::GET]);
940
941 let api_router = Router::new()
942 .route("/version", get(get_version))
943 .route("/clusters/{ty}", get(list_clusters))
944 .route("/streaming_jobs", get(list_streaming_jobs))
945 .route("/fragments/job_id/{job_id}", get(list_fragments_by_job_id))
946 .route("/relation_id_infos", get(get_relation_id_infos))
947 .route(
948 "/fragment_to_relation_map",
949 get(get_fragment_to_relation_map),
950 )
951 .route("/views", get(list_views))
952 .route("/functions", get(list_functions))
953 .route("/materialized_views", get(list_materialized_views))
954 .route("/tables", get(list_tables))
955 .route("/indexes", get(list_index_tables))
956 .route("/index_items", get(list_indexes))
957 .route("/subscriptions", get(list_subscription))
958 .route("/internal_tables", get(list_internal_tables))
959 .route("/sources", get(list_sources))
960 .route("/sinks", get(list_sinks))
961 .route("/users", get(list_users))
962 .route("/databases", get(list_databases))
963 .route("/schemas", get(list_schemas))
964 .route("/object_dependencies", get(list_object_dependencies))
965 .route("/metrics/cluster", get(prometheus::list_prometheus_cluster))
966 .route("/metrics/streaming_stats", get(get_streaming_stats))
967 .route(
968 "/metrics/streaming_stats_prometheus",
969 get(get_streaming_stats_from_prometheus),
970 )
971 .route("/monitor/await_tree/{worker_id}", get(dump_await_tree))
973 .route("/monitor/await_tree/", get(dump_await_tree_all))
975 .route("/monitor/dump_heap_profile/{worker_id}", get(heap_profile))
976 .route(
977 "/monitor/list_heap_profile/{worker_id}",
978 get(list_heap_profile),
979 )
980 .route("/monitor/analyze/{worker_id}/{*path}", get(analyze_heap))
981 .route("/monitor/diagnose/", get(diagnose))
983 .layer(
984 ServiceBuilder::new()
985 .layer(AddExtensionLayer::new(srv.clone()))
986 .into_inner(),
987 )
988 .layer(cors_layer);
989
990 let trace_ui_router = otlp_embedded::ui_app(srv.trace_state.clone(), "/trace/");
991 let dashboard_router = risingwave_meta_dashboard::router();
992
993 let app = Router::new()
994 .fallback_service(dashboard_router)
995 .nest("/api", api_router)
996 .nest("/trace", trace_ui_router)
997 .layer(CompressionLayer::new());
998
999 let listener = TcpListener::bind(&srv.dashboard_addr)
1000 .await
1001 .context("failed to bind dashboard address")?;
1002 axum::serve(listener, app)
1003 .await
1004 .context("failed to serve dashboard service")?;
1005
1006 Ok(())
1007 }
1008}