risingwave_common/catalog/
internal_table.rs1use std::any::type_name;
16use std::fmt::Debug;
17
18use anyhow::anyhow;
19use itertools::Itertools;
20use risingwave_pb::id::{FragmentId, TableId};
21
22pub const RW_INTERNAL_TABLE_FUNCTION_NAME: &str = "rw_table";
23
24pub fn generate_internal_table_name_with_type(
25 job_name: &str,
26 fragment_id: FragmentId,
27 table_id: TableId,
28 table_type: &str,
29) -> String {
30 format!(
31 "__internal_{}_{}_{}_{}",
32 job_name,
33 fragment_id,
34 table_type.to_lowercase(),
35 table_id
36 )
37}
38
39pub fn is_backfill_table(table_name: &str) -> bool {
40 let parts: Vec<&str> = table_name.split('_').collect();
41 let parts_len = parts.len();
42 parts_len >= 2
43 && matches!(
44 parts[parts_len - 2],
45 "streamscan" | "localityproviderprogress"
46 )
47}
48
49pub fn is_source_backfill_table(table_name: &str) -> bool {
50 let parts: Vec<&str> = table_name.split('_').collect();
51 let parts_len = parts.len();
52 parts_len >= 2 && parts[parts_len - 2] == "sourcebackfill"
53}
54
55pub fn get_dist_key_in_pk_indices<I: Eq + Copy + Debug, O: TryFrom<usize>>(
56 dist_key_indices: &[I],
57 pk_indices: &[I],
58) -> anyhow::Result<Vec<O>> {
59 dist_key_indices
60 .iter()
61 .map(|&di| {
62 pk_indices
63 .iter()
64 .position(|&pi| di == pi)
65 .ok_or_else(|| {
66 anyhow!(
67 "distribution key {:?} must be a subset of primary key {:?}",
68 dist_key_indices,
69 pk_indices
70 )
71 })
72 .map(|idx| match O::try_from(idx) {
73 Ok(idx) => idx,
74 Err(_) => unreachable!("failed to cast {} to {}", idx, type_name::<O>()),
75 })
76 })
77 .try_collect()
78}
79
80pub fn get_dist_key_start_index_in_pk(dist_key_in_pk_indices: &[usize]) -> Option<usize> {
85 let mut sorted_dist_key = dist_key_in_pk_indices.iter().sorted();
86 if let Some(min_idx) = sorted_dist_key.next() {
87 let mut prev_idx = min_idx;
88 for idx in sorted_dist_key {
89 if *idx != prev_idx + 1 {
90 return None;
91 }
92 prev_idx = idx;
93 }
94 Some(*min_idx)
95 } else {
96 None
97 }
98}