risingwave_common/catalog/
internal_table.rs1use std::any::type_name;
16use std::fmt::Debug;
17
18use anyhow::anyhow;
19use itertools::Itertools;
20
21pub const RW_INTERNAL_TABLE_FUNCTION_NAME: &str = "rw_table";
22
23pub fn generate_internal_table_name_with_type(
24 job_name: &str,
25 fragment_id: u32,
26 table_id: u32,
27 table_type: &str,
28) -> String {
29 format!(
30 "__internal_{}_{}_{}_{}",
31 job_name,
32 fragment_id,
33 table_type.to_lowercase(),
34 table_id
35 )
36}
37
38pub fn is_backfill_table(table_name: &str) -> bool {
39 let parts: Vec<&str> = table_name.split('_').collect();
40 let parts_len = parts.len();
41 parts_len >= 2 && parts[parts_len - 2] == "streamscan"
42}
43
44pub fn get_dist_key_in_pk_indices<I: Eq + Copy + Debug, O: TryFrom<usize>>(
45 dist_key_indices: &[I],
46 pk_indices: &[I],
47) -> anyhow::Result<Vec<O>> {
48 dist_key_indices
49 .iter()
50 .map(|&di| {
51 pk_indices
52 .iter()
53 .position(|&pi| di == pi)
54 .ok_or_else(|| {
55 anyhow!(
56 "distribution key {:?} must be a subset of primary key {:?}",
57 dist_key_indices,
58 pk_indices
59 )
60 })
61 .map(|idx| match O::try_from(idx) {
62 Ok(idx) => idx,
63 Err(_) => unreachable!("failed to cast {} to {}", idx, type_name::<O>()),
64 })
65 })
66 .try_collect()
67}
68
69pub fn get_dist_key_start_index_in_pk(dist_key_in_pk_indices: &[usize]) -> Option<usize> {
74 let mut sorted_dist_key = dist_key_in_pk_indices.iter().sorted();
75 if let Some(min_idx) = sorted_dist_key.next() {
76 let mut prev_idx = min_idx;
77 for idx in sorted_dist_key {
78 if *idx != prev_idx + 1 {
79 return None;
80 }
81 prev_idx = idx;
82 }
83 Some(*min_idx)
84 } else {
85 None
86 }
87}