Skip to main content

risingwave_frontend/handler/
create_index.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::num::NonZeroU32;
17use std::sync::Arc;
18
19use either::Either;
20use fixedbitset::FixedBitSet;
21use itertools::Itertools;
22use pgwire::pg_response::{PgResponse, StatementType};
23use risingwave_common::catalog::{IndexId, TableId};
24use risingwave_common::types::DataType;
25use risingwave_common::util::recursive::{Recurse, tracker};
26use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
27use risingwave_pb::catalog::{
28    CreateType, PbFlatIndexConfig, PbHnswFlatIndexConfig, PbIndex, PbIndexColumnProperties,
29    PbStreamJobStatus, PbVectorIndexInfo,
30};
31use risingwave_pb::common::PbDistanceType;
32use risingwave_sqlparser::ast;
33use risingwave_sqlparser::ast::{Ident, ObjectName, OrderByExpr};
34use thiserror_ext::AsReport;
35
36use super::RwPgResponse;
37use crate::TableCatalog;
38use crate::binder::Binder;
39use crate::catalog::root_catalog::SchemaPath;
40use crate::catalog::{DatabaseId, SchemaId};
41use crate::error::{ErrorCode, Result};
42use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, InputRef, is_impure_func_call};
43use crate::handler::HandlerArgs;
44use crate::handler::create_mv::resolve_streaming_job_resource_type;
45use crate::handler::util::{
46    LongRunningNotificationAction, execute_with_long_running_notification,
47    reject_internal_table_dependency,
48};
49use crate::optimizer::plan_expr_rewriter::ConstEvalRewriter;
50use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
51use crate::optimizer::plan_node::{
52    Explain, LogicalProject, LogicalScan, StreamMaterialize, StreamPlanRef as PlanRef,
53    ensure_sync_log_store_fragment_root,
54};
55use crate::optimizer::property::{Distribution, Order, RequiredDist};
56use crate::optimizer::{LogicalPlanRoot, OptimizerContext, OptimizerContextRef, PlanRoot};
57use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
58use crate::session::SessionImpl;
59use crate::session::current::notice_to_user;
60use crate::stream_fragmenter::{GraphJobType, build_graph};
61
62pub(crate) fn resolve_index_schema(
63    session: &SessionImpl,
64    index_name: ObjectName,
65    table_name: ObjectName,
66) -> Result<(String, Arc<TableCatalog>, String)> {
67    let db_name = &session.database();
68    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
69    let search_path = session.config().search_path();
70    let user_name = &session.user_name();
71    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
72
73    let index_table_name = Binder::resolve_index_name(index_name)?;
74
75    let catalog_reader = session.env().catalog_reader();
76    let read_guard = catalog_reader.read_guard();
77    let (table, schema_name) =
78        read_guard.get_created_table_by_name(db_name, schema_path, &table_name)?;
79    reject_internal_table_dependency(table.as_ref(), "CREATE INDEX")?;
80    Ok((schema_name.to_owned(), table.clone(), index_table_name))
81}
82
83pub(crate) struct IndexColumnExprValidator {
84    allow_impure: bool,
85    result: Result<()>,
86}
87
88impl IndexColumnExprValidator {
89    fn unsupported_expr_err(expr: &ExprImpl) -> ErrorCode {
90        ErrorCode::NotSupported(
91            format!("unsupported index column expression type: {:?}", expr),
92            "use columns or expressions instead".into(),
93        )
94    }
95
96    pub(crate) fn validate(expr: &ExprImpl, allow_impure: bool) -> Result<()> {
97        match expr {
98            ExprImpl::InputRef(_) | ExprImpl::FunctionCall(_) => {}
99            other_expr => {
100                return Err(Self::unsupported_expr_err(other_expr).into());
101            }
102        }
103        let mut visitor = Self {
104            allow_impure,
105            result: Ok(()),
106        };
107        visitor.visit_expr(expr);
108        visitor.result
109    }
110}
111
112impl ExprVisitor for IndexColumnExprValidator {
113    fn visit_expr(&mut self, expr: &ExprImpl) {
114        if self.result.is_err() {
115            return;
116        }
117        tracker!().recurse(|t| {
118            if t.depth_reaches(crate::expr::EXPR_DEPTH_THRESHOLD) {
119                notice_to_user(crate::expr::EXPR_TOO_DEEP_NOTICE);
120            }
121
122            match expr {
123                ExprImpl::InputRef(_) | ExprImpl::Literal(_) => {}
124                ExprImpl::FunctionCall(inner) => {
125                    if !self.allow_impure && is_impure_func_call(inner) {
126                        self.result = Err(ErrorCode::NotSupported(
127                            "this expression is impure".into(),
128                            "use a pure expression instead".into(),
129                        )
130                        .into());
131                        return;
132                    }
133                    self.visit_function_call(inner)
134                }
135                other_expr => {
136                    self.result = Err(Self::unsupported_expr_err(other_expr).into());
137                }
138            }
139        })
140    }
141}
142
143pub(crate) fn gen_create_index_plan(
144    session: &SessionImpl,
145    context: OptimizerContextRef,
146    schema_name: String,
147    table: Arc<TableCatalog>,
148    index_table_name: String,
149    method: Option<Ident>,
150    columns: Vec<OrderByExpr>,
151    include: Vec<Ident>,
152    distributed_by: Vec<ast::Expr>,
153) -> Result<(PlanRef, TableCatalog, PbIndex)> {
154    if table.is_index() {
155        return Err(
156            ErrorCode::InvalidInputSyntax(format!("\"{}\" is an index", table.name)).into(),
157        );
158    }
159
160    if !session.is_super_user() && session.user_id() != table.owner {
161        return Err(ErrorCode::PermissionDenied(format!(
162            "must be owner of table \"{}\"",
163            table.name
164        ))
165        .into());
166    }
167
168    let vector_index_config = if let Some(method) = method {
169        match method.real_value().to_ascii_lowercase().as_str() {
170            "default" => None,
171            "flat" => Some(risingwave_pb::catalog::vector_index_info::PbConfig::Flat(
172                PbFlatIndexConfig {},
173            )),
174            "hnsw" => {
175                let with_options = context.with_options();
176                let parse_non_zero_u32 = |key: &str, default| {
177                    with_options
178                        .get(key)
179                        .map(|v| {
180                            let v = v.parse::<u32>().map_err(|e| {
181                                ErrorCode::InvalidInputSyntax(format!(
182                                    "invalid {} value {}: failed to parse {}",
183                                    key,
184                                    v,
185                                    e.as_report()
186                                ))
187                            })?;
188                            if v == 0 {
189                                Err(ErrorCode::InvalidInputSyntax(format!(
190                                    "invalid {} value {}: should not be zero",
191                                    key, v
192                                )))
193                            } else {
194                                Ok(v)
195                            }
196                        })
197                        .transpose()
198                        .map(|v| v.unwrap_or(default))
199                };
200                Some(
201                    risingwave_pb::catalog::vector_index_info::PbConfig::HnswFlat(
202                        PbHnswFlatIndexConfig {
203                            m: parse_non_zero_u32("m", 16)?, /* default value borrowed from pg_vector */
204                            ef_construction: parse_non_zero_u32("ef_construction", 64)?, /* default value borrowed from pg_vector */
205                            max_level: parse_non_zero_u32("max_level", 10)?,
206                        },
207                    ),
208                )
209            }
210            _ => {
211                return Err(ErrorCode::InvalidInputSyntax(format!(
212                    "invalid index method {}",
213                    method
214                ))
215                .into());
216            }
217        }
218    } else {
219        None
220    };
221
222    let is_vector_index = vector_index_config.is_some();
223
224    if is_vector_index && !table.append_only {
225        return Err(ErrorCode::InvalidInputSyntax(format!(
226            "\"{}\" is not append-only but vector index must be append-only",
227            table.name
228        ))
229        .into());
230    }
231
232    let mut binder = Binder::new_for_stream(session);
233    binder.bind_table(Some(&schema_name), &table.name)?;
234
235    let mut index_columns_ordered_expr = vec![];
236    let mut include_columns_expr = vec![];
237    let mut distributed_columns_expr = vec![];
238    if is_vector_index && columns.len() != 1 {
239        return Err(ErrorCode::InvalidInputSyntax(format!(
240            "vector index must be defined on exactly 1 column, but got {}",
241            columns.len()
242        ))
243        .into());
244    }
245    let mut dimension = None;
246    for column in columns {
247        if is_vector_index && (column.asc.is_some() || column.nulls_first.is_some()) {
248            return Err(ErrorCode::InvalidInputSyntax(
249                "vector index cannot specify order".to_owned(),
250            )
251            .into());
252        }
253        let order_type = OrderType::from_bools(column.asc, column.nulls_first);
254        let expr_impl = binder.bind_expr(&column.expr)?;
255        // Do constant folding and timezone transportation on expressions so that batch queries can match it in the same form.
256        let mut const_eval = ConstEvalRewriter { error: None };
257        let expr_impl = const_eval.rewrite_expr(expr_impl);
258        let expr_impl = context.session_timezone().rewrite_expr(expr_impl);
259        let allow_impure = is_vector_index;
260        IndexColumnExprValidator::validate(&expr_impl, allow_impure)?;
261        if is_vector_index {
262            match expr_impl.return_type() {
263                DataType::Vector(d) => {
264                    dimension = Some(d);
265                }
266                other => {
267                    return Err(ErrorCode::InvalidInputSyntax(format!(
268                        "vector index must be defined on column of Vector type, but got {:?}",
269                        other
270                    ))
271                    .into());
272                }
273            }
274        }
275        index_columns_ordered_expr.push((expr_impl, order_type));
276    }
277
278    if include.is_empty() {
279        // Create index to include all (non-hidden) columns by default.
280        include_columns_expr = table
281            .columns()
282            .iter()
283            .enumerate()
284            .filter(|(_, column)| !column.is_hidden)
285            .map(|(x, column)| {
286                ExprImpl::InputRef(InputRef::new(x, column.column_desc.data_type.clone()).into())
287            })
288            .collect_vec();
289    } else {
290        for column in include {
291            let expr_impl =
292                binder.bind_expr(&risingwave_sqlparser::ast::Expr::Identifier(column))?;
293            include_columns_expr.push(expr_impl);
294        }
295    };
296
297    if is_vector_index && !distributed_by.is_empty() {
298        return Err(ErrorCode::InvalidInputSyntax(
299            "vector index cannot define DISTRIBUTED BY".to_owned(),
300        )
301        .into());
302    }
303
304    for column in distributed_by {
305        let expr_impl = binder.bind_expr(&column)?;
306        distributed_columns_expr.push(expr_impl);
307    }
308
309    // Remove duplicate column of index columns
310    let mut set = HashSet::new();
311    index_columns_ordered_expr = index_columns_ordered_expr
312        .into_iter()
313        .filter(|(expr, _)| match expr {
314            ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
315            ExprImpl::FunctionCall(_) => true,
316            _ => unreachable!(),
317        })
318        .collect_vec();
319
320    // Remove include columns are already in index columns
321    include_columns_expr = include_columns_expr
322        .into_iter()
323        .filter(|expr| match expr {
324            ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
325            _ => unreachable!(),
326        })
327        .collect_vec();
328
329    // Remove duplicate columns of distributed by columns
330    let mut set = HashSet::new();
331    let distributed_columns_expr = distributed_columns_expr
332        .into_iter()
333        .filter(|expr| match expr {
334            ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
335            ExprImpl::FunctionCall(_) => true,
336            _ => unreachable!(),
337        })
338        .collect_vec();
339    // Distributed by columns should be a prefix of index columns
340    if !index_columns_ordered_expr
341        .iter()
342        .map(|(expr, _)| expr.clone())
343        .collect_vec()
344        .starts_with(&distributed_columns_expr)
345    {
346        return Err(ErrorCode::InvalidInputSyntax(
347            "Distributed by columns should be a prefix of index columns".to_owned(),
348        )
349        .into());
350    }
351
352    let (index_database_id, index_schema_id) =
353        session.get_database_and_schema_id_for_create(Some(schema_name))?;
354
355    let (plan, mut index_table) = if let Some(vector_index_config) = vector_index_config {
356        assert!(distributed_columns_expr.is_empty());
357        assert_eq!(1, index_columns_ordered_expr.len());
358        let input = assemble_input(
359            table.clone(),
360            context.clone(),
361            &index_columns_ordered_expr,
362            &include_columns_expr,
363            RequiredDist::PhysicalDist(Distribution::Single),
364        )?;
365        let definition = context.normalized_sql().to_owned();
366        let retention_seconds = table.retention_seconds.and_then(NonZeroU32::new);
367
368        let distance_type = match context
369            .with_options()
370            .get("distance_type")
371            .ok_or_else(|| {
372                ErrorCode::InvalidInputSyntax(
373                    "vector index missing `distance_type` in with options".to_owned(),
374                )
375            })?
376            .to_ascii_lowercase()
377            .as_str()
378        {
379            "l1" => PbDistanceType::L1,
380            "l2" => PbDistanceType::L2Sqr,
381            "inner_product" => PbDistanceType::InnerProduct,
382            "cosine" => PbDistanceType::Cosine,
383            other => {
384                return Err(ErrorCode::InvalidInputSyntax(format!(
385                    "unsupported vector index distance type: {}",
386                    other
387                ))
388                .into());
389            }
390        };
391
392        let vector_index_write = input.gen_vector_index_plan(
393            index_table_name.clone(),
394            index_database_id,
395            index_schema_id,
396            definition,
397            retention_seconds,
398            PbVectorIndexInfo {
399                dimension: dimension.expect("should be set for vector index") as _,
400                config: Some(vector_index_config),
401                distance_type: distance_type as _,
402            },
403        )?;
404        let index_table = vector_index_write.table().clone();
405        let plan: PlanRef = ensure_sync_log_store_fragment_root(vector_index_write.into());
406        (plan, index_table)
407    } else {
408        // Manually assemble the materialization plan for the index MV.
409        let materialize = assemble_materialize(
410            index_database_id,
411            index_schema_id,
412            table.clone(),
413            context.clone(),
414            index_table_name.clone(),
415            &index_columns_ordered_expr,
416            &include_columns_expr,
417            // We use the first index column as distributed key by default if users
418            // haven't specified the distributed by columns.
419            if distributed_columns_expr.is_empty() {
420                1
421            } else {
422                distributed_columns_expr.len()
423            },
424        )?;
425        let index_table = materialize.table().clone();
426        let plan: PlanRef = ensure_sync_log_store_fragment_root(materialize.into());
427        (plan, index_table)
428    };
429
430    {
431        // Inherit table properties
432        index_table.retention_seconds = table.retention_seconds;
433    }
434
435    index_table.owner = table.owner;
436
437    let index_columns_len = index_columns_ordered_expr.len() as u32;
438    let index_column_properties = index_columns_ordered_expr
439        .iter()
440        .map(|(_, order)| PbIndexColumnProperties {
441            is_desc: order.is_descending(),
442            nulls_first: order.nulls_are_first(),
443        })
444        .collect();
445    let index_item = build_index_item(
446        &index_table,
447        table.name(),
448        &table,
449        index_columns_ordered_expr,
450    );
451
452    let create_type =
453        if context.session_ctx().config().background_ddl() && plan_can_use_background_ddl(&plan) {
454            CreateType::Background
455        } else {
456            CreateType::Foreground
457        };
458
459    let index_prost = PbIndex {
460        id: IndexId::placeholder(),
461        schema_id: index_schema_id,
462        database_id: index_database_id,
463        name: index_table_name,
464        owner: index_table.owner,
465        index_table_id: TableId::placeholder(),
466        primary_table_id: table.id,
467        index_item,
468        index_column_properties,
469        index_columns_len,
470        initialized_at_epoch: None,
471        created_at_epoch: None,
472        stream_job_status: PbStreamJobStatus::Creating.into(),
473        initialized_at_cluster_version: None,
474        created_at_cluster_version: None,
475        create_type: create_type.into(),
476    };
477
478    let ctx = plan.ctx();
479    let explain_trace = ctx.is_explain_trace();
480    if explain_trace {
481        ctx.trace("Create Index:");
482        ctx.trace(plan.explain_to_string());
483    }
484
485    Ok((plan, index_table, index_prost))
486}
487
488fn build_index_item(
489    index_table: &TableCatalog,
490    primary_table_name: &str,
491    primary_table: &TableCatalog,
492    index_columns: Vec<(ExprImpl, OrderType)>,
493) -> Vec<risingwave_pb::expr::ExprNode> {
494    let primary_table_desc_map = primary_table
495        .columns
496        .iter()
497        .enumerate()
498        .map(|(x, y)| (y.name.clone(), x))
499        .collect::<HashMap<_, _>>();
500
501    let primary_table_name_prefix = format!("{}.", primary_table_name);
502
503    let index_columns_len = index_columns.len();
504    index_columns
505        .into_iter()
506        .map(|(expr, _)| expr.to_expr_proto())
507        .chain(
508            index_table
509                .columns
510                .iter()
511                .map(|c| &c.column_desc)
512                .skip(index_columns_len)
513                .map(|x| {
514                    let name = if x.name.starts_with(&primary_table_name_prefix) {
515                        x.name[primary_table_name_prefix.len()..].to_string()
516                    } else {
517                        x.name.clone()
518                    };
519
520                    let column_index = *primary_table_desc_map.get(&name).unwrap();
521                    InputRef {
522                        index: column_index,
523                        data_type: primary_table
524                            .columns
525                            .get(column_index)
526                            .unwrap()
527                            .data_type
528                            .clone(),
529                    }
530                    .to_expr_proto()
531                }),
532        )
533        .collect_vec()
534}
535
536fn assemble_input(
537    table_catalog: Arc<TableCatalog>,
538    context: OptimizerContextRef,
539    index_columns: &[(ExprImpl, OrderType)],
540    include_columns: &[ExprImpl],
541    required_dist: RequiredDist,
542) -> Result<LogicalPlanRoot> {
543    // Build logical plan and then call gen_create_index_plan
544    // LogicalProject(index_columns, include_columns)
545    //   LogicalScan(table_desc)
546
547    let logical_scan = LogicalScan::create(table_catalog.clone(), context, None);
548
549    let exprs = index_columns
550        .iter()
551        .map(|(expr, _)| expr.clone())
552        .chain(include_columns.iter().cloned())
553        .collect_vec();
554
555    let logical_project = LogicalProject::create(logical_scan.into(), exprs);
556    let mut project_required_cols = FixedBitSet::with_capacity(logical_project.schema().len());
557    project_required_cols.toggle_range(0..logical_project.schema().len());
558
559    let mut col_names = HashSet::new();
560    let mut count = 0;
561
562    let out_names: Vec<String> = index_columns
563        .iter()
564        .map(|(expr, _)| match expr {
565            ExprImpl::InputRef(input_ref) => table_catalog
566                .columns()
567                .get(input_ref.index)
568                .unwrap()
569                .name()
570                .to_owned(),
571            ExprImpl::FunctionCall(func) => {
572                let func_name = func.func_type().as_str_name().to_owned();
573                let mut name = func_name.clone();
574                while !col_names.insert(name.clone()) {
575                    count += 1;
576                    name = format!("{}{}", func_name, count);
577                }
578                name
579            }
580            _ => unreachable!(),
581        })
582        .chain(include_columns.iter().map(|expr| {
583            match expr {
584                ExprImpl::InputRef(input_ref) => table_catalog
585                    .columns()
586                    .get(input_ref.index)
587                    .unwrap()
588                    .name()
589                    .to_owned(),
590                _ => unreachable!(),
591            }
592        }))
593        .collect_vec();
594
595    Ok(PlanRoot::new_with_logical_plan(
596        logical_project,
597        required_dist,
598        Order::new(
599            index_columns
600                .iter()
601                .enumerate()
602                .map(|(i, (_, order))| ColumnOrder::new(i, *order))
603                .collect(),
604        ),
605        project_required_cols,
606        out_names,
607    ))
608}
609
610/// Note: distributed by columns must be a prefix of index columns, so we just use
611/// `distributed_by_columns_len` to represent distributed by columns
612fn assemble_materialize(
613    database_id: DatabaseId,
614    schema_id: SchemaId,
615    table_catalog: Arc<TableCatalog>,
616    context: OptimizerContextRef,
617    index_name: String,
618    index_columns: &[(ExprImpl, OrderType)],
619    include_columns: &[ExprImpl],
620    distributed_by_columns_len: usize,
621) -> Result<StreamMaterialize> {
622    let input = assemble_input(
623        table_catalog.clone(),
624        context.clone(),
625        index_columns,
626        include_columns,
627        // schema of logical_project is such that index columns come first.
628        // so we can use distributed_by_columns_len to represent distributed by columns indices.
629        RequiredDist::PhysicalDist(Distribution::HashShard(
630            (0..distributed_by_columns_len).collect(),
631        )),
632    )?;
633
634    let definition = context.normalized_sql().to_owned();
635    let retention_seconds = table_catalog.retention_seconds.and_then(NonZeroU32::new);
636
637    input.gen_index_plan(
638        index_name,
639        database_id,
640        schema_id,
641        definition,
642        retention_seconds,
643    )
644}
645
646pub async fn handle_create_index(
647    mut handler_args: HandlerArgs,
648    if_not_exists: bool,
649    index_name: ObjectName,
650    table_name: ObjectName,
651    method: Option<Ident>,
652    columns: Vec<OrderByExpr>,
653    include: Vec<Ident>,
654    distributed_by: Vec<ast::Expr>,
655) -> Result<RwPgResponse> {
656    let session = handler_args.session.clone();
657    let resource_type =
658        resolve_streaming_job_resource_type(session.as_ref(), &mut handler_args.with_options)
659            .await?;
660
661    let (graph, index_table, index) = {
662        let (schema_name, table, index_table_name) =
663            resolve_index_schema(&session, index_name, table_name)?;
664        let qualified_index_name = ObjectName(vec![
665            Ident::from_real_value(&schema_name),
666            Ident::from_real_value(&index_table_name),
667        ]);
668        if let Either::Right(resp) = session.check_relation_name_duplicated(
669            qualified_index_name,
670            StatementType::CREATE_INDEX,
671            if_not_exists,
672        )? {
673            return Ok(resp);
674        }
675
676        let context = OptimizerContext::from_handler_args(handler_args);
677        let (plan, index_table, index) = gen_create_index_plan(
678            &session,
679            context.into(),
680            schema_name,
681            table,
682            index_table_name,
683            method,
684            columns,
685            include,
686            distributed_by,
687        )?;
688        let graph = build_graph(plan, Some(GraphJobType::Index))?;
689
690        (graph, index_table, index)
691    };
692
693    tracing::trace!(
694        "name={}, graph=\n{}",
695        index.name,
696        serde_json::to_string_pretty(&graph).unwrap()
697    );
698
699    let _job_guard =
700        session
701            .env()
702            .creating_streaming_job_tracker()
703            .guard(CreatingStreamingJobInfo::new(
704                session.session_id(),
705                index.database_id,
706                index.schema_id,
707                index.name.clone(),
708            ));
709
710    let catalog_writer = session.catalog_writer()?;
711    execute_with_long_running_notification(
712        catalog_writer.create_index(
713            index,
714            index_table.to_prost(),
715            graph,
716            resource_type,
717            if_not_exists,
718        ),
719        &session,
720        "CREATE INDEX",
721        LongRunningNotificationAction::MonitorBackfillJob,
722    )
723    .await?;
724
725    Ok(PgResponse::empty_result(StatementType::CREATE_INDEX))
726}