risingwave_meta/dashboard/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prometheus;
16
17use std::net::SocketAddr;
18use std::sync::Arc;
19
20use anyhow::{Context as _, Result, anyhow};
21use axum::Router;
22use axum::extract::{Extension, Path};
23use axum::http::{Method, StatusCode};
24use axum::response::{IntoResponse, Response};
25use axum::routing::get;
26use risingwave_common_heap_profiling::ProfileServiceImpl;
27use risingwave_rpc_client::MonitorClientPool;
28use tokio::net::TcpListener;
29use tower::ServiceBuilder;
30use tower_http::add_extension::AddExtensionLayer;
31use tower_http::compression::CompressionLayer;
32use tower_http::cors::{self, CorsLayer};
33
34use crate::hummock::HummockManagerRef;
35use crate::manager::MetadataManager;
36use crate::manager::diagnose::DiagnoseCommandRef;
37
38#[derive(Clone)]
39pub struct DashboardService {
40    pub await_tree_reg: await_tree::Registry,
41    pub dashboard_addr: SocketAddr,
42    pub prometheus_client: Option<prometheus_http_query::Client>,
43    pub prometheus_selector: String,
44    pub metadata_manager: MetadataManager,
45    pub hummock_manager: HummockManagerRef,
46    pub monitor_clients: MonitorClientPool,
47    pub diagnose_command: DiagnoseCommandRef,
48    pub profile_service: ProfileServiceImpl,
49    pub trace_state: otlp_embedded::StateRef,
50}
51
52pub type Service = Arc<DashboardService>;
53
54pub(super) mod handlers {
55    use std::cmp::min;
56    use std::collections::HashMap;
57
58    use anyhow::Context;
59    use axum::Json;
60    use axum::extract::Query;
61    use futures::future::join_all;
62    use itertools::Itertools;
63    use risingwave_common::id::JobId;
64    use risingwave_meta_model::WorkerId;
65    use risingwave_pb::catalog::table::TableType;
66    use risingwave_pb::catalog::{
67        Index, PbDatabase, PbFunction, PbSchema, Sink, Source, Subscription, Table, View,
68    };
69    use risingwave_pb::common::{WorkerNode, WorkerType};
70    use risingwave_pb::hummock::TableStats;
71    use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
72    use risingwave_pb::meta::{
73        ActorIds, FragmentIdToActorIdMap, FragmentToRelationMap, PbTableFragments, RelationIdInfos,
74    };
75    use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
76    use risingwave_pb::monitor_service::{
77        AnalyzeHeapRequest, ChannelDeltaStats, GetStreamingPrometheusStatsResponse,
78        GetStreamingStatsResponse, HeapProfilingRequest, HeapProfilingResponse,
79        ListHeapProfilingRequest, ListHeapProfilingResponse, StackTraceResponse,
80    };
81    use risingwave_pb::user::PbUserInfo;
82    use serde::{Deserialize, Serialize};
83    use serde_json::json;
84    use thiserror_ext::AsReport;
85    use tonic::Request;
86
87    use super::*;
88    use crate::controller::fragment::StreamingJobInfo;
89    use crate::rpc::await_tree::{dump_cluster_await_tree, dump_worker_node_await_tree};
90
91    #[derive(Serialize)]
92    pub struct TableWithStats {
93        #[serde(flatten)]
94        pub table: Table,
95        pub total_size_bytes: i64,
96        pub total_key_count: i64,
97        pub total_key_size: i64,
98        pub total_value_size: i64,
99        pub compressed_size: u64,
100    }
101
102    impl TableWithStats {
103        pub fn from_table_and_stats(table: Table, stats: Option<&TableStats>) -> Self {
104            match stats {
105                Some(stats) => Self {
106                    total_size_bytes: stats.total_key_size + stats.total_value_size,
107                    total_key_count: stats.total_key_count,
108                    total_key_size: stats.total_key_size,
109                    total_value_size: stats.total_value_size,
110                    compressed_size: stats.total_compressed_size,
111                    table,
112                },
113                None => Self {
114                    total_size_bytes: 0,
115                    total_key_count: 0,
116                    total_key_size: 0,
117                    total_value_size: 0,
118                    compressed_size: 0,
119                    table,
120                },
121            }
122        }
123    }
124
125    pub struct DashboardError(anyhow::Error);
126    pub type Result<T> = std::result::Result<T, DashboardError>;
127
128    pub fn err(err: impl Into<anyhow::Error>) -> DashboardError {
129        DashboardError(err.into())
130    }
131
132    impl From<anyhow::Error> for DashboardError {
133        fn from(value: anyhow::Error) -> Self {
134            DashboardError(value)
135        }
136    }
137
138    impl IntoResponse for DashboardError {
139        fn into_response(self) -> axum::response::Response {
140            let mut resp = Json(json!({
141                "error": self.0.to_report_string(),
142            }))
143            .into_response();
144            *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
145            resp
146        }
147    }
148
149    pub async fn list_clusters(
150        Path(ty): Path<i32>,
151        Extension(srv): Extension<Service>,
152    ) -> Result<Json<Vec<WorkerNode>>> {
153        let worker_type = WorkerType::try_from(ty)
154            .map_err(|_| anyhow!("invalid worker type"))
155            .map_err(err)?;
156        let mut result = srv
157            .metadata_manager
158            .list_worker_node(Some(worker_type), None)
159            .await
160            .map_err(err)?;
161        result.sort_unstable_by_key(|n| n.id);
162        Ok(result.into())
163    }
164
165    async fn list_table_catalogs_inner(
166        metadata_manager: &MetadataManager,
167        hummock_manager: &HummockManagerRef,
168        table_type: TableType,
169    ) -> Result<Json<Vec<TableWithStats>>> {
170        let tables = metadata_manager
171            .catalog_controller
172            .list_tables_by_type(table_type.into())
173            .await
174            .map_err(err)?;
175
176        // Get table statistics from hummock manager
177        let version_stats = hummock_manager.get_version_stats().await;
178
179        let tables_with_stats = tables
180            .into_iter()
181            .map(|table| {
182                let stats = version_stats.table_stats.get(&table.id);
183                TableWithStats::from_table_and_stats(table, stats)
184            })
185            .collect();
186
187        Ok(Json(tables_with_stats))
188    }
189
190    pub async fn list_materialized_views(
191        Extension(srv): Extension<Service>,
192    ) -> Result<Json<Vec<TableWithStats>>> {
193        list_table_catalogs_inner(
194            &srv.metadata_manager,
195            &srv.hummock_manager,
196            TableType::MaterializedView,
197        )
198        .await
199    }
200
201    pub async fn list_tables(
202        Extension(srv): Extension<Service>,
203    ) -> Result<Json<Vec<TableWithStats>>> {
204        list_table_catalogs_inner(
205            &srv.metadata_manager,
206            &srv.hummock_manager,
207            TableType::Table,
208        )
209        .await
210    }
211
212    pub async fn list_index_tables(
213        Extension(srv): Extension<Service>,
214    ) -> Result<Json<Vec<TableWithStats>>> {
215        list_table_catalogs_inner(
216            &srv.metadata_manager,
217            &srv.hummock_manager,
218            TableType::Index,
219        )
220        .await
221    }
222
223    pub async fn list_indexes(Extension(srv): Extension<Service>) -> Result<Json<Vec<Index>>> {
224        let indexes = srv
225            .metadata_manager
226            .catalog_controller
227            .list_indexes()
228            .await
229            .map_err(err)?;
230
231        Ok(Json(indexes))
232    }
233
234    pub async fn list_subscription(
235        Extension(srv): Extension<Service>,
236    ) -> Result<Json<Vec<Subscription>>> {
237        let subscriptions = srv
238            .metadata_manager
239            .catalog_controller
240            .list_subscriptions()
241            .await
242            .map_err(err)?;
243
244        Ok(Json(subscriptions))
245    }
246
247    pub async fn list_internal_tables(
248        Extension(srv): Extension<Service>,
249    ) -> Result<Json<Vec<TableWithStats>>> {
250        list_table_catalogs_inner(
251            &srv.metadata_manager,
252            &srv.hummock_manager,
253            TableType::Internal,
254        )
255        .await
256    }
257
258    pub async fn list_sources(Extension(srv): Extension<Service>) -> Result<Json<Vec<Source>>> {
259        let sources = srv.metadata_manager.list_sources().await.map_err(err)?;
260
261        Ok(Json(sources))
262    }
263
264    pub async fn list_sinks(Extension(srv): Extension<Service>) -> Result<Json<Vec<Sink>>> {
265        let sinks = srv
266            .metadata_manager
267            .catalog_controller
268            .list_sinks()
269            .await
270            .map_err(err)?;
271
272        Ok(Json(sinks))
273    }
274
275    pub async fn list_views(Extension(srv): Extension<Service>) -> Result<Json<Vec<View>>> {
276        let views = srv
277            .metadata_manager
278            .catalog_controller
279            .list_views()
280            .await
281            .map_err(err)?;
282
283        Ok(Json(views))
284    }
285
286    pub async fn list_functions(
287        Extension(srv): Extension<Service>,
288    ) -> Result<Json<Vec<PbFunction>>> {
289        let functions = srv
290            .metadata_manager
291            .catalog_controller
292            .list_functions()
293            .await
294            .map_err(err)?;
295
296        Ok(Json(functions))
297    }
298
299    pub async fn list_streaming_jobs(
300        Extension(srv): Extension<Service>,
301    ) -> Result<Json<Vec<StreamingJobInfo>>> {
302        let streaming_jobs = srv
303            .metadata_manager
304            .catalog_controller
305            .list_streaming_job_infos()
306            .await
307            .map_err(err)?;
308
309        Ok(Json(streaming_jobs))
310    }
311
312    /// In the ddl backpressure graph, we want to compute the backpressure between relations.
313    /// So we need to know which are the fragments which are connected to external relations.
314    /// These fragments form the vertices of the graph.
315    /// We can get collection of backpressure values, keyed by vertex_id-vertex_id.
316    /// This function will return a map of fragment vertex id to relation id.
317    /// We can convert `fragment_id-fragment_id` to `relation_id-relation_id` using that.
318    /// Finally, we have a map of `relation_id-relation_id` to backpressure values.
319    pub async fn get_fragment_to_relation_map(
320        Extension(srv): Extension<Service>,
321    ) -> Result<Json<FragmentToRelationMap>> {
322        let table_fragments = srv
323            .metadata_manager
324            .catalog_controller
325            .table_fragments()
326            .await
327            .map_err(err)?;
328        let mut fragment_to_relation_map = HashMap::new();
329        for (relation_id, tf) in table_fragments {
330            for fragment_id in tf.fragments.keys() {
331                fragment_to_relation_map.insert(*fragment_id, relation_id.as_raw_id());
332            }
333        }
334        let map = FragmentToRelationMap {
335            fragment_to_relation_map,
336        };
337        Ok(Json(map))
338    }
339
340    /// Provides a hierarchy of relation ids to fragments to actors.
341    pub async fn get_relation_id_infos(
342        Extension(srv): Extension<Service>,
343    ) -> Result<Json<RelationIdInfos>> {
344        let table_fragments = srv
345            .metadata_manager
346            .catalog_controller
347            .table_fragments()
348            .await
349            .map_err(err)?;
350        let mut map = HashMap::new();
351        for (id, tf) in table_fragments {
352            let mut fragment_id_to_actor_ids = HashMap::new();
353            for (fragment_id, fragment) in &tf.fragments {
354                let actor_ids = fragment.actors.iter().map(|a| a.actor_id).collect_vec();
355                fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
356            }
357            map.insert(
358                id.as_raw_id(),
359                FragmentIdToActorIdMap {
360                    map: fragment_id_to_actor_ids,
361                },
362            );
363        }
364        let relation_id_infos = RelationIdInfos { map };
365
366        Ok(Json(relation_id_infos))
367    }
368
369    pub async fn list_fragments_by_job_id(
370        Extension(srv): Extension<Service>,
371        Path(job_id): Path<u32>,
372    ) -> Result<Json<PbTableFragments>> {
373        let job_id = JobId::new(job_id);
374        let table_fragments = srv
375            .metadata_manager
376            .get_job_fragments_by_id(job_id)
377            .await
378            .map_err(err)?;
379        let upstream_fragments = srv
380            .metadata_manager
381            .catalog_controller
382            .upstream_fragments(table_fragments.fragment_ids())
383            .await
384            .map_err(err)?;
385        let dispatchers = srv
386            .metadata_manager
387            .catalog_controller
388            .get_fragment_actor_dispatchers(
389                table_fragments.fragment_ids().map(|id| id as _).collect(),
390            )
391            .await
392            .map_err(err)?;
393        Ok(Json(
394            table_fragments.to_protobuf(&upstream_fragments, &dispatchers),
395        ))
396    }
397
398    pub async fn list_users(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbUserInfo>>> {
399        let users = srv
400            .metadata_manager
401            .catalog_controller
402            .list_users()
403            .await
404            .map_err(err)?;
405
406        Ok(Json(users))
407    }
408
409    pub async fn list_databases(
410        Extension(srv): Extension<Service>,
411    ) -> Result<Json<Vec<PbDatabase>>> {
412        let databases = srv
413            .metadata_manager
414            .catalog_controller
415            .list_databases()
416            .await
417            .map_err(err)?;
418
419        Ok(Json(databases))
420    }
421
422    pub async fn list_schemas(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbSchema>>> {
423        let schemas = srv
424            .metadata_manager
425            .catalog_controller
426            .list_schemas()
427            .await
428            .map_err(err)?;
429
430        Ok(Json(schemas))
431    }
432
433    pub async fn list_object_dependencies(
434        Extension(srv): Extension<Service>,
435    ) -> Result<Json<Vec<PbObjectDependencies>>> {
436        let object_dependencies = srv
437            .metadata_manager
438            .catalog_controller
439            .list_all_object_dependencies()
440            .await
441            .map_err(err)?;
442
443        Ok(Json(object_dependencies))
444    }
445
446    #[derive(Debug, Deserialize)]
447    pub struct AwaitTreeDumpParams {
448        #[serde(default = "await_tree_default_format")]
449        format: String,
450    }
451
452    impl AwaitTreeDumpParams {
453        /// Parse the `format` parameter to [`ActorTracesFormat`].
454        pub fn actor_traces_format(&self) -> Result<ActorTracesFormat> {
455            Ok(match self.format.as_str() {
456                "text" => ActorTracesFormat::Text,
457                "json" => ActorTracesFormat::Json,
458                _ => {
459                    return Err(err(anyhow!(
460                        "Unsupported format `{}`, only `text` and `json` are supported for now",
461                        self.format
462                    )));
463                }
464            })
465        }
466    }
467
468    fn await_tree_default_format() -> String {
469        // In dashboard, await tree is usually for engineer to debug, so we use human-readable text format by default here.
470        "text".to_owned()
471    }
472
473    pub async fn dump_await_tree_all(
474        Query(params): Query<AwaitTreeDumpParams>,
475        Extension(srv): Extension<Service>,
476    ) -> Result<Json<StackTraceResponse>> {
477        let actor_traces_format = params.actor_traces_format()?;
478
479        let res = dump_cluster_await_tree(
480            &srv.metadata_manager,
481            &srv.await_tree_reg,
482            actor_traces_format,
483        )
484        .await
485        .map_err(err)?;
486
487        Ok(res.into())
488    }
489
490    pub async fn dump_await_tree(
491        Path(worker_id): Path<WorkerId>,
492        Query(params): Query<AwaitTreeDumpParams>,
493        Extension(srv): Extension<Service>,
494    ) -> Result<Json<StackTraceResponse>> {
495        let actor_traces_format = params.actor_traces_format()?;
496
497        let worker_node = srv
498            .metadata_manager
499            .get_worker_by_id(worker_id)
500            .await
501            .map_err(err)?
502            .context("worker node not found")
503            .map_err(err)?;
504
505        let res = dump_worker_node_await_tree(std::iter::once(&worker_node), actor_traces_format)
506            .await
507            .map_err(err)?;
508
509        Ok(res.into())
510    }
511
512    pub async fn heap_profile(
513        Path(worker_id): Path<WorkerId>,
514        Extension(srv): Extension<Service>,
515    ) -> Result<Json<HeapProfilingResponse>> {
516        if worker_id == crate::manager::META_NODE_ID {
517            let result = srv
518                .profile_service
519                .heap_profiling(Request::new(HeapProfilingRequest { dir: "".to_owned() }))
520                .await
521                .map_err(err)?
522                .into_inner();
523            return Ok(result.into());
524        }
525
526        let worker_node = srv
527            .metadata_manager
528            .get_worker_by_id(worker_id)
529            .await
530            .map_err(err)?
531            .context("worker node not found")
532            .map_err(err)?;
533
534        let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
535
536        let result = client.heap_profile("".to_owned()).await.map_err(err)?;
537
538        Ok(result.into())
539    }
540
541    pub async fn list_heap_profile(
542        Path(worker_id): Path<WorkerId>,
543        Extension(srv): Extension<Service>,
544    ) -> Result<Json<ListHeapProfilingResponse>> {
545        if worker_id == crate::manager::META_NODE_ID {
546            let result = srv
547                .profile_service
548                .list_heap_profiling(Request::new(ListHeapProfilingRequest {}))
549                .await
550                .map_err(err)?
551                .into_inner();
552            return Ok(result.into());
553        }
554
555        let worker_node = srv
556            .metadata_manager
557            .get_worker_by_id(worker_id)
558            .await
559            .map_err(err)?
560            .context("worker node not found")
561            .map_err(err)?;
562
563        let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
564
565        let result = client.list_heap_profile().await.map_err(err)?;
566        Ok(result.into())
567    }
568
569    pub async fn analyze_heap(
570        Path((worker_id, file_path)): Path<(WorkerId, String)>,
571        Extension(srv): Extension<Service>,
572    ) -> Result<Response> {
573        let file_path =
574            String::from_utf8(base64_url::decode(&file_path).map_err(err)?).map_err(err)?;
575
576        let collapsed_bin = if worker_id == crate::manager::META_NODE_ID {
577            srv.profile_service
578                .analyze_heap(Request::new(AnalyzeHeapRequest {
579                    path: file_path.clone(),
580                }))
581                .await
582                .map_err(err)?
583                .into_inner()
584                .result
585        } else {
586            let worker_node = srv
587                .metadata_manager
588                .get_worker_by_id(worker_id)
589                .await
590                .map_err(err)?
591                .context("worker node not found")
592                .map_err(err)?;
593
594            let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
595            client
596                .analyze_heap(file_path.clone())
597                .await
598                .map_err(err)?
599                .result
600        };
601        let collapsed_str = String::from_utf8_lossy(&collapsed_bin).to_string();
602
603        let response = Response::builder()
604            .header("Content-Type", "application/octet-stream")
605            .body(collapsed_str.into());
606
607        response.map_err(err)
608    }
609
610    #[derive(Debug, Deserialize)]
611    pub struct DiagnoseParams {
612        #[serde(default = "await_tree_default_format")]
613        actor_traces_format: String,
614    }
615
616    pub async fn diagnose(
617        Query(params): Query<DiagnoseParams>,
618        Extension(srv): Extension<Service>,
619    ) -> Result<String> {
620        let actor_traces_format = match params.actor_traces_format.as_str() {
621            "text" => ActorTracesFormat::Text,
622            "json" => ActorTracesFormat::Json,
623            _ => {
624                return Err(err(anyhow!(
625                    "Unsupported actor_traces_format `{}`, only `text` and `json` are supported for now",
626                    params.actor_traces_format
627                )));
628            }
629        };
630        Ok(srv.diagnose_command.report(actor_traces_format).await)
631    }
632
633    /// NOTE(kwannoel): Although we fetch the BP for the entire graph via this API,
634    /// the workload should be reasonable.
635    /// In most cases, we can safely assume each node has most 2 outgoing edges (e.g. join).
636    /// In such a scenario, the number of edges is linear to the number of nodes.
637    /// So the workload is proportional to the relation id graph we fetch in `get_relation_id_infos`.
638    pub async fn get_streaming_stats(
639        Extension(srv): Extension<Service>,
640    ) -> Result<Json<GetStreamingStatsResponse>> {
641        let worker_nodes = srv
642            .metadata_manager
643            .list_active_streaming_compute_nodes()
644            .await
645            .map_err(err)?;
646
647        let mut futures = Vec::new();
648
649        for worker_node in worker_nodes {
650            let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
651            let client = Arc::new(client);
652            let fut = async move {
653                let result = client.get_streaming_stats().await.map_err(err)?;
654                Ok::<_, DashboardError>(result)
655            };
656            futures.push(fut);
657        }
658        let results = join_all(futures).await;
659
660        let mut all = GetStreamingStatsResponse::default();
661
662        for result in results {
663            let result = result
664                .map_err(|_| anyhow!("Failed to get back pressure"))
665                .map_err(err)?;
666
667            // Aggregate fragment_stats
668            for (fragment_id, fragment_stats) in result.fragment_stats {
669                if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
670                    s.actor_count += fragment_stats.actor_count;
671                    s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
672                } else {
673                    all.fragment_stats.insert(fragment_id, fragment_stats);
674                }
675            }
676
677            // Aggregate relation_stats
678            for (relation_id, relation_stats) in result.relation_stats {
679                if let Some(s) = all.relation_stats.get_mut(&relation_id) {
680                    s.actor_count += relation_stats.actor_count;
681                    s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
682                } else {
683                    all.relation_stats.insert(relation_id, relation_stats);
684                }
685            }
686
687            // Aggregate channel_stats
688            for (key, channel_stats) in result.channel_stats {
689                if let Some(s) = all.channel_stats.get_mut(&key) {
690                    s.actor_count += channel_stats.actor_count;
691                    s.output_blocking_duration += channel_stats.output_blocking_duration;
692                    s.recv_row_count += channel_stats.recv_row_count;
693                    s.send_row_count += channel_stats.send_row_count;
694                } else {
695                    all.channel_stats.insert(key, channel_stats);
696                }
697            }
698        }
699
700        Ok(all.into())
701    }
702
703    #[derive(Debug, Deserialize)]
704    pub struct StreamingStatsPrometheusParams {
705        /// Unix timestamp in seconds for the evaluation time. If not set, defaults to current Prometheus server time.
706        #[serde(default)]
707        at: Option<i64>,
708        /// Time offset for throughput and backpressure rate calculation in seconds. If not set, defaults to 60s.
709        #[serde(default = "streaming_stats_default_time_offset")]
710        time_offset: i64,
711    }
712
713    fn streaming_stats_default_time_offset() -> i64 {
714        60
715    }
716
717    pub async fn get_streaming_stats_from_prometheus(
718        Query(params): Query<StreamingStatsPrometheusParams>,
719        Extension(srv): Extension<Service>,
720    ) -> Result<Json<GetStreamingPrometheusStatsResponse>> {
721        let mut all = GetStreamingPrometheusStatsResponse::default();
722
723        // Get fragment and relation stats from workers
724        let worker_nodes = srv
725            .metadata_manager
726            .list_active_streaming_compute_nodes()
727            .await
728            .map_err(err)?;
729
730        let mut futures = Vec::new();
731
732        for worker_node in worker_nodes {
733            let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
734            let client = Arc::new(client);
735            let fut = async move {
736                let result = client.get_streaming_stats().await.map_err(err)?;
737                Ok::<_, DashboardError>(result)
738            };
739            futures.push(fut);
740        }
741        let results = join_all(futures).await;
742
743        for result in results {
744            let result = result
745                .map_err(|_| anyhow!("Failed to get streaming stats from worker"))
746                .map_err(err)?;
747
748            // Aggregate fragment_stats
749            for (fragment_id, fragment_stats) in result.fragment_stats {
750                if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
751                    s.actor_count += fragment_stats.actor_count;
752                    s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
753                } else {
754                    all.fragment_stats.insert(fragment_id, fragment_stats);
755                }
756            }
757
758            // Aggregate relation_stats
759            for (relation_id, relation_stats) in result.relation_stats {
760                if let Some(s) = all.relation_stats.get_mut(&relation_id) {
761                    s.actor_count += relation_stats.actor_count;
762                    s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
763                } else {
764                    all.relation_stats.insert(relation_id, relation_stats);
765                }
766            }
767        }
768
769        // Get channel delta stats from Prometheus
770        if let Some(ref client) = srv.prometheus_client {
771            // Query channel delta stats: throughput and backpressure rate
772            let channel_input_throughput_query = format!(
773                "sum(rate(stream_actor_in_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
774                srv.prometheus_selector, params.time_offset
775            );
776            let channel_output_throughput_query = format!(
777                "sum(rate(stream_actor_out_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
778                srv.prometheus_selector, params.time_offset
779            );
780            let channel_backpressure_query = format!(
781                "sum(rate(stream_actor_output_buffer_blocking_duration_ns{{{}}}[{}s])) by (fragment_id, downstream_fragment_id) \
782                 / ignoring (downstream_fragment_id) group_left sum(stream_actor_count) by (fragment_id)",
783                srv.prometheus_selector, params.time_offset
784            );
785
786            // Execute all queries concurrently with optional time parameter
787            let (
788                channel_input_throughput_result,
789                channel_output_throughput_result,
790                channel_backpressure_result,
791            ) = {
792                let mut input_query = client.query(channel_input_throughput_query);
793                let mut output_query = client.query(channel_output_throughput_query);
794                let mut backpressure_query = client.query(channel_backpressure_query);
795
796                // Set the evaluation time if provided
797                if let Some(at_time) = params.at {
798                    input_query = input_query.at(at_time);
799                    output_query = output_query.at(at_time);
800                    backpressure_query = backpressure_query.at(at_time);
801                }
802
803                tokio::try_join!(
804                    input_query.get(),
805                    output_query.get(),
806                    backpressure_query.get(),
807                )
808                .map_err(err)?
809            };
810
811            // Process channel delta stats
812            let mut channel_data = HashMap::new();
813
814            // Collect input throughput
815            if let Some(channel_input_throughput_data) =
816                channel_input_throughput_result.data().as_vector()
817            {
818                for sample in channel_input_throughput_data {
819                    if let Some(fragment_id_str) = sample.metric().get("fragment_id")
820                        && let Some(upstream_fragment_id_str) =
821                            sample.metric().get("upstream_fragment_id")
822                        && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
823                            fragment_id_str.parse::<u32>(),
824                            upstream_fragment_id_str.parse::<u32>(),
825                        )
826                    {
827                        let key = format!("{}_{}", upstream_fragment_id, fragment_id);
828                        channel_data
829                            .entry(key)
830                            .or_insert_with(|| ChannelDeltaStats {
831                                actor_count: 0,
832                                backpressure_rate: 0.0,
833                                recv_throughput: 0.0,
834                                send_throughput: 0.0,
835                            })
836                            .recv_throughput = sample.sample().value();
837                    }
838                }
839            }
840
841            // Collect output throughput
842            if let Some(channel_output_throughput_data) =
843                channel_output_throughput_result.data().as_vector()
844            {
845                for sample in channel_output_throughput_data {
846                    if let Some(fragment_id_str) = sample.metric().get("fragment_id")
847                        && let Some(upstream_fragment_id_str) =
848                            sample.metric().get("upstream_fragment_id")
849                        && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
850                            fragment_id_str.parse::<u32>(),
851                            upstream_fragment_id_str.parse::<u32>(),
852                        )
853                    {
854                        let key = format!("{}_{}", upstream_fragment_id, fragment_id);
855                        channel_data
856                            .entry(key)
857                            .or_insert_with(|| ChannelDeltaStats {
858                                actor_count: 0,
859                                backpressure_rate: 0.0,
860                                recv_throughput: 0.0,
861                                send_throughput: 0.0,
862                            })
863                            .send_throughput = sample.sample().value();
864                    }
865                }
866            }
867
868            // Collect backpressure rate
869            if let Some(channel_backpressure_data) = channel_backpressure_result.data().as_vector()
870            {
871                for sample in channel_backpressure_data {
872                    if let Some(fragment_id_str) = sample.metric().get("fragment_id")
873                        && let Some(downstream_fragment_id_str) =
874                            sample.metric().get("downstream_fragment_id")
875                        && let (Ok(fragment_id), Ok(downstream_fragment_id)) = (
876                            fragment_id_str.parse::<u32>(),
877                            downstream_fragment_id_str.parse::<u32>(),
878                        )
879                    {
880                        let key = format!("{}_{}", fragment_id, downstream_fragment_id);
881                        channel_data
882                            .entry(key)
883                            .or_insert_with(|| ChannelDeltaStats {
884                                actor_count: 0,
885                                backpressure_rate: 0.0,
886                                recv_throughput: 0.0,
887                                send_throughput: 0.0,
888                            })
889                            .backpressure_rate = sample.sample().value() / 1_000_000_000.0; // Convert ns to seconds
890                    }
891                }
892            }
893
894            // Set actor count for channels (using fragment actor count as approximation)
895            for (key, channel_stats) in &mut channel_data {
896                let parts: Vec<&str> = key.split('_').collect();
897                if parts.len() == 2
898                    && let Ok(fragment_id) = parts[1].parse::<u32>()
899                    && let Some(fragment_stats) = all.fragment_stats.get(&fragment_id)
900                {
901                    channel_stats.actor_count = fragment_stats.actor_count;
902                }
903            }
904
905            all.channel_stats = channel_data;
906
907            Ok(Json(all))
908        } else {
909            Err(err(anyhow!("Prometheus endpoint is not set")))
910        }
911    }
912
913    pub async fn get_version(Extension(_srv): Extension<Service>) -> Result<Json<String>> {
914        Ok(Json(risingwave_common::current_cluster_version()))
915    }
916}
917
918impl DashboardService {
919    pub async fn serve(self) -> Result<()> {
920        use handlers::*;
921        let srv = Arc::new(self);
922
923        let cors_layer = CorsLayer::new()
924            .allow_origin(cors::Any)
925            .allow_methods(vec![Method::GET]);
926
927        let api_router = Router::new()
928            .route("/version", get(get_version))
929            .route("/clusters/:ty", get(list_clusters))
930            .route("/streaming_jobs", get(list_streaming_jobs))
931            .route("/fragments/job_id/:job_id", get(list_fragments_by_job_id))
932            .route("/relation_id_infos", get(get_relation_id_infos))
933            .route(
934                "/fragment_to_relation_map",
935                get(get_fragment_to_relation_map),
936            )
937            .route("/views", get(list_views))
938            .route("/functions", get(list_functions))
939            .route("/materialized_views", get(list_materialized_views))
940            .route("/tables", get(list_tables))
941            .route("/indexes", get(list_index_tables))
942            .route("/index_items", get(list_indexes))
943            .route("/subscriptions", get(list_subscription))
944            .route("/internal_tables", get(list_internal_tables))
945            .route("/sources", get(list_sources))
946            .route("/sinks", get(list_sinks))
947            .route("/users", get(list_users))
948            .route("/databases", get(list_databases))
949            .route("/schemas", get(list_schemas))
950            .route("/object_dependencies", get(list_object_dependencies))
951            .route("/metrics/cluster", get(prometheus::list_prometheus_cluster))
952            .route("/metrics/streaming_stats", get(get_streaming_stats))
953            .route(
954                "/metrics/streaming_stats_prometheus",
955                get(get_streaming_stats_from_prometheus),
956            )
957            // /monitor/await_tree/{worker_id}/?format={text or json}
958            .route("/monitor/await_tree/:worker_id", get(dump_await_tree))
959            // /monitor/await_tree/?format={text or json}
960            .route("/monitor/await_tree/", get(dump_await_tree_all))
961            .route("/monitor/dump_heap_profile/:worker_id", get(heap_profile))
962            .route(
963                "/monitor/list_heap_profile/:worker_id",
964                get(list_heap_profile),
965            )
966            .route("/monitor/analyze/:worker_id/*path", get(analyze_heap))
967            // /monitor/diagnose/?format={text or json}
968            .route("/monitor/diagnose/", get(diagnose))
969            .layer(
970                ServiceBuilder::new()
971                    .layer(AddExtensionLayer::new(srv.clone()))
972                    .into_inner(),
973            )
974            .layer(cors_layer);
975
976        let trace_ui_router = otlp_embedded::ui_app(srv.trace_state.clone(), "/trace/");
977        let dashboard_router = risingwave_meta_dashboard::router();
978
979        let app = Router::new()
980            .fallback_service(dashboard_router)
981            .nest("/api", api_router)
982            .nest("/trace", trace_ui_router)
983            .layer(CompressionLayer::new());
984
985        let listener = TcpListener::bind(&srv.dashboard_addr)
986            .await
987            .context("failed to bind dashboard address")?;
988        axum::serve(listener, app)
989            .await
990            .context("failed to serve dashboard service")?;
991
992        Ok(())
993    }
994}