risingwave_frontend/handler/
create_index.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::num::NonZeroU32;
17use std::sync::Arc;
18
19use either::Either;
20use fixedbitset::FixedBitSet;
21use itertools::Itertools;
22use pgwire::pg_response::{PgResponse, StatementType};
23use risingwave_common::catalog::{IndexId, TableId};
24use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
25use risingwave_pb::catalog::{PbIndex, PbIndexColumnProperties, PbStreamJobStatus};
26use risingwave_sqlparser::ast;
27use risingwave_sqlparser::ast::{Ident, ObjectName, OrderByExpr};
28
29use super::RwPgResponse;
30use crate::TableCatalog;
31use crate::binder::Binder;
32use crate::catalog::root_catalog::SchemaPath;
33use crate::catalog::{DatabaseId, SchemaId};
34use crate::error::{ErrorCode, Result};
35use crate::expr::{Expr, ExprImpl, ExprRewriter, InputRef};
36use crate::handler::HandlerArgs;
37use crate::optimizer::plan_expr_rewriter::ConstEvalRewriter;
38use crate::optimizer::plan_node::{
39    Explain, LogicalProject, LogicalScan, StreamMaterialize, StreamPlanRef as PlanRef,
40};
41use crate::optimizer::property::{Distribution, Order, RequiredDist};
42use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRoot};
43use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
44use crate::session::SessionImpl;
45use crate::stream_fragmenter::{GraphJobType, build_graph};
46
47pub(crate) fn resolve_index_schema(
48    session: &SessionImpl,
49    index_name: ObjectName,
50    table_name: ObjectName,
51) -> Result<(String, Arc<TableCatalog>, String)> {
52    let db_name = &session.database();
53    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
54    let search_path = session.config().search_path();
55    let user_name = &session.user_name();
56    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
57
58    let index_table_name = Binder::resolve_index_name(index_name)?;
59
60    let catalog_reader = session.env().catalog_reader();
61    let read_guard = catalog_reader.read_guard();
62    let (table, schema_name) =
63        read_guard.get_created_table_by_name(db_name, schema_path, &table_name)?;
64    Ok((schema_name.to_owned(), table.clone(), index_table_name))
65}
66
67pub(crate) fn gen_create_index_plan(
68    session: &SessionImpl,
69    context: OptimizerContextRef,
70    schema_name: String,
71    table: Arc<TableCatalog>,
72    index_table_name: String,
73    columns: Vec<OrderByExpr>,
74    include: Vec<Ident>,
75    distributed_by: Vec<ast::Expr>,
76) -> Result<(PlanRef, TableCatalog, PbIndex)> {
77    if table.is_index() {
78        return Err(
79            ErrorCode::InvalidInputSyntax(format!("\"{}\" is an index", table.name)).into(),
80        );
81    }
82
83    if !session.is_super_user() && session.user_id() != table.owner {
84        return Err(ErrorCode::PermissionDenied(format!(
85            "must be owner of table \"{}\"",
86            table.name
87        ))
88        .into());
89    }
90
91    let mut binder = Binder::new_for_stream(session);
92    binder.bind_table(Some(&schema_name), &table.name)?;
93
94    let mut index_columns_ordered_expr = vec![];
95    let mut include_columns_expr = vec![];
96    let mut distributed_columns_expr = vec![];
97    for column in columns {
98        let order_type = OrderType::from_bools(column.asc, column.nulls_first);
99        let expr_impl = binder.bind_expr(&column.expr)?;
100        // Do constant folding and timezone transportation on expressions so that batch queries can match it in the same form.
101        let mut const_eval = ConstEvalRewriter { error: None };
102        let expr_impl = const_eval.rewrite_expr(expr_impl);
103        let expr_impl = context.session_timezone().rewrite_expr(expr_impl);
104        match expr_impl {
105            ExprImpl::InputRef(_) => {}
106            ExprImpl::FunctionCall(_) => {
107                if expr_impl.is_impure() {
108                    return Err(ErrorCode::NotSupported(
109                        "this expression is impure".into(),
110                        "use a pure expression instead".into(),
111                    )
112                    .into());
113                }
114            }
115            _ => {
116                return Err(ErrorCode::NotSupported(
117                    "index columns should be columns or expressions".into(),
118                    "use columns or expressions instead".into(),
119                )
120                .into());
121            }
122        }
123        index_columns_ordered_expr.push((expr_impl, order_type));
124    }
125
126    if include.is_empty() {
127        // Create index to include all (non-hidden) columns by default.
128        include_columns_expr = table
129            .columns()
130            .iter()
131            .enumerate()
132            .filter(|(_, column)| !column.is_hidden)
133            .map(|(x, column)| {
134                ExprImpl::InputRef(InputRef::new(x, column.column_desc.data_type.clone()).into())
135            })
136            .collect_vec();
137    } else {
138        for column in include {
139            let expr_impl =
140                binder.bind_expr(&risingwave_sqlparser::ast::Expr::Identifier(column))?;
141            include_columns_expr.push(expr_impl);
142        }
143    };
144
145    for column in distributed_by {
146        let expr_impl = binder.bind_expr(&column)?;
147        distributed_columns_expr.push(expr_impl);
148    }
149
150    // Remove duplicate column of index columns
151    let mut set = HashSet::new();
152    index_columns_ordered_expr = index_columns_ordered_expr
153        .into_iter()
154        .filter(|(expr, _)| match expr {
155            ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
156            ExprImpl::FunctionCall(_) => true,
157            _ => unreachable!(),
158        })
159        .collect_vec();
160
161    // Remove include columns are already in index columns
162    include_columns_expr = include_columns_expr
163        .into_iter()
164        .filter(|expr| match expr {
165            ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
166            _ => unreachable!(),
167        })
168        .collect_vec();
169
170    // Remove duplicate columns of distributed by columns
171    let mut set = HashSet::new();
172    let distributed_columns_expr = distributed_columns_expr
173        .into_iter()
174        .filter(|expr| match expr {
175            ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
176            ExprImpl::FunctionCall(_) => true,
177            _ => unreachable!(),
178        })
179        .collect_vec();
180    // Distributed by columns should be a prefix of index columns
181    if !index_columns_ordered_expr
182        .iter()
183        .map(|(expr, _)| expr.clone())
184        .collect_vec()
185        .starts_with(&distributed_columns_expr)
186    {
187        return Err(ErrorCode::InvalidInputSyntax(
188            "Distributed by columns should be a prefix of index columns".to_owned(),
189        )
190        .into());
191    }
192
193    let (index_database_id, index_schema_id) =
194        session.get_database_and_schema_id_for_create(Some(schema_name))?;
195
196    // Manually assemble the materialization plan for the index MV.
197    let materialize = assemble_materialize(
198        index_database_id,
199        index_schema_id,
200        table.clone(),
201        context,
202        index_table_name.clone(),
203        &index_columns_ordered_expr,
204        &include_columns_expr,
205        // We use the first index column as distributed key by default if users
206        // haven't specified the distributed by columns.
207        if distributed_columns_expr.is_empty() {
208            1
209        } else {
210            distributed_columns_expr.len()
211        },
212    )?;
213
214    let mut index_table = materialize.table().clone();
215    {
216        // Inherit table properties
217        index_table.retention_seconds = table.retention_seconds;
218    }
219
220    index_table.owner = table.owner;
221
222    let index_columns_len = index_columns_ordered_expr.len() as u32;
223    let index_column_properties = index_columns_ordered_expr
224        .iter()
225        .map(|(_, order)| PbIndexColumnProperties {
226            is_desc: order.is_descending(),
227            nulls_first: order.nulls_are_first(),
228        })
229        .collect();
230    let index_item = build_index_item(
231        &index_table,
232        table.name(),
233        &table,
234        index_columns_ordered_expr,
235    );
236
237    let index_prost = PbIndex {
238        id: IndexId::placeholder().index_id,
239        schema_id: index_schema_id,
240        database_id: index_database_id,
241        name: index_table_name,
242        owner: index_table.owner,
243        index_table_id: TableId::placeholder().table_id,
244        primary_table_id: table.id.table_id,
245        index_item,
246        index_column_properties,
247        index_columns_len,
248        initialized_at_epoch: None,
249        created_at_epoch: None,
250        stream_job_status: PbStreamJobStatus::Creating.into(),
251        initialized_at_cluster_version: None,
252        created_at_cluster_version: None,
253    };
254
255    let plan: PlanRef = materialize.into();
256    let ctx = plan.ctx();
257    let explain_trace = ctx.is_explain_trace();
258    if explain_trace {
259        ctx.trace("Create Index:");
260        ctx.trace(plan.explain_to_string());
261    }
262
263    Ok((plan, index_table, index_prost))
264}
265
266fn build_index_item(
267    index_table: &TableCatalog,
268    primary_table_name: &str,
269    primary_table: &TableCatalog,
270    index_columns: Vec<(ExprImpl, OrderType)>,
271) -> Vec<risingwave_pb::expr::ExprNode> {
272    let primary_table_desc_map = primary_table
273        .columns
274        .iter()
275        .enumerate()
276        .map(|(x, y)| (y.name.clone(), x))
277        .collect::<HashMap<_, _>>();
278
279    let primary_table_name_prefix = format!("{}.", primary_table_name);
280
281    let index_columns_len = index_columns.len();
282    index_columns
283        .into_iter()
284        .map(|(expr, _)| expr.to_expr_proto())
285        .chain(
286            index_table
287                .columns
288                .iter()
289                .map(|c| &c.column_desc)
290                .skip(index_columns_len)
291                .map(|x| {
292                    let name = if x.name.starts_with(&primary_table_name_prefix) {
293                        x.name[primary_table_name_prefix.len()..].to_string()
294                    } else {
295                        x.name.clone()
296                    };
297
298                    let column_index = *primary_table_desc_map.get(&name).unwrap();
299                    InputRef {
300                        index: column_index,
301                        data_type: primary_table
302                            .columns
303                            .get(column_index)
304                            .unwrap()
305                            .data_type
306                            .clone(),
307                    }
308                    .to_expr_proto()
309                }),
310        )
311        .collect_vec()
312}
313
314/// Note: distributed by columns must be a prefix of index columns, so we just use
315/// `distributed_by_columns_len` to represent distributed by columns
316fn assemble_materialize(
317    database_id: DatabaseId,
318    schema_id: SchemaId,
319    table_catalog: Arc<TableCatalog>,
320    context: OptimizerContextRef,
321    index_name: String,
322    index_columns: &[(ExprImpl, OrderType)],
323    include_columns: &[ExprImpl],
324    distributed_by_columns_len: usize,
325) -> Result<StreamMaterialize> {
326    // Build logical plan and then call gen_create_index_plan
327    // LogicalProject(index_columns, include_columns)
328    //   LogicalScan(table_desc)
329
330    let definition = context.normalized_sql().to_owned();
331    let retention_seconds = table_catalog.retention_seconds.and_then(NonZeroU32::new);
332
333    let logical_scan = LogicalScan::create(table_catalog.clone(), context, None);
334
335    let exprs = index_columns
336        .iter()
337        .map(|(expr, _)| expr.clone())
338        .chain(include_columns.iter().cloned())
339        .collect_vec();
340
341    let logical_project = LogicalProject::create(logical_scan.into(), exprs);
342    let mut project_required_cols = FixedBitSet::with_capacity(logical_project.schema().len());
343    project_required_cols.toggle_range(0..logical_project.schema().len());
344
345    let mut col_names = HashSet::new();
346    let mut count = 0;
347
348    let out_names: Vec<String> = index_columns
349        .iter()
350        .map(|(expr, _)| match expr {
351            ExprImpl::InputRef(input_ref) => table_catalog
352                .columns()
353                .get(input_ref.index)
354                .unwrap()
355                .name()
356                .to_owned(),
357            ExprImpl::FunctionCall(func) => {
358                let func_name = func.func_type().as_str_name().to_owned();
359                let mut name = func_name.clone();
360                while !col_names.insert(name.clone()) {
361                    count += 1;
362                    name = format!("{}{}", func_name, count);
363                }
364                name
365            }
366            _ => unreachable!(),
367        })
368        .chain(include_columns.iter().map(|expr| {
369            match expr {
370                ExprImpl::InputRef(input_ref) => table_catalog
371                    .columns()
372                    .get(input_ref.index)
373                    .unwrap()
374                    .name()
375                    .to_owned(),
376                _ => unreachable!(),
377            }
378        }))
379        .collect_vec();
380
381    PlanRoot::new_with_logical_plan(
382        logical_project,
383        // schema of logical_project is such that index columns come first.
384        // so we can use distributed_by_columns_len to represent distributed by columns indices.
385        RequiredDist::PhysicalDist(Distribution::HashShard(
386            (0..distributed_by_columns_len).collect(),
387        )),
388        Order::new(
389            index_columns
390                .iter()
391                .enumerate()
392                .map(|(i, (_, order))| ColumnOrder::new(i, *order))
393                .collect(),
394        ),
395        project_required_cols,
396        out_names,
397    )
398    .gen_index_plan(
399        index_name,
400        database_id,
401        schema_id,
402        definition,
403        retention_seconds,
404    )
405}
406
407pub async fn handle_create_index(
408    handler_args: HandlerArgs,
409    if_not_exists: bool,
410    index_name: ObjectName,
411    table_name: ObjectName,
412    columns: Vec<OrderByExpr>,
413    include: Vec<Ident>,
414    distributed_by: Vec<ast::Expr>,
415) -> Result<RwPgResponse> {
416    let session = handler_args.session.clone();
417
418    let (graph, index_table, index) = {
419        let (schema_name, table, index_table_name) =
420            resolve_index_schema(&session, index_name, table_name)?;
421        let qualified_index_name = ObjectName(vec![
422            Ident::from_real_value(&schema_name),
423            Ident::from_real_value(&index_table_name),
424        ]);
425        if let Either::Right(resp) = session.check_relation_name_duplicated(
426            qualified_index_name,
427            StatementType::CREATE_INDEX,
428            if_not_exists,
429        )? {
430            return Ok(resp);
431        }
432
433        let context = OptimizerContext::from_handler_args(handler_args);
434        let (plan, index_table, index) = gen_create_index_plan(
435            &session,
436            context.into(),
437            schema_name,
438            table,
439            index_table_name,
440            columns,
441            include,
442            distributed_by,
443        )?;
444        let graph = build_graph(plan, Some(GraphJobType::Index))?;
445
446        (graph, index_table, index)
447    };
448
449    tracing::trace!(
450        "name={}, graph=\n{}",
451        index.name,
452        serde_json::to_string_pretty(&graph).unwrap()
453    );
454
455    let _job_guard =
456        session
457            .env()
458            .creating_streaming_job_tracker()
459            .guard(CreatingStreamingJobInfo::new(
460                session.session_id(),
461                index.database_id,
462                index.schema_id,
463                index.name.clone(),
464            ));
465
466    let catalog_writer = session.catalog_writer()?;
467    catalog_writer
468        .create_index(index, index_table.to_prost(), graph, if_not_exists)
469        .await?;
470
471    Ok(PgResponse::empty_result(StatementType::CREATE_INDEX))
472}