risingwave_common/catalog/
internal_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::type_name;
16use std::fmt::Debug;
17
18use anyhow::anyhow;
19use itertools::Itertools;
20use risingwave_pb::id::{FragmentId, TableId};
21
22pub const RW_INTERNAL_TABLE_FUNCTION_NAME: &str = "rw_table";
23
24pub fn generate_internal_table_name_with_type(
25    job_name: &str,
26    fragment_id: FragmentId,
27    table_id: TableId,
28    table_type: &str,
29) -> String {
30    format!(
31        "__internal_{}_{}_{}_{}",
32        job_name,
33        fragment_id,
34        table_type.to_lowercase(),
35        table_id
36    )
37}
38
39pub fn is_backfill_table(table_name: &str) -> bool {
40    let parts: Vec<&str> = table_name.split('_').collect();
41    let parts_len = parts.len();
42    parts_len >= 2 && parts[parts_len - 2] == "streamscan"
43}
44
45pub fn is_source_backfill_table(table_name: &str) -> bool {
46    let parts: Vec<&str> = table_name.split('_').collect();
47    let parts_len = parts.len();
48    parts_len >= 2 && parts[parts_len - 2] == "sourcebackfill"
49}
50
51pub fn get_dist_key_in_pk_indices<I: Eq + Copy + Debug, O: TryFrom<usize>>(
52    dist_key_indices: &[I],
53    pk_indices: &[I],
54) -> anyhow::Result<Vec<O>> {
55    dist_key_indices
56        .iter()
57        .map(|&di| {
58            pk_indices
59                .iter()
60                .position(|&pi| di == pi)
61                .ok_or_else(|| {
62                    anyhow!(
63                        "distribution key {:?} must be a subset of primary key {:?}",
64                        dist_key_indices,
65                        pk_indices
66                    )
67                })
68                .map(|idx| match O::try_from(idx) {
69                    Ok(idx) => idx,
70                    Err(_) => unreachable!("failed to cast {} to {}", idx, type_name::<O>()),
71                })
72        })
73        .try_collect()
74}
75
76/// Get distribution key start index in pk, and return None if `dist_key_in_pk_indices` is not empty
77/// or continuous.
78/// Note that `dist_key_in_pk_indices` may be shuffled, the start index should be the
79/// minimum value.
80pub fn get_dist_key_start_index_in_pk(dist_key_in_pk_indices: &[usize]) -> Option<usize> {
81    let mut sorted_dist_key = dist_key_in_pk_indices.iter().sorted();
82    if let Some(min_idx) = sorted_dist_key.next() {
83        let mut prev_idx = min_idx;
84        for idx in sorted_dist_key {
85            if *idx != prev_idx + 1 {
86                return None;
87            }
88            prev_idx = idx;
89        }
90        Some(*min_idx)
91    } else {
92        None
93    }
94}