risingwave_frontend/catalog/
index_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::Deref;
17use std::sync::Arc;
18
19use educe::Educe;
20use itertools::Itertools;
21use risingwave_common::catalog::{Field, IndexId, Schema};
22use risingwave_common::util::epoch::Epoch;
23use risingwave_common::util::sort_util::ColumnOrder;
24use risingwave_pb::catalog::{PbIndex, PbIndexColumnProperties, PbStreamJobStatus};
25
26use crate::catalog::{DatabaseId, OwnedByUserCatalog, SchemaId, TableCatalog};
27use crate::expr::{Expr, ExprDisplay, ExprImpl, ExprRewriter as _, FunctionCall};
28use crate::user::UserId;
29
30#[derive(Clone, Debug, Educe)]
31#[educe(PartialEq, Eq, Hash)]
32pub struct IndexCatalog {
33    pub id: IndexId,
34
35    pub name: String,
36
37    /// Only `InputRef` and `FuncCall` type index is supported Now.
38    /// The index of `InputRef` is the column index of the primary table.
39    /// The `index_item` size is equal to the index table columns size
40    /// The input args of `FuncCall` is also the column index of the primary table.
41    pub index_item: Vec<ExprImpl>,
42
43    /// The properties of the index columns.
44    /// <https://www.postgresql.org/docs/current/functions-info.html#FUNCTIONS-INFO-INDEX-COLUMN-PROPS>
45    pub index_column_properties: Vec<PbIndexColumnProperties>,
46
47    pub index_table: Arc<TableCatalog>,
48
49    pub primary_table: Arc<TableCatalog>,
50
51    pub primary_to_secondary_mapping: BTreeMap<usize, usize>,
52
53    pub secondary_to_primary_mapping: BTreeMap<usize, usize>,
54
55    /// Map function call from the primary table to the index table.
56    /// Use `HashMap` instead of `BTreeMap`, because `FunctionCall` can't be used as the key for
57    /// `BTreeMap`. BTW, the trait `std::hash::Hash` is not implemented for
58    /// `HashMap<function_call::FunctionCall, usize>`, so we need to ignore it. It will not
59    /// affect the correctness, since it can be derived by `index_item`.
60    #[educe(PartialEq(ignore))]
61    #[educe(Hash(ignore))]
62    pub function_mapping: HashMap<FunctionCall, usize>,
63
64    pub index_columns_len: u32,
65
66    pub created_at_epoch: Option<Epoch>,
67
68    pub initialized_at_epoch: Option<Epoch>,
69
70    pub created_at_cluster_version: Option<String>,
71
72    pub initialized_at_cluster_version: Option<String>,
73}
74
75impl IndexCatalog {
76    pub fn build_from(
77        index_prost: &PbIndex,
78        index_table: &TableCatalog,
79        primary_table: &TableCatalog,
80    ) -> Self {
81        let index_item: Vec<ExprImpl> = index_prost
82            .index_item
83            .iter()
84            .map(|expr| ExprImpl::from_expr_proto(expr).unwrap())
85            .map(|expr| item_rewriter::CompositeCastEliminator.rewrite_expr(expr))
86            .collect();
87
88        let primary_to_secondary_mapping: BTreeMap<usize, usize> = index_item
89            .iter()
90            .enumerate()
91            .filter_map(|(i, expr)| match expr {
92                ExprImpl::InputRef(input_ref) => Some((input_ref.index, i)),
93                ExprImpl::FunctionCall(_) => None,
94                _ => unreachable!(),
95            })
96            .collect();
97
98        let secondary_to_primary_mapping = BTreeMap::from_iter(
99            primary_to_secondary_mapping
100                .clone()
101                .into_iter()
102                .map(|(x, y)| (y, x)),
103        );
104
105        let function_mapping: HashMap<FunctionCall, usize> = index_item
106            .iter()
107            .enumerate()
108            .filter_map(|(i, expr)| match expr {
109                ExprImpl::InputRef(_) => None,
110                ExprImpl::FunctionCall(func) => Some((func.deref().clone(), i)),
111                _ => unreachable!(),
112            })
113            .collect();
114
115        IndexCatalog {
116            id: index_prost.id.into(),
117            name: index_prost.name.clone(),
118            index_item,
119            index_column_properties: index_prost.index_column_properties.clone(),
120            index_table: Arc::new(index_table.clone()),
121            primary_table: Arc::new(primary_table.clone()),
122            primary_to_secondary_mapping,
123            secondary_to_primary_mapping,
124            function_mapping,
125            index_columns_len: index_prost.index_columns_len,
126            created_at_epoch: index_prost.created_at_epoch.map(Epoch::from),
127            initialized_at_epoch: index_prost.initialized_at_epoch.map(Epoch::from),
128            created_at_cluster_version: index_prost.created_at_cluster_version.clone(),
129            initialized_at_cluster_version: index_prost.initialized_at_cluster_version.clone(),
130        }
131    }
132
133    pub fn primary_table_pk_ref_to_index_table(&self) -> Vec<ColumnOrder> {
134        let mapping = self.primary_to_secondary_mapping();
135
136        self.primary_table
137            .pk
138            .iter()
139            .map(|x| ColumnOrder::new(*mapping.get(&x.column_index).unwrap(), x.order_type))
140            .collect_vec()
141    }
142
143    pub fn primary_table_distribute_key_ref_to_index_table(&self) -> Vec<usize> {
144        let mapping = self.primary_to_secondary_mapping();
145
146        self.primary_table
147            .distribution_key
148            .iter()
149            .map(|x| *mapping.get(x).unwrap())
150            .collect_vec()
151    }
152
153    pub fn full_covering(&self) -> bool {
154        self.index_table.columns.len() == self.primary_table.columns.len()
155    }
156
157    /// A mapping maps the column index of the secondary index to the column index of the primary
158    /// table.
159    pub fn secondary_to_primary_mapping(&self) -> &BTreeMap<usize, usize> {
160        &self.secondary_to_primary_mapping
161    }
162
163    /// A mapping maps the column index of the primary table to the column index of the secondary
164    /// index.
165    pub fn primary_to_secondary_mapping(&self) -> &BTreeMap<usize, usize> {
166        &self.primary_to_secondary_mapping
167    }
168
169    pub fn function_mapping(&self) -> &HashMap<FunctionCall, usize> {
170        &self.function_mapping
171    }
172
173    pub fn to_prost(&self, schema_id: SchemaId, database_id: DatabaseId) -> PbIndex {
174        PbIndex {
175            id: self.id.index_id,
176            schema_id,
177            database_id,
178            name: self.name.clone(),
179            owner: self.index_table.owner,
180            index_table_id: self.index_table.id.table_id,
181            primary_table_id: self.primary_table.id.table_id,
182            index_item: self
183                .index_item
184                .iter()
185                .map(|expr| expr.to_expr_proto())
186                .collect_vec(),
187            index_column_properties: self.index_column_properties.clone(),
188            index_columns_len: self.index_columns_len,
189            initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
190            created_at_epoch: self.created_at_epoch.map(|e| e.0),
191            stream_job_status: PbStreamJobStatus::Creating.into(),
192            initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
193            created_at_cluster_version: self.created_at_cluster_version.clone(),
194        }
195    }
196
197    /// Get the column properties of the index column.
198    pub fn get_column_properties(&self, column_idx: usize) -> Option<PbIndexColumnProperties> {
199        self.index_column_properties.get(column_idx).cloned()
200    }
201
202    pub fn get_column_def(&self, column_idx: usize) -> Option<String> {
203        if let Some(col) = self.index_table.columns.get(column_idx) {
204            if col.is_hidden {
205                return None;
206            }
207        } else {
208            return None;
209        }
210        let expr_display = ExprDisplay {
211            expr: &self.index_item[column_idx],
212            input_schema: &Schema::new(
213                self.primary_table
214                    .columns
215                    .iter()
216                    .map(|col| Field::from(&col.column_desc))
217                    .collect_vec(),
218            ),
219        };
220
221        // TODO(Kexiang): Currently, extra info like ":Int32" introduced by `ExprDisplay` is kept for simplity.
222        // We'd better remove it in the future.
223        Some(expr_display.to_string())
224    }
225
226    pub fn display(&self) -> IndexDisplay {
227        let index_table = self.index_table.clone();
228        let index_columns_with_ordering = index_table
229            .pk
230            .iter()
231            .filter(|x| !index_table.columns[x.column_index].is_hidden)
232            .map(|x| {
233                let index_column_name = index_table.columns[x.column_index].name().to_owned();
234                format!("{} {}", index_column_name, x.order_type)
235            })
236            .collect_vec();
237
238        let pk_column_index_set = index_table
239            .pk
240            .iter()
241            .map(|x| x.column_index)
242            .collect::<HashSet<_>>();
243
244        let include_columns = index_table
245            .columns
246            .iter()
247            .enumerate()
248            .filter(|(i, _)| !pk_column_index_set.contains(i))
249            .filter(|(_, x)| !x.is_hidden)
250            .map(|(_, x)| x.name().to_owned())
251            .collect_vec();
252
253        let distributed_by_columns = index_table
254            .distribution_key
255            .iter()
256            .map(|&x| index_table.columns[x].name().to_owned())
257            .collect_vec();
258
259        IndexDisplay {
260            index_columns_with_ordering,
261            include_columns,
262            distributed_by_columns,
263        }
264    }
265}
266
267pub struct IndexDisplay {
268    pub index_columns_with_ordering: Vec<String>,
269    pub include_columns: Vec<String>,
270    pub distributed_by_columns: Vec<String>,
271}
272
273impl OwnedByUserCatalog for IndexCatalog {
274    fn owner(&self) -> UserId {
275        self.index_table.owner
276    }
277}
278
279mod item_rewriter {
280    use risingwave_pb::expr::expr_node;
281
282    use crate::expr::{Expr, ExprImpl, ExprRewriter, FunctionCall};
283
284    /// Rewrite the expression of index item to eliminate `CompositeCast`, if any. This is needed
285    /// if the type of a column was changed and there's functional index on it.
286    ///
287    /// # Example
288    ///
289    /// Imagine there's a table created with `CREATE TABLE t (v struct<a int, b int>)`.
290    /// Then we create an index on it with `CREATE INDEX idx ON t ((v).a)`, which will create an
291    /// index item `Field(InputRef(0), 0)`.
292    ///
293    /// If we alter the column with `ALTER TABLE t ALTER COLUMN v TYPE struct<x varchar, a int>`,
294    /// the meta service will wrap the `InputRef(0)` with a `CompositeCast` to maintain the correct
295    /// return type. The index item will now become `Field(CompositeCast(InputRef(0)), 0)`.
296    ///
297    /// `CompositeCast` is for internal use only, and cannot be constructed or executed. To allow
298    /// this functional index to work and be matched with user queries, we need to eliminate it
299    /// here. By comparing the input and output types of `CompositeCast` and matching the field id,
300    /// we can find the real `Field` index and rewrite it to `Field(InputRef(0), 1)`.
301    ///
302    /// Note that if the field is dropped, we will leave the index item as is. This makes the index
303    /// item invalid, and it will never be matched and used.
304    pub struct CompositeCastEliminator;
305
306    impl ExprRewriter for CompositeCastEliminator {
307        fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl {
308            let (func_type, inputs, ret) = func_call.decompose();
309
310            // Flatten consecutive `CompositeCast`.
311            if func_type == expr_node::Type::CompositeCast {
312                let child = inputs[0].clone();
313
314                if let Some(child) = child.as_function_call()
315                    && child.func_type() == expr_node::Type::CompositeCast
316                {
317                    let new_child = child.inputs()[0].clone();
318
319                    // If the type already matches, no need to wrap again.
320                    // Recursively eliminate more composite cast by calling rewrite again.
321                    if new_child.return_type() == ret {
322                        return self.rewrite_expr(new_child);
323                    } else {
324                        let new_composite_cast =
325                            FunctionCall::new_unchecked(func_type, vec![new_child], ret);
326                        return self.rewrite_function_call(new_composite_cast);
327                    }
328                }
329            }
330            // Rewrite `Field(CompositeCast(x), y)` to `Field(x, y')`.
331            // TODO: also support rewriting `ArrayAccess` and `MapAccess`.
332            else if func_type == expr_node::Type::Field {
333                let child = inputs[0].clone();
334
335                if let Some(child) = child.as_function_call()
336                    && child.func_type() == expr_node::Type::CompositeCast
337                {
338                    let index = (inputs[1].clone().into_literal().unwrap())
339                        .get_data()
340                        .clone()
341                        .unwrap()
342                        .into_int32();
343
344                    let struct_type = child.return_type().into_struct();
345                    let field_id = struct_type
346                        .id_at(index as usize)
347                        .expect("ids should be set");
348
349                    // Unwrap the composite cast.
350                    let new_child = child.inputs()[0].clone();
351                    let new_struct_type = new_child.return_type().into_struct();
352
353                    let Some(new_index) = new_struct_type
354                        .ids()
355                        .expect("ids should be set")
356                        .position(|x| x == field_id)
357                    else {
358                        // Previously we have index on this field, but now it's dropped.
359                        // As a result, this entire index item becomes invalid.
360                        // Simply leave it as is. Users cannot construct a `CompositeCast` (which is
361                        // not user-facing), thus this index item will never be matched and used.
362                        return FunctionCall::new_unchecked(func_type, inputs, ret).into();
363                    };
364                    let new_index = ExprImpl::literal_int(new_index as i32);
365
366                    let new_inputs = vec![new_child, new_index];
367                    let new_field_call = FunctionCall::new_unchecked(func_type, new_inputs, ret);
368
369                    // Recursively eliminate more composite cast by calling rewrite again.
370                    return self.rewrite_function_call(new_field_call);
371                }
372            }
373
374            let inputs = inputs
375                .into_iter()
376                .map(|expr| self.rewrite_expr(expr))
377                .collect();
378            FunctionCall::new_unchecked(func_type, inputs, ret).into()
379        }
380    }
381}