risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_fragments.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::FragmentTypeFlag;
16use risingwave_common::types::{Fields, JsonbVal};
17use risingwave_frontend_macro::system_catalog;
18use serde_json::json;
19
20use crate::catalog::system_catalog::SysCatalogReaderImpl;
21use crate::error::Result;
22
23#[derive(Fields)]
24struct RwFragment {
25    #[primary_key]
26    fragment_id: i32,
27    table_id: i32,
28    distribution_type: String,
29    state_table_ids: Vec<i32>,
30    upstream_fragment_ids: Vec<i32>,
31    flags: Vec<String>,
32    parallelism: i32,
33    max_parallelism: i32,
34    node: JsonbVal,
35}
36
37pub(super) fn extract_fragment_type_flag(mask: u32) -> Vec<FragmentTypeFlag> {
38    let mut result = vec![];
39    for i in 0..32 {
40        let bit = 1 << i;
41        if mask & bit != 0 {
42            match FragmentTypeFlag::try_from(bit) {
43                Err(_) => continue,
44                Ok(flag) => result.push(flag),
45            };
46        }
47    }
48    result
49}
50
51#[system_catalog(table, "rw_catalog.rw_fragments")]
52async fn read_rw_fragment(reader: &SysCatalogReaderImpl) -> Result<Vec<RwFragment>> {
53    let distributions = reader.meta_client.list_fragment_distribution().await?;
54
55    Ok(distributions
56        .into_iter()
57        .map(|distribution| RwFragment {
58            fragment_id: distribution.fragment_id as i32,
59            table_id: distribution.table_id as i32,
60            distribution_type: distribution.distribution_type().as_str_name().into(),
61            state_table_ids: distribution
62                .state_table_ids
63                .into_iter()
64                .map(|id| id as i32)
65                .collect(),
66            upstream_fragment_ids: distribution
67                .upstream_fragment_ids
68                .into_iter()
69                .map(|id| id as i32)
70                .collect(),
71            flags: extract_fragment_type_flag(distribution.fragment_type_mask)
72                .into_iter()
73                .map(|t| t.as_str_name().to_owned())
74                .collect(),
75            parallelism: distribution.parallelism as i32,
76            max_parallelism: distribution.vnode_count as i32,
77            node: json!(distribution.node).into(),
78        })
79        .collect())
80}
81
82#[cfg(test)]
83mod tests {
84    use risingwave_common::catalog::FragmentTypeFlag;
85
86    use super::extract_fragment_type_flag;
87
88    #[test]
89    fn test_extract_mask() {
90        let mask = (FragmentTypeFlag::Source as u32) | (FragmentTypeFlag::StreamScan as u32);
91        let result = extract_fragment_type_flag(mask);
92        assert_eq!(result.len(), 2);
93        assert!(result.contains(&FragmentTypeFlag::Source));
94        assert!(result.contains(&FragmentTypeFlag::StreamScan))
95    }
96}