risingwave_frontend/catalog/
index_catalog.rs1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::Deref;
17use std::sync::Arc;
18
19use educe::Educe;
20use itertools::Itertools;
21use risingwave_common::catalog::{Field, IndexId, Schema};
22use risingwave_common::util::epoch::Epoch;
23use risingwave_common::util::sort_util::ColumnOrder;
24use risingwave_pb::catalog::{PbIndex, PbIndexColumnProperties, PbStreamJobStatus};
25
26use crate::catalog::{DatabaseId, OwnedByUserCatalog, SchemaId, TableCatalog};
27use crate::expr::{Expr, ExprDisplay, ExprImpl, FunctionCall};
28use crate::user::UserId;
29
30#[derive(Clone, Debug, Educe)]
31#[educe(PartialEq, Eq, Hash)]
32pub struct IndexCatalog {
33 pub id: IndexId,
34
35 pub name: String,
36
37 pub index_item: Vec<ExprImpl>,
42
43 pub index_column_properties: Vec<PbIndexColumnProperties>,
46
47 pub index_table: Arc<TableCatalog>,
48
49 pub primary_table: Arc<TableCatalog>,
50
51 pub primary_to_secondary_mapping: BTreeMap<usize, usize>,
52
53 pub secondary_to_primary_mapping: BTreeMap<usize, usize>,
54
55 #[educe(PartialEq(ignore))]
61 #[educe(Hash(ignore))]
62 pub function_mapping: HashMap<FunctionCall, usize>,
63
64 pub index_columns_len: u32,
65
66 pub created_at_epoch: Option<Epoch>,
67
68 pub initialized_at_epoch: Option<Epoch>,
69
70 pub created_at_cluster_version: Option<String>,
71
72 pub initialized_at_cluster_version: Option<String>,
73}
74
75impl IndexCatalog {
76 pub fn build_from(
77 index_prost: &PbIndex,
78 index_table: &TableCatalog,
79 primary_table: &TableCatalog,
80 ) -> Self {
81 let index_item: Vec<ExprImpl> = index_prost
82 .index_item
83 .iter()
84 .map(ExprImpl::from_expr_proto)
85 .try_collect()
86 .unwrap();
87
88 let primary_to_secondary_mapping: BTreeMap<usize, usize> = index_item
89 .iter()
90 .enumerate()
91 .filter_map(|(i, expr)| match expr {
92 ExprImpl::InputRef(input_ref) => Some((input_ref.index, i)),
93 ExprImpl::FunctionCall(_) => None,
94 _ => unreachable!(),
95 })
96 .collect();
97
98 let secondary_to_primary_mapping = BTreeMap::from_iter(
99 primary_to_secondary_mapping
100 .clone()
101 .into_iter()
102 .map(|(x, y)| (y, x)),
103 );
104
105 let function_mapping: HashMap<FunctionCall, usize> = index_item
106 .iter()
107 .enumerate()
108 .filter_map(|(i, expr)| match expr {
109 ExprImpl::InputRef(_) => None,
110 ExprImpl::FunctionCall(func) => Some((func.deref().clone(), i)),
111 _ => unreachable!(),
112 })
113 .collect();
114
115 IndexCatalog {
116 id: index_prost.id.into(),
117 name: index_prost.name.clone(),
118 index_item,
119 index_column_properties: index_prost.index_column_properties.clone(),
120 index_table: Arc::new(index_table.clone()),
121 primary_table: Arc::new(primary_table.clone()),
122 primary_to_secondary_mapping,
123 secondary_to_primary_mapping,
124 function_mapping,
125 index_columns_len: index_prost.index_columns_len,
126 created_at_epoch: index_prost.created_at_epoch.map(Epoch::from),
127 initialized_at_epoch: index_prost.initialized_at_epoch.map(Epoch::from),
128 created_at_cluster_version: index_prost.created_at_cluster_version.clone(),
129 initialized_at_cluster_version: index_prost.initialized_at_cluster_version.clone(),
130 }
131 }
132
133 pub fn primary_table_pk_ref_to_index_table(&self) -> Vec<ColumnOrder> {
134 let mapping = self.primary_to_secondary_mapping();
135
136 self.primary_table
137 .pk
138 .iter()
139 .map(|x| ColumnOrder::new(*mapping.get(&x.column_index).unwrap(), x.order_type))
140 .collect_vec()
141 }
142
143 pub fn primary_table_distribute_key_ref_to_index_table(&self) -> Vec<usize> {
144 let mapping = self.primary_to_secondary_mapping();
145
146 self.primary_table
147 .distribution_key
148 .iter()
149 .map(|x| *mapping.get(x).unwrap())
150 .collect_vec()
151 }
152
153 pub fn full_covering(&self) -> bool {
154 self.index_table.columns.len() == self.primary_table.columns.len()
155 }
156
157 pub fn secondary_to_primary_mapping(&self) -> &BTreeMap<usize, usize> {
160 &self.secondary_to_primary_mapping
161 }
162
163 pub fn primary_to_secondary_mapping(&self) -> &BTreeMap<usize, usize> {
166 &self.primary_to_secondary_mapping
167 }
168
169 pub fn function_mapping(&self) -> &HashMap<FunctionCall, usize> {
170 &self.function_mapping
171 }
172
173 pub fn to_prost(&self, schema_id: SchemaId, database_id: DatabaseId) -> PbIndex {
174 PbIndex {
175 id: self.id.index_id,
176 schema_id,
177 database_id,
178 name: self.name.clone(),
179 owner: self.index_table.owner,
180 index_table_id: self.index_table.id.table_id,
181 primary_table_id: self.primary_table.id.table_id,
182 index_item: self
183 .index_item
184 .iter()
185 .map(|expr| expr.to_expr_proto())
186 .collect_vec(),
187 index_column_properties: self.index_column_properties.clone(),
188 index_columns_len: self.index_columns_len,
189 initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
190 created_at_epoch: self.created_at_epoch.map(|e| e.0),
191 stream_job_status: PbStreamJobStatus::Creating.into(),
192 initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
193 created_at_cluster_version: self.created_at_cluster_version.clone(),
194 }
195 }
196
197 pub fn get_column_properties(&self, column_idx: usize) -> Option<PbIndexColumnProperties> {
199 self.index_column_properties.get(column_idx).cloned()
200 }
201
202 pub fn get_column_def(&self, column_idx: usize) -> Option<String> {
203 if let Some(col) = self.index_table.columns.get(column_idx) {
204 if col.is_hidden {
205 return None;
206 }
207 } else {
208 return None;
209 }
210 let expr_display = ExprDisplay {
211 expr: &self.index_item[column_idx],
212 input_schema: &Schema::new(
213 self.primary_table
214 .columns
215 .iter()
216 .map(|col| Field::from(&col.column_desc))
217 .collect_vec(),
218 ),
219 };
220
221 Some(expr_display.to_string())
224 }
225
226 pub fn display(&self) -> IndexDisplay {
227 let index_table = self.index_table.clone();
228 let index_columns_with_ordering = index_table
229 .pk
230 .iter()
231 .filter(|x| !index_table.columns[x.column_index].is_hidden)
232 .map(|x| {
233 let index_column_name = index_table.columns[x.column_index].name().to_owned();
234 format!("{} {}", index_column_name, x.order_type)
235 })
236 .collect_vec();
237
238 let pk_column_index_set = index_table
239 .pk
240 .iter()
241 .map(|x| x.column_index)
242 .collect::<HashSet<_>>();
243
244 let include_columns = index_table
245 .columns
246 .iter()
247 .enumerate()
248 .filter(|(i, _)| !pk_column_index_set.contains(i))
249 .filter(|(_, x)| !x.is_hidden)
250 .map(|(_, x)| x.name().to_owned())
251 .collect_vec();
252
253 let distributed_by_columns = index_table
254 .distribution_key
255 .iter()
256 .map(|&x| index_table.columns[x].name().to_owned())
257 .collect_vec();
258
259 IndexDisplay {
260 index_columns_with_ordering,
261 include_columns,
262 distributed_by_columns,
263 }
264 }
265}
266
267pub struct IndexDisplay {
268 pub index_columns_with_ordering: Vec<String>,
269 pub include_columns: Vec<String>,
270 pub distributed_by_columns: Vec<String>,
271}
272
273impl OwnedByUserCatalog for IndexCatalog {
274 fn owner(&self) -> UserId {
275 self.index_table.owner
276 }
277}