risingwave_common/catalog/
internal_table.rs1use std::any::type_name;
16use std::fmt::Debug;
17
18use anyhow::anyhow;
19use itertools::Itertools;
20use risingwave_pb::id::{FragmentId, TableId};
21
22pub const RW_INTERNAL_TABLE_FUNCTION_NAME: &str = "rw_table";
23
24pub fn generate_internal_table_name_with_type(
25 job_name: &str,
26 fragment_id: FragmentId,
27 table_id: TableId,
28 table_type: &str,
29) -> String {
30 format!(
31 "__internal_{}_{}_{}_{}",
32 job_name,
33 fragment_id,
34 table_type.to_lowercase(),
35 table_id
36 )
37}
38
39pub fn is_backfill_table(table_name: &str) -> bool {
40 let parts: Vec<&str> = table_name.split('_').collect();
41 let parts_len = parts.len();
42 parts_len >= 2 && parts[parts_len - 2] == "streamscan"
43}
44
45pub fn is_source_backfill_table(table_name: &str) -> bool {
46 let parts: Vec<&str> = table_name.split('_').collect();
47 let parts_len = parts.len();
48 parts_len >= 2 && parts[parts_len - 2] == "sourcebackfill"
49}
50
51pub fn get_dist_key_in_pk_indices<I: Eq + Copy + Debug, O: TryFrom<usize>>(
52 dist_key_indices: &[I],
53 pk_indices: &[I],
54) -> anyhow::Result<Vec<O>> {
55 dist_key_indices
56 .iter()
57 .map(|&di| {
58 pk_indices
59 .iter()
60 .position(|&pi| di == pi)
61 .ok_or_else(|| {
62 anyhow!(
63 "distribution key {:?} must be a subset of primary key {:?}",
64 dist_key_indices,
65 pk_indices
66 )
67 })
68 .map(|idx| match O::try_from(idx) {
69 Ok(idx) => idx,
70 Err(_) => unreachable!("failed to cast {} to {}", idx, type_name::<O>()),
71 })
72 })
73 .try_collect()
74}
75
76pub fn get_dist_key_start_index_in_pk(dist_key_in_pk_indices: &[usize]) -> Option<usize> {
81 let mut sorted_dist_key = dist_key_in_pk_indices.iter().sorted();
82 if let Some(min_idx) = sorted_dist_key.next() {
83 let mut prev_idx = min_idx;
84 for idx in sorted_dist_key {
85 if *idx != prev_idx + 1 {
86 return None;
87 }
88 prev_idx = idx;
89 }
90 Some(*min_idx)
91 } else {
92 None
93 }
94}