risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_fragments.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::FragmentTypeFlag;
16use risingwave_common::types::{Fields, JsonbVal};
17use risingwave_frontend_macro::system_catalog;
18use serde_json::json;
19
20use crate::catalog::system_catalog::SysCatalogReaderImpl;
21use crate::error::Result;
22
23#[derive(Fields)]
24struct RwFragment {
25    #[primary_key]
26    fragment_id: i32,
27    table_id: i32,
28    distribution_type: String,
29    state_table_ids: Vec<i32>,
30    upstream_fragment_ids: Vec<i32>,
31    flags: Vec<String>,
32    parallelism: i32,
33    max_parallelism: i32,
34    parallelism_policy: String,
35    node: JsonbVal,
36}
37
38pub(super) fn extract_fragment_type_flag(mask: u32) -> Vec<FragmentTypeFlag> {
39    let mut result = vec![];
40    for i in 0..32 {
41        let bit = 1 << i;
42        if mask & bit != 0 {
43            match FragmentTypeFlag::try_from(bit) {
44                Err(_) => continue,
45                Ok(flag) => result.push(flag),
46            };
47        }
48    }
49    result
50}
51
52#[system_catalog(table, "rw_catalog.rw_fragments")]
53async fn read_rw_fragment(reader: &SysCatalogReaderImpl) -> Result<Vec<RwFragment>> {
54    let distributions = reader.meta_client.list_fragment_distribution().await?;
55
56    Ok(distributions
57        .into_iter()
58        .map(|distribution| RwFragment {
59            fragment_id: distribution.fragment_id.as_i32_id(),
60            table_id: distribution.table_id.as_i32_id(),
61            distribution_type: distribution.distribution_type().as_str_name().into(),
62            state_table_ids: distribution
63                .state_table_ids
64                .into_iter()
65                .map(|id| id.as_i32_id())
66                .collect(),
67            upstream_fragment_ids: distribution
68                .upstream_fragment_ids
69                .into_iter()
70                .map(|id| id.as_i32_id())
71                .collect(),
72            flags: extract_fragment_type_flag(distribution.fragment_type_mask)
73                .into_iter()
74                .map(|t| t.as_str_name().to_owned())
75                .collect(),
76            parallelism: distribution.parallelism as i32,
77            max_parallelism: distribution.vnode_count as i32,
78            parallelism_policy: distribution.parallelism_policy,
79            node: json!(distribution.node).into(),
80        })
81        .collect())
82}
83
84#[cfg(test)]
85mod tests {
86    use risingwave_common::catalog::FragmentTypeFlag;
87
88    use super::extract_fragment_type_flag;
89
90    #[test]
91    fn test_extract_mask() {
92        let mask = (FragmentTypeFlag::Source as u32) | (FragmentTypeFlag::StreamScan as u32);
93        let result = extract_fragment_type_flag(mask);
94        assert_eq!(result.len(), 2);
95        assert!(result.contains(&FragmentTypeFlag::Source));
96        assert!(result.contains(&FragmentTypeFlag::StreamScan))
97    }
98}