1use std::collections::{HashMap, HashSet};
16use std::num::NonZeroU32;
17use std::sync::Arc;
18
19use either::Either;
20use fixedbitset::FixedBitSet;
21use itertools::Itertools;
22use pgwire::pg_response::{PgResponse, StatementType};
23use risingwave_common::catalog::{IndexId, TableId};
24use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
25use risingwave_pb::catalog::{PbIndex, PbIndexColumnProperties, PbStreamJobStatus};
26use risingwave_sqlparser::ast;
27use risingwave_sqlparser::ast::{Ident, ObjectName, OrderByExpr};
28
29use super::RwPgResponse;
30use crate::TableCatalog;
31use crate::binder::Binder;
32use crate::catalog::root_catalog::SchemaPath;
33use crate::catalog::{DatabaseId, SchemaId};
34use crate::error::{ErrorCode, Result};
35use crate::expr::{Expr, ExprImpl, ExprRewriter, InputRef};
36use crate::handler::HandlerArgs;
37use crate::optimizer::plan_expr_rewriter::ConstEvalRewriter;
38use crate::optimizer::plan_node::{
39 Explain, LogicalProject, LogicalScan, StreamMaterialize, StreamPlanRef as PlanRef,
40};
41use crate::optimizer::property::{Distribution, Order, RequiredDist};
42use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRoot};
43use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
44use crate::session::SessionImpl;
45use crate::stream_fragmenter::{GraphJobType, build_graph};
46
47pub(crate) fn resolve_index_schema(
48 session: &SessionImpl,
49 index_name: ObjectName,
50 table_name: ObjectName,
51) -> Result<(String, Arc<TableCatalog>, String)> {
52 let db_name = &session.database();
53 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
54 let search_path = session.config().search_path();
55 let user_name = &session.user_name();
56 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
57
58 let index_table_name = Binder::resolve_index_name(index_name)?;
59
60 let catalog_reader = session.env().catalog_reader();
61 let read_guard = catalog_reader.read_guard();
62 let (table, schema_name) =
63 read_guard.get_created_table_by_name(db_name, schema_path, &table_name)?;
64 Ok((schema_name.to_owned(), table.clone(), index_table_name))
65}
66
67pub(crate) fn gen_create_index_plan(
68 session: &SessionImpl,
69 context: OptimizerContextRef,
70 schema_name: String,
71 table: Arc<TableCatalog>,
72 index_table_name: String,
73 columns: Vec<OrderByExpr>,
74 include: Vec<Ident>,
75 distributed_by: Vec<ast::Expr>,
76) -> Result<(PlanRef, TableCatalog, PbIndex)> {
77 if table.is_index() {
78 return Err(
79 ErrorCode::InvalidInputSyntax(format!("\"{}\" is an index", table.name)).into(),
80 );
81 }
82
83 if !session.is_super_user() && session.user_id() != table.owner {
84 return Err(ErrorCode::PermissionDenied(format!(
85 "must be owner of table \"{}\"",
86 table.name
87 ))
88 .into());
89 }
90
91 let mut binder = Binder::new_for_stream(session);
92 binder.bind_table(Some(&schema_name), &table.name)?;
93
94 let mut index_columns_ordered_expr = vec![];
95 let mut include_columns_expr = vec![];
96 let mut distributed_columns_expr = vec![];
97 for column in columns {
98 let order_type = OrderType::from_bools(column.asc, column.nulls_first);
99 let expr_impl = binder.bind_expr(&column.expr)?;
100 let mut const_eval = ConstEvalRewriter { error: None };
102 let expr_impl = const_eval.rewrite_expr(expr_impl);
103 let expr_impl = context.session_timezone().rewrite_expr(expr_impl);
104 match expr_impl {
105 ExprImpl::InputRef(_) => {}
106 ExprImpl::FunctionCall(_) => {
107 if expr_impl.is_impure() {
108 return Err(ErrorCode::NotSupported(
109 "this expression is impure".into(),
110 "use a pure expression instead".into(),
111 )
112 .into());
113 }
114 }
115 _ => {
116 return Err(ErrorCode::NotSupported(
117 "index columns should be columns or expressions".into(),
118 "use columns or expressions instead".into(),
119 )
120 .into());
121 }
122 }
123 index_columns_ordered_expr.push((expr_impl, order_type));
124 }
125
126 if include.is_empty() {
127 include_columns_expr = table
129 .columns()
130 .iter()
131 .enumerate()
132 .filter(|(_, column)| !column.is_hidden)
133 .map(|(x, column)| {
134 ExprImpl::InputRef(InputRef::new(x, column.column_desc.data_type.clone()).into())
135 })
136 .collect_vec();
137 } else {
138 for column in include {
139 let expr_impl =
140 binder.bind_expr(&risingwave_sqlparser::ast::Expr::Identifier(column))?;
141 include_columns_expr.push(expr_impl);
142 }
143 };
144
145 for column in distributed_by {
146 let expr_impl = binder.bind_expr(&column)?;
147 distributed_columns_expr.push(expr_impl);
148 }
149
150 let mut set = HashSet::new();
152 index_columns_ordered_expr = index_columns_ordered_expr
153 .into_iter()
154 .filter(|(expr, _)| match expr {
155 ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
156 ExprImpl::FunctionCall(_) => true,
157 _ => unreachable!(),
158 })
159 .collect_vec();
160
161 include_columns_expr = include_columns_expr
163 .into_iter()
164 .filter(|expr| match expr {
165 ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
166 _ => unreachable!(),
167 })
168 .collect_vec();
169
170 let mut set = HashSet::new();
172 let distributed_columns_expr = distributed_columns_expr
173 .into_iter()
174 .filter(|expr| match expr {
175 ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
176 ExprImpl::FunctionCall(_) => true,
177 _ => unreachable!(),
178 })
179 .collect_vec();
180 if !index_columns_ordered_expr
182 .iter()
183 .map(|(expr, _)| expr.clone())
184 .collect_vec()
185 .starts_with(&distributed_columns_expr)
186 {
187 return Err(ErrorCode::InvalidInputSyntax(
188 "Distributed by columns should be a prefix of index columns".to_owned(),
189 )
190 .into());
191 }
192
193 let (index_database_id, index_schema_id) =
194 session.get_database_and_schema_id_for_create(Some(schema_name))?;
195
196 let materialize = assemble_materialize(
198 index_database_id,
199 index_schema_id,
200 table.clone(),
201 context,
202 index_table_name.clone(),
203 &index_columns_ordered_expr,
204 &include_columns_expr,
205 if distributed_columns_expr.is_empty() {
208 1
209 } else {
210 distributed_columns_expr.len()
211 },
212 )?;
213
214 let mut index_table = materialize.table().clone();
215 {
216 index_table.retention_seconds = table.retention_seconds;
218 }
219
220 index_table.owner = table.owner;
221
222 let index_columns_len = index_columns_ordered_expr.len() as u32;
223 let index_column_properties = index_columns_ordered_expr
224 .iter()
225 .map(|(_, order)| PbIndexColumnProperties {
226 is_desc: order.is_descending(),
227 nulls_first: order.nulls_are_first(),
228 })
229 .collect();
230 let index_item = build_index_item(
231 &index_table,
232 table.name(),
233 &table,
234 index_columns_ordered_expr,
235 );
236
237 let index_prost = PbIndex {
238 id: IndexId::placeholder().index_id,
239 schema_id: index_schema_id,
240 database_id: index_database_id,
241 name: index_table_name,
242 owner: index_table.owner,
243 index_table_id: TableId::placeholder().table_id,
244 primary_table_id: table.id.table_id,
245 index_item,
246 index_column_properties,
247 index_columns_len,
248 initialized_at_epoch: None,
249 created_at_epoch: None,
250 stream_job_status: PbStreamJobStatus::Creating.into(),
251 initialized_at_cluster_version: None,
252 created_at_cluster_version: None,
253 };
254
255 let plan: PlanRef = materialize.into();
256 let ctx = plan.ctx();
257 let explain_trace = ctx.is_explain_trace();
258 if explain_trace {
259 ctx.trace("Create Index:");
260 ctx.trace(plan.explain_to_string());
261 }
262
263 Ok((plan, index_table, index_prost))
264}
265
266fn build_index_item(
267 index_table: &TableCatalog,
268 primary_table_name: &str,
269 primary_table: &TableCatalog,
270 index_columns: Vec<(ExprImpl, OrderType)>,
271) -> Vec<risingwave_pb::expr::ExprNode> {
272 let primary_table_desc_map = primary_table
273 .columns
274 .iter()
275 .enumerate()
276 .map(|(x, y)| (y.name.clone(), x))
277 .collect::<HashMap<_, _>>();
278
279 let primary_table_name_prefix = format!("{}.", primary_table_name);
280
281 let index_columns_len = index_columns.len();
282 index_columns
283 .into_iter()
284 .map(|(expr, _)| expr.to_expr_proto())
285 .chain(
286 index_table
287 .columns
288 .iter()
289 .map(|c| &c.column_desc)
290 .skip(index_columns_len)
291 .map(|x| {
292 let name = if x.name.starts_with(&primary_table_name_prefix) {
293 x.name[primary_table_name_prefix.len()..].to_string()
294 } else {
295 x.name.clone()
296 };
297
298 let column_index = *primary_table_desc_map.get(&name).unwrap();
299 InputRef {
300 index: column_index,
301 data_type: primary_table
302 .columns
303 .get(column_index)
304 .unwrap()
305 .data_type
306 .clone(),
307 }
308 .to_expr_proto()
309 }),
310 )
311 .collect_vec()
312}
313
314fn assemble_materialize(
317 database_id: DatabaseId,
318 schema_id: SchemaId,
319 table_catalog: Arc<TableCatalog>,
320 context: OptimizerContextRef,
321 index_name: String,
322 index_columns: &[(ExprImpl, OrderType)],
323 include_columns: &[ExprImpl],
324 distributed_by_columns_len: usize,
325) -> Result<StreamMaterialize> {
326 let definition = context.normalized_sql().to_owned();
331 let retention_seconds = table_catalog.retention_seconds.and_then(NonZeroU32::new);
332
333 let logical_scan = LogicalScan::create(table_catalog.clone(), context, None);
334
335 let exprs = index_columns
336 .iter()
337 .map(|(expr, _)| expr.clone())
338 .chain(include_columns.iter().cloned())
339 .collect_vec();
340
341 let logical_project = LogicalProject::create(logical_scan.into(), exprs);
342 let mut project_required_cols = FixedBitSet::with_capacity(logical_project.schema().len());
343 project_required_cols.toggle_range(0..logical_project.schema().len());
344
345 let mut col_names = HashSet::new();
346 let mut count = 0;
347
348 let out_names: Vec<String> = index_columns
349 .iter()
350 .map(|(expr, _)| match expr {
351 ExprImpl::InputRef(input_ref) => table_catalog
352 .columns()
353 .get(input_ref.index)
354 .unwrap()
355 .name()
356 .to_owned(),
357 ExprImpl::FunctionCall(func) => {
358 let func_name = func.func_type().as_str_name().to_owned();
359 let mut name = func_name.clone();
360 while !col_names.insert(name.clone()) {
361 count += 1;
362 name = format!("{}{}", func_name, count);
363 }
364 name
365 }
366 _ => unreachable!(),
367 })
368 .chain(include_columns.iter().map(|expr| {
369 match expr {
370 ExprImpl::InputRef(input_ref) => table_catalog
371 .columns()
372 .get(input_ref.index)
373 .unwrap()
374 .name()
375 .to_owned(),
376 _ => unreachable!(),
377 }
378 }))
379 .collect_vec();
380
381 PlanRoot::new_with_logical_plan(
382 logical_project,
383 RequiredDist::PhysicalDist(Distribution::HashShard(
386 (0..distributed_by_columns_len).collect(),
387 )),
388 Order::new(
389 index_columns
390 .iter()
391 .enumerate()
392 .map(|(i, (_, order))| ColumnOrder::new(i, *order))
393 .collect(),
394 ),
395 project_required_cols,
396 out_names,
397 )
398 .gen_index_plan(
399 index_name,
400 database_id,
401 schema_id,
402 definition,
403 retention_seconds,
404 )
405}
406
407pub async fn handle_create_index(
408 handler_args: HandlerArgs,
409 if_not_exists: bool,
410 index_name: ObjectName,
411 table_name: ObjectName,
412 columns: Vec<OrderByExpr>,
413 include: Vec<Ident>,
414 distributed_by: Vec<ast::Expr>,
415) -> Result<RwPgResponse> {
416 let session = handler_args.session.clone();
417
418 let (graph, index_table, index) = {
419 let (schema_name, table, index_table_name) =
420 resolve_index_schema(&session, index_name, table_name)?;
421 let qualified_index_name = ObjectName(vec![
422 Ident::from_real_value(&schema_name),
423 Ident::from_real_value(&index_table_name),
424 ]);
425 if let Either::Right(resp) = session.check_relation_name_duplicated(
426 qualified_index_name,
427 StatementType::CREATE_INDEX,
428 if_not_exists,
429 )? {
430 return Ok(resp);
431 }
432
433 let context = OptimizerContext::from_handler_args(handler_args);
434 let (plan, index_table, index) = gen_create_index_plan(
435 &session,
436 context.into(),
437 schema_name,
438 table,
439 index_table_name,
440 columns,
441 include,
442 distributed_by,
443 )?;
444 let graph = build_graph(plan, Some(GraphJobType::Index))?;
445
446 (graph, index_table, index)
447 };
448
449 tracing::trace!(
450 "name={}, graph=\n{}",
451 index.name,
452 serde_json::to_string_pretty(&graph).unwrap()
453 );
454
455 let _job_guard =
456 session
457 .env()
458 .creating_streaming_job_tracker()
459 .guard(CreatingStreamingJobInfo::new(
460 session.session_id(),
461 index.database_id,
462 index.schema_id,
463 index.name.clone(),
464 ));
465
466 let catalog_writer = session.catalog_writer()?;
467 catalog_writer
468 .create_index(index, index_table.to_prost(), graph, if_not_exists)
469 .await?;
470
471 Ok(PgResponse::empty_result(StatementType::CREATE_INDEX))
472}