risingwave_frontend/catalog/
index_catalog.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::Deref;
17use std::sync::Arc;
18
19use educe::Educe;
20use itertools::Itertools;
21use risingwave_common::catalog::{ColumnDesc, CreateType, Field, IndexId, Schema};
22use risingwave_common::session_config::SessionConfig;
23use risingwave_common::util::epoch::Epoch;
24use risingwave_common::util::sort_util::ColumnOrder;
25use risingwave_pb::catalog::{
26    PbCreateType, PbIndex, PbIndexColumnProperties, PbVectorIndexInfo, vector_index_info,
27};
28
29use crate::catalog::table_catalog::TableType;
30use crate::catalog::{OwnedByUserCatalog, TableCatalog};
31use crate::expr::{ExprDisplay, ExprImpl, ExprRewriter as _, FunctionCall};
32use crate::user::UserId;
33
34#[derive(Clone, Debug, Educe)]
35#[educe(PartialEq, Eq, Hash)]
36pub struct TableIndex {
37    pub name: String,
38    /// The properties of the index columns.
39    /// <https://www.postgresql.org/docs/current/functions-info.html#FUNCTIONS-INFO-INDEX-COLUMN-PROPS>
40    pub index_column_properties: Vec<PbIndexColumnProperties>,
41
42    pub index_table: Arc<TableCatalog>,
43
44    pub primary_table: Arc<TableCatalog>,
45
46    pub primary_to_secondary_mapping: BTreeMap<usize, usize>,
47
48    pub secondary_to_primary_mapping: BTreeMap<usize, usize>,
49
50    /// Map function call from the primary table to the index table.
51    /// Use `HashMap` instead of `BTreeMap`, because `FunctionCall` can't be used as the key for
52    /// `BTreeMap`. BTW, the trait `std::hash::Hash` is not implemented for
53    /// `HashMap<function_call::FunctionCall, usize>`, so we need to ignore it. It will not
54    /// affect the correctness, since it can be derived by `index_item`.
55    #[educe(PartialEq(ignore))]
56    #[educe(Hash(ignore))]
57    pub function_mapping: HashMap<FunctionCall, usize>,
58
59    pub index_columns_len: u32,
60}
61
62#[derive(Clone, Debug, PartialEq, Eq, Educe)]
63#[educe(Hash)]
64pub struct VectorIndex {
65    pub index_table: Arc<TableCatalog>,
66    pub vector_expr: ExprImpl,
67    #[educe(Hash(ignore))]
68    pub primary_to_included_info_column_mapping: HashMap<usize, usize>,
69    pub primary_key_idx_in_info_columns: Vec<usize>,
70    pub included_info_columns: Vec<usize>,
71    pub vector_index_info: PbVectorIndexInfo,
72}
73
74impl VectorIndex {
75    pub fn info_column_desc(&self) -> Vec<ColumnDesc> {
76        self.index_table.columns[1..=self.included_info_columns.len()]
77            .iter()
78            .map(|col| col.column_desc.clone())
79            .collect()
80    }
81
82    pub fn resolve_hnsw_ef_search(&self, config: &SessionConfig) -> Option<usize> {
83        match self.vector_index_info.config.as_ref().unwrap() {
84            vector_index_info::Config::Flat(_) => None,
85            vector_index_info::Config::HnswFlat(_) => Some(config.batch_hnsw_ef_search()),
86        }
87    }
88}
89
90#[derive(Clone, Debug, Hash, PartialEq, Eq)]
91pub enum IndexType {
92    Table(Arc<TableIndex>),
93    Vector(Arc<VectorIndex>),
94}
95
96#[derive(Clone, Debug, Hash, PartialEq, Eq)]
97pub struct IndexCatalog {
98    pub id: IndexId,
99
100    pub name: String,
101
102    /// Only `InputRef` and `FuncCall` type index is supported Now.
103    /// The index of `InputRef` is the column index of the primary table.
104    /// The `index_item` size is equal to the index table columns size
105    /// The input args of `FuncCall` is also the column index of the primary table.
106    pub index_item: Vec<ExprImpl>,
107
108    pub index_type: IndexType,
109
110    pub primary_table: Arc<TableCatalog>,
111
112    /// Indicate whether to create index in background or foreground.
113    pub create_type: CreateType,
114
115    pub created_at_epoch: Option<Epoch>,
116
117    pub initialized_at_epoch: Option<Epoch>,
118
119    pub created_at_cluster_version: Option<String>,
120
121    pub initialized_at_cluster_version: Option<String>,
122}
123
124impl IndexCatalog {
125    pub fn build_from(
126        index_prost: &PbIndex,
127        index_table: &Arc<TableCatalog>,
128        primary_table: &Arc<TableCatalog>,
129    ) -> Self {
130        let index_item: Vec<ExprImpl> = index_prost
131            .index_item
132            .iter()
133            .map(|expr| ExprImpl::from_expr_proto(expr).unwrap())
134            .map(|expr| item_rewriter::CompositeCastEliminator.rewrite_expr(expr))
135            .collect();
136
137        let create_type = index_prost
138            .get_create_type()
139            .unwrap_or(PbCreateType::Foreground);
140
141        let index_type = match index_table.table_type {
142            TableType::Index => {
143                let primary_to_secondary_mapping: BTreeMap<usize, usize> = index_item
144                    .iter()
145                    .enumerate()
146                    .filter_map(|(i, expr)| match expr {
147                        ExprImpl::InputRef(input_ref) => Some((input_ref.index, i)),
148                        ExprImpl::FunctionCall(_) => None,
149                        _ => unreachable!(),
150                    })
151                    .collect();
152
153                let secondary_to_primary_mapping = BTreeMap::from_iter(
154                    primary_to_secondary_mapping
155                        .clone()
156                        .into_iter()
157                        .map(|(x, y)| (y, x)),
158                );
159
160                let function_mapping: HashMap<FunctionCall, usize> = index_item
161                    .iter()
162                    .enumerate()
163                    .filter_map(|(i, expr)| match expr {
164                        ExprImpl::InputRef(_) => None,
165                        ExprImpl::FunctionCall(func) => Some((func.deref().clone(), i)),
166                        _ => unreachable!(),
167                    })
168                    .collect();
169                IndexType::Table(Arc::new(TableIndex {
170                    name: index_prost.name.clone(),
171                    index_column_properties: index_prost.index_column_properties.clone(),
172                    index_columns_len: index_prost.index_columns_len,
173                    index_table: index_table.clone(),
174                    primary_table: primary_table.clone(),
175                    primary_to_secondary_mapping,
176                    secondary_to_primary_mapping,
177                    function_mapping,
178                }))
179            }
180            TableType::VectorIndex => {
181                assert_eq!(index_prost.index_columns_len, 1);
182                let included_info_columns = index_item[1..].iter().map(|item| {
183                    let ExprImpl::InputRef(input) = item else {
184                        panic!("vector index included columns must be from direct input column, but got: {:?}", item);
185                    };
186                    input.index
187                }).collect_vec();
188                let primary_to_included_info_column_mapping: HashMap<_, _> = included_info_columns
189                    .iter()
190                    .enumerate()
191                    .map(|(included_info_column_idx, primary_column_idx)| {
192                        (*primary_column_idx, included_info_column_idx)
193                    })
194                    .collect();
195                let primary_key_idx_in_info_columns = primary_table
196                    .pk()
197                    .iter()
198                    .map(|order| primary_to_included_info_column_mapping[&order.column_index])
199                    .collect();
200                IndexType::Vector(Arc::new(VectorIndex {
201                    index_table: index_table.clone(),
202                    vector_expr: index_item[0].clone(),
203                    primary_to_included_info_column_mapping,
204                    primary_key_idx_in_info_columns,
205                    included_info_columns,
206                    vector_index_info: index_table
207                        .vector_index_info
208                        .expect("should exist for vector index"),
209                }))
210            }
211            TableType::Table | TableType::MaterializedView | TableType::Internal => {
212                unreachable!()
213            }
214        };
215
216        IndexCatalog {
217            id: index_prost.id,
218            name: index_prost.name.clone(),
219            index_item,
220            index_type,
221            primary_table: primary_table.clone(),
222            created_at_epoch: index_prost.created_at_epoch.map(Epoch::from),
223            initialized_at_epoch: index_prost.initialized_at_epoch.map(Epoch::from),
224            created_at_cluster_version: index_prost.created_at_cluster_version.clone(),
225            initialized_at_cluster_version: index_prost.initialized_at_cluster_version.clone(),
226            create_type: CreateType::from_proto(create_type),
227        }
228    }
229}
230
231impl TableIndex {
232    pub fn primary_table_pk_ref_to_index_table(&self) -> Vec<ColumnOrder> {
233        let mapping = self.primary_to_secondary_mapping();
234
235        self.primary_table
236            .pk
237            .iter()
238            .map(|x| ColumnOrder::new(*mapping.get(&x.column_index).unwrap(), x.order_type))
239            .collect_vec()
240    }
241
242    pub fn primary_table_distribute_key_ref_to_index_table(&self) -> Vec<usize> {
243        let mapping = self.primary_to_secondary_mapping();
244
245        self.primary_table
246            .distribution_key
247            .iter()
248            .map(|x| *mapping.get(x).unwrap())
249            .collect_vec()
250    }
251
252    pub fn full_covering(&self) -> bool {
253        self.index_table.columns.len() == self.primary_table.columns.len()
254    }
255
256    /// A mapping maps the column index of the secondary index to the column index of the primary
257    /// table.
258    pub fn secondary_to_primary_mapping(&self) -> &BTreeMap<usize, usize> {
259        &self.secondary_to_primary_mapping
260    }
261
262    /// A mapping maps the column index of the primary table to the column index of the secondary
263    /// index.
264    pub fn primary_to_secondary_mapping(&self) -> &BTreeMap<usize, usize> {
265        &self.primary_to_secondary_mapping
266    }
267
268    pub fn function_mapping(&self) -> &HashMap<FunctionCall, usize> {
269        &self.function_mapping
270    }
271}
272
273impl IndexCatalog {
274    pub fn index_table(&self) -> &Arc<TableCatalog> {
275        match &self.index_type {
276            IndexType::Table(index) => &index.index_table,
277            IndexType::Vector(index) => &index.index_table,
278        }
279    }
280
281    /// Get the column properties of the index column.
282    pub fn get_column_properties(&self, column_idx: usize) -> Option<PbIndexColumnProperties> {
283        match &self.index_type {
284            IndexType::Table(index) => index.index_column_properties.get(column_idx).cloned(),
285            IndexType::Vector { .. } => {
286                if column_idx == 0 {
287                    // return with the default value defined in [https://www.postgresql.org/docs/current/sql-createindex.html]
288                    Some(PbIndexColumnProperties {
289                        is_desc: false,
290                        nulls_first: false,
291                    })
292                } else {
293                    None
294                }
295            }
296        }
297    }
298
299    pub fn get_column_def(&self, column_idx: usize) -> Option<String> {
300        if let Some(col) = self.index_table().columns.get(column_idx) {
301            if col.is_hidden {
302                return None;
303            }
304        } else {
305            return None;
306        }
307        let expr_display = ExprDisplay {
308            expr: &self.index_item[column_idx],
309            input_schema: &Schema::new(
310                self.primary_table
311                    .columns
312                    .iter()
313                    .map(|col| Field::from(&col.column_desc))
314                    .collect_vec(),
315            ),
316        };
317
318        // TODO(Kexiang): Currently, extra info like ":Int32" introduced by `ExprDisplay` is kept for simplity.
319        // We'd better remove it in the future.
320        Some(expr_display.to_string())
321    }
322
323    pub fn display(&self) -> IndexDisplay {
324        let index_table = self.index_table().clone();
325        let index_columns_with_ordering = index_table
326            .pk
327            .iter()
328            .filter(|x| !index_table.columns[x.column_index].is_hidden)
329            .map(|x| {
330                let index_column_name = index_table.columns[x.column_index].name().to_owned();
331                format!("{} {}", index_column_name, x.order_type)
332            })
333            .collect_vec();
334
335        let pk_column_index_set = index_table
336            .pk
337            .iter()
338            .map(|x| x.column_index)
339            .collect::<HashSet<_>>();
340
341        let include_columns = index_table
342            .columns
343            .iter()
344            .enumerate()
345            .filter(|(i, _)| !pk_column_index_set.contains(i))
346            .filter(|(_, x)| !x.is_hidden)
347            .map(|(_, x)| x.name().to_owned())
348            .collect_vec();
349
350        let distributed_by_columns = index_table
351            .distribution_key
352            .iter()
353            .map(|&x| index_table.columns[x].name().to_owned())
354            .collect_vec();
355
356        IndexDisplay {
357            index_columns_with_ordering,
358            include_columns,
359            distributed_by_columns,
360        }
361    }
362
363    pub fn is_created(&self) -> bool {
364        self.index_table().is_created()
365    }
366}
367
368pub struct IndexDisplay {
369    pub index_columns_with_ordering: Vec<String>,
370    pub include_columns: Vec<String>,
371    pub distributed_by_columns: Vec<String>,
372}
373
374impl OwnedByUserCatalog for IndexCatalog {
375    fn owner(&self) -> UserId {
376        self.index_table().owner
377    }
378}
379
380mod item_rewriter {
381    use risingwave_pb::expr::expr_node;
382
383    use crate::expr::{Expr, ExprImpl, ExprRewriter, FunctionCall};
384
385    /// Rewrite the expression of index item to eliminate `CompositeCast`, if any. This is needed
386    /// if the type of a column was changed and there's functional index on it.
387    ///
388    /// # Example
389    ///
390    /// Imagine there's a table created with `CREATE TABLE t (v struct<a int, b int>)`.
391    /// Then we create an index on it with `CREATE INDEX idx ON t ((v).a)`, which will create an
392    /// index item `Field(InputRef(0), 0)`.
393    ///
394    /// If we alter the column with `ALTER TABLE t ALTER COLUMN v TYPE struct<x varchar, a int>`,
395    /// the meta service will wrap the `InputRef(0)` with a `CompositeCast` to maintain the correct
396    /// return type. The index item will now become `Field(CompositeCast(InputRef(0)), 0)`.
397    ///
398    /// `CompositeCast` is for internal use only, and cannot be constructed or executed. To allow
399    /// this functional index to work and be matched with user queries, we need to eliminate it
400    /// here. By comparing the input and output types of `CompositeCast` and matching the field id,
401    /// we can find the real `Field` index and rewrite it to `Field(InputRef(0), 1)`.
402    ///
403    /// Note that if the field is dropped, we will leave the index item as is. This makes the index
404    /// item invalid, and it will never be matched and used.
405    pub struct CompositeCastEliminator;
406
407    impl ExprRewriter for CompositeCastEliminator {
408        fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl {
409            let (func_type, inputs, ret) = func_call.decompose();
410
411            // Flatten consecutive `CompositeCast`.
412            if func_type == expr_node::Type::CompositeCast {
413                let child = inputs[0].clone();
414
415                if let Some(child) = child.as_function_call()
416                    && child.func_type() == expr_node::Type::CompositeCast
417                {
418                    let new_child = child.inputs()[0].clone();
419
420                    // If the type already matches, no need to wrap again.
421                    // Recursively eliminate more composite cast by calling rewrite again.
422                    if new_child.return_type() == ret {
423                        return self.rewrite_expr(new_child);
424                    } else {
425                        let new_composite_cast =
426                            FunctionCall::new_unchecked(func_type, vec![new_child], ret);
427                        return self.rewrite_function_call(new_composite_cast);
428                    }
429                }
430            }
431            // Rewrite `Field(CompositeCast(x), y)` to `Field(x, y')`.
432            // TODO: also support rewriting `ArrayAccess` and `MapAccess`.
433            else if func_type == expr_node::Type::Field {
434                let child = inputs[0].clone();
435
436                if let Some(child) = child.as_function_call()
437                    && child.func_type() == expr_node::Type::CompositeCast
438                {
439                    let index = (inputs[1].clone().into_literal().unwrap())
440                        .get_data()
441                        .clone()
442                        .unwrap()
443                        .into_int32();
444
445                    let struct_type = child.return_type().into_struct();
446                    let field_id = struct_type
447                        .id_at(index as usize)
448                        .expect("ids should be set");
449
450                    // Unwrap the composite cast.
451                    let new_child = child.inputs()[0].clone();
452                    let new_struct_type = new_child.return_type().into_struct();
453
454                    let Some(new_index) = new_struct_type
455                        .ids()
456                        .expect("ids should be set")
457                        .position(|x| x == field_id)
458                    else {
459                        // Previously we have index on this field, but now it's dropped.
460                        // As a result, this entire index item becomes invalid.
461                        // Simply leave it as is. Users cannot construct a `CompositeCast` (which is
462                        // not user-facing), thus this index item will never be matched and used.
463                        return FunctionCall::new_unchecked(func_type, inputs, ret).into();
464                    };
465                    let new_index = ExprImpl::literal_int(new_index as i32);
466
467                    let new_inputs = vec![new_child, new_index];
468                    let new_field_call = FunctionCall::new_unchecked(func_type, new_inputs, ret);
469
470                    // Recursively eliminate more composite cast by calling rewrite again.
471                    return self.rewrite_function_call(new_field_call);
472                }
473            }
474
475            let inputs = inputs
476                .into_iter()
477                .map(|expr| self.rewrite_expr(expr))
478                .collect();
479            FunctionCall::new_unchecked(func_type, inputs, ret).into()
480        }
481    }
482}