risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_fragments.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::types::{Fields, JsonbVal};
16use risingwave_frontend_macro::system_catalog;
17use risingwave_pb::stream_plan::FragmentTypeFlag;
18use serde_json::json;
19
20use crate::catalog::system_catalog::SysCatalogReaderImpl;
21use crate::error::Result;
22
23#[derive(Fields)]
24struct RwFragment {
25    #[primary_key]
26    fragment_id: i32,
27    table_id: i32,
28    distribution_type: String,
29    state_table_ids: Vec<i32>,
30    upstream_fragment_ids: Vec<i32>,
31    flags: Vec<String>,
32    parallelism: i32,
33    max_parallelism: i32,
34    node: JsonbVal,
35}
36
37pub(super) fn extract_fragment_type_flag(mask: u32) -> Vec<FragmentTypeFlag> {
38    let mut result = vec![];
39    for i in 0..32 {
40        let bit = 1 << i;
41        if mask & bit != 0 {
42            match FragmentTypeFlag::try_from(bit as i32) {
43                Err(_) => continue,
44                Ok(flag) => result.push(flag),
45            };
46        }
47    }
48    result
49}
50
51#[system_catalog(table, "rw_catalog.rw_fragments")]
52async fn read_rw_fragment(reader: &SysCatalogReaderImpl) -> Result<Vec<RwFragment>> {
53    let distributions = reader.meta_client.list_fragment_distribution().await?;
54
55    Ok(distributions
56        .into_iter()
57        .map(|distribution| RwFragment {
58            fragment_id: distribution.fragment_id as i32,
59            table_id: distribution.table_id as i32,
60            distribution_type: distribution.distribution_type().as_str_name().into(),
61            state_table_ids: distribution
62                .state_table_ids
63                .into_iter()
64                .map(|id| id as i32)
65                .collect(),
66            upstream_fragment_ids: distribution
67                .upstream_fragment_ids
68                .into_iter()
69                .map(|id| id as i32)
70                .collect(),
71            flags: extract_fragment_type_flag(distribution.fragment_type_mask)
72                .into_iter()
73                .flat_map(|t| t.as_str_name().strip_prefix("FRAGMENT_TYPE_FLAG_"))
74                .map(|s| s.into())
75                .collect(),
76            parallelism: distribution.parallelism as i32,
77            max_parallelism: distribution.vnode_count as i32,
78            node: json!(distribution.node).into(),
79        })
80        .collect())
81}
82
83#[cfg(test)]
84mod tests {
85    use risingwave_pb::stream_plan::FragmentTypeFlag;
86
87    use super::extract_fragment_type_flag;
88
89    #[test]
90    fn test_extract_mask() {
91        let mask = (FragmentTypeFlag::Source as u32) | (FragmentTypeFlag::StreamScan as u32);
92        let result = extract_fragment_type_flag(mask);
93        assert_eq!(result.len(), 2);
94        assert!(result.contains(&FragmentTypeFlag::Source));
95        assert!(result.contains(&FragmentTypeFlag::StreamScan))
96    }
97}