1use std::collections::{HashMap, HashSet};
16use std::num::NonZeroU32;
17use std::sync::Arc;
18
19use either::Either;
20use fixedbitset::FixedBitSet;
21use itertools::Itertools;
22use pgwire::pg_response::{PgResponse, StatementType};
23use risingwave_common::catalog::{IndexId, TableId};
24use risingwave_common::types::DataType;
25use risingwave_common::util::recursive::{Recurse, tracker};
26use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
27use risingwave_pb::catalog::{
28 CreateType, PbFlatIndexConfig, PbHnswFlatIndexConfig, PbIndex, PbIndexColumnProperties,
29 PbStreamJobStatus, PbVectorIndexInfo,
30};
31use risingwave_pb::common::PbDistanceType;
32use risingwave_sqlparser::ast;
33use risingwave_sqlparser::ast::{Ident, ObjectName, OrderByExpr};
34use thiserror_ext::AsReport;
35
36use super::RwPgResponse;
37use crate::TableCatalog;
38use crate::binder::Binder;
39use crate::catalog::root_catalog::SchemaPath;
40use crate::catalog::{DatabaseId, SchemaId};
41use crate::error::{ErrorCode, Result};
42use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, InputRef, is_impure_func_call};
43use crate::handler::HandlerArgs;
44use crate::handler::create_mv::resolve_streaming_job_resource_type;
45use crate::handler::util::{
46 LongRunningNotificationAction, execute_with_long_running_notification,
47 reject_internal_table_dependency,
48};
49use crate::optimizer::plan_expr_rewriter::ConstEvalRewriter;
50use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
51use crate::optimizer::plan_node::{
52 Explain, LogicalProject, LogicalScan, StreamMaterialize, StreamPlanRef as PlanRef,
53 ensure_sync_log_store_fragment_root,
54};
55use crate::optimizer::property::{Distribution, Order, RequiredDist};
56use crate::optimizer::{LogicalPlanRoot, OptimizerContext, OptimizerContextRef, PlanRoot};
57use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
58use crate::session::SessionImpl;
59use crate::session::current::notice_to_user;
60use crate::stream_fragmenter::{GraphJobType, build_graph};
61
62pub(crate) fn resolve_index_schema(
63 session: &SessionImpl,
64 index_name: ObjectName,
65 table_name: ObjectName,
66) -> Result<(String, Arc<TableCatalog>, String)> {
67 let db_name = &session.database();
68 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
69 let search_path = session.config().search_path();
70 let user_name = &session.user_name();
71 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
72
73 let index_table_name = Binder::resolve_index_name(index_name)?;
74
75 let catalog_reader = session.env().catalog_reader();
76 let read_guard = catalog_reader.read_guard();
77 let (table, schema_name) =
78 read_guard.get_created_table_by_name(db_name, schema_path, &table_name)?;
79 reject_internal_table_dependency(table.as_ref(), "CREATE INDEX")?;
80 Ok((schema_name.to_owned(), table.clone(), index_table_name))
81}
82
83pub(crate) struct IndexColumnExprValidator {
84 allow_impure: bool,
85 result: Result<()>,
86}
87
88impl IndexColumnExprValidator {
89 fn unsupported_expr_err(expr: &ExprImpl) -> ErrorCode {
90 ErrorCode::NotSupported(
91 format!("unsupported index column expression type: {:?}", expr),
92 "use columns or expressions instead".into(),
93 )
94 }
95
96 pub(crate) fn validate(expr: &ExprImpl, allow_impure: bool) -> Result<()> {
97 match expr {
98 ExprImpl::InputRef(_) | ExprImpl::FunctionCall(_) => {}
99 other_expr => {
100 return Err(Self::unsupported_expr_err(other_expr).into());
101 }
102 }
103 let mut visitor = Self {
104 allow_impure,
105 result: Ok(()),
106 };
107 visitor.visit_expr(expr);
108 visitor.result
109 }
110}
111
112impl ExprVisitor for IndexColumnExprValidator {
113 fn visit_expr(&mut self, expr: &ExprImpl) {
114 if self.result.is_err() {
115 return;
116 }
117 tracker!().recurse(|t| {
118 if t.depth_reaches(crate::expr::EXPR_DEPTH_THRESHOLD) {
119 notice_to_user(crate::expr::EXPR_TOO_DEEP_NOTICE);
120 }
121
122 match expr {
123 ExprImpl::InputRef(_) | ExprImpl::Literal(_) => {}
124 ExprImpl::FunctionCall(inner) => {
125 if !self.allow_impure && is_impure_func_call(inner) {
126 self.result = Err(ErrorCode::NotSupported(
127 "this expression is impure".into(),
128 "use a pure expression instead".into(),
129 )
130 .into());
131 return;
132 }
133 self.visit_function_call(inner)
134 }
135 other_expr => {
136 self.result = Err(Self::unsupported_expr_err(other_expr).into());
137 }
138 }
139 })
140 }
141}
142
143pub(crate) fn gen_create_index_plan(
144 session: &SessionImpl,
145 context: OptimizerContextRef,
146 schema_name: String,
147 table: Arc<TableCatalog>,
148 index_table_name: String,
149 method: Option<Ident>,
150 columns: Vec<OrderByExpr>,
151 include: Vec<Ident>,
152 distributed_by: Vec<ast::Expr>,
153) -> Result<(PlanRef, TableCatalog, PbIndex)> {
154 if table.is_index() {
155 return Err(
156 ErrorCode::InvalidInputSyntax(format!("\"{}\" is an index", table.name)).into(),
157 );
158 }
159
160 if !session.is_super_user() && session.user_id() != table.owner {
161 return Err(ErrorCode::PermissionDenied(format!(
162 "must be owner of table \"{}\"",
163 table.name
164 ))
165 .into());
166 }
167
168 let vector_index_config = if let Some(method) = method {
169 match method.real_value().to_ascii_lowercase().as_str() {
170 "default" => None,
171 "flat" => Some(risingwave_pb::catalog::vector_index_info::PbConfig::Flat(
172 PbFlatIndexConfig {},
173 )),
174 "hnsw" => {
175 let with_options = context.with_options();
176 let parse_non_zero_u32 = |key: &str, default| {
177 with_options
178 .get(key)
179 .map(|v| {
180 let v = v.parse::<u32>().map_err(|e| {
181 ErrorCode::InvalidInputSyntax(format!(
182 "invalid {} value {}: failed to parse {}",
183 key,
184 v,
185 e.as_report()
186 ))
187 })?;
188 if v == 0 {
189 Err(ErrorCode::InvalidInputSyntax(format!(
190 "invalid {} value {}: should not be zero",
191 key, v
192 )))
193 } else {
194 Ok(v)
195 }
196 })
197 .transpose()
198 .map(|v| v.unwrap_or(default))
199 };
200 Some(
201 risingwave_pb::catalog::vector_index_info::PbConfig::HnswFlat(
202 PbHnswFlatIndexConfig {
203 m: parse_non_zero_u32("m", 16)?, ef_construction: parse_non_zero_u32("ef_construction", 64)?, max_level: parse_non_zero_u32("max_level", 10)?,
206 },
207 ),
208 )
209 }
210 _ => {
211 return Err(ErrorCode::InvalidInputSyntax(format!(
212 "invalid index method {}",
213 method
214 ))
215 .into());
216 }
217 }
218 } else {
219 None
220 };
221
222 let is_vector_index = vector_index_config.is_some();
223
224 if is_vector_index && !table.append_only {
225 return Err(ErrorCode::InvalidInputSyntax(format!(
226 "\"{}\" is not append-only but vector index must be append-only",
227 table.name
228 ))
229 .into());
230 }
231
232 let mut binder = Binder::new_for_stream(session);
233 binder.bind_table(Some(&schema_name), &table.name)?;
234
235 let mut index_columns_ordered_expr = vec![];
236 let mut include_columns_expr = vec![];
237 let mut distributed_columns_expr = vec![];
238 if is_vector_index && columns.len() != 1 {
239 return Err(ErrorCode::InvalidInputSyntax(format!(
240 "vector index must be defined on exactly 1 column, but got {}",
241 columns.len()
242 ))
243 .into());
244 }
245 let mut dimension = None;
246 for column in columns {
247 if is_vector_index && (column.asc.is_some() || column.nulls_first.is_some()) {
248 return Err(ErrorCode::InvalidInputSyntax(
249 "vector index cannot specify order".to_owned(),
250 )
251 .into());
252 }
253 let order_type = OrderType::from_bools(column.asc, column.nulls_first);
254 let expr_impl = binder.bind_expr(&column.expr)?;
255 let mut const_eval = ConstEvalRewriter { error: None };
257 let expr_impl = const_eval.rewrite_expr(expr_impl);
258 let expr_impl = context.session_timezone().rewrite_expr(expr_impl);
259 let allow_impure = is_vector_index;
260 IndexColumnExprValidator::validate(&expr_impl, allow_impure)?;
261 if is_vector_index {
262 match expr_impl.return_type() {
263 DataType::Vector(d) => {
264 dimension = Some(d);
265 }
266 other => {
267 return Err(ErrorCode::InvalidInputSyntax(format!(
268 "vector index must be defined on column of Vector type, but got {:?}",
269 other
270 ))
271 .into());
272 }
273 }
274 }
275 index_columns_ordered_expr.push((expr_impl, order_type));
276 }
277
278 if include.is_empty() {
279 include_columns_expr = table
281 .columns()
282 .iter()
283 .enumerate()
284 .filter(|(_, column)| !column.is_hidden)
285 .map(|(x, column)| {
286 ExprImpl::InputRef(InputRef::new(x, column.column_desc.data_type.clone()).into())
287 })
288 .collect_vec();
289 } else {
290 for column in include {
291 let expr_impl =
292 binder.bind_expr(&risingwave_sqlparser::ast::Expr::Identifier(column))?;
293 include_columns_expr.push(expr_impl);
294 }
295 };
296
297 if is_vector_index && !distributed_by.is_empty() {
298 return Err(ErrorCode::InvalidInputSyntax(
299 "vector index cannot define DISTRIBUTED BY".to_owned(),
300 )
301 .into());
302 }
303
304 for column in distributed_by {
305 let expr_impl = binder.bind_expr(&column)?;
306 distributed_columns_expr.push(expr_impl);
307 }
308
309 let mut set = HashSet::new();
311 index_columns_ordered_expr = index_columns_ordered_expr
312 .into_iter()
313 .filter(|(expr, _)| match expr {
314 ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
315 ExprImpl::FunctionCall(_) => true,
316 _ => unreachable!(),
317 })
318 .collect_vec();
319
320 include_columns_expr = include_columns_expr
322 .into_iter()
323 .filter(|expr| match expr {
324 ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
325 _ => unreachable!(),
326 })
327 .collect_vec();
328
329 let mut set = HashSet::new();
331 let distributed_columns_expr = distributed_columns_expr
332 .into_iter()
333 .filter(|expr| match expr {
334 ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
335 ExprImpl::FunctionCall(_) => true,
336 _ => unreachable!(),
337 })
338 .collect_vec();
339 if !index_columns_ordered_expr
341 .iter()
342 .map(|(expr, _)| expr.clone())
343 .collect_vec()
344 .starts_with(&distributed_columns_expr)
345 {
346 return Err(ErrorCode::InvalidInputSyntax(
347 "Distributed by columns should be a prefix of index columns".to_owned(),
348 )
349 .into());
350 }
351
352 let (index_database_id, index_schema_id) =
353 session.get_database_and_schema_id_for_create(Some(schema_name))?;
354
355 let (plan, mut index_table) = if let Some(vector_index_config) = vector_index_config {
356 assert!(distributed_columns_expr.is_empty());
357 assert_eq!(1, index_columns_ordered_expr.len());
358 let input = assemble_input(
359 table.clone(),
360 context.clone(),
361 &index_columns_ordered_expr,
362 &include_columns_expr,
363 RequiredDist::PhysicalDist(Distribution::Single),
364 )?;
365 let definition = context.normalized_sql().to_owned();
366 let retention_seconds = table.retention_seconds.and_then(NonZeroU32::new);
367
368 let distance_type = match context
369 .with_options()
370 .get("distance_type")
371 .ok_or_else(|| {
372 ErrorCode::InvalidInputSyntax(
373 "vector index missing `distance_type` in with options".to_owned(),
374 )
375 })?
376 .to_ascii_lowercase()
377 .as_str()
378 {
379 "l1" => PbDistanceType::L1,
380 "l2" => PbDistanceType::L2Sqr,
381 "inner_product" => PbDistanceType::InnerProduct,
382 "cosine" => PbDistanceType::Cosine,
383 other => {
384 return Err(ErrorCode::InvalidInputSyntax(format!(
385 "unsupported vector index distance type: {}",
386 other
387 ))
388 .into());
389 }
390 };
391
392 let vector_index_write = input.gen_vector_index_plan(
393 index_table_name.clone(),
394 index_database_id,
395 index_schema_id,
396 definition,
397 retention_seconds,
398 PbVectorIndexInfo {
399 dimension: dimension.expect("should be set for vector index") as _,
400 config: Some(vector_index_config),
401 distance_type: distance_type as _,
402 },
403 )?;
404 let index_table = vector_index_write.table().clone();
405 let plan: PlanRef = ensure_sync_log_store_fragment_root(vector_index_write.into());
406 (plan, index_table)
407 } else {
408 let materialize = assemble_materialize(
410 index_database_id,
411 index_schema_id,
412 table.clone(),
413 context.clone(),
414 index_table_name.clone(),
415 &index_columns_ordered_expr,
416 &include_columns_expr,
417 if distributed_columns_expr.is_empty() {
420 1
421 } else {
422 distributed_columns_expr.len()
423 },
424 )?;
425 let index_table = materialize.table().clone();
426 let plan: PlanRef = ensure_sync_log_store_fragment_root(materialize.into());
427 (plan, index_table)
428 };
429
430 {
431 index_table.retention_seconds = table.retention_seconds;
433 }
434
435 index_table.owner = table.owner;
436
437 let index_columns_len = index_columns_ordered_expr.len() as u32;
438 let index_column_properties = index_columns_ordered_expr
439 .iter()
440 .map(|(_, order)| PbIndexColumnProperties {
441 is_desc: order.is_descending(),
442 nulls_first: order.nulls_are_first(),
443 })
444 .collect();
445 let index_item = build_index_item(
446 &index_table,
447 table.name(),
448 &table,
449 index_columns_ordered_expr,
450 );
451
452 let create_type =
453 if context.session_ctx().config().background_ddl() && plan_can_use_background_ddl(&plan) {
454 CreateType::Background
455 } else {
456 CreateType::Foreground
457 };
458
459 let index_prost = PbIndex {
460 id: IndexId::placeholder(),
461 schema_id: index_schema_id,
462 database_id: index_database_id,
463 name: index_table_name,
464 owner: index_table.owner,
465 index_table_id: TableId::placeholder(),
466 primary_table_id: table.id,
467 index_item,
468 index_column_properties,
469 index_columns_len,
470 initialized_at_epoch: None,
471 created_at_epoch: None,
472 stream_job_status: PbStreamJobStatus::Creating.into(),
473 initialized_at_cluster_version: None,
474 created_at_cluster_version: None,
475 create_type: create_type.into(),
476 };
477
478 let ctx = plan.ctx();
479 let explain_trace = ctx.is_explain_trace();
480 if explain_trace {
481 ctx.trace("Create Index:");
482 ctx.trace(plan.explain_to_string());
483 }
484
485 Ok((plan, index_table, index_prost))
486}
487
488fn build_index_item(
489 index_table: &TableCatalog,
490 primary_table_name: &str,
491 primary_table: &TableCatalog,
492 index_columns: Vec<(ExprImpl, OrderType)>,
493) -> Vec<risingwave_pb::expr::ExprNode> {
494 let primary_table_desc_map = primary_table
495 .columns
496 .iter()
497 .enumerate()
498 .map(|(x, y)| (y.name.clone(), x))
499 .collect::<HashMap<_, _>>();
500
501 let primary_table_name_prefix = format!("{}.", primary_table_name);
502
503 let index_columns_len = index_columns.len();
504 index_columns
505 .into_iter()
506 .map(|(expr, _)| expr.to_expr_proto())
507 .chain(
508 index_table
509 .columns
510 .iter()
511 .map(|c| &c.column_desc)
512 .skip(index_columns_len)
513 .map(|x| {
514 let name = if x.name.starts_with(&primary_table_name_prefix) {
515 x.name[primary_table_name_prefix.len()..].to_string()
516 } else {
517 x.name.clone()
518 };
519
520 let column_index = *primary_table_desc_map.get(&name).unwrap();
521 InputRef {
522 index: column_index,
523 data_type: primary_table
524 .columns
525 .get(column_index)
526 .unwrap()
527 .data_type
528 .clone(),
529 }
530 .to_expr_proto()
531 }),
532 )
533 .collect_vec()
534}
535
536fn assemble_input(
537 table_catalog: Arc<TableCatalog>,
538 context: OptimizerContextRef,
539 index_columns: &[(ExprImpl, OrderType)],
540 include_columns: &[ExprImpl],
541 required_dist: RequiredDist,
542) -> Result<LogicalPlanRoot> {
543 let logical_scan = LogicalScan::create(table_catalog.clone(), context, None);
548
549 let exprs = index_columns
550 .iter()
551 .map(|(expr, _)| expr.clone())
552 .chain(include_columns.iter().cloned())
553 .collect_vec();
554
555 let logical_project = LogicalProject::create(logical_scan.into(), exprs);
556 let mut project_required_cols = FixedBitSet::with_capacity(logical_project.schema().len());
557 project_required_cols.toggle_range(0..logical_project.schema().len());
558
559 let mut col_names = HashSet::new();
560 let mut count = 0;
561
562 let out_names: Vec<String> = index_columns
563 .iter()
564 .map(|(expr, _)| match expr {
565 ExprImpl::InputRef(input_ref) => table_catalog
566 .columns()
567 .get(input_ref.index)
568 .unwrap()
569 .name()
570 .to_owned(),
571 ExprImpl::FunctionCall(func) => {
572 let func_name = func.func_type().as_str_name().to_owned();
573 let mut name = func_name.clone();
574 while !col_names.insert(name.clone()) {
575 count += 1;
576 name = format!("{}{}", func_name, count);
577 }
578 name
579 }
580 _ => unreachable!(),
581 })
582 .chain(include_columns.iter().map(|expr| {
583 match expr {
584 ExprImpl::InputRef(input_ref) => table_catalog
585 .columns()
586 .get(input_ref.index)
587 .unwrap()
588 .name()
589 .to_owned(),
590 _ => unreachable!(),
591 }
592 }))
593 .collect_vec();
594
595 Ok(PlanRoot::new_with_logical_plan(
596 logical_project,
597 required_dist,
598 Order::new(
599 index_columns
600 .iter()
601 .enumerate()
602 .map(|(i, (_, order))| ColumnOrder::new(i, *order))
603 .collect(),
604 ),
605 project_required_cols,
606 out_names,
607 ))
608}
609
610fn assemble_materialize(
613 database_id: DatabaseId,
614 schema_id: SchemaId,
615 table_catalog: Arc<TableCatalog>,
616 context: OptimizerContextRef,
617 index_name: String,
618 index_columns: &[(ExprImpl, OrderType)],
619 include_columns: &[ExprImpl],
620 distributed_by_columns_len: usize,
621) -> Result<StreamMaterialize> {
622 let input = assemble_input(
623 table_catalog.clone(),
624 context.clone(),
625 index_columns,
626 include_columns,
627 RequiredDist::PhysicalDist(Distribution::HashShard(
630 (0..distributed_by_columns_len).collect(),
631 )),
632 )?;
633
634 let definition = context.normalized_sql().to_owned();
635 let retention_seconds = table_catalog.retention_seconds.and_then(NonZeroU32::new);
636
637 input.gen_index_plan(
638 index_name,
639 database_id,
640 schema_id,
641 definition,
642 retention_seconds,
643 )
644}
645
646pub async fn handle_create_index(
647 mut handler_args: HandlerArgs,
648 if_not_exists: bool,
649 index_name: ObjectName,
650 table_name: ObjectName,
651 method: Option<Ident>,
652 columns: Vec<OrderByExpr>,
653 include: Vec<Ident>,
654 distributed_by: Vec<ast::Expr>,
655) -> Result<RwPgResponse> {
656 let session = handler_args.session.clone();
657 let resource_type =
658 resolve_streaming_job_resource_type(session.as_ref(), &mut handler_args.with_options)
659 .await?;
660
661 let (graph, index_table, index) = {
662 let (schema_name, table, index_table_name) =
663 resolve_index_schema(&session, index_name, table_name)?;
664 let qualified_index_name = ObjectName(vec![
665 Ident::from_real_value(&schema_name),
666 Ident::from_real_value(&index_table_name),
667 ]);
668 if let Either::Right(resp) = session.check_relation_name_duplicated(
669 qualified_index_name,
670 StatementType::CREATE_INDEX,
671 if_not_exists,
672 )? {
673 return Ok(resp);
674 }
675
676 let context = OptimizerContext::from_handler_args(handler_args);
677 let (plan, index_table, index) = gen_create_index_plan(
678 &session,
679 context.into(),
680 schema_name,
681 table,
682 index_table_name,
683 method,
684 columns,
685 include,
686 distributed_by,
687 )?;
688 let graph = build_graph(plan, Some(GraphJobType::Index))?;
689
690 (graph, index_table, index)
691 };
692
693 tracing::trace!(
694 "name={}, graph=\n{}",
695 index.name,
696 serde_json::to_string_pretty(&graph).unwrap()
697 );
698
699 let _job_guard =
700 session
701 .env()
702 .creating_streaming_job_tracker()
703 .guard(CreatingStreamingJobInfo::new(
704 session.session_id(),
705 index.database_id,
706 index.schema_id,
707 index.name.clone(),
708 ));
709
710 let catalog_writer = session.catalog_writer()?;
711 execute_with_long_running_notification(
712 catalog_writer.create_index(
713 index,
714 index_table.to_prost(),
715 graph,
716 resource_type,
717 if_not_exists,
718 ),
719 &session,
720 "CREATE INDEX",
721 LongRunningNotificationAction::MonitorBackfillJob,
722 )
723 .await?;
724
725 Ok(PgResponse::empty_result(StatementType::CREATE_INDEX))
726}