risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_fragments.rs1use risingwave_common::catalog::FragmentTypeFlag;
16use risingwave_common::types::{Fields, JsonbVal};
17use risingwave_frontend_macro::system_catalog;
18use serde_json::json;
19
20use crate::catalog::system_catalog::SysCatalogReaderImpl;
21use crate::error::Result;
22
23#[derive(Fields)]
24struct RwFragment {
25 #[primary_key]
26 fragment_id: i32,
27 table_id: i32,
28 distribution_type: String,
29 state_table_ids: Vec<i32>,
30 upstream_fragment_ids: Vec<i32>,
31 flags: Vec<String>,
32 parallelism: i32,
33 max_parallelism: i32,
34 parallelism_policy: String,
35 node: JsonbVal,
36}
37
38pub(super) fn extract_fragment_type_flag(mask: u32) -> Vec<FragmentTypeFlag> {
39 let mut result = vec![];
40 for i in 0..32 {
41 let bit = 1 << i;
42 if mask & bit != 0 {
43 match FragmentTypeFlag::try_from(bit) {
44 Err(_) => continue,
45 Ok(flag) => result.push(flag),
46 };
47 }
48 }
49 result
50}
51
52#[system_catalog(table, "rw_catalog.rw_fragments")]
53async fn read_rw_fragment(reader: &SysCatalogReaderImpl) -> Result<Vec<RwFragment>> {
54 let distributions = reader.meta_client.list_fragment_distribution().await?;
55
56 Ok(distributions
57 .into_iter()
58 .map(|distribution| RwFragment {
59 fragment_id: distribution.fragment_id.as_i32_id(),
60 table_id: distribution.table_id.as_i32_id(),
61 distribution_type: distribution.distribution_type().as_str_name().into(),
62 state_table_ids: distribution
63 .state_table_ids
64 .into_iter()
65 .map(|id| id.as_i32_id())
66 .collect(),
67 upstream_fragment_ids: distribution
68 .upstream_fragment_ids
69 .into_iter()
70 .map(|id| id.as_i32_id())
71 .collect(),
72 flags: extract_fragment_type_flag(distribution.fragment_type_mask)
73 .into_iter()
74 .map(|t| t.as_str_name().to_owned())
75 .collect(),
76 parallelism: distribution.parallelism as i32,
77 max_parallelism: distribution.vnode_count as i32,
78 parallelism_policy: distribution.parallelism_policy,
79 node: json!(distribution.node).into(),
80 })
81 .collect())
82}
83
84#[cfg(test)]
85mod tests {
86 use risingwave_common::catalog::FragmentTypeFlag;
87
88 use super::extract_fragment_type_flag;
89
90 #[test]
91 fn test_extract_mask() {
92 let mask = (FragmentTypeFlag::Source as u32) | (FragmentTypeFlag::StreamScan as u32);
93 let result = extract_fragment_type_flag(mask);
94 assert_eq!(result.len(), 2);
95 assert!(result.contains(&FragmentTypeFlag::Source));
96 assert!(result.contains(&FragmentTypeFlag::StreamScan))
97 }
98}