risingwave_meta/dashboard/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prometheus;
16
17use std::net::SocketAddr;
18use std::sync::Arc;
19
20use anyhow::{Context as _, Result, anyhow};
21use axum::Router;
22use axum::extract::{Extension, Path};
23use axum::http::{Method, StatusCode};
24use axum::response::{IntoResponse, Response};
25use axum::routing::get;
26use risingwave_common_heap_profiling::ProfileServiceImpl;
27use risingwave_rpc_client::MonitorClientPool;
28use tokio::net::TcpListener;
29use tower::ServiceBuilder;
30use tower_http::add_extension::AddExtensionLayer;
31use tower_http::compression::CompressionLayer;
32use tower_http::cors::{self, CorsLayer};
33
34use crate::hummock::HummockManagerRef;
35use crate::manager::MetadataManager;
36use crate::manager::diagnose::DiagnoseCommandRef;
37
38#[derive(Clone)]
39pub struct DashboardService {
40    pub await_tree_reg: await_tree::Registry,
41    pub dashboard_addr: SocketAddr,
42    pub prometheus_client: Option<prometheus_http_query::Client>,
43    pub prometheus_selector: String,
44    pub metadata_manager: MetadataManager,
45    pub hummock_manager: HummockManagerRef,
46    pub monitor_clients: MonitorClientPool,
47    pub diagnose_command: DiagnoseCommandRef,
48    pub profile_service: ProfileServiceImpl,
49    pub trace_state: otlp_embedded::StateRef,
50}
51
52pub type Service = Arc<DashboardService>;
53
54pub(super) mod handlers {
55    use std::cmp::min;
56    use std::collections::HashMap;
57
58    use anyhow::Context;
59    use axum::Json;
60    use axum::extract::Query;
61    use futures::future::join_all;
62    use itertools::Itertools;
63    use risingwave_common::id::JobId;
64    use risingwave_meta_model::WorkerId;
65    use risingwave_pb::catalog::table::TableType;
66    use risingwave_pb::catalog::{
67        Index, PbDatabase, PbFunction, PbSchema, Sink, Source, Subscription, Table, View,
68    };
69    use risingwave_pb::common::{WorkerNode, WorkerType};
70    use risingwave_pb::hummock::TableStats;
71    use risingwave_pb::meta::{
72        ActorIds, FragmentIdToActorIdMap, FragmentToRelationMap, PbTableFragments, RelationIdInfos,
73    };
74    use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
75    use risingwave_pb::monitor_service::{
76        AnalyzeHeapRequest, ChannelDeltaStats, GetStreamingPrometheusStatsResponse,
77        GetStreamingStatsResponse, HeapProfilingRequest, HeapProfilingResponse,
78        ListHeapProfilingRequest, ListHeapProfilingResponse, StackTraceResponse,
79    };
80    use risingwave_pb::user::PbUserInfo;
81    use serde::{Deserialize, Serialize};
82    use serde_json::json;
83    use thiserror_ext::AsReport;
84    use tonic::Request;
85
86    use super::*;
87    use crate::controller::fragment::StreamingJobInfo;
88    use crate::rpc::await_tree::{dump_cluster_await_tree, dump_worker_node_await_tree};
89
90    #[derive(Serialize)]
91    pub struct TableWithStats {
92        #[serde(flatten)]
93        pub table: Table,
94        pub total_size_bytes: i64,
95        pub total_key_count: i64,
96        pub total_key_size: i64,
97        pub total_value_size: i64,
98        pub compressed_size: u64,
99    }
100
101    impl TableWithStats {
102        pub fn from_table_and_stats(table: Table, stats: Option<&TableStats>) -> Self {
103            match stats {
104                Some(stats) => Self {
105                    total_size_bytes: stats.total_key_size + stats.total_value_size,
106                    total_key_count: stats.total_key_count,
107                    total_key_size: stats.total_key_size,
108                    total_value_size: stats.total_value_size,
109                    compressed_size: stats.total_compressed_size,
110                    table,
111                },
112                None => Self {
113                    total_size_bytes: 0,
114                    total_key_count: 0,
115                    total_key_size: 0,
116                    total_value_size: 0,
117                    compressed_size: 0,
118                    table,
119                },
120            }
121        }
122    }
123
124    pub struct DashboardError(anyhow::Error);
125    pub type Result<T> = std::result::Result<T, DashboardError>;
126
127    pub fn err(err: impl Into<anyhow::Error>) -> DashboardError {
128        DashboardError(err.into())
129    }
130
131    impl From<anyhow::Error> for DashboardError {
132        fn from(value: anyhow::Error) -> Self {
133            DashboardError(value)
134        }
135    }
136
137    impl IntoResponse for DashboardError {
138        fn into_response(self) -> axum::response::Response {
139            let mut resp = Json(json!({
140                "error": self.0.to_report_string(),
141            }))
142            .into_response();
143            *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
144            resp
145        }
146    }
147
148    pub async fn list_clusters(
149        Path(ty): Path<i32>,
150        Extension(srv): Extension<Service>,
151    ) -> Result<Json<Vec<WorkerNode>>> {
152        let worker_type = WorkerType::try_from(ty)
153            .map_err(|_| anyhow!("invalid worker type"))
154            .map_err(err)?;
155        let mut result = srv
156            .metadata_manager
157            .list_worker_node(Some(worker_type), None)
158            .await
159            .map_err(err)?;
160        result.sort_unstable_by_key(|n| n.id);
161        Ok(result.into())
162    }
163
164    async fn list_table_catalogs_inner(
165        metadata_manager: &MetadataManager,
166        hummock_manager: &HummockManagerRef,
167        table_type: TableType,
168    ) -> Result<Json<Vec<TableWithStats>>> {
169        let tables = metadata_manager
170            .catalog_controller
171            .list_tables_by_type(table_type.into())
172            .await
173            .map_err(err)?;
174
175        // Get table statistics from hummock manager
176        let version_stats = hummock_manager.get_version_stats().await;
177
178        let tables_with_stats = tables
179            .into_iter()
180            .map(|table| {
181                let stats = version_stats.table_stats.get(&table.id);
182                TableWithStats::from_table_and_stats(table, stats)
183            })
184            .collect();
185
186        Ok(Json(tables_with_stats))
187    }
188
189    pub async fn list_materialized_views(
190        Extension(srv): Extension<Service>,
191    ) -> Result<Json<Vec<TableWithStats>>> {
192        list_table_catalogs_inner(
193            &srv.metadata_manager,
194            &srv.hummock_manager,
195            TableType::MaterializedView,
196        )
197        .await
198    }
199
200    pub async fn list_tables(
201        Extension(srv): Extension<Service>,
202    ) -> Result<Json<Vec<TableWithStats>>> {
203        list_table_catalogs_inner(
204            &srv.metadata_manager,
205            &srv.hummock_manager,
206            TableType::Table,
207        )
208        .await
209    }
210
211    pub async fn list_index_tables(
212        Extension(srv): Extension<Service>,
213    ) -> Result<Json<Vec<TableWithStats>>> {
214        list_table_catalogs_inner(
215            &srv.metadata_manager,
216            &srv.hummock_manager,
217            TableType::Index,
218        )
219        .await
220    }
221
222    pub async fn list_indexes(Extension(srv): Extension<Service>) -> Result<Json<Vec<Index>>> {
223        let indexes = srv
224            .metadata_manager
225            .catalog_controller
226            .list_indexes()
227            .await
228            .map_err(err)?;
229
230        Ok(Json(indexes))
231    }
232
233    pub async fn list_subscription(
234        Extension(srv): Extension<Service>,
235    ) -> Result<Json<Vec<Subscription>>> {
236        let subscriptions = srv
237            .metadata_manager
238            .catalog_controller
239            .list_subscriptions()
240            .await
241            .map_err(err)?;
242
243        Ok(Json(subscriptions))
244    }
245
246    pub async fn list_internal_tables(
247        Extension(srv): Extension<Service>,
248    ) -> Result<Json<Vec<TableWithStats>>> {
249        list_table_catalogs_inner(
250            &srv.metadata_manager,
251            &srv.hummock_manager,
252            TableType::Internal,
253        )
254        .await
255    }
256
257    pub async fn list_sources(Extension(srv): Extension<Service>) -> Result<Json<Vec<Source>>> {
258        let sources = srv.metadata_manager.list_sources().await.map_err(err)?;
259
260        Ok(Json(sources))
261    }
262
263    pub async fn list_sinks(Extension(srv): Extension<Service>) -> Result<Json<Vec<Sink>>> {
264        let sinks = srv
265            .metadata_manager
266            .catalog_controller
267            .list_sinks()
268            .await
269            .map_err(err)?;
270
271        Ok(Json(sinks))
272    }
273
274    pub async fn list_views(Extension(srv): Extension<Service>) -> Result<Json<Vec<View>>> {
275        let views = srv
276            .metadata_manager
277            .catalog_controller
278            .list_views()
279            .await
280            .map_err(err)?;
281
282        Ok(Json(views))
283    }
284
285    pub async fn list_functions(
286        Extension(srv): Extension<Service>,
287    ) -> Result<Json<Vec<PbFunction>>> {
288        let functions = srv
289            .metadata_manager
290            .catalog_controller
291            .list_functions()
292            .await
293            .map_err(err)?;
294
295        Ok(Json(functions))
296    }
297
298    pub async fn list_streaming_jobs(
299        Extension(srv): Extension<Service>,
300    ) -> Result<Json<Vec<StreamingJobInfo>>> {
301        let streaming_jobs = srv
302            .metadata_manager
303            .catalog_controller
304            .list_streaming_job_infos()
305            .await
306            .map_err(err)?;
307
308        Ok(Json(streaming_jobs))
309    }
310
311    /// In the ddl backpressure graph, we want to compute the backpressure between relations.
312    /// So we need to know which are the fragments which are connected to external relations.
313    /// These fragments form the vertices of the graph.
314    /// We can get collection of backpressure values, keyed by vertex_id-vertex_id.
315    /// This function will return a map of fragment vertex id to relation id.
316    /// We can convert `fragment_id-fragment_id` to `relation_id-relation_id` using that.
317    /// Finally, we have a map of `relation_id-relation_id` to backpressure values.
318    pub async fn get_fragment_to_relation_map(
319        Extension(srv): Extension<Service>,
320    ) -> Result<Json<FragmentToRelationMap>> {
321        let table_fragments = srv
322            .metadata_manager
323            .catalog_controller
324            .table_fragments()
325            .await
326            .map_err(err)?;
327        let mut fragment_to_relation_map = HashMap::new();
328        for (relation_id, (tf, _, _)) in table_fragments {
329            for fragment_id in tf.fragments.keys() {
330                fragment_to_relation_map.insert(*fragment_id, relation_id);
331            }
332        }
333        let map = FragmentToRelationMap {
334            fragment_to_relation_map,
335        };
336        Ok(Json(map))
337    }
338
339    /// Provides a hierarchy of relation ids to fragments to actors.
340    pub async fn get_relation_id_infos(
341        Extension(srv): Extension<Service>,
342    ) -> Result<Json<RelationIdInfos>> {
343        let table_fragments = srv
344            .metadata_manager
345            .catalog_controller
346            .table_fragments()
347            .await
348            .map_err(err)?;
349        let mut map = HashMap::new();
350        for (id, (tf, fragment_actors, _actor_status)) in table_fragments {
351            let mut fragment_id_to_actor_ids = HashMap::new();
352            for fragment_id in tf.fragments.keys() {
353                let actor_ids = fragment_actors
354                    .get(fragment_id)
355                    .into_iter()
356                    .flat_map(|actors| actors.iter().map(|a| a.actor_id))
357                    .collect_vec();
358                fragment_id_to_actor_ids.insert(*fragment_id, ActorIds { ids: actor_ids });
359            }
360            map.insert(
361                id.as_raw_id(),
362                FragmentIdToActorIdMap {
363                    map: fragment_id_to_actor_ids,
364                },
365            );
366        }
367        let relation_id_infos = RelationIdInfos { map };
368
369        Ok(Json(relation_id_infos))
370    }
371
372    pub async fn list_fragments_by_job_id(
373        Extension(srv): Extension<Service>,
374        Path(job_id): Path<u32>,
375    ) -> Result<Json<PbTableFragments>> {
376        let job_id = JobId::new(job_id);
377        let (table_fragments, fragment_actors, actor_status) = srv
378            .metadata_manager
379            .catalog_controller
380            .get_job_fragments_by_id(job_id)
381            .await
382            .map_err(err)?;
383        let upstream_fragments = srv
384            .metadata_manager
385            .catalog_controller
386            .upstream_fragments(table_fragments.fragment_ids())
387            .await
388            .map_err(err)?;
389        let dispatchers = srv
390            .metadata_manager
391            .catalog_controller
392            .get_fragment_actor_dispatchers(
393                table_fragments.fragment_ids().map(|id| id as _).collect(),
394            )
395            .await
396            .map_err(err)?;
397        Ok(Json(table_fragments.to_protobuf(
398            &fragment_actors,
399            &upstream_fragments,
400            &dispatchers,
401            actor_status,
402        )))
403    }
404
405    pub async fn list_users(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbUserInfo>>> {
406        let users = srv
407            .metadata_manager
408            .catalog_controller
409            .list_users()
410            .await
411            .map_err(err)?;
412
413        Ok(Json(users))
414    }
415
416    pub async fn list_databases(
417        Extension(srv): Extension<Service>,
418    ) -> Result<Json<Vec<PbDatabase>>> {
419        let databases = srv
420            .metadata_manager
421            .catalog_controller
422            .list_databases()
423            .await
424            .map_err(err)?;
425
426        Ok(Json(databases))
427    }
428
429    pub async fn list_schemas(Extension(srv): Extension<Service>) -> Result<Json<Vec<PbSchema>>> {
430        let schemas = srv
431            .metadata_manager
432            .catalog_controller
433            .list_schemas()
434            .await
435            .map_err(err)?;
436
437        Ok(Json(schemas))
438    }
439
440    #[derive(Serialize)]
441    #[serde(rename_all = "camelCase")]
442    pub struct DashboardObjectDependency {
443        pub object_id: u32,
444        pub referenced_object_id: u32,
445    }
446
447    pub async fn list_object_dependencies(
448        Extension(srv): Extension<Service>,
449    ) -> Result<Json<Vec<DashboardObjectDependency>>> {
450        let object_dependencies = srv
451            .metadata_manager
452            .catalog_controller
453            .list_all_object_dependencies()
454            .await
455            .map_err(err)?;
456
457        let result = object_dependencies
458            .into_iter()
459            .map(|dependency| DashboardObjectDependency {
460                object_id: dependency.object_id.as_raw_id(),
461                referenced_object_id: dependency.referenced_object_id.as_raw_id(),
462            })
463            .collect();
464
465        Ok(Json(result))
466    }
467
468    #[derive(Debug, Deserialize)]
469    pub struct AwaitTreeDumpParams {
470        #[serde(default = "await_tree_default_format")]
471        format: String,
472    }
473
474    impl AwaitTreeDumpParams {
475        /// Parse the `format` parameter to [`ActorTracesFormat`].
476        pub fn actor_traces_format(&self) -> Result<ActorTracesFormat> {
477            Ok(match self.format.as_str() {
478                "text" => ActorTracesFormat::Text,
479                "json" => ActorTracesFormat::Json,
480                _ => {
481                    return Err(err(anyhow!(
482                        "Unsupported format `{}`, only `text` and `json` are supported for now",
483                        self.format
484                    )));
485                }
486            })
487        }
488    }
489
490    fn await_tree_default_format() -> String {
491        // In dashboard, await tree is usually for engineer to debug, so we use human-readable text format by default here.
492        "text".to_owned()
493    }
494
495    pub async fn dump_await_tree_all(
496        Query(params): Query<AwaitTreeDumpParams>,
497        Extension(srv): Extension<Service>,
498    ) -> Result<Json<StackTraceResponse>> {
499        let actor_traces_format = params.actor_traces_format()?;
500
501        let res = dump_cluster_await_tree(
502            &srv.metadata_manager,
503            &srv.await_tree_reg,
504            actor_traces_format,
505        )
506        .await
507        .map_err(err)?;
508
509        Ok(res.into())
510    }
511
512    pub async fn dump_await_tree(
513        Path(worker_id): Path<WorkerId>,
514        Query(params): Query<AwaitTreeDumpParams>,
515        Extension(srv): Extension<Service>,
516    ) -> Result<Json<StackTraceResponse>> {
517        let actor_traces_format = params.actor_traces_format()?;
518
519        let worker_node = srv
520            .metadata_manager
521            .get_worker_by_id(worker_id)
522            .await
523            .map_err(err)?
524            .context("worker node not found")
525            .map_err(err)?;
526
527        let res = dump_worker_node_await_tree(std::iter::once(&worker_node), actor_traces_format)
528            .await
529            .map_err(err)?;
530
531        Ok(res.into())
532    }
533
534    pub async fn heap_profile(
535        Path(worker_id): Path<WorkerId>,
536        Extension(srv): Extension<Service>,
537    ) -> Result<Json<HeapProfilingResponse>> {
538        if worker_id == crate::manager::META_NODE_ID {
539            let result = srv
540                .profile_service
541                .heap_profiling(Request::new(HeapProfilingRequest { dir: "".to_owned() }))
542                .await
543                .map_err(err)?
544                .into_inner();
545            return Ok(result.into());
546        }
547
548        let worker_node = srv
549            .metadata_manager
550            .get_worker_by_id(worker_id)
551            .await
552            .map_err(err)?
553            .context("worker node not found")
554            .map_err(err)?;
555
556        let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
557
558        let result = client.heap_profile("".to_owned()).await.map_err(err)?;
559
560        Ok(result.into())
561    }
562
563    pub async fn list_heap_profile(
564        Path(worker_id): Path<WorkerId>,
565        Extension(srv): Extension<Service>,
566    ) -> Result<Json<ListHeapProfilingResponse>> {
567        if worker_id == crate::manager::META_NODE_ID {
568            let result = srv
569                .profile_service
570                .list_heap_profiling(Request::new(ListHeapProfilingRequest {}))
571                .await
572                .map_err(err)?
573                .into_inner();
574            return Ok(result.into());
575        }
576
577        let worker_node = srv
578            .metadata_manager
579            .get_worker_by_id(worker_id)
580            .await
581            .map_err(err)?
582            .context("worker node not found")
583            .map_err(err)?;
584
585        let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
586
587        let result = client.list_heap_profile().await.map_err(err)?;
588        Ok(result.into())
589    }
590
591    pub async fn analyze_heap(
592        Path((worker_id, file_path)): Path<(WorkerId, String)>,
593        Extension(srv): Extension<Service>,
594    ) -> Result<Response> {
595        let file_path =
596            String::from_utf8(base64_url::decode(&file_path).map_err(err)?).map_err(err)?;
597
598        let collapsed_bin = if worker_id == crate::manager::META_NODE_ID {
599            srv.profile_service
600                .analyze_heap(Request::new(AnalyzeHeapRequest {
601                    path: file_path.clone(),
602                }))
603                .await
604                .map_err(err)?
605                .into_inner()
606                .result
607        } else {
608            let worker_node = srv
609                .metadata_manager
610                .get_worker_by_id(worker_id)
611                .await
612                .map_err(err)?
613                .context("worker node not found")
614                .map_err(err)?;
615
616            let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
617            client
618                .analyze_heap(file_path.clone())
619                .await
620                .map_err(err)?
621                .result
622        };
623        let collapsed_str = String::from_utf8_lossy(&collapsed_bin).to_string();
624
625        let response = Response::builder()
626            .header("Content-Type", "application/octet-stream")
627            .body(collapsed_str.into());
628
629        response.map_err(err)
630    }
631
632    #[derive(Debug, Deserialize)]
633    pub struct DiagnoseParams {
634        #[serde(default = "await_tree_default_format")]
635        actor_traces_format: String,
636    }
637
638    pub async fn diagnose(
639        Query(params): Query<DiagnoseParams>,
640        Extension(srv): Extension<Service>,
641    ) -> Result<String> {
642        let actor_traces_format = match params.actor_traces_format.as_str() {
643            "text" => ActorTracesFormat::Text,
644            "json" => ActorTracesFormat::Json,
645            _ => {
646                return Err(err(anyhow!(
647                    "Unsupported actor_traces_format `{}`, only `text` and `json` are supported for now",
648                    params.actor_traces_format
649                )));
650            }
651        };
652        Ok(srv.diagnose_command.report(actor_traces_format).await)
653    }
654
655    /// NOTE(kwannoel): Although we fetch the BP for the entire graph via this API,
656    /// the workload should be reasonable.
657    /// In most cases, we can safely assume each node has most 2 outgoing edges (e.g. join).
658    /// In such a scenario, the number of edges is linear to the number of nodes.
659    /// So the workload is proportional to the relation id graph we fetch in `get_relation_id_infos`.
660    pub async fn get_streaming_stats(
661        Extension(srv): Extension<Service>,
662    ) -> Result<Json<GetStreamingStatsResponse>> {
663        let worker_nodes = srv
664            .metadata_manager
665            .list_active_streaming_compute_nodes()
666            .await
667            .map_err(err)?;
668
669        let mut futures = Vec::new();
670
671        for worker_node in worker_nodes {
672            let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
673            let client = Arc::new(client);
674            let fut = async move {
675                let result = client.get_streaming_stats().await.map_err(err)?;
676                Ok::<_, DashboardError>(result)
677            };
678            futures.push(fut);
679        }
680        let results = join_all(futures).await;
681
682        let mut all = GetStreamingStatsResponse::default();
683
684        for result in results {
685            let result = result
686                .map_err(|_| anyhow!("Failed to get back pressure"))
687                .map_err(err)?;
688
689            // Aggregate fragment_stats
690            for (fragment_id, fragment_stats) in result.fragment_stats {
691                if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
692                    s.actor_count += fragment_stats.actor_count;
693                    s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
694                } else {
695                    all.fragment_stats.insert(fragment_id, fragment_stats);
696                }
697            }
698
699            // Aggregate relation_stats
700            for (relation_id, relation_stats) in result.relation_stats {
701                if let Some(s) = all.relation_stats.get_mut(&relation_id) {
702                    s.actor_count += relation_stats.actor_count;
703                    s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
704                } else {
705                    all.relation_stats.insert(relation_id, relation_stats);
706                }
707            }
708
709            // Aggregate channel_stats
710            for (key, channel_stats) in result.channel_stats {
711                if let Some(s) = all.channel_stats.get_mut(&key) {
712                    s.actor_count += channel_stats.actor_count;
713                    s.output_blocking_duration += channel_stats.output_blocking_duration;
714                    s.recv_row_count += channel_stats.recv_row_count;
715                    s.send_row_count += channel_stats.send_row_count;
716                } else {
717                    all.channel_stats.insert(key, channel_stats);
718                }
719            }
720        }
721
722        Ok(all.into())
723    }
724
725    #[derive(Debug, Deserialize)]
726    pub struct StreamingStatsPrometheusParams {
727        /// Unix timestamp in seconds for the evaluation time. If not set, defaults to current Prometheus server time.
728        #[serde(default)]
729        at: Option<i64>,
730        /// Time offset for throughput and backpressure rate calculation in seconds. If not set, defaults to 60s.
731        #[serde(default = "streaming_stats_default_time_offset")]
732        time_offset: i64,
733    }
734
735    fn streaming_stats_default_time_offset() -> i64 {
736        60
737    }
738
739    pub async fn get_streaming_stats_from_prometheus(
740        Query(params): Query<StreamingStatsPrometheusParams>,
741        Extension(srv): Extension<Service>,
742    ) -> Result<Json<GetStreamingPrometheusStatsResponse>> {
743        let mut all = GetStreamingPrometheusStatsResponse::default();
744
745        // Get fragment and relation stats from workers
746        let worker_nodes = srv
747            .metadata_manager
748            .list_active_streaming_compute_nodes()
749            .await
750            .map_err(err)?;
751
752        let mut futures = Vec::new();
753
754        for worker_node in worker_nodes {
755            let client = srv.monitor_clients.get(&worker_node).await.map_err(err)?;
756            let client = Arc::new(client);
757            let fut = async move {
758                let result = client.get_streaming_stats().await.map_err(err)?;
759                Ok::<_, DashboardError>(result)
760            };
761            futures.push(fut);
762        }
763        let results = join_all(futures).await;
764
765        for result in results {
766            let result = result
767                .map_err(|_| anyhow!("Failed to get streaming stats from worker"))
768                .map_err(err)?;
769
770            // Aggregate fragment_stats
771            for (fragment_id, fragment_stats) in result.fragment_stats {
772                if let Some(s) = all.fragment_stats.get_mut(&fragment_id) {
773                    s.actor_count += fragment_stats.actor_count;
774                    s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch);
775                } else {
776                    all.fragment_stats.insert(fragment_id, fragment_stats);
777                }
778            }
779
780            // Aggregate relation_stats
781            for (relation_id, relation_stats) in result.relation_stats {
782                if let Some(s) = all.relation_stats.get_mut(&relation_id) {
783                    s.actor_count += relation_stats.actor_count;
784                    s.current_epoch = min(s.current_epoch, relation_stats.current_epoch);
785                } else {
786                    all.relation_stats.insert(relation_id, relation_stats);
787                }
788            }
789        }
790
791        // Get channel delta stats from Prometheus
792        if let Some(ref client) = srv.prometheus_client {
793            // Query channel delta stats: throughput and backpressure rate
794            let channel_input_throughput_query = format!(
795                "sum(rate(stream_actor_in_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
796                srv.prometheus_selector, params.time_offset
797            );
798            let channel_output_throughput_query = format!(
799                "sum(rate(stream_actor_out_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
800                srv.prometheus_selector, params.time_offset
801            );
802            let channel_backpressure_query = format!(
803                "sum(rate(stream_actor_output_buffer_blocking_duration_ns{{{}}}[{}s])) by (fragment_id, downstream_fragment_id) \
804                 / ignoring (downstream_fragment_id) group_left sum(stream_actor_count) by (fragment_id)",
805                srv.prometheus_selector, params.time_offset
806            );
807
808            // Execute all queries concurrently with optional time parameter
809            let (
810                channel_input_throughput_result,
811                channel_output_throughput_result,
812                channel_backpressure_result,
813            ) = {
814                let mut input_query = client.query(channel_input_throughput_query);
815                let mut output_query = client.query(channel_output_throughput_query);
816                let mut backpressure_query = client.query(channel_backpressure_query);
817
818                // Set the evaluation time if provided
819                if let Some(at_time) = params.at {
820                    input_query = input_query.at(at_time);
821                    output_query = output_query.at(at_time);
822                    backpressure_query = backpressure_query.at(at_time);
823                }
824
825                tokio::try_join!(
826                    input_query.get(),
827                    output_query.get(),
828                    backpressure_query.get(),
829                )
830                .map_err(err)?
831            };
832
833            // Process channel delta stats
834            let mut channel_data = HashMap::new();
835
836            // Collect input throughput
837            if let Some(channel_input_throughput_data) =
838                channel_input_throughput_result.data().as_vector()
839            {
840                for sample in channel_input_throughput_data {
841                    if let Some(fragment_id_str) = sample.metric().get("fragment_id")
842                        && let Some(upstream_fragment_id_str) =
843                            sample.metric().get("upstream_fragment_id")
844                        && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
845                            fragment_id_str.parse::<u32>(),
846                            upstream_fragment_id_str.parse::<u32>(),
847                        )
848                    {
849                        let key = format!("{}_{}", upstream_fragment_id, fragment_id);
850                        channel_data
851                            .entry(key)
852                            .or_insert_with(|| ChannelDeltaStats {
853                                actor_count: 0,
854                                backpressure_rate: 0.0,
855                                recv_throughput: 0.0,
856                                send_throughput: 0.0,
857                            })
858                            .recv_throughput = sample.sample().value();
859                    }
860                }
861            }
862
863            // Collect output throughput
864            if let Some(channel_output_throughput_data) =
865                channel_output_throughput_result.data().as_vector()
866            {
867                for sample in channel_output_throughput_data {
868                    if let Some(fragment_id_str) = sample.metric().get("fragment_id")
869                        && let Some(upstream_fragment_id_str) =
870                            sample.metric().get("upstream_fragment_id")
871                        && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
872                            fragment_id_str.parse::<u32>(),
873                            upstream_fragment_id_str.parse::<u32>(),
874                        )
875                    {
876                        let key = format!("{}_{}", upstream_fragment_id, fragment_id);
877                        channel_data
878                            .entry(key)
879                            .or_insert_with(|| ChannelDeltaStats {
880                                actor_count: 0,
881                                backpressure_rate: 0.0,
882                                recv_throughput: 0.0,
883                                send_throughput: 0.0,
884                            })
885                            .send_throughput = sample.sample().value();
886                    }
887                }
888            }
889
890            // Collect backpressure rate
891            if let Some(channel_backpressure_data) = channel_backpressure_result.data().as_vector()
892            {
893                for sample in channel_backpressure_data {
894                    if let Some(fragment_id_str) = sample.metric().get("fragment_id")
895                        && let Some(downstream_fragment_id_str) =
896                            sample.metric().get("downstream_fragment_id")
897                        && let (Ok(fragment_id), Ok(downstream_fragment_id)) = (
898                            fragment_id_str.parse::<u32>(),
899                            downstream_fragment_id_str.parse::<u32>(),
900                        )
901                    {
902                        let key = format!("{}_{}", fragment_id, downstream_fragment_id);
903                        channel_data
904                            .entry(key)
905                            .or_insert_with(|| ChannelDeltaStats {
906                                actor_count: 0,
907                                backpressure_rate: 0.0,
908                                recv_throughput: 0.0,
909                                send_throughput: 0.0,
910                            })
911                            .backpressure_rate = sample.sample().value() / 1_000_000_000.0; // Convert ns to seconds
912                    }
913                }
914            }
915
916            // Set actor count for channels (using fragment actor count as approximation)
917            for (key, channel_stats) in &mut channel_data {
918                let parts: Vec<&str> = key.split('_').collect();
919                if parts.len() == 2
920                    && let Ok(fragment_id) = parts[1].parse::<u32>()
921                    && let Some(fragment_stats) = all.fragment_stats.get(&fragment_id)
922                {
923                    channel_stats.actor_count = fragment_stats.actor_count;
924                }
925            }
926
927            all.channel_stats = channel_data;
928
929            Ok(Json(all))
930        } else {
931            Err(err(anyhow!("Prometheus endpoint is not set")))
932        }
933    }
934
935    pub async fn get_version(Extension(_srv): Extension<Service>) -> Result<Json<String>> {
936        Ok(Json(risingwave_common::current_cluster_version()))
937    }
938}
939
940impl DashboardService {
941    pub async fn serve(self) -> Result<()> {
942        use handlers::*;
943        let srv = Arc::new(self);
944
945        let cors_layer = CorsLayer::new()
946            .allow_origin(cors::Any)
947            .allow_methods(vec![Method::GET]);
948
949        let api_router = Router::new()
950            .route("/version", get(get_version))
951            .route("/clusters/{ty}", get(list_clusters))
952            .route("/streaming_jobs", get(list_streaming_jobs))
953            .route("/fragments/job_id/{job_id}", get(list_fragments_by_job_id))
954            .route("/relation_id_infos", get(get_relation_id_infos))
955            .route(
956                "/fragment_to_relation_map",
957                get(get_fragment_to_relation_map),
958            )
959            .route("/views", get(list_views))
960            .route("/functions", get(list_functions))
961            .route("/materialized_views", get(list_materialized_views))
962            .route("/tables", get(list_tables))
963            .route("/indexes", get(list_index_tables))
964            .route("/index_items", get(list_indexes))
965            .route("/subscriptions", get(list_subscription))
966            .route("/internal_tables", get(list_internal_tables))
967            .route("/sources", get(list_sources))
968            .route("/sinks", get(list_sinks))
969            .route("/users", get(list_users))
970            .route("/databases", get(list_databases))
971            .route("/schemas", get(list_schemas))
972            .route("/object_dependencies", get(list_object_dependencies))
973            .route("/metrics/cluster", get(prometheus::list_prometheus_cluster))
974            .route("/metrics/streaming_stats", get(get_streaming_stats))
975            .route(
976                "/metrics/streaming_stats_prometheus",
977                get(get_streaming_stats_from_prometheus),
978            )
979            // /monitor/await_tree/{worker_id}/?format={text or json}
980            .route("/monitor/await_tree/{worker_id}", get(dump_await_tree))
981            // /monitor/await_tree/?format={text or json}
982            .route("/monitor/await_tree/", get(dump_await_tree_all))
983            .route("/monitor/dump_heap_profile/{worker_id}", get(heap_profile))
984            .route(
985                "/monitor/list_heap_profile/{worker_id}",
986                get(list_heap_profile),
987            )
988            .route("/monitor/analyze/{worker_id}/{*path}", get(analyze_heap))
989            // /monitor/diagnose/?format={text or json}
990            .route("/monitor/diagnose/", get(diagnose))
991            .layer(
992                ServiceBuilder::new()
993                    .layer(AddExtensionLayer::new(srv.clone()))
994                    .into_inner(),
995            )
996            .layer(cors_layer);
997
998        let trace_ui_router = otlp_embedded::ui_app(srv.trace_state.clone(), "/trace/");
999        let dashboard_router = risingwave_meta_dashboard::router();
1000
1001        let app = Router::new()
1002            .fallback_service(dashboard_router)
1003            .nest("/api", api_router)
1004            .nest("/trace", trace_ui_router)
1005            .layer(CompressionLayer::new());
1006
1007        let listener = TcpListener::bind(&srv.dashboard_addr)
1008            .await
1009            .context("failed to bind dashboard address")?;
1010        axum::serve(listener, app)
1011            .await
1012            .context("failed to serve dashboard service")?;
1013
1014        Ok(())
1015    }
1016}