risingwave_common/catalog/
internal_table.rs1use std::any::type_name;
16use std::fmt::Debug;
17
18use anyhow::anyhow;
19use itertools::Itertools;
20
21pub const RW_INTERNAL_TABLE_FUNCTION_NAME: &str = "rw_table";
22
23pub fn generate_internal_table_name_with_type(
24 job_name: &str,
25 fragment_id: u32,
26 table_id: u32,
27 table_type: &str,
28) -> String {
29 format!(
30 "__internal_{}_{}_{}_{}",
31 job_name,
32 fragment_id,
33 table_type.to_lowercase(),
34 table_id
35 )
36}
37
38pub fn is_backfill_table(table_name: &str) -> bool {
39 let parts: Vec<&str> = table_name.split('_').collect();
40 let parts_len = parts.len();
41 parts_len >= 2 && parts[parts_len - 2] == "streamscan"
42}
43
44pub fn is_source_backfill_table(table_name: &str) -> bool {
45 let parts: Vec<&str> = table_name.split('_').collect();
46 let parts_len = parts.len();
47 parts_len >= 2 && parts[parts_len - 2] == "sourcebackfill"
48}
49
50pub fn get_dist_key_in_pk_indices<I: Eq + Copy + Debug, O: TryFrom<usize>>(
51 dist_key_indices: &[I],
52 pk_indices: &[I],
53) -> anyhow::Result<Vec<O>> {
54 dist_key_indices
55 .iter()
56 .map(|&di| {
57 pk_indices
58 .iter()
59 .position(|&pi| di == pi)
60 .ok_or_else(|| {
61 anyhow!(
62 "distribution key {:?} must be a subset of primary key {:?}",
63 dist_key_indices,
64 pk_indices
65 )
66 })
67 .map(|idx| match O::try_from(idx) {
68 Ok(idx) => idx,
69 Err(_) => unreachable!("failed to cast {} to {}", idx, type_name::<O>()),
70 })
71 })
72 .try_collect()
73}
74
75pub fn get_dist_key_start_index_in_pk(dist_key_in_pk_indices: &[usize]) -> Option<usize> {
80 let mut sorted_dist_key = dist_key_in_pk_indices.iter().sorted();
81 if let Some(min_idx) = sorted_dist_key.next() {
82 let mut prev_idx = min_idx;
83 for idx in sorted_dist_key {
84 if *idx != prev_idx + 1 {
85 return None;
86 }
87 prev_idx = idx;
88 }
89 Some(*min_idx)
90 } else {
91 None
92 }
93}