risingwave_common/catalog/
internal_table.rs1use std::any::type_name;
16use std::fmt::Debug;
17
18use anyhow::anyhow;
19use itertools::Itertools;
20
21pub const RW_INTERNAL_TABLE_FUNCTION_NAME: &str = "rw_table";
22
23pub fn generate_internal_table_name_with_type(
24 job_name: &str,
25 fragment_id: u32,
26 table_id: u32,
27 table_type: &str,
28) -> String {
29 format!(
30 "__internal_{}_{}_{}_{}",
31 job_name,
32 fragment_id,
33 table_type.to_lowercase(),
34 table_id
35 )
36}
37
38pub fn get_dist_key_in_pk_indices<I: Eq + Copy + Debug, O: TryFrom<usize>>(
39 dist_key_indices: &[I],
40 pk_indices: &[I],
41) -> anyhow::Result<Vec<O>> {
42 dist_key_indices
43 .iter()
44 .map(|&di| {
45 pk_indices
46 .iter()
47 .position(|&pi| di == pi)
48 .ok_or_else(|| {
49 anyhow!(
50 "distribution key {:?} must be a subset of primary key {:?}",
51 dist_key_indices,
52 pk_indices
53 )
54 })
55 .map(|idx| match O::try_from(idx) {
56 Ok(idx) => idx,
57 Err(_) => unreachable!("failed to cast {} to {}", idx, type_name::<O>()),
58 })
59 })
60 .try_collect()
61}
62
63pub fn get_dist_key_start_index_in_pk(dist_key_in_pk_indices: &[usize]) -> Option<usize> {
68 let mut sorted_dist_key = dist_key_in_pk_indices.iter().sorted();
69 if let Some(min_idx) = sorted_dist_key.next() {
70 let mut prev_idx = min_idx;
71 for idx in sorted_dist_key {
72 if *idx != prev_idx + 1 {
73 return None;
74 }
75 prev_idx = idx;
76 }
77 Some(*min_idx)
78 } else {
79 None
80 }
81}