risingwave_common/catalog/
internal_table.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::type_name;
use std::fmt::Debug;
use std::sync::LazyLock;

use anyhow::anyhow;
use itertools::Itertools;
use regex::Regex;

pub const RW_INTERNAL_TABLE_FUNCTION_NAME: &str = "rw_table";

pub fn generate_internal_table_name_with_type(
    job_name: &str,
    fragment_id: u32,
    table_id: u32,
    table_type: &str,
) -> String {
    format!(
        "__internal_{}_{}_{}_{}",
        job_name,
        fragment_id,
        table_type.to_lowercase(),
        table_id
    )
}

pub fn valid_table_name(table_name: &str) -> bool {
    static INTERNAL_TABLE_NAME: LazyLock<Regex> =
        LazyLock::new(|| Regex::new(r"__internal_.*_\d+").unwrap());
    !INTERNAL_TABLE_NAME.is_match(table_name)
}

pub fn get_dist_key_in_pk_indices<I: Eq + Copy + Debug, O: TryFrom<usize>>(
    dist_key_indices: &[I],
    pk_indices: &[I],
) -> anyhow::Result<Vec<O>> {
    dist_key_indices
        .iter()
        .map(|&di| {
            pk_indices
                .iter()
                .position(|&pi| di == pi)
                .ok_or_else(|| {
                    anyhow!(
                        "distribution key {:?} must be a subset of primary key {:?}",
                        dist_key_indices,
                        pk_indices
                    )
                })
                .map(|idx| match O::try_from(idx) {
                    Ok(idx) => idx,
                    Err(_) => unreachable!("failed to cast {} to {}", idx, type_name::<O>()),
                })
        })
        .try_collect()
}

/// Get distribution key start index in pk, and return None if `dist_key_in_pk_indices` is not empty
/// or continuous.
/// Note that `dist_key_in_pk_indices` may be shuffled, the start index should be the
/// minimum value.
pub fn get_dist_key_start_index_in_pk(dist_key_in_pk_indices: &[usize]) -> Option<usize> {
    let mut sorted_dist_key = dist_key_in_pk_indices.iter().sorted();
    if let Some(min_idx) = sorted_dist_key.next() {
        let mut prev_idx = min_idx;
        for idx in sorted_dist_key {
            if *idx != prev_idx + 1 {
                return None;
            }
            prev_idx = idx;
        }
        Some(*min_idx)
    } else {
        None
    }
}