risingwave_frontend/handler/
create_index.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::num::NonZeroU32;
17use std::rc::Rc;
18use std::sync::Arc;
19
20use either::Either;
21use fixedbitset::FixedBitSet;
22use itertools::Itertools;
23use pgwire::pg_response::{PgResponse, StatementType};
24use risingwave_common::catalog::{IndexId, TableDesc, TableId};
25use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
26use risingwave_pb::catalog::{PbIndex, PbIndexColumnProperties, PbStreamJobStatus};
27use risingwave_sqlparser::ast;
28use risingwave_sqlparser::ast::{Ident, ObjectName, OrderByExpr};
29
30use super::RwPgResponse;
31use crate::TableCatalog;
32use crate::binder::Binder;
33use crate::catalog::root_catalog::SchemaPath;
34use crate::catalog::{DatabaseId, SchemaId};
35use crate::error::{ErrorCode, Result};
36use crate::expr::{Expr, ExprImpl, ExprRewriter, InputRef};
37use crate::handler::HandlerArgs;
38use crate::optimizer::plan_expr_rewriter::ConstEvalRewriter;
39use crate::optimizer::plan_node::{Explain, LogicalProject, LogicalScan, StreamMaterialize};
40use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
41use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot};
42use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
43use crate::session::SessionImpl;
44use crate::stream_fragmenter::{GraphJobType, build_graph};
45
46pub(crate) fn resolve_index_schema(
47    session: &SessionImpl,
48    index_name: ObjectName,
49    table_name: ObjectName,
50) -> Result<(String, Arc<TableCatalog>, String)> {
51    let db_name = &session.database();
52    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, table_name)?;
53    let search_path = session.config().search_path();
54    let user_name = &session.user_name();
55    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
56
57    let index_table_name = Binder::resolve_index_name(index_name)?;
58
59    let catalog_reader = session.env().catalog_reader();
60    let read_guard = catalog_reader.read_guard();
61    let (table, schema_name) =
62        read_guard.get_created_table_by_name(db_name, schema_path, &table_name)?;
63    Ok((schema_name.to_owned(), table.clone(), index_table_name))
64}
65
66pub(crate) fn gen_create_index_plan(
67    session: &SessionImpl,
68    context: OptimizerContextRef,
69    schema_name: String,
70    table: Arc<TableCatalog>,
71    index_table_name: String,
72    columns: Vec<OrderByExpr>,
73    include: Vec<Ident>,
74    distributed_by: Vec<ast::Expr>,
75) -> Result<(PlanRef, TableCatalog, PbIndex)> {
76    let table_name = table.name.clone();
77
78    if table.is_index() {
79        return Err(
80            ErrorCode::InvalidInputSyntax(format!("\"{}\" is an index", table.name)).into(),
81        );
82    }
83
84    if !session.is_super_user() && session.user_id() != table.owner {
85        return Err(ErrorCode::PermissionDenied(format!(
86            "must be owner of table \"{}\"",
87            table.name
88        ))
89        .into());
90    }
91
92    let mut binder = Binder::new_for_stream(session);
93    binder.bind_table(Some(&schema_name), &table_name)?;
94
95    let mut index_columns_ordered_expr = vec![];
96    let mut include_columns_expr = vec![];
97    let mut distributed_columns_expr = vec![];
98    for column in columns {
99        let order_type = OrderType::from_bools(column.asc, column.nulls_first);
100        let expr_impl = binder.bind_expr(column.expr)?;
101        // Do constant folding and timezone transportation on expressions so that batch queries can match it in the same form.
102        let mut const_eval = ConstEvalRewriter { error: None };
103        let expr_impl = const_eval.rewrite_expr(expr_impl);
104        let expr_impl = context.session_timezone().rewrite_expr(expr_impl);
105        match expr_impl {
106            ExprImpl::InputRef(_) => {}
107            ExprImpl::FunctionCall(_) => {
108                if expr_impl.is_impure() {
109                    return Err(ErrorCode::NotSupported(
110                        "this expression is impure".into(),
111                        "use a pure expression instead".into(),
112                    )
113                    .into());
114                }
115            }
116            _ => {
117                return Err(ErrorCode::NotSupported(
118                    "index columns should be columns or expressions".into(),
119                    "use columns or expressions instead".into(),
120                )
121                .into());
122            }
123        }
124        index_columns_ordered_expr.push((expr_impl, order_type));
125    }
126
127    if include.is_empty() {
128        // Create index to include all (non-hidden) columns by default.
129        include_columns_expr = table
130            .columns()
131            .iter()
132            .enumerate()
133            .filter(|(_, column)| !column.is_hidden)
134            .map(|(x, column)| {
135                ExprImpl::InputRef(InputRef::new(x, column.column_desc.data_type.clone()).into())
136            })
137            .collect_vec();
138    } else {
139        for column in include {
140            let expr_impl =
141                binder.bind_expr(risingwave_sqlparser::ast::Expr::Identifier(column))?;
142            include_columns_expr.push(expr_impl);
143        }
144    };
145
146    for column in distributed_by {
147        let expr_impl = binder.bind_expr(column)?;
148        distributed_columns_expr.push(expr_impl);
149    }
150
151    let table_desc = Rc::new(table.table_desc());
152
153    // Remove duplicate column of index columns
154    let mut set = HashSet::new();
155    index_columns_ordered_expr = index_columns_ordered_expr
156        .into_iter()
157        .filter(|(expr, _)| match expr {
158            ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
159            ExprImpl::FunctionCall(_) => true,
160            _ => unreachable!(),
161        })
162        .collect_vec();
163
164    // Remove include columns are already in index columns
165    include_columns_expr = include_columns_expr
166        .into_iter()
167        .filter(|expr| match expr {
168            ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
169            _ => unreachable!(),
170        })
171        .collect_vec();
172
173    // Remove duplicate columns of distributed by columns
174    let mut set = HashSet::new();
175    let distributed_columns_expr = distributed_columns_expr
176        .into_iter()
177        .filter(|expr| match expr {
178            ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
179            ExprImpl::FunctionCall(_) => true,
180            _ => unreachable!(),
181        })
182        .collect_vec();
183    // Distributed by columns should be a prefix of index columns
184    if !index_columns_ordered_expr
185        .iter()
186        .map(|(expr, _)| expr.clone())
187        .collect_vec()
188        .starts_with(&distributed_columns_expr)
189    {
190        return Err(ErrorCode::InvalidInputSyntax(
191            "Distributed by columns should be a prefix of index columns".to_owned(),
192        )
193        .into());
194    }
195
196    let (index_database_id, index_schema_id) =
197        session.get_database_and_schema_id_for_create(Some(schema_name))?;
198
199    // Manually assemble the materialization plan for the index MV.
200    let materialize = assemble_materialize(
201        table_name,
202        index_database_id,
203        index_schema_id,
204        table.clone(),
205        context,
206        index_table_name.clone(),
207        &index_columns_ordered_expr,
208        &include_columns_expr,
209        // We use the first index column as distributed key by default if users
210        // haven't specified the distributed by columns.
211        if distributed_columns_expr.is_empty() {
212            1
213        } else {
214            distributed_columns_expr.len()
215        },
216        table.cardinality,
217    )?;
218
219    let mut index_table = materialize.table().clone();
220    {
221        // Inherit table properties
222        index_table.retention_seconds = table.retention_seconds;
223    }
224
225    index_table.owner = table.owner;
226
227    let index_columns_len = index_columns_ordered_expr.len() as u32;
228    let index_column_properties = index_columns_ordered_expr
229        .iter()
230        .map(|(_, order)| PbIndexColumnProperties {
231            is_desc: order.is_descending(),
232            nulls_first: order.nulls_are_first(),
233        })
234        .collect();
235    let index_item = build_index_item(
236        &index_table,
237        table.name(),
238        table_desc,
239        index_columns_ordered_expr,
240    );
241
242    let index_prost = PbIndex {
243        id: IndexId::placeholder().index_id,
244        schema_id: index_schema_id,
245        database_id: index_database_id,
246        name: index_table_name,
247        owner: index_table.owner,
248        index_table_id: TableId::placeholder().table_id,
249        primary_table_id: table.id.table_id,
250        index_item,
251        index_column_properties,
252        index_columns_len,
253        initialized_at_epoch: None,
254        created_at_epoch: None,
255        stream_job_status: PbStreamJobStatus::Creating.into(),
256        initialized_at_cluster_version: None,
257        created_at_cluster_version: None,
258    };
259
260    let plan: PlanRef = materialize.into();
261    let ctx = plan.ctx();
262    let explain_trace = ctx.is_explain_trace();
263    if explain_trace {
264        ctx.trace("Create Index:");
265        ctx.trace(plan.explain_to_string());
266    }
267
268    Ok((plan, index_table, index_prost))
269}
270
271fn build_index_item(
272    index_table: &TableCatalog,
273    primary_table_name: &str,
274    primary_table_desc: Rc<TableDesc>,
275    index_columns: Vec<(ExprImpl, OrderType)>,
276) -> Vec<risingwave_pb::expr::ExprNode> {
277    let primary_table_desc_map = primary_table_desc
278        .columns
279        .iter()
280        .enumerate()
281        .map(|(x, y)| (y.name.clone(), x))
282        .collect::<HashMap<_, _>>();
283
284    let primary_table_name_prefix = format!("{}.", primary_table_name);
285
286    let index_columns_len = index_columns.len();
287    index_columns
288        .into_iter()
289        .map(|(expr, _)| expr.to_expr_proto())
290        .chain(
291            index_table
292                .columns
293                .iter()
294                .map(|c| &c.column_desc)
295                .skip(index_columns_len)
296                .map(|x| {
297                    let name = if x.name.starts_with(&primary_table_name_prefix) {
298                        x.name[primary_table_name_prefix.len()..].to_string()
299                    } else {
300                        x.name.clone()
301                    };
302
303                    let column_index = *primary_table_desc_map.get(&name).unwrap();
304                    InputRef {
305                        index: column_index,
306                        data_type: primary_table_desc
307                            .columns
308                            .get(column_index)
309                            .unwrap()
310                            .data_type
311                            .clone(),
312                    }
313                    .to_expr_proto()
314                }),
315        )
316        .collect_vec()
317}
318
319/// Note: distributed by columns must be a prefix of index columns, so we just use
320/// `distributed_by_columns_len` to represent distributed by columns
321fn assemble_materialize(
322    table_name: String,
323    database_id: DatabaseId,
324    schema_id: SchemaId,
325    table_catalog: Arc<TableCatalog>,
326    context: OptimizerContextRef,
327    index_name: String,
328    index_columns: &[(ExprImpl, OrderType)],
329    include_columns: &[ExprImpl],
330    distributed_by_columns_len: usize,
331    cardinality: Cardinality,
332) -> Result<StreamMaterialize> {
333    // Build logical plan and then call gen_create_index_plan
334    // LogicalProject(index_columns, include_columns)
335    //   LogicalScan(table_desc)
336
337    let definition = context.normalized_sql().to_owned();
338    let retention_seconds = table_catalog.retention_seconds.and_then(NonZeroU32::new);
339
340    let logical_scan = LogicalScan::create(
341        table_name,
342        table_catalog.clone(),
343        // Index table has no indexes.
344        vec![],
345        context,
346        None,
347        cardinality,
348    );
349
350    let exprs = index_columns
351        .iter()
352        .map(|(expr, _)| expr.clone())
353        .chain(include_columns.iter().cloned())
354        .collect_vec();
355
356    let logical_project = LogicalProject::create(logical_scan.into(), exprs);
357    let mut project_required_cols = FixedBitSet::with_capacity(logical_project.schema().len());
358    project_required_cols.toggle_range(0..logical_project.schema().len());
359
360    let mut col_names = HashSet::new();
361    let mut count = 0;
362
363    let out_names: Vec<String> = index_columns
364        .iter()
365        .map(|(expr, _)| match expr {
366            ExprImpl::InputRef(input_ref) => table_catalog
367                .columns()
368                .get(input_ref.index)
369                .unwrap()
370                .name()
371                .to_owned(),
372            ExprImpl::FunctionCall(func) => {
373                let func_name = func.func_type().as_str_name().to_owned();
374                let mut name = func_name.clone();
375                while !col_names.insert(name.clone()) {
376                    count += 1;
377                    name = format!("{}{}", func_name, count);
378                }
379                name
380            }
381            _ => unreachable!(),
382        })
383        .chain(include_columns.iter().map(|expr| {
384            match expr {
385                ExprImpl::InputRef(input_ref) => table_catalog
386                    .columns()
387                    .get(input_ref.index)
388                    .unwrap()
389                    .name()
390                    .to_owned(),
391                _ => unreachable!(),
392            }
393        }))
394        .collect_vec();
395
396    PlanRoot::new_with_logical_plan(
397        logical_project,
398        // schema of logical_project is such that index columns come first.
399        // so we can use distributed_by_columns_len to represent distributed by columns indices.
400        RequiredDist::PhysicalDist(Distribution::HashShard(
401            (0..distributed_by_columns_len).collect(),
402        )),
403        Order::new(
404            index_columns
405                .iter()
406                .enumerate()
407                .map(|(i, (_, order))| ColumnOrder::new(i, *order))
408                .collect(),
409        ),
410        project_required_cols,
411        out_names,
412    )
413    .gen_index_plan(
414        index_name,
415        database_id,
416        schema_id,
417        definition,
418        retention_seconds,
419    )
420}
421
422pub async fn handle_create_index(
423    handler_args: HandlerArgs,
424    if_not_exists: bool,
425    index_name: ObjectName,
426    table_name: ObjectName,
427    columns: Vec<OrderByExpr>,
428    include: Vec<Ident>,
429    distributed_by: Vec<ast::Expr>,
430) -> Result<RwPgResponse> {
431    let session = handler_args.session.clone();
432
433    let (graph, index_table, index) = {
434        let (schema_name, table, index_table_name) =
435            resolve_index_schema(&session, index_name, table_name)?;
436        let qualified_index_name = ObjectName(vec![
437            Ident::from_real_value(&schema_name),
438            Ident::from_real_value(&index_table_name),
439        ]);
440        if let Either::Right(resp) = session.check_relation_name_duplicated(
441            qualified_index_name,
442            StatementType::CREATE_INDEX,
443            if_not_exists,
444        )? {
445            return Ok(resp);
446        }
447
448        let context = OptimizerContext::from_handler_args(handler_args);
449        let (plan, index_table, index) = gen_create_index_plan(
450            &session,
451            context.into(),
452            schema_name,
453            table,
454            index_table_name,
455            columns,
456            include,
457            distributed_by,
458        )?;
459        let graph = build_graph(plan, Some(GraphJobType::Index))?;
460
461        (graph, index_table, index)
462    };
463
464    tracing::trace!(
465        "name={}, graph=\n{}",
466        index.name,
467        serde_json::to_string_pretty(&graph).unwrap()
468    );
469
470    let _job_guard =
471        session
472            .env()
473            .creating_streaming_job_tracker()
474            .guard(CreatingStreamingJobInfo::new(
475                session.session_id(),
476                index.database_id,
477                index.schema_id,
478                index.name.clone(),
479            ));
480
481    let catalog_writer = session.catalog_writer()?;
482    catalog_writer
483        .create_index(index, index_table.to_prost(), graph, if_not_exists)
484        .await?;
485
486    Ok(PgResponse::empty_result(StatementType::CREATE_INDEX))
487}