risingwave_frontend/catalog/
index_catalog.rs1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::Deref;
17use std::sync::Arc;
18
19use educe::Educe;
20use itertools::Itertools;
21use risingwave_common::catalog::{ColumnDesc, CreateType, Field, IndexId, Schema};
22use risingwave_common::session_config::SessionConfig;
23use risingwave_common::util::epoch::Epoch;
24use risingwave_common::util::sort_util::ColumnOrder;
25use risingwave_pb::catalog::{
26 PbCreateType, PbIndex, PbIndexColumnProperties, PbVectorIndexInfo, vector_index_info,
27};
28
29use crate::catalog::table_catalog::TableType;
30use crate::catalog::{OwnedByUserCatalog, TableCatalog};
31use crate::expr::{ExprDisplay, ExprImpl, ExprRewriter as _, FunctionCall};
32use crate::user::UserId;
33
34#[derive(Clone, Debug, Educe)]
35#[educe(PartialEq, Eq, Hash)]
36pub struct TableIndex {
37 pub name: String,
38 pub index_column_properties: Vec<PbIndexColumnProperties>,
41
42 pub index_table: Arc<TableCatalog>,
43
44 pub primary_table: Arc<TableCatalog>,
45
46 pub primary_to_secondary_mapping: BTreeMap<usize, usize>,
47
48 pub secondary_to_primary_mapping: BTreeMap<usize, usize>,
49
50 #[educe(PartialEq(ignore))]
56 #[educe(Hash(ignore))]
57 pub function_mapping: HashMap<FunctionCall, usize>,
58
59 pub index_columns_len: u32,
60}
61
62#[derive(Clone, Debug, PartialEq, Eq, Educe)]
63#[educe(Hash)]
64pub struct VectorIndex {
65 pub index_table: Arc<TableCatalog>,
66 pub vector_expr: ExprImpl,
67 #[educe(Hash(ignore))]
68 pub primary_to_included_info_column_mapping: HashMap<usize, usize>,
69 pub primary_key_idx_in_info_columns: Vec<usize>,
70 pub included_info_columns: Vec<usize>,
71 pub vector_index_info: PbVectorIndexInfo,
72}
73
74impl VectorIndex {
75 pub fn info_column_desc(&self) -> Vec<ColumnDesc> {
76 self.index_table.columns[1..=self.included_info_columns.len()]
77 .iter()
78 .map(|col| col.column_desc.clone())
79 .collect()
80 }
81
82 pub fn resolve_hnsw_ef_search(&self, config: &SessionConfig) -> Option<usize> {
83 match self.vector_index_info.config.as_ref().unwrap() {
84 vector_index_info::Config::Flat(_) => None,
85 vector_index_info::Config::HnswFlat(_) => Some(config.batch_hnsw_ef_search()),
86 }
87 }
88}
89
90#[derive(Clone, Debug, Hash, PartialEq, Eq)]
91pub enum IndexType {
92 Table(Arc<TableIndex>),
93 Vector(Arc<VectorIndex>),
94}
95
96#[derive(Clone, Debug, Hash, PartialEq, Eq)]
97pub struct IndexCatalog {
98 pub id: IndexId,
99
100 pub name: String,
101
102 pub index_item: Vec<ExprImpl>,
107
108 pub index_type: IndexType,
109
110 pub primary_table: Arc<TableCatalog>,
111
112 pub create_type: CreateType,
114
115 pub created_at_epoch: Option<Epoch>,
116
117 pub initialized_at_epoch: Option<Epoch>,
118
119 pub created_at_cluster_version: Option<String>,
120
121 pub initialized_at_cluster_version: Option<String>,
122}
123
124impl IndexCatalog {
125 pub fn build_from(
126 index_prost: &PbIndex,
127 index_table: &Arc<TableCatalog>,
128 primary_table: &Arc<TableCatalog>,
129 ) -> Self {
130 let index_item: Vec<ExprImpl> = index_prost
131 .index_item
132 .iter()
133 .map(|expr| ExprImpl::from_expr_proto(expr).unwrap())
134 .map(|expr| item_rewriter::CompositeCastEliminator.rewrite_expr(expr))
135 .collect();
136
137 let create_type = index_prost
138 .get_create_type()
139 .unwrap_or(PbCreateType::Foreground);
140
141 let index_type = match index_table.table_type {
142 TableType::Index => {
143 let primary_to_secondary_mapping: BTreeMap<usize, usize> = index_item
144 .iter()
145 .enumerate()
146 .filter_map(|(i, expr)| match expr {
147 ExprImpl::InputRef(input_ref) => Some((input_ref.index, i)),
148 ExprImpl::FunctionCall(_) => None,
149 _ => unreachable!(),
150 })
151 .collect();
152
153 let secondary_to_primary_mapping = BTreeMap::from_iter(
154 primary_to_secondary_mapping
155 .clone()
156 .into_iter()
157 .map(|(x, y)| (y, x)),
158 );
159
160 let function_mapping: HashMap<FunctionCall, usize> = index_item
161 .iter()
162 .enumerate()
163 .filter_map(|(i, expr)| match expr {
164 ExprImpl::InputRef(_) => None,
165 ExprImpl::FunctionCall(func) => Some((func.deref().clone(), i)),
166 _ => unreachable!(),
167 })
168 .collect();
169 IndexType::Table(Arc::new(TableIndex {
170 name: index_prost.name.clone(),
171 index_column_properties: index_prost.index_column_properties.clone(),
172 index_columns_len: index_prost.index_columns_len,
173 index_table: index_table.clone(),
174 primary_table: primary_table.clone(),
175 primary_to_secondary_mapping,
176 secondary_to_primary_mapping,
177 function_mapping,
178 }))
179 }
180 TableType::VectorIndex => {
181 assert_eq!(index_prost.index_columns_len, 1);
182 let included_info_columns = index_item[1..].iter().map(|item| {
183 let ExprImpl::InputRef(input) = item else {
184 panic!("vector index included columns must be from direct input column, but got: {:?}", item);
185 };
186 input.index
187 }).collect_vec();
188 let primary_to_included_info_column_mapping: HashMap<_, _> = included_info_columns
189 .iter()
190 .enumerate()
191 .map(|(included_info_column_idx, primary_column_idx)| {
192 (*primary_column_idx, included_info_column_idx)
193 })
194 .collect();
195 let primary_key_idx_in_info_columns = primary_table
196 .pk()
197 .iter()
198 .map(|order| primary_to_included_info_column_mapping[&order.column_index])
199 .collect();
200 IndexType::Vector(Arc::new(VectorIndex {
201 index_table: index_table.clone(),
202 vector_expr: index_item[0].clone(),
203 primary_to_included_info_column_mapping,
204 primary_key_idx_in_info_columns,
205 included_info_columns,
206 vector_index_info: index_table
207 .vector_index_info
208 .expect("should exist for vector index"),
209 }))
210 }
211 TableType::Table | TableType::MaterializedView | TableType::Internal => {
212 unreachable!()
213 }
214 };
215
216 IndexCatalog {
217 id: index_prost.id,
218 name: index_prost.name.clone(),
219 index_item,
220 index_type,
221 primary_table: primary_table.clone(),
222 created_at_epoch: index_prost.created_at_epoch.map(Epoch::from),
223 initialized_at_epoch: index_prost.initialized_at_epoch.map(Epoch::from),
224 created_at_cluster_version: index_prost.created_at_cluster_version.clone(),
225 initialized_at_cluster_version: index_prost.initialized_at_cluster_version.clone(),
226 create_type: CreateType::from_proto(create_type),
227 }
228 }
229}
230
231impl TableIndex {
232 pub fn primary_table_pk_ref_to_index_table(&self) -> Vec<ColumnOrder> {
233 let mapping = self.primary_to_secondary_mapping();
234
235 self.primary_table
236 .pk
237 .iter()
238 .map(|x| ColumnOrder::new(*mapping.get(&x.column_index).unwrap(), x.order_type))
239 .collect_vec()
240 }
241
242 pub fn primary_table_distribute_key_ref_to_index_table(&self) -> Vec<usize> {
243 let mapping = self.primary_to_secondary_mapping();
244
245 self.primary_table
246 .distribution_key
247 .iter()
248 .map(|x| *mapping.get(x).unwrap())
249 .collect_vec()
250 }
251
252 pub fn full_covering(&self) -> bool {
253 self.index_table.columns.len() == self.primary_table.columns.len()
254 }
255
256 pub fn secondary_to_primary_mapping(&self) -> &BTreeMap<usize, usize> {
259 &self.secondary_to_primary_mapping
260 }
261
262 pub fn primary_to_secondary_mapping(&self) -> &BTreeMap<usize, usize> {
265 &self.primary_to_secondary_mapping
266 }
267
268 pub fn function_mapping(&self) -> &HashMap<FunctionCall, usize> {
269 &self.function_mapping
270 }
271}
272
273impl IndexCatalog {
274 pub fn index_table(&self) -> &Arc<TableCatalog> {
275 match &self.index_type {
276 IndexType::Table(index) => &index.index_table,
277 IndexType::Vector(index) => &index.index_table,
278 }
279 }
280
281 pub fn get_column_properties(&self, column_idx: usize) -> Option<PbIndexColumnProperties> {
283 match &self.index_type {
284 IndexType::Table(index) => index.index_column_properties.get(column_idx).cloned(),
285 IndexType::Vector { .. } => {
286 if column_idx == 0 {
287 Some(PbIndexColumnProperties {
289 is_desc: false,
290 nulls_first: false,
291 })
292 } else {
293 None
294 }
295 }
296 }
297 }
298
299 pub fn get_column_def(&self, column_idx: usize) -> Option<String> {
300 if let Some(col) = self.index_table().columns.get(column_idx) {
301 if col.is_hidden {
302 return None;
303 }
304 } else {
305 return None;
306 }
307 let expr_display = ExprDisplay {
308 expr: &self.index_item[column_idx],
309 input_schema: &Schema::new(
310 self.primary_table
311 .columns
312 .iter()
313 .map(|col| Field::from(&col.column_desc))
314 .collect_vec(),
315 ),
316 };
317
318 Some(expr_display.to_string())
321 }
322
323 pub fn display(&self) -> IndexDisplay {
324 let index_table = self.index_table().clone();
325 let index_columns_with_ordering = index_table
326 .pk
327 .iter()
328 .filter(|x| !index_table.columns[x.column_index].is_hidden)
329 .map(|x| {
330 let index_column_name = index_table.columns[x.column_index].name().to_owned();
331 format!("{} {}", index_column_name, x.order_type)
332 })
333 .collect_vec();
334
335 let pk_column_index_set = index_table
336 .pk
337 .iter()
338 .map(|x| x.column_index)
339 .collect::<HashSet<_>>();
340
341 let include_columns = index_table
342 .columns
343 .iter()
344 .enumerate()
345 .filter(|(i, _)| !pk_column_index_set.contains(i))
346 .filter(|(_, x)| !x.is_hidden)
347 .map(|(_, x)| x.name().to_owned())
348 .collect_vec();
349
350 let distributed_by_columns = index_table
351 .distribution_key
352 .iter()
353 .map(|&x| index_table.columns[x].name().to_owned())
354 .collect_vec();
355
356 IndexDisplay {
357 index_columns_with_ordering,
358 include_columns,
359 distributed_by_columns,
360 }
361 }
362
363 pub fn is_created(&self) -> bool {
364 self.index_table().is_created()
365 }
366}
367
368pub struct IndexDisplay {
369 pub index_columns_with_ordering: Vec<String>,
370 pub include_columns: Vec<String>,
371 pub distributed_by_columns: Vec<String>,
372}
373
374impl OwnedByUserCatalog for IndexCatalog {
375 fn owner(&self) -> UserId {
376 self.index_table().owner
377 }
378}
379
380mod item_rewriter {
381 use risingwave_pb::expr::expr_node;
382
383 use crate::expr::{Expr, ExprImpl, ExprRewriter, FunctionCall};
384
385 pub struct CompositeCastEliminator;
406
407 impl ExprRewriter for CompositeCastEliminator {
408 fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl {
409 let (func_type, inputs, ret) = func_call.decompose();
410
411 if func_type == expr_node::Type::CompositeCast {
413 let child = inputs[0].clone();
414
415 if let Some(child) = child.as_function_call()
416 && child.func_type() == expr_node::Type::CompositeCast
417 {
418 let new_child = child.inputs()[0].clone();
419
420 if new_child.return_type() == ret {
423 return self.rewrite_expr(new_child);
424 } else {
425 let new_composite_cast =
426 FunctionCall::new_unchecked(func_type, vec![new_child], ret);
427 return self.rewrite_function_call(new_composite_cast);
428 }
429 }
430 }
431 else if func_type == expr_node::Type::Field {
434 let child = inputs[0].clone();
435
436 if let Some(child) = child.as_function_call()
437 && child.func_type() == expr_node::Type::CompositeCast
438 {
439 let index = (inputs[1].clone().into_literal().unwrap())
440 .get_data()
441 .clone()
442 .unwrap()
443 .into_int32();
444
445 let struct_type = child.return_type().into_struct();
446 let field_id = struct_type
447 .id_at(index as usize)
448 .expect("ids should be set");
449
450 let new_child = child.inputs()[0].clone();
452 let new_struct_type = new_child.return_type().into_struct();
453
454 let Some(new_index) = new_struct_type
455 .ids()
456 .expect("ids should be set")
457 .position(|x| x == field_id)
458 else {
459 return FunctionCall::new_unchecked(func_type, inputs, ret).into();
464 };
465 let new_index = ExprImpl::literal_int(new_index as i32);
466
467 let new_inputs = vec![new_child, new_index];
468 let new_field_call = FunctionCall::new_unchecked(func_type, new_inputs, ret);
469
470 return self.rewrite_function_call(new_field_call);
472 }
473 }
474
475 let inputs = inputs
476 .into_iter()
477 .map(|expr| self.rewrite_expr(expr))
478 .collect();
479 FunctionCall::new_unchecked(func_type, inputs, ret).into()
480 }
481 }
482}