risingwave_common/catalog/
internal_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::type_name;
16use std::fmt::Debug;
17
18use anyhow::anyhow;
19use itertools::Itertools;
20
21pub const RW_INTERNAL_TABLE_FUNCTION_NAME: &str = "rw_table";
22
23pub fn generate_internal_table_name_with_type(
24    job_name: &str,
25    fragment_id: u32,
26    table_id: u32,
27    table_type: &str,
28) -> String {
29    format!(
30        "__internal_{}_{}_{}_{}",
31        job_name,
32        fragment_id,
33        table_type.to_lowercase(),
34        table_id
35    )
36}
37
38pub fn is_backfill_table(table_name: &str) -> bool {
39    let parts: Vec<&str> = table_name.split('_').collect();
40    let parts_len = parts.len();
41    parts_len >= 2 && parts[parts_len - 2] == "streamscan"
42}
43
44pub fn get_dist_key_in_pk_indices<I: Eq + Copy + Debug, O: TryFrom<usize>>(
45    dist_key_indices: &[I],
46    pk_indices: &[I],
47) -> anyhow::Result<Vec<O>> {
48    dist_key_indices
49        .iter()
50        .map(|&di| {
51            pk_indices
52                .iter()
53                .position(|&pi| di == pi)
54                .ok_or_else(|| {
55                    anyhow!(
56                        "distribution key {:?} must be a subset of primary key {:?}",
57                        dist_key_indices,
58                        pk_indices
59                    )
60                })
61                .map(|idx| match O::try_from(idx) {
62                    Ok(idx) => idx,
63                    Err(_) => unreachable!("failed to cast {} to {}", idx, type_name::<O>()),
64                })
65        })
66        .try_collect()
67}
68
69/// Get distribution key start index in pk, and return None if `dist_key_in_pk_indices` is not empty
70/// or continuous.
71/// Note that `dist_key_in_pk_indices` may be shuffled, the start index should be the
72/// minimum value.
73pub fn get_dist_key_start_index_in_pk(dist_key_in_pk_indices: &[usize]) -> Option<usize> {
74    let mut sorted_dist_key = dist_key_in_pk_indices.iter().sorted();
75    if let Some(min_idx) = sorted_dist_key.next() {
76        let mut prev_idx = min_idx;
77        for idx in sorted_dist_key {
78            if *idx != prev_idx + 1 {
79                return None;
80            }
81            prev_idx = idx;
82        }
83        Some(*min_idx)
84    } else {
85        None
86    }
87}