1use std::collections::{HashMap, HashSet};
16use std::num::NonZeroU32;
17use std::sync::Arc;
18
19use either::Either;
20use fixedbitset::FixedBitSet;
21use itertools::Itertools;
22use pgwire::pg_response::{PgResponse, StatementType};
23use risingwave_common::catalog::{IndexId, TableId};
24use risingwave_common::types::DataType;
25use risingwave_common::util::recursive::{Recurse, tracker};
26use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
27use risingwave_pb::catalog::{
28 CreateType, PbFlatIndexConfig, PbHnswFlatIndexConfig, PbIndex, PbIndexColumnProperties,
29 PbStreamJobStatus, PbVectorIndexInfo,
30};
31use risingwave_pb::common::PbDistanceType;
32use risingwave_sqlparser::ast;
33use risingwave_sqlparser::ast::{Ident, ObjectName, OrderByExpr};
34use thiserror_ext::AsReport;
35
36use super::RwPgResponse;
37use crate::TableCatalog;
38use crate::binder::Binder;
39use crate::catalog::root_catalog::SchemaPath;
40use crate::catalog::{DatabaseId, SchemaId};
41use crate::error::{ErrorCode, Result};
42use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, InputRef, is_impure_func_call};
43use crate::handler::HandlerArgs;
44use crate::handler::util::reject_internal_table_dependency;
45use crate::optimizer::plan_expr_rewriter::ConstEvalRewriter;
46use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
47use crate::optimizer::plan_node::{
48 Explain, LogicalProject, LogicalScan, StreamMaterialize, StreamPlanRef as PlanRef,
49};
50use crate::optimizer::property::{Distribution, Order, RequiredDist};
51use crate::optimizer::{LogicalPlanRoot, OptimizerContext, OptimizerContextRef, PlanRoot};
52use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
53use crate::session::SessionImpl;
54use crate::session::current::notice_to_user;
55use crate::stream_fragmenter::{GraphJobType, build_graph};
56
57pub(crate) fn resolve_index_schema(
58 session: &SessionImpl,
59 index_name: ObjectName,
60 table_name: ObjectName,
61) -> Result<(String, Arc<TableCatalog>, String)> {
62 let db_name = &session.database();
63 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
64 let search_path = session.config().search_path();
65 let user_name = &session.user_name();
66 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
67
68 let index_table_name = Binder::resolve_index_name(index_name)?;
69
70 let catalog_reader = session.env().catalog_reader();
71 let read_guard = catalog_reader.read_guard();
72 let (table, schema_name) =
73 read_guard.get_created_table_by_name(db_name, schema_path, &table_name)?;
74 reject_internal_table_dependency(table.as_ref(), "CREATE INDEX")?;
75 Ok((schema_name.to_owned(), table.clone(), index_table_name))
76}
77
78pub(crate) struct IndexColumnExprValidator {
79 allow_impure: bool,
80 result: Result<()>,
81}
82
83impl IndexColumnExprValidator {
84 fn unsupported_expr_err(expr: &ExprImpl) -> ErrorCode {
85 ErrorCode::NotSupported(
86 format!("unsupported index column expression type: {:?}", expr),
87 "use columns or expressions instead".into(),
88 )
89 }
90
91 pub(crate) fn validate(expr: &ExprImpl, allow_impure: bool) -> Result<()> {
92 match expr {
93 ExprImpl::InputRef(_) | ExprImpl::FunctionCall(_) => {}
94 other_expr => {
95 return Err(Self::unsupported_expr_err(other_expr).into());
96 }
97 }
98 let mut visitor = Self {
99 allow_impure,
100 result: Ok(()),
101 };
102 visitor.visit_expr(expr);
103 visitor.result
104 }
105}
106
107impl ExprVisitor for IndexColumnExprValidator {
108 fn visit_expr(&mut self, expr: &ExprImpl) {
109 if self.result.is_err() {
110 return;
111 }
112 tracker!().recurse(|t| {
113 if t.depth_reaches(crate::expr::EXPR_DEPTH_THRESHOLD) {
114 notice_to_user(crate::expr::EXPR_TOO_DEEP_NOTICE);
115 }
116
117 match expr {
118 ExprImpl::InputRef(_) | ExprImpl::Literal(_) => {}
119 ExprImpl::FunctionCall(inner) => {
120 if !self.allow_impure && is_impure_func_call(inner) {
121 self.result = Err(ErrorCode::NotSupported(
122 "this expression is impure".into(),
123 "use a pure expression instead".into(),
124 )
125 .into());
126 return;
127 }
128 self.visit_function_call(inner)
129 }
130 other_expr => {
131 self.result = Err(Self::unsupported_expr_err(other_expr).into());
132 }
133 }
134 })
135 }
136}
137
138pub(crate) fn gen_create_index_plan(
139 session: &SessionImpl,
140 context: OptimizerContextRef,
141 schema_name: String,
142 table: Arc<TableCatalog>,
143 index_table_name: String,
144 method: Option<Ident>,
145 columns: Vec<OrderByExpr>,
146 include: Vec<Ident>,
147 distributed_by: Vec<ast::Expr>,
148) -> Result<(PlanRef, TableCatalog, PbIndex)> {
149 if table.is_index() {
150 return Err(
151 ErrorCode::InvalidInputSyntax(format!("\"{}\" is an index", table.name)).into(),
152 );
153 }
154
155 if !session.is_super_user() && session.user_id() != table.owner {
156 return Err(ErrorCode::PermissionDenied(format!(
157 "must be owner of table \"{}\"",
158 table.name
159 ))
160 .into());
161 }
162
163 let vector_index_config = if let Some(method) = method {
164 match method.real_value().to_ascii_lowercase().as_str() {
165 "default" => None,
166 "flat" => Some(risingwave_pb::catalog::vector_index_info::PbConfig::Flat(
167 PbFlatIndexConfig {},
168 )),
169 "hnsw" => {
170 let with_options = context.with_options();
171 let parse_non_zero_u32 = |key: &str, default| {
172 with_options
173 .get(key)
174 .map(|v| {
175 let v = v.parse::<u32>().map_err(|e| {
176 ErrorCode::InvalidInputSyntax(format!(
177 "invalid {} value {}: failed to parse {}",
178 key,
179 v,
180 e.as_report()
181 ))
182 })?;
183 if v == 0 {
184 Err(ErrorCode::InvalidInputSyntax(format!(
185 "invalid {} value {}: should not be zero",
186 key, v
187 )))
188 } else {
189 Ok(v)
190 }
191 })
192 .transpose()
193 .map(|v| v.unwrap_or(default))
194 };
195 Some(
196 risingwave_pb::catalog::vector_index_info::PbConfig::HnswFlat(
197 PbHnswFlatIndexConfig {
198 m: parse_non_zero_u32("m", 16)?, ef_construction: parse_non_zero_u32("ef_construction", 64)?, max_level: parse_non_zero_u32("max_level", 10)?,
201 },
202 ),
203 )
204 }
205 _ => {
206 return Err(ErrorCode::InvalidInputSyntax(format!(
207 "invalid index method {}",
208 method
209 ))
210 .into());
211 }
212 }
213 } else {
214 None
215 };
216
217 let is_vector_index = vector_index_config.is_some();
218
219 if is_vector_index && !table.append_only {
220 return Err(ErrorCode::InvalidInputSyntax(format!(
221 "\"{}\" is not append-only but vector index must be append-only",
222 table.name
223 ))
224 .into());
225 }
226
227 let mut binder = Binder::new_for_stream(session);
228 binder.bind_table(Some(&schema_name), &table.name)?;
229
230 let mut index_columns_ordered_expr = vec![];
231 let mut include_columns_expr = vec![];
232 let mut distributed_columns_expr = vec![];
233 if is_vector_index && columns.len() != 1 {
234 return Err(ErrorCode::InvalidInputSyntax(format!(
235 "vector index must be defined on exactly 1 column, but got {}",
236 columns.len()
237 ))
238 .into());
239 }
240 let mut dimension = None;
241 for column in columns {
242 if is_vector_index && (column.asc.is_some() || column.nulls_first.is_some()) {
243 return Err(ErrorCode::InvalidInputSyntax(
244 "vector index cannot specify order".to_owned(),
245 )
246 .into());
247 }
248 let order_type = OrderType::from_bools(column.asc, column.nulls_first);
249 let expr_impl = binder.bind_expr(&column.expr)?;
250 let mut const_eval = ConstEvalRewriter { error: None };
252 let expr_impl = const_eval.rewrite_expr(expr_impl);
253 let expr_impl = context.session_timezone().rewrite_expr(expr_impl);
254 let allow_impure = is_vector_index;
255 IndexColumnExprValidator::validate(&expr_impl, allow_impure)?;
256 if is_vector_index {
257 match expr_impl.return_type() {
258 DataType::Vector(d) => {
259 dimension = Some(d);
260 }
261 other => {
262 return Err(ErrorCode::InvalidInputSyntax(format!(
263 "vector index must be defined on column of Vector type, but got {:?}",
264 other
265 ))
266 .into());
267 }
268 }
269 }
270 index_columns_ordered_expr.push((expr_impl, order_type));
271 }
272
273 if include.is_empty() {
274 include_columns_expr = table
276 .columns()
277 .iter()
278 .enumerate()
279 .filter(|(_, column)| !column.is_hidden)
280 .map(|(x, column)| {
281 ExprImpl::InputRef(InputRef::new(x, column.column_desc.data_type.clone()).into())
282 })
283 .collect_vec();
284 } else {
285 for column in include {
286 let expr_impl =
287 binder.bind_expr(&risingwave_sqlparser::ast::Expr::Identifier(column))?;
288 include_columns_expr.push(expr_impl);
289 }
290 };
291
292 if is_vector_index && !distributed_by.is_empty() {
293 return Err(ErrorCode::InvalidInputSyntax(
294 "vector index cannot define DISTRIBUTED BY".to_owned(),
295 )
296 .into());
297 }
298
299 for column in distributed_by {
300 let expr_impl = binder.bind_expr(&column)?;
301 distributed_columns_expr.push(expr_impl);
302 }
303
304 let mut set = HashSet::new();
306 index_columns_ordered_expr = index_columns_ordered_expr
307 .into_iter()
308 .filter(|(expr, _)| match expr {
309 ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
310 ExprImpl::FunctionCall(_) => true,
311 _ => unreachable!(),
312 })
313 .collect_vec();
314
315 include_columns_expr = include_columns_expr
317 .into_iter()
318 .filter(|expr| match expr {
319 ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
320 _ => unreachable!(),
321 })
322 .collect_vec();
323
324 let mut set = HashSet::new();
326 let distributed_columns_expr = distributed_columns_expr
327 .into_iter()
328 .filter(|expr| match expr {
329 ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
330 ExprImpl::FunctionCall(_) => true,
331 _ => unreachable!(),
332 })
333 .collect_vec();
334 if !index_columns_ordered_expr
336 .iter()
337 .map(|(expr, _)| expr.clone())
338 .collect_vec()
339 .starts_with(&distributed_columns_expr)
340 {
341 return Err(ErrorCode::InvalidInputSyntax(
342 "Distributed by columns should be a prefix of index columns".to_owned(),
343 )
344 .into());
345 }
346
347 let (index_database_id, index_schema_id) =
348 session.get_database_and_schema_id_for_create(Some(schema_name))?;
349
350 let (plan, mut index_table) = if let Some(vector_index_config) = vector_index_config {
351 assert!(distributed_columns_expr.is_empty());
352 assert_eq!(1, index_columns_ordered_expr.len());
353 let input = assemble_input(
354 table.clone(),
355 context.clone(),
356 &index_columns_ordered_expr,
357 &include_columns_expr,
358 RequiredDist::PhysicalDist(Distribution::Single),
359 )?;
360 let definition = context.normalized_sql().to_owned();
361 let retention_seconds = table.retention_seconds.and_then(NonZeroU32::new);
362
363 let distance_type = match context
364 .with_options()
365 .get("distance_type")
366 .ok_or_else(|| {
367 ErrorCode::InvalidInputSyntax(
368 "vector index missing `distance_type` in with options".to_owned(),
369 )
370 })?
371 .to_ascii_lowercase()
372 .as_str()
373 {
374 "l1" => PbDistanceType::L1,
375 "l2" => PbDistanceType::L2Sqr,
376 "inner_product" => PbDistanceType::InnerProduct,
377 "cosine" => PbDistanceType::Cosine,
378 other => {
379 return Err(ErrorCode::InvalidInputSyntax(format!(
380 "unsupported vector index distance type: {}",
381 other
382 ))
383 .into());
384 }
385 };
386
387 let vector_index_write = input.gen_vector_index_plan(
388 index_table_name.clone(),
389 index_database_id,
390 index_schema_id,
391 definition,
392 retention_seconds,
393 PbVectorIndexInfo {
394 dimension: dimension.expect("should be set for vector index") as _,
395 config: Some(vector_index_config),
396 distance_type: distance_type as _,
397 },
398 )?;
399 let index_table = vector_index_write.table().clone();
400 let plan: PlanRef = vector_index_write.into();
401 (plan, index_table)
402 } else {
403 let materialize = assemble_materialize(
405 index_database_id,
406 index_schema_id,
407 table.clone(),
408 context.clone(),
409 index_table_name.clone(),
410 &index_columns_ordered_expr,
411 &include_columns_expr,
412 if distributed_columns_expr.is_empty() {
415 1
416 } else {
417 distributed_columns_expr.len()
418 },
419 )?;
420 let index_table = materialize.table().clone();
421 let plan: PlanRef = materialize.into();
422 (plan, index_table)
423 };
424
425 {
426 index_table.retention_seconds = table.retention_seconds;
428 }
429
430 index_table.owner = table.owner;
431
432 let index_columns_len = index_columns_ordered_expr.len() as u32;
433 let index_column_properties = index_columns_ordered_expr
434 .iter()
435 .map(|(_, order)| PbIndexColumnProperties {
436 is_desc: order.is_descending(),
437 nulls_first: order.nulls_are_first(),
438 })
439 .collect();
440 let index_item = build_index_item(
441 &index_table,
442 table.name(),
443 &table,
444 index_columns_ordered_expr,
445 );
446
447 let create_type =
448 if context.session_ctx().config().background_ddl() && plan_can_use_background_ddl(&plan) {
449 CreateType::Background
450 } else {
451 CreateType::Foreground
452 };
453
454 let index_prost = PbIndex {
455 id: IndexId::placeholder(),
456 schema_id: index_schema_id,
457 database_id: index_database_id,
458 name: index_table_name,
459 owner: index_table.owner,
460 index_table_id: TableId::placeholder(),
461 primary_table_id: table.id,
462 index_item,
463 index_column_properties,
464 index_columns_len,
465 initialized_at_epoch: None,
466 created_at_epoch: None,
467 stream_job_status: PbStreamJobStatus::Creating.into(),
468 initialized_at_cluster_version: None,
469 created_at_cluster_version: None,
470 create_type: create_type.into(),
471 };
472
473 let ctx = plan.ctx();
474 let explain_trace = ctx.is_explain_trace();
475 if explain_trace {
476 ctx.trace("Create Index:");
477 ctx.trace(plan.explain_to_string());
478 }
479
480 Ok((plan, index_table, index_prost))
481}
482
483fn build_index_item(
484 index_table: &TableCatalog,
485 primary_table_name: &str,
486 primary_table: &TableCatalog,
487 index_columns: Vec<(ExprImpl, OrderType)>,
488) -> Vec<risingwave_pb::expr::ExprNode> {
489 let primary_table_desc_map = primary_table
490 .columns
491 .iter()
492 .enumerate()
493 .map(|(x, y)| (y.name.clone(), x))
494 .collect::<HashMap<_, _>>();
495
496 let primary_table_name_prefix = format!("{}.", primary_table_name);
497
498 let index_columns_len = index_columns.len();
499 index_columns
500 .into_iter()
501 .map(|(expr, _)| expr.to_expr_proto())
502 .chain(
503 index_table
504 .columns
505 .iter()
506 .map(|c| &c.column_desc)
507 .skip(index_columns_len)
508 .map(|x| {
509 let name = if x.name.starts_with(&primary_table_name_prefix) {
510 x.name[primary_table_name_prefix.len()..].to_string()
511 } else {
512 x.name.clone()
513 };
514
515 let column_index = *primary_table_desc_map.get(&name).unwrap();
516 InputRef {
517 index: column_index,
518 data_type: primary_table
519 .columns
520 .get(column_index)
521 .unwrap()
522 .data_type
523 .clone(),
524 }
525 .to_expr_proto()
526 }),
527 )
528 .collect_vec()
529}
530
531fn assemble_input(
532 table_catalog: Arc<TableCatalog>,
533 context: OptimizerContextRef,
534 index_columns: &[(ExprImpl, OrderType)],
535 include_columns: &[ExprImpl],
536 required_dist: RequiredDist,
537) -> Result<LogicalPlanRoot> {
538 let logical_scan = LogicalScan::create(table_catalog.clone(), context, None);
543
544 let exprs = index_columns
545 .iter()
546 .map(|(expr, _)| expr.clone())
547 .chain(include_columns.iter().cloned())
548 .collect_vec();
549
550 let logical_project = LogicalProject::create(logical_scan.into(), exprs);
551 let mut project_required_cols = FixedBitSet::with_capacity(logical_project.schema().len());
552 project_required_cols.toggle_range(0..logical_project.schema().len());
553
554 let mut col_names = HashSet::new();
555 let mut count = 0;
556
557 let out_names: Vec<String> = index_columns
558 .iter()
559 .map(|(expr, _)| match expr {
560 ExprImpl::InputRef(input_ref) => table_catalog
561 .columns()
562 .get(input_ref.index)
563 .unwrap()
564 .name()
565 .to_owned(),
566 ExprImpl::FunctionCall(func) => {
567 let func_name = func.func_type().as_str_name().to_owned();
568 let mut name = func_name.clone();
569 while !col_names.insert(name.clone()) {
570 count += 1;
571 name = format!("{}{}", func_name, count);
572 }
573 name
574 }
575 _ => unreachable!(),
576 })
577 .chain(include_columns.iter().map(|expr| {
578 match expr {
579 ExprImpl::InputRef(input_ref) => table_catalog
580 .columns()
581 .get(input_ref.index)
582 .unwrap()
583 .name()
584 .to_owned(),
585 _ => unreachable!(),
586 }
587 }))
588 .collect_vec();
589
590 Ok(PlanRoot::new_with_logical_plan(
591 logical_project,
592 required_dist,
593 Order::new(
594 index_columns
595 .iter()
596 .enumerate()
597 .map(|(i, (_, order))| ColumnOrder::new(i, *order))
598 .collect(),
599 ),
600 project_required_cols,
601 out_names,
602 ))
603}
604
605fn assemble_materialize(
608 database_id: DatabaseId,
609 schema_id: SchemaId,
610 table_catalog: Arc<TableCatalog>,
611 context: OptimizerContextRef,
612 index_name: String,
613 index_columns: &[(ExprImpl, OrderType)],
614 include_columns: &[ExprImpl],
615 distributed_by_columns_len: usize,
616) -> Result<StreamMaterialize> {
617 let input = assemble_input(
618 table_catalog.clone(),
619 context.clone(),
620 index_columns,
621 include_columns,
622 RequiredDist::PhysicalDist(Distribution::HashShard(
625 (0..distributed_by_columns_len).collect(),
626 )),
627 )?;
628
629 let definition = context.normalized_sql().to_owned();
630 let retention_seconds = table_catalog.retention_seconds.and_then(NonZeroU32::new);
631
632 input.gen_index_plan(
633 index_name,
634 database_id,
635 schema_id,
636 definition,
637 retention_seconds,
638 )
639}
640
641pub async fn handle_create_index(
642 handler_args: HandlerArgs,
643 if_not_exists: bool,
644 index_name: ObjectName,
645 table_name: ObjectName,
646 method: Option<Ident>,
647 columns: Vec<OrderByExpr>,
648 include: Vec<Ident>,
649 distributed_by: Vec<ast::Expr>,
650) -> Result<RwPgResponse> {
651 let session = handler_args.session.clone();
652
653 let (graph, index_table, index) = {
654 let (schema_name, table, index_table_name) =
655 resolve_index_schema(&session, index_name, table_name)?;
656 let qualified_index_name = ObjectName(vec![
657 Ident::from_real_value(&schema_name),
658 Ident::from_real_value(&index_table_name),
659 ]);
660 if let Either::Right(resp) = session.check_relation_name_duplicated(
661 qualified_index_name,
662 StatementType::CREATE_INDEX,
663 if_not_exists,
664 )? {
665 return Ok(resp);
666 }
667
668 let context = OptimizerContext::from_handler_args(handler_args);
669 let (plan, index_table, index) = gen_create_index_plan(
670 &session,
671 context.into(),
672 schema_name,
673 table,
674 index_table_name,
675 method,
676 columns,
677 include,
678 distributed_by,
679 )?;
680 let graph = build_graph(plan, Some(GraphJobType::Index))?;
681
682 (graph, index_table, index)
683 };
684
685 tracing::trace!(
686 "name={}, graph=\n{}",
687 index.name,
688 serde_json::to_string_pretty(&graph).unwrap()
689 );
690
691 let _job_guard =
692 session
693 .env()
694 .creating_streaming_job_tracker()
695 .guard(CreatingStreamingJobInfo::new(
696 session.session_id(),
697 index.database_id,
698 index.schema_id,
699 index.name.clone(),
700 ));
701
702 let catalog_writer = session.catalog_writer()?;
703 catalog_writer
704 .create_index(index, index_table.to_prost(), graph, if_not_exists)
705 .await?;
706
707 Ok(PgResponse::empty_result(StatementType::CREATE_INDEX))
708}