risingwave_frontend/handler/
create_index.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::num::NonZeroU32;
17use std::sync::Arc;
18
19use either::Either;
20use fixedbitset::FixedBitSet;
21use itertools::Itertools;
22use pgwire::pg_response::{PgResponse, StatementType};
23use risingwave_common::catalog::{IndexId, TableId};
24use risingwave_common::types::DataType;
25use risingwave_common::util::recursive::{Recurse, tracker};
26use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
27use risingwave_pb::catalog::{
28    CreateType, PbFlatIndexConfig, PbHnswFlatIndexConfig, PbIndex, PbIndexColumnProperties,
29    PbStreamJobStatus, PbVectorIndexInfo,
30};
31use risingwave_pb::common::PbDistanceType;
32use risingwave_sqlparser::ast;
33use risingwave_sqlparser::ast::{Ident, ObjectName, OrderByExpr};
34use thiserror_ext::AsReport;
35
36use super::RwPgResponse;
37use crate::TableCatalog;
38use crate::binder::Binder;
39use crate::catalog::root_catalog::SchemaPath;
40use crate::catalog::{DatabaseId, SchemaId};
41use crate::error::{ErrorCode, Result};
42use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, InputRef, is_impure_func_call};
43use crate::handler::HandlerArgs;
44use crate::optimizer::plan_expr_rewriter::ConstEvalRewriter;
45use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
46use crate::optimizer::plan_node::{
47    Explain, LogicalProject, LogicalScan, StreamMaterialize, StreamPlanRef as PlanRef,
48};
49use crate::optimizer::property::{Distribution, Order, RequiredDist};
50use crate::optimizer::{LogicalPlanRoot, OptimizerContext, OptimizerContextRef, PlanRoot};
51use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
52use crate::session::SessionImpl;
53use crate::session::current::notice_to_user;
54use crate::stream_fragmenter::{GraphJobType, build_graph};
55
56pub(crate) fn resolve_index_schema(
57    session: &SessionImpl,
58    index_name: ObjectName,
59    table_name: ObjectName,
60) -> Result<(String, Arc<TableCatalog>, String)> {
61    let db_name = &session.database();
62    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
63    let search_path = session.config().search_path();
64    let user_name = &session.user_name();
65    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
66
67    let index_table_name = Binder::resolve_index_name(index_name)?;
68
69    let catalog_reader = session.env().catalog_reader();
70    let read_guard = catalog_reader.read_guard();
71    let (table, schema_name) =
72        read_guard.get_created_table_by_name(db_name, schema_path, &table_name)?;
73    Ok((schema_name.to_owned(), table.clone(), index_table_name))
74}
75
76struct IndexColumnExprValidator {
77    allow_impure: bool,
78    result: Result<()>,
79}
80
81impl IndexColumnExprValidator {
82    fn unsupported_expr_err(expr: &ExprImpl) -> ErrorCode {
83        ErrorCode::NotSupported(
84            format!("unsupported index column expression type: {:?}", expr),
85            "use columns or expressions instead".into(),
86        )
87    }
88
89    fn validate(expr: &ExprImpl, allow_impure: bool) -> Result<()> {
90        match expr {
91            ExprImpl::InputRef(_) | ExprImpl::FunctionCall(_) => {}
92            other_expr => {
93                return Err(Self::unsupported_expr_err(other_expr).into());
94            }
95        }
96        let mut visitor = Self {
97            allow_impure,
98            result: Ok(()),
99        };
100        visitor.visit_expr(expr);
101        visitor.result
102    }
103}
104
105impl ExprVisitor for IndexColumnExprValidator {
106    fn visit_expr(&mut self, expr: &ExprImpl) {
107        if self.result.is_err() {
108            return;
109        }
110        tracker!().recurse(|t| {
111            if t.depth_reaches(crate::expr::EXPR_DEPTH_THRESHOLD) {
112                notice_to_user(crate::expr::EXPR_TOO_DEEP_NOTICE);
113            }
114
115            match expr {
116                ExprImpl::InputRef(_) | ExprImpl::Literal(_) => {}
117                ExprImpl::FunctionCall(inner) => {
118                    if !self.allow_impure && is_impure_func_call(inner) {
119                        self.result = Err(ErrorCode::NotSupported(
120                            "this expression is impure".into(),
121                            "use a pure expression instead".into(),
122                        )
123                        .into());
124                        return;
125                    }
126                    self.visit_function_call(inner)
127                }
128                other_expr => {
129                    self.result = Err(Self::unsupported_expr_err(other_expr).into());
130                }
131            }
132        })
133    }
134}
135
136pub(crate) fn gen_create_index_plan(
137    session: &SessionImpl,
138    context: OptimizerContextRef,
139    schema_name: String,
140    table: Arc<TableCatalog>,
141    index_table_name: String,
142    method: Option<Ident>,
143    columns: Vec<OrderByExpr>,
144    include: Vec<Ident>,
145    distributed_by: Vec<ast::Expr>,
146) -> Result<(PlanRef, TableCatalog, PbIndex)> {
147    if table.is_index() {
148        return Err(
149            ErrorCode::InvalidInputSyntax(format!("\"{}\" is an index", table.name)).into(),
150        );
151    }
152
153    if !session.is_super_user() && session.user_id() != table.owner {
154        return Err(ErrorCode::PermissionDenied(format!(
155            "must be owner of table \"{}\"",
156            table.name
157        ))
158        .into());
159    }
160
161    let vector_index_config = if let Some(method) = method {
162        match method.real_value().to_ascii_lowercase().as_str() {
163            "default" => None,
164            "flat" => Some(risingwave_pb::catalog::vector_index_info::PbConfig::Flat(
165                PbFlatIndexConfig {},
166            )),
167            "hnsw" => {
168                let with_options = context.with_options();
169                let parse_non_zero_u32 = |key: &str, default| {
170                    with_options
171                        .get(key)
172                        .map(|v| {
173                            let v = v.parse::<u32>().map_err(|e| {
174                                ErrorCode::InvalidInputSyntax(format!(
175                                    "invalid {} value {}: failed to parse {}",
176                                    key,
177                                    v,
178                                    e.as_report()
179                                ))
180                            })?;
181                            if v == 0 {
182                                Err(ErrorCode::InvalidInputSyntax(format!(
183                                    "invalid {} value {}: should not be zero",
184                                    key, v
185                                )))
186                            } else {
187                                Ok(v)
188                            }
189                        })
190                        .transpose()
191                        .map(|v| v.unwrap_or(default))
192                };
193                Some(
194                    risingwave_pb::catalog::vector_index_info::PbConfig::HnswFlat(
195                        PbHnswFlatIndexConfig {
196                            m: parse_non_zero_u32("m", 16)?, /* default value borrowed from pg_vector */
197                            ef_construction: parse_non_zero_u32("ef_construction", 64)?, /* default value borrowed from pg_vector */
198                            max_level: parse_non_zero_u32("max_level", 10)?,
199                        },
200                    ),
201                )
202            }
203            _ => {
204                return Err(ErrorCode::InvalidInputSyntax(format!(
205                    "invalid index method {}",
206                    method
207                ))
208                .into());
209            }
210        }
211    } else {
212        None
213    };
214
215    let is_vector_index = vector_index_config.is_some();
216
217    if is_vector_index && !table.append_only {
218        return Err(ErrorCode::InvalidInputSyntax(format!(
219            "\"{}\" is not append-only but vector index must be append-only",
220            table.name
221        ))
222        .into());
223    }
224
225    let mut binder = Binder::new_for_stream(session);
226    binder.bind_table(Some(&schema_name), &table.name)?;
227
228    let mut index_columns_ordered_expr = vec![];
229    let mut include_columns_expr = vec![];
230    let mut distributed_columns_expr = vec![];
231    if is_vector_index && columns.len() != 1 {
232        return Err(ErrorCode::InvalidInputSyntax(format!(
233            "vector index must be defined on exactly 1 column, but got {}",
234            columns.len()
235        ))
236        .into());
237    }
238    let mut dimension = None;
239    for column in columns {
240        if is_vector_index && (column.asc.is_some() || column.nulls_first.is_some()) {
241            return Err(ErrorCode::InvalidInputSyntax(
242                "vector index cannot specify order".to_owned(),
243            )
244            .into());
245        }
246        let order_type = OrderType::from_bools(column.asc, column.nulls_first);
247        let expr_impl = binder.bind_expr(&column.expr)?;
248        // Do constant folding and timezone transportation on expressions so that batch queries can match it in the same form.
249        let mut const_eval = ConstEvalRewriter { error: None };
250        let expr_impl = const_eval.rewrite_expr(expr_impl);
251        let expr_impl = context.session_timezone().rewrite_expr(expr_impl);
252        let allow_impure = is_vector_index;
253        IndexColumnExprValidator::validate(&expr_impl, allow_impure)?;
254        if is_vector_index {
255            match expr_impl.return_type() {
256                DataType::Vector(d) => {
257                    dimension = Some(d);
258                }
259                other => {
260                    return Err(ErrorCode::InvalidInputSyntax(format!(
261                        "vector index must be defined on column of Vector type, but got {:?}",
262                        other
263                    ))
264                    .into());
265                }
266            }
267        }
268        index_columns_ordered_expr.push((expr_impl, order_type));
269    }
270
271    if include.is_empty() {
272        // Create index to include all (non-hidden) columns by default.
273        include_columns_expr = table
274            .columns()
275            .iter()
276            .enumerate()
277            .filter(|(_, column)| !column.is_hidden)
278            .map(|(x, column)| {
279                ExprImpl::InputRef(InputRef::new(x, column.column_desc.data_type.clone()).into())
280            })
281            .collect_vec();
282    } else {
283        for column in include {
284            let expr_impl =
285                binder.bind_expr(&risingwave_sqlparser::ast::Expr::Identifier(column))?;
286            include_columns_expr.push(expr_impl);
287        }
288    };
289
290    if is_vector_index && !distributed_by.is_empty() {
291        return Err(ErrorCode::InvalidInputSyntax(
292            "vector index cannot define DISTRIBUTED BY".to_owned(),
293        )
294        .into());
295    }
296
297    for column in distributed_by {
298        let expr_impl = binder.bind_expr(&column)?;
299        distributed_columns_expr.push(expr_impl);
300    }
301
302    // Remove duplicate column of index columns
303    let mut set = HashSet::new();
304    index_columns_ordered_expr = index_columns_ordered_expr
305        .into_iter()
306        .filter(|(expr, _)| match expr {
307            ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
308            ExprImpl::FunctionCall(_) => true,
309            _ => unreachable!(),
310        })
311        .collect_vec();
312
313    // Remove include columns are already in index columns
314    include_columns_expr = include_columns_expr
315        .into_iter()
316        .filter(|expr| match expr {
317            ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
318            _ => unreachable!(),
319        })
320        .collect_vec();
321
322    // Remove duplicate columns of distributed by columns
323    let mut set = HashSet::new();
324    let distributed_columns_expr = distributed_columns_expr
325        .into_iter()
326        .filter(|expr| match expr {
327            ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
328            ExprImpl::FunctionCall(_) => true,
329            _ => unreachable!(),
330        })
331        .collect_vec();
332    // Distributed by columns should be a prefix of index columns
333    if !index_columns_ordered_expr
334        .iter()
335        .map(|(expr, _)| expr.clone())
336        .collect_vec()
337        .starts_with(&distributed_columns_expr)
338    {
339        return Err(ErrorCode::InvalidInputSyntax(
340            "Distributed by columns should be a prefix of index columns".to_owned(),
341        )
342        .into());
343    }
344
345    let (index_database_id, index_schema_id) =
346        session.get_database_and_schema_id_for_create(Some(schema_name))?;
347
348    let (plan, mut index_table) = if let Some(vector_index_config) = vector_index_config {
349        assert!(distributed_columns_expr.is_empty());
350        assert_eq!(1, index_columns_ordered_expr.len());
351        let input = assemble_input(
352            table.clone(),
353            context.clone(),
354            &index_columns_ordered_expr,
355            &include_columns_expr,
356            RequiredDist::PhysicalDist(Distribution::Single),
357        )?;
358        let definition = context.normalized_sql().to_owned();
359        let retention_seconds = table.retention_seconds.and_then(NonZeroU32::new);
360
361        let distance_type = match context
362            .with_options()
363            .get("distance_type")
364            .ok_or_else(|| {
365                ErrorCode::InvalidInputSyntax(
366                    "vector index missing `distance_type` in with options".to_owned(),
367                )
368            })?
369            .to_ascii_lowercase()
370            .as_str()
371        {
372            "l1" => PbDistanceType::L1,
373            "l2" => PbDistanceType::L2Sqr,
374            "inner_product" => PbDistanceType::InnerProduct,
375            "cosine" => PbDistanceType::Cosine,
376            other => {
377                return Err(ErrorCode::InvalidInputSyntax(format!(
378                    "unsupported vector index distance type: {}",
379                    other
380                ))
381                .into());
382            }
383        };
384
385        let vector_index_write = input.gen_vector_index_plan(
386            index_table_name.clone(),
387            index_database_id,
388            index_schema_id,
389            definition,
390            retention_seconds,
391            PbVectorIndexInfo {
392                dimension: dimension.expect("should be set for vector index") as _,
393                config: Some(vector_index_config),
394                distance_type: distance_type as _,
395            },
396        )?;
397        let index_table = vector_index_write.table().clone();
398        let plan: PlanRef = vector_index_write.into();
399        (plan, index_table)
400    } else {
401        // Manually assemble the materialization plan for the index MV.
402        let materialize = assemble_materialize(
403            index_database_id,
404            index_schema_id,
405            table.clone(),
406            context.clone(),
407            index_table_name.clone(),
408            &index_columns_ordered_expr,
409            &include_columns_expr,
410            // We use the first index column as distributed key by default if users
411            // haven't specified the distributed by columns.
412            if distributed_columns_expr.is_empty() {
413                1
414            } else {
415                distributed_columns_expr.len()
416            },
417        )?;
418        let index_table = materialize.table().clone();
419        let plan: PlanRef = materialize.into();
420        (plan, index_table)
421    };
422
423    {
424        // Inherit table properties
425        index_table.retention_seconds = table.retention_seconds;
426    }
427
428    index_table.owner = table.owner;
429
430    let index_columns_len = index_columns_ordered_expr.len() as u32;
431    let index_column_properties = index_columns_ordered_expr
432        .iter()
433        .map(|(_, order)| PbIndexColumnProperties {
434            is_desc: order.is_descending(),
435            nulls_first: order.nulls_are_first(),
436        })
437        .collect();
438    let index_item = build_index_item(
439        &index_table,
440        table.name(),
441        &table,
442        index_columns_ordered_expr,
443    );
444
445    let create_type =
446        if context.session_ctx().config().background_ddl() && plan_can_use_background_ddl(&plan) {
447            CreateType::Background
448        } else {
449            CreateType::Foreground
450        };
451
452    let index_prost = PbIndex {
453        id: IndexId::placeholder().index_id,
454        schema_id: index_schema_id,
455        database_id: index_database_id,
456        name: index_table_name,
457        owner: index_table.owner,
458        index_table_id: TableId::placeholder().table_id,
459        primary_table_id: table.id.table_id,
460        index_item,
461        index_column_properties,
462        index_columns_len,
463        initialized_at_epoch: None,
464        created_at_epoch: None,
465        stream_job_status: PbStreamJobStatus::Creating.into(),
466        initialized_at_cluster_version: None,
467        created_at_cluster_version: None,
468        create_type: create_type.into(),
469    };
470
471    let ctx = plan.ctx();
472    let explain_trace = ctx.is_explain_trace();
473    if explain_trace {
474        ctx.trace("Create Index:");
475        ctx.trace(plan.explain_to_string());
476    }
477
478    Ok((plan, index_table, index_prost))
479}
480
481fn build_index_item(
482    index_table: &TableCatalog,
483    primary_table_name: &str,
484    primary_table: &TableCatalog,
485    index_columns: Vec<(ExprImpl, OrderType)>,
486) -> Vec<risingwave_pb::expr::ExprNode> {
487    let primary_table_desc_map = primary_table
488        .columns
489        .iter()
490        .enumerate()
491        .map(|(x, y)| (y.name.clone(), x))
492        .collect::<HashMap<_, _>>();
493
494    let primary_table_name_prefix = format!("{}.", primary_table_name);
495
496    let index_columns_len = index_columns.len();
497    index_columns
498        .into_iter()
499        .map(|(expr, _)| expr.to_expr_proto())
500        .chain(
501            index_table
502                .columns
503                .iter()
504                .map(|c| &c.column_desc)
505                .skip(index_columns_len)
506                .map(|x| {
507                    let name = if x.name.starts_with(&primary_table_name_prefix) {
508                        x.name[primary_table_name_prefix.len()..].to_string()
509                    } else {
510                        x.name.clone()
511                    };
512
513                    let column_index = *primary_table_desc_map.get(&name).unwrap();
514                    InputRef {
515                        index: column_index,
516                        data_type: primary_table
517                            .columns
518                            .get(column_index)
519                            .unwrap()
520                            .data_type
521                            .clone(),
522                    }
523                    .to_expr_proto()
524                }),
525        )
526        .collect_vec()
527}
528
529fn assemble_input(
530    table_catalog: Arc<TableCatalog>,
531    context: OptimizerContextRef,
532    index_columns: &[(ExprImpl, OrderType)],
533    include_columns: &[ExprImpl],
534    required_dist: RequiredDist,
535) -> Result<LogicalPlanRoot> {
536    // Build logical plan and then call gen_create_index_plan
537    // LogicalProject(index_columns, include_columns)
538    //   LogicalScan(table_desc)
539
540    let logical_scan = LogicalScan::create(table_catalog.clone(), context, None);
541
542    let exprs = index_columns
543        .iter()
544        .map(|(expr, _)| expr.clone())
545        .chain(include_columns.iter().cloned())
546        .collect_vec();
547
548    let logical_project = LogicalProject::create(logical_scan.into(), exprs);
549    let mut project_required_cols = FixedBitSet::with_capacity(logical_project.schema().len());
550    project_required_cols.toggle_range(0..logical_project.schema().len());
551
552    let mut col_names = HashSet::new();
553    let mut count = 0;
554
555    let out_names: Vec<String> = index_columns
556        .iter()
557        .map(|(expr, _)| match expr {
558            ExprImpl::InputRef(input_ref) => table_catalog
559                .columns()
560                .get(input_ref.index)
561                .unwrap()
562                .name()
563                .to_owned(),
564            ExprImpl::FunctionCall(func) => {
565                let func_name = func.func_type().as_str_name().to_owned();
566                let mut name = func_name.clone();
567                while !col_names.insert(name.clone()) {
568                    count += 1;
569                    name = format!("{}{}", func_name, count);
570                }
571                name
572            }
573            _ => unreachable!(),
574        })
575        .chain(include_columns.iter().map(|expr| {
576            match expr {
577                ExprImpl::InputRef(input_ref) => table_catalog
578                    .columns()
579                    .get(input_ref.index)
580                    .unwrap()
581                    .name()
582                    .to_owned(),
583                _ => unreachable!(),
584            }
585        }))
586        .collect_vec();
587
588    Ok(PlanRoot::new_with_logical_plan(
589        logical_project,
590        required_dist,
591        Order::new(
592            index_columns
593                .iter()
594                .enumerate()
595                .map(|(i, (_, order))| ColumnOrder::new(i, *order))
596                .collect(),
597        ),
598        project_required_cols,
599        out_names,
600    ))
601}
602
603/// Note: distributed by columns must be a prefix of index columns, so we just use
604/// `distributed_by_columns_len` to represent distributed by columns
605fn assemble_materialize(
606    database_id: DatabaseId,
607    schema_id: SchemaId,
608    table_catalog: Arc<TableCatalog>,
609    context: OptimizerContextRef,
610    index_name: String,
611    index_columns: &[(ExprImpl, OrderType)],
612    include_columns: &[ExprImpl],
613    distributed_by_columns_len: usize,
614) -> Result<StreamMaterialize> {
615    let input = assemble_input(
616        table_catalog.clone(),
617        context.clone(),
618        index_columns,
619        include_columns,
620        // schema of logical_project is such that index columns come first.
621        // so we can use distributed_by_columns_len to represent distributed by columns indices.
622        RequiredDist::PhysicalDist(Distribution::HashShard(
623            (0..distributed_by_columns_len).collect(),
624        )),
625    )?;
626
627    let definition = context.normalized_sql().to_owned();
628    let retention_seconds = table_catalog.retention_seconds.and_then(NonZeroU32::new);
629
630    input.gen_index_plan(
631        index_name,
632        database_id,
633        schema_id,
634        definition,
635        retention_seconds,
636    )
637}
638
639pub async fn handle_create_index(
640    handler_args: HandlerArgs,
641    if_not_exists: bool,
642    index_name: ObjectName,
643    table_name: ObjectName,
644    method: Option<Ident>,
645    columns: Vec<OrderByExpr>,
646    include: Vec<Ident>,
647    distributed_by: Vec<ast::Expr>,
648) -> Result<RwPgResponse> {
649    let session = handler_args.session.clone();
650
651    let (graph, index_table, index) = {
652        let (schema_name, table, index_table_name) =
653            resolve_index_schema(&session, index_name, table_name)?;
654        let qualified_index_name = ObjectName(vec![
655            Ident::from_real_value(&schema_name),
656            Ident::from_real_value(&index_table_name),
657        ]);
658        if let Either::Right(resp) = session.check_relation_name_duplicated(
659            qualified_index_name,
660            StatementType::CREATE_INDEX,
661            if_not_exists,
662        )? {
663            return Ok(resp);
664        }
665
666        let context = OptimizerContext::from_handler_args(handler_args);
667        let (plan, index_table, index) = gen_create_index_plan(
668            &session,
669            context.into(),
670            schema_name,
671            table,
672            index_table_name,
673            method,
674            columns,
675            include,
676            distributed_by,
677        )?;
678        let graph = build_graph(plan, Some(GraphJobType::Index))?;
679
680        (graph, index_table, index)
681    };
682
683    tracing::trace!(
684        "name={}, graph=\n{}",
685        index.name,
686        serde_json::to_string_pretty(&graph).unwrap()
687    );
688
689    let _job_guard =
690        session
691            .env()
692            .creating_streaming_job_tracker()
693            .guard(CreatingStreamingJobInfo::new(
694                session.session_id(),
695                index.database_id,
696                index.schema_id,
697                index.name.clone(),
698            ));
699
700    let catalog_writer = session.catalog_writer()?;
701    catalog_writer
702        .create_index(index, index_table.to_prost(), graph, if_not_exists)
703        .await?;
704
705    Ok(PgResponse::empty_result(StatementType::CREATE_INDEX))
706}