risingwave_frontend/catalog/
index_catalog.rs1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::Deref;
17use std::sync::Arc;
18
19use educe::Educe;
20use itertools::Itertools;
21use risingwave_common::catalog::{ColumnDesc, Field, IndexId, Schema};
22use risingwave_common::session_config::SessionConfig;
23use risingwave_common::util::epoch::Epoch;
24use risingwave_common::util::sort_util::ColumnOrder;
25use risingwave_pb::catalog::{
26 PbIndex, PbIndexColumnProperties, PbVectorIndexInfo, vector_index_info,
27};
28
29use crate::catalog::table_catalog::TableType;
30use crate::catalog::{OwnedByUserCatalog, TableCatalog};
31use crate::expr::{ExprDisplay, ExprImpl, ExprRewriter as _, FunctionCall};
32use crate::user::UserId;
33
34#[derive(Clone, Debug, Educe)]
35#[educe(PartialEq, Eq, Hash)]
36pub struct TableIndex {
37 pub name: String,
38 pub index_column_properties: Vec<PbIndexColumnProperties>,
41
42 pub index_table: Arc<TableCatalog>,
43
44 pub primary_table: Arc<TableCatalog>,
45
46 pub primary_to_secondary_mapping: BTreeMap<usize, usize>,
47
48 pub secondary_to_primary_mapping: BTreeMap<usize, usize>,
49
50 #[educe(PartialEq(ignore))]
56 #[educe(Hash(ignore))]
57 pub function_mapping: HashMap<FunctionCall, usize>,
58
59 pub index_columns_len: u32,
60}
61
62#[derive(Clone, Debug, PartialEq, Eq, Educe)]
63#[educe(Hash)]
64pub struct VectorIndex {
65 pub index_table: Arc<TableCatalog>,
66 pub vector_expr: ExprImpl,
67 #[educe(Hash(ignore))]
68 pub primary_to_included_info_column_mapping: HashMap<usize, usize>,
69 pub primary_key_idx_in_info_columns: Vec<usize>,
70 pub included_info_columns: Vec<usize>,
71 pub vector_index_info: PbVectorIndexInfo,
72}
73
74impl VectorIndex {
75 pub fn info_column_desc(&self) -> Vec<ColumnDesc> {
76 self.index_table.columns[1..=self.included_info_columns.len()]
77 .iter()
78 .map(|col| col.column_desc.clone())
79 .collect()
80 }
81
82 pub fn resolve_hnsw_ef_search(&self, config: &SessionConfig) -> Option<usize> {
83 match self.vector_index_info.config.as_ref().unwrap() {
84 vector_index_info::Config::Flat(_) => None,
85 vector_index_info::Config::HnswFlat(_) => Some(config.batch_hnsw_ef_search()),
86 }
87 }
88}
89
90#[derive(Clone, Debug, Hash, PartialEq, Eq)]
91pub enum IndexType {
92 Table(Arc<TableIndex>),
93 Vector(Arc<VectorIndex>),
94}
95
96#[derive(Clone, Debug, Hash, PartialEq, Eq)]
97pub struct IndexCatalog {
98 pub id: IndexId,
99
100 pub name: String,
101
102 pub index_item: Vec<ExprImpl>,
107
108 pub index_type: IndexType,
109
110 pub primary_table: Arc<TableCatalog>,
111
112 pub created_at_epoch: Option<Epoch>,
113
114 pub initialized_at_epoch: Option<Epoch>,
115
116 pub created_at_cluster_version: Option<String>,
117
118 pub initialized_at_cluster_version: Option<String>,
119}
120
121impl IndexCatalog {
122 pub fn build_from(
123 index_prost: &PbIndex,
124 index_table: &Arc<TableCatalog>,
125 primary_table: &Arc<TableCatalog>,
126 ) -> Self {
127 let index_item: Vec<ExprImpl> = index_prost
128 .index_item
129 .iter()
130 .map(|expr| ExprImpl::from_expr_proto(expr).unwrap())
131 .map(|expr| item_rewriter::CompositeCastEliminator.rewrite_expr(expr))
132 .collect();
133
134 let index_type = match index_table.table_type {
135 TableType::Index => {
136 let primary_to_secondary_mapping: BTreeMap<usize, usize> = index_item
137 .iter()
138 .enumerate()
139 .filter_map(|(i, expr)| match expr {
140 ExprImpl::InputRef(input_ref) => Some((input_ref.index, i)),
141 ExprImpl::FunctionCall(_) => None,
142 _ => unreachable!(),
143 })
144 .collect();
145
146 let secondary_to_primary_mapping = BTreeMap::from_iter(
147 primary_to_secondary_mapping
148 .clone()
149 .into_iter()
150 .map(|(x, y)| (y, x)),
151 );
152
153 let function_mapping: HashMap<FunctionCall, usize> = index_item
154 .iter()
155 .enumerate()
156 .filter_map(|(i, expr)| match expr {
157 ExprImpl::InputRef(_) => None,
158 ExprImpl::FunctionCall(func) => Some((func.deref().clone(), i)),
159 _ => unreachable!(),
160 })
161 .collect();
162 IndexType::Table(Arc::new(TableIndex {
163 name: index_prost.name.clone(),
164 index_column_properties: index_prost.index_column_properties.clone(),
165 index_columns_len: index_prost.index_columns_len,
166 index_table: index_table.clone(),
167 primary_table: primary_table.clone(),
168 primary_to_secondary_mapping,
169 secondary_to_primary_mapping,
170 function_mapping,
171 }))
172 }
173 TableType::VectorIndex => {
174 assert_eq!(index_prost.index_columns_len, 1);
175 let included_info_columns = index_item[1..].iter().map(|item| {
176 let ExprImpl::InputRef(input) = item else {
177 panic!("vector index included columns must be from direct input column, but got: {:?}", item);
178 };
179 input.index
180 }).collect_vec();
181 let primary_to_included_info_column_mapping: HashMap<_, _> = included_info_columns
182 .iter()
183 .enumerate()
184 .map(|(included_info_column_idx, primary_column_idx)| {
185 (*primary_column_idx, included_info_column_idx)
186 })
187 .collect();
188 let primary_key_idx_in_info_columns = primary_table
189 .pk()
190 .iter()
191 .map(|order| primary_to_included_info_column_mapping[&order.column_index])
192 .collect();
193 IndexType::Vector(Arc::new(VectorIndex {
194 index_table: index_table.clone(),
195 vector_expr: index_item[0].clone(),
196 primary_to_included_info_column_mapping,
197 primary_key_idx_in_info_columns,
198 included_info_columns,
199 vector_index_info: index_table
200 .vector_index_info
201 .expect("should exist for vector index"),
202 }))
203 }
204 TableType::Table | TableType::MaterializedView | TableType::Internal => {
205 unreachable!()
206 }
207 };
208
209 IndexCatalog {
210 id: index_prost.id,
211 name: index_prost.name.clone(),
212 index_item,
213 index_type,
214 primary_table: primary_table.clone(),
215 created_at_epoch: index_prost.created_at_epoch.map(Epoch::from),
216 initialized_at_epoch: index_prost.initialized_at_epoch.map(Epoch::from),
217 created_at_cluster_version: index_prost.created_at_cluster_version.clone(),
218 initialized_at_cluster_version: index_prost.initialized_at_cluster_version.clone(),
219 }
220 }
221}
222
223impl TableIndex {
224 pub fn primary_table_pk_ref_to_index_table(&self) -> Vec<ColumnOrder> {
225 let mapping = self.primary_to_secondary_mapping();
226
227 self.primary_table
228 .pk
229 .iter()
230 .map(|x| ColumnOrder::new(*mapping.get(&x.column_index).unwrap(), x.order_type))
231 .collect_vec()
232 }
233
234 pub fn primary_table_distribute_key_ref_to_index_table(&self) -> Vec<usize> {
235 let mapping = self.primary_to_secondary_mapping();
236
237 self.primary_table
238 .distribution_key
239 .iter()
240 .map(|x| *mapping.get(x).unwrap())
241 .collect_vec()
242 }
243
244 pub fn full_covering(&self) -> bool {
245 self.index_table.columns.len() == self.primary_table.columns.len()
246 }
247
248 pub fn secondary_to_primary_mapping(&self) -> &BTreeMap<usize, usize> {
251 &self.secondary_to_primary_mapping
252 }
253
254 pub fn primary_to_secondary_mapping(&self) -> &BTreeMap<usize, usize> {
257 &self.primary_to_secondary_mapping
258 }
259
260 pub fn function_mapping(&self) -> &HashMap<FunctionCall, usize> {
261 &self.function_mapping
262 }
263}
264
265impl IndexCatalog {
266 pub fn index_table(&self) -> &Arc<TableCatalog> {
267 match &self.index_type {
268 IndexType::Table(index) => &index.index_table,
269 IndexType::Vector(index) => &index.index_table,
270 }
271 }
272
273 pub fn get_column_properties(&self, column_idx: usize) -> Option<PbIndexColumnProperties> {
275 match &self.index_type {
276 IndexType::Table(index) => index.index_column_properties.get(column_idx).cloned(),
277 IndexType::Vector { .. } => {
278 if column_idx == 0 {
279 Some(PbIndexColumnProperties {
281 is_desc: false,
282 nulls_first: false,
283 })
284 } else {
285 None
286 }
287 }
288 }
289 }
290
291 pub fn get_column_def(&self, column_idx: usize) -> Option<String> {
292 if let Some(col) = self.index_table().columns.get(column_idx) {
293 if col.is_hidden {
294 return None;
295 }
296 } else {
297 return None;
298 }
299 let expr_display = ExprDisplay {
300 expr: &self.index_item[column_idx],
301 input_schema: &Schema::new(
302 self.primary_table
303 .columns
304 .iter()
305 .map(|col| Field::from(&col.column_desc))
306 .collect_vec(),
307 ),
308 };
309
310 Some(expr_display.to_string())
313 }
314
315 pub fn display(&self) -> IndexDisplay {
316 let index_table = self.index_table().clone();
317 let index_columns_with_ordering = index_table
318 .pk
319 .iter()
320 .filter(|x| !index_table.columns[x.column_index].is_hidden)
321 .map(|x| {
322 let index_column_name = index_table.columns[x.column_index].name().to_owned();
323 format!("{} {}", index_column_name, x.order_type)
324 })
325 .collect_vec();
326
327 let pk_column_index_set = index_table
328 .pk
329 .iter()
330 .map(|x| x.column_index)
331 .collect::<HashSet<_>>();
332
333 let include_columns = index_table
334 .columns
335 .iter()
336 .enumerate()
337 .filter(|(i, _)| !pk_column_index_set.contains(i))
338 .filter(|(_, x)| !x.is_hidden)
339 .map(|(_, x)| x.name().to_owned())
340 .collect_vec();
341
342 let distributed_by_columns = index_table
343 .distribution_key
344 .iter()
345 .map(|&x| index_table.columns[x].name().to_owned())
346 .collect_vec();
347
348 IndexDisplay {
349 index_columns_with_ordering,
350 include_columns,
351 distributed_by_columns,
352 }
353 }
354
355 pub fn is_created(&self) -> bool {
356 self.index_table().is_created()
357 }
358}
359
360pub struct IndexDisplay {
361 pub index_columns_with_ordering: Vec<String>,
362 pub include_columns: Vec<String>,
363 pub distributed_by_columns: Vec<String>,
364}
365
366impl OwnedByUserCatalog for IndexCatalog {
367 fn owner(&self) -> UserId {
368 self.index_table().owner
369 }
370}
371
372mod item_rewriter {
373 use risingwave_pb::expr::expr_node;
374
375 use crate::expr::{Expr, ExprImpl, ExprRewriter, FunctionCall};
376
377 pub struct CompositeCastEliminator;
398
399 impl ExprRewriter for CompositeCastEliminator {
400 fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl {
401 let (func_type, inputs, ret) = func_call.decompose();
402
403 if func_type == expr_node::Type::CompositeCast {
405 let child = inputs[0].clone();
406
407 if let Some(child) = child.as_function_call()
408 && child.func_type() == expr_node::Type::CompositeCast
409 {
410 let new_child = child.inputs()[0].clone();
411
412 if new_child.return_type() == ret {
415 return self.rewrite_expr(new_child);
416 } else {
417 let new_composite_cast =
418 FunctionCall::new_unchecked(func_type, vec![new_child], ret);
419 return self.rewrite_function_call(new_composite_cast);
420 }
421 }
422 }
423 else if func_type == expr_node::Type::Field {
426 let child = inputs[0].clone();
427
428 if let Some(child) = child.as_function_call()
429 && child.func_type() == expr_node::Type::CompositeCast
430 {
431 let index = (inputs[1].clone().into_literal().unwrap())
432 .get_data()
433 .clone()
434 .unwrap()
435 .into_int32();
436
437 let struct_type = child.return_type().into_struct();
438 let field_id = struct_type
439 .id_at(index as usize)
440 .expect("ids should be set");
441
442 let new_child = child.inputs()[0].clone();
444 let new_struct_type = new_child.return_type().into_struct();
445
446 let Some(new_index) = new_struct_type
447 .ids()
448 .expect("ids should be set")
449 .position(|x| x == field_id)
450 else {
451 return FunctionCall::new_unchecked(func_type, inputs, ret).into();
456 };
457 let new_index = ExprImpl::literal_int(new_index as i32);
458
459 let new_inputs = vec![new_child, new_index];
460 let new_field_call = FunctionCall::new_unchecked(func_type, new_inputs, ret);
461
462 return self.rewrite_function_call(new_field_call);
464 }
465 }
466
467 let inputs = inputs
468 .into_iter()
469 .map(|expr| self.rewrite_expr(expr))
470 .collect();
471 FunctionCall::new_unchecked(func_type, inputs, ret).into()
472 }
473 }
474}