risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_fragments.rs1use risingwave_common::types::{Fields, JsonbVal};
16use risingwave_frontend_macro::system_catalog;
17use risingwave_pb::stream_plan::FragmentTypeFlag;
18use serde_json::json;
19
20use crate::catalog::system_catalog::SysCatalogReaderImpl;
21use crate::error::Result;
22
23#[derive(Fields)]
24struct RwFragment {
25 #[primary_key]
26 fragment_id: i32,
27 table_id: i32,
28 distribution_type: String,
29 state_table_ids: Vec<i32>,
30 upstream_fragment_ids: Vec<i32>,
31 flags: Vec<String>,
32 parallelism: i32,
33 max_parallelism: i32,
34 node: JsonbVal,
35}
36
37pub(super) fn extract_fragment_type_flag(mask: u32) -> Vec<FragmentTypeFlag> {
38 let mut result = vec![];
39 for i in 0..32 {
40 let bit = 1 << i;
41 if mask & bit != 0 {
42 match FragmentTypeFlag::try_from(bit as i32) {
43 Err(_) => continue,
44 Ok(flag) => result.push(flag),
45 };
46 }
47 }
48 result
49}
50
51#[system_catalog(table, "rw_catalog.rw_fragments")]
52async fn read_rw_fragment(reader: &SysCatalogReaderImpl) -> Result<Vec<RwFragment>> {
53 let distributions = reader.meta_client.list_fragment_distribution().await?;
54
55 Ok(distributions
56 .into_iter()
57 .map(|distribution| RwFragment {
58 fragment_id: distribution.fragment_id as i32,
59 table_id: distribution.table_id as i32,
60 distribution_type: distribution.distribution_type().as_str_name().into(),
61 state_table_ids: distribution
62 .state_table_ids
63 .into_iter()
64 .map(|id| id as i32)
65 .collect(),
66 upstream_fragment_ids: distribution
67 .upstream_fragment_ids
68 .into_iter()
69 .map(|id| id as i32)
70 .collect(),
71 flags: extract_fragment_type_flag(distribution.fragment_type_mask)
72 .into_iter()
73 .flat_map(|t| t.as_str_name().strip_prefix("FRAGMENT_TYPE_FLAG_"))
74 .map(|s| s.into())
75 .collect(),
76 parallelism: distribution.parallelism as i32,
77 max_parallelism: distribution.vnode_count as i32,
78 node: json!(distribution.node).into(),
79 })
80 .collect())
81}
82
83#[cfg(test)]
84mod tests {
85 use risingwave_pb::stream_plan::FragmentTypeFlag;
86
87 use super::extract_fragment_type_flag;
88
89 #[test]
90 fn test_extract_mask() {
91 let mask = (FragmentTypeFlag::Source as u32) | (FragmentTypeFlag::StreamScan as u32);
92 let result = extract_fragment_type_flag(mask);
93 assert_eq!(result.len(), 2);
94 assert!(result.contains(&FragmentTypeFlag::Source));
95 assert!(result.contains(&FragmentTypeFlag::StreamScan))
96 }
97}