risingwave_frontend/handler/
create_mv.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use either::Either;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::catalog::{FunctionId, ObjectId, TableId};
20use risingwave_pb::serverless_backfill_controller::{
21    ProvisionRequest, node_group_controller_service_client,
22};
23use risingwave_pb::stream_plan::PbStreamFragmentGraph;
24use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
25use thiserror_ext::AsReport;
26
27use super::RwPgResponse;
28use crate::binder::{Binder, BoundQuery, BoundSetExpr};
29use crate::catalog::check_column_name_not_reserved;
30use crate::error::ErrorCode::{InvalidInputSyntax, ProtocolError};
31use crate::error::{ErrorCode, Result, RwError};
32use crate::handler::HandlerArgs;
33use crate::optimizer::backfill_order_strategy::plan_backfill_order;
34use crate::optimizer::plan_node::generic::GenericPlanRef;
35use crate::optimizer::plan_node::{Explain, StreamPlanRef as PlanRef};
36use crate::optimizer::{OptimizerContext, OptimizerContextRef, RelationCollectorVisitor};
37use crate::planner::Planner;
38use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
39use crate::session::{SESSION_MANAGER, SessionImpl};
40use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
41use crate::utils::ordinal;
42use crate::{TableCatalog, WithOptions};
43
44pub const RESOURCE_GROUP_KEY: &str = "resource_group";
45pub const CLOUD_SERVERLESS_BACKFILL_ENABLED: &str = "cloud.serverless_backfill_enabled";
46
47pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
48    if columns.is_empty() {
49        None
50    } else {
51        Some(columns.iter().map(|v| v.real_value()).collect())
52    }
53}
54
55/// If columns is empty, it means that the user did not specify the column names.
56/// In this case, we extract the column names from the query.
57/// If columns is not empty, it means that user specify the column names and the user
58/// should guarantee that the column names number are consistent with the query.
59pub(super) fn get_column_names(
60    bound: &BoundQuery,
61    columns: Vec<Ident>,
62) -> Result<Option<Vec<String>>> {
63    let col_names = parse_column_names(&columns);
64    if let BoundSetExpr::Select(select) = &bound.body {
65        // `InputRef`'s alias will be implicitly assigned in `bind_project`.
66        // If user provides columns name (col_names.is_some()), we don't need alias.
67        // For other expressions (col_names.is_none()), we require the user to explicitly assign an
68        // alias.
69        if col_names.is_none() {
70            for (i, alias) in select.aliases.iter().enumerate() {
71                if alias.is_none() {
72                    return Err(ErrorCode::BindError(format!(
73                    "An alias must be specified for the {} expression (counting from 1) in result relation", ordinal(i+1)
74                ))
75                .into());
76                }
77            }
78        }
79    }
80
81    Ok(col_names)
82}
83
84/// Bind and generate create MV plan, return plan and mv table info.
85pub fn gen_create_mv_plan(
86    session: &SessionImpl,
87    context: OptimizerContextRef,
88    query: Query,
89    name: ObjectName,
90    columns: Vec<Ident>,
91    emit_mode: Option<EmitMode>,
92) -> Result<(PlanRef, TableCatalog)> {
93    let mut binder = Binder::new_for_stream(session);
94    let bound = binder.bind_query(&query)?;
95    gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode)
96}
97
98/// Generate create MV plan from a bound query
99pub fn gen_create_mv_plan_bound(
100    session: &SessionImpl,
101    context: OptimizerContextRef,
102    query: BoundQuery,
103    name: ObjectName,
104    columns: Vec<Ident>,
105    emit_mode: Option<EmitMode>,
106) -> Result<(PlanRef, TableCatalog)> {
107    if session.config().create_compaction_group_for_mv() {
108        context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
109    }
110
111    let db_name = &session.database();
112    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
113
114    let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
115
116    let definition = context.normalized_sql().to_owned();
117
118    let col_names = get_column_names(&query, columns)?;
119
120    let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose);
121    if emit_on_window_close {
122        context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
123    }
124
125    let mut plan_root = Planner::new_for_stream(context).plan_query(query)?;
126    if let Some(col_names) = col_names {
127        for name in &col_names {
128            check_column_name_not_reserved(name)?;
129        }
130        plan_root.set_out_names(col_names)?;
131    }
132    let materialize = plan_root.gen_materialize_plan(
133        database_id,
134        schema_id,
135        table_name,
136        definition,
137        emit_on_window_close,
138    )?;
139
140    let mut table = materialize.table().clone();
141    table.owner = session.user_id();
142
143    let plan: PlanRef = materialize.into();
144
145    let ctx = plan.ctx();
146    let explain_trace = ctx.is_explain_trace();
147    if explain_trace {
148        ctx.trace("Create Materialized View:");
149        ctx.trace(plan.explain_to_string());
150    }
151
152    Ok((plan, table))
153}
154
155pub async fn handle_create_mv(
156    handler_args: HandlerArgs,
157    if_not_exists: bool,
158    name: ObjectName,
159    query: Query,
160    columns: Vec<Ident>,
161    emit_mode: Option<EmitMode>,
162) -> Result<RwPgResponse> {
163    let (dependent_relations, dependent_udfs, bound_query) = {
164        let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
165        let bound_query = binder.bind_query(&query)?;
166        (
167            binder.included_relations().clone(),
168            binder.included_udfs().clone(),
169            bound_query,
170        )
171    };
172    handle_create_mv_bound(
173        handler_args,
174        if_not_exists,
175        name,
176        bound_query,
177        dependent_relations,
178        dependent_udfs,
179        columns,
180        emit_mode,
181    )
182    .await
183}
184
185/// Send a provision request to the serverless backfill controller
186pub async fn provision_resource_group(sbc_addr: String) -> Result<String> {
187    let request = tonic::Request::new(ProvisionRequest {});
188    let mut client =
189        node_group_controller_service_client::NodeGroupControllerServiceClient::connect(
190            sbc_addr.clone(),
191        )
192        .await
193        .map_err(|e| {
194            RwError::from(ErrorCode::InternalError(format!(
195                "unable to reach serverless backfill controller at addr {}: {}",
196                sbc_addr,
197                e.as_report()
198            )))
199        })?;
200
201    match client.provision(request).await {
202        Ok(resp) => Ok(resp.into_inner().resource_group),
203        Err(e) => Err(RwError::from(ErrorCode::InternalError(format!(
204            "serverless backfill controller returned error :{}",
205            e.as_report()
206        )))),
207    }
208}
209
210fn get_with_options(handler_args: HandlerArgs) -> WithOptions {
211    let context = OptimizerContext::from_handler_args(handler_args);
212    context.with_options().clone()
213}
214
215pub async fn handle_create_mv_bound(
216    handler_args: HandlerArgs,
217    if_not_exists: bool,
218    name: ObjectName,
219    query: BoundQuery,
220    dependent_relations: HashSet<TableId>,
221    dependent_udfs: HashSet<FunctionId>, // TODO(rc): merge with `dependent_relations`
222    columns: Vec<Ident>,
223    emit_mode: Option<EmitMode>,
224) -> Result<RwPgResponse> {
225    let session = handler_args.session.clone();
226
227    // Check cluster limits
228    session.check_cluster_limits().await?;
229
230    if let Either::Right(resp) = session.check_relation_name_duplicated(
231        name.clone(),
232        StatementType::CREATE_MATERIALIZED_VIEW,
233        if_not_exists,
234    )? {
235        return Ok(resp);
236    }
237
238    let (table, graph, dependencies, resource_group) = {
239        gen_create_mv_graph(
240            handler_args,
241            name,
242            query,
243            dependent_relations,
244            dependent_udfs,
245            columns,
246            emit_mode,
247        )
248        .await?
249    };
250
251    // Ensure writes to `StreamJobTracker` are atomic.
252    let _job_guard =
253        session
254            .env()
255            .creating_streaming_job_tracker()
256            .guard(CreatingStreamingJobInfo::new(
257                session.session_id(),
258                table.database_id,
259                table.schema_id,
260                table.name.clone(),
261            ));
262
263    let catalog_writer = session.catalog_writer()?;
264    catalog_writer
265        .create_materialized_view(
266            table.to_prost(),
267            graph,
268            dependencies,
269            resource_group,
270            if_not_exists,
271        )
272        .await?;
273
274    Ok(PgResponse::empty_result(
275        StatementType::CREATE_MATERIALIZED_VIEW,
276    ))
277}
278
279pub(crate) async fn gen_create_mv_graph(
280    handler_args: HandlerArgs,
281    name: ObjectName,
282    query: BoundQuery,
283    dependent_relations: HashSet<TableId>,
284    dependent_udfs: HashSet<FunctionId>,
285    columns: Vec<Ident>,
286    emit_mode: Option<EmitMode>,
287) -> Result<(
288    TableCatalog,
289    PbStreamFragmentGraph,
290    HashSet<u32>,
291    Option<String>,
292)> {
293    let mut with_options = get_with_options(handler_args.clone());
294    let mut resource_group = with_options.remove(&RESOURCE_GROUP_KEY.to_owned());
295
296    if resource_group.is_some() {
297        risingwave_common::license::Feature::ResourceGroup.check_available()?;
298    }
299
300    let is_serverless_backfill = with_options
301        .remove(&CLOUD_SERVERLESS_BACKFILL_ENABLED.to_owned())
302        .unwrap_or_default()
303        .parse::<bool>()
304        .unwrap_or(false);
305
306    if resource_group.is_some() && is_serverless_backfill {
307        return Err(RwError::from(InvalidInputSyntax(
308            "Please do not specify serverless backfilling and resource group together".to_owned(),
309        )));
310    }
311
312    if !with_options.is_empty() {
313        // get other useful fields by `remove`, the logic here is to reject unknown options.
314        return Err(RwError::from(ProtocolError(format!(
315            "unexpected options in WITH clause: {:?}",
316            with_options.keys()
317        ))));
318    }
319
320    let sbc_addr = match SESSION_MANAGER.get() {
321        Some(manager) => manager.env().sbc_address(),
322        None => "",
323    }
324    .to_owned();
325
326    if is_serverless_backfill && sbc_addr.is_empty() {
327        return Err(RwError::from(InvalidInputSyntax(
328            "Serverless Backfill is disabled on-premise. Use RisingWave cloud at https://cloud.risingwave.com/auth/signup to try this feature".to_owned(),
329        )));
330    }
331
332    if is_serverless_backfill {
333        match provision_resource_group(sbc_addr).await {
334            Err(e) => {
335                return Err(RwError::from(ProtocolError(format!(
336                    "failed to provision serverless backfill nodes: {}",
337                    e.as_report()
338                ))));
339            }
340            Ok(val) => resource_group = Some(val),
341        }
342    }
343    tracing::debug!(
344        resource_group = resource_group,
345        "provisioning on resource group"
346    );
347
348    let context = OptimizerContext::from_handler_args(handler_args);
349    let has_order_by = !query.order.is_empty();
350    if has_order_by {
351        context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
352It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
353"#.to_owned());
354    }
355
356    if resource_group.is_some()
357        && !context
358            .session_ctx()
359            .config()
360            .streaming_use_arrangement_backfill()
361    {
362        return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
363    }
364
365    let context: OptimizerContextRef = context.into();
366    let session = context.session_ctx().as_ref();
367
368    let (plan, table) =
369        gen_create_mv_plan_bound(session, context.clone(), query, name, columns, emit_mode)?;
370
371    let backfill_order = plan_backfill_order(
372        session,
373        context.with_options().backfill_order_strategy(),
374        plan.clone(),
375    )?;
376
377    // TODO(rc): To be consistent with UDF dependency check, we should collect relation dependencies
378    // during binding instead of visiting the optimized plan.
379    let dependencies = RelationCollectorVisitor::collect_with(dependent_relations, plan.clone())
380        .into_iter()
381        .map(|id| id.table_id() as ObjectId)
382        .chain(
383            dependent_udfs
384                .into_iter()
385                .map(|id| id.function_id() as ObjectId),
386        )
387        .collect();
388
389    let graph = build_graph_with_strategy(
390        plan,
391        Some(GraphJobType::MaterializedView),
392        Some(backfill_order),
393    )?;
394
395    Ok((table, graph, dependencies, resource_group))
396}
397
398#[cfg(test)]
399pub mod tests {
400    use std::collections::HashMap;
401
402    use pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW;
403    use risingwave_common::catalog::{
404        DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
405    };
406    use risingwave_common::types::{DataType, StructType};
407
408    use crate::catalog::root_catalog::SchemaPath;
409    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
410
411    #[tokio::test]
412    async fn test_create_mv_handler() {
413        let proto_file = create_proto_file(PROTO_FILE_DATA);
414        let sql = format!(
415            r#"CREATE SOURCE t1
416    WITH (connector = 'kinesis')
417    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
418            proto_file.path().to_str().unwrap()
419        );
420        let frontend = LocalFrontend::new(Default::default()).await;
421        frontend.run_sql(sql).await.unwrap();
422
423        let sql = "create materialized view mv1 as select t1.country from t1";
424        frontend.run_sql(sql).await.unwrap();
425
426        let session = frontend.session_ref();
427        let catalog_reader = session.env().catalog_reader().read_guard();
428        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
429
430        // Check source exists.
431        let (source, _) = catalog_reader
432            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
433            .unwrap();
434        assert_eq!(source.name, "t1");
435
436        // Check table exists.
437        let (table, _) = catalog_reader
438            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
439            .unwrap();
440        assert_eq!(table.name(), "mv1");
441
442        let columns = table
443            .columns
444            .iter()
445            .map(|col| (col.name(), col.data_type().clone()))
446            .collect::<HashMap<&str, DataType>>();
447
448        let city_type = StructType::new(vec![
449            ("address", DataType::Varchar),
450            ("zipcode", DataType::Varchar),
451        ])
452        // .with_ids([5, 6].map(ColumnId::new))
453        .into();
454        let expected_columns = maplit::hashmap! {
455            ROW_ID_COLUMN_NAME => DataType::Serial,
456            "country" => StructType::new(
457                 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
458            )
459            // .with_ids([3, 4, 7].map(ColumnId::new))
460            .into(),
461            RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
462        };
463        assert_eq!(columns, expected_columns, "{columns:#?}");
464    }
465
466    /// When creating MV, a unique column name must be specified for each column
467    #[tokio::test]
468    async fn test_no_alias() {
469        let frontend = LocalFrontend::new(Default::default()).await;
470
471        let sql = "create table t(x varchar)";
472        frontend.run_sql(sql).await.unwrap();
473
474        // Aggregation without alias is ok.
475        let sql = "create materialized view mv0 as select count(x) from t";
476        frontend.run_sql(sql).await.unwrap();
477
478        // Same aggregations without alias is forbidden, because it make the same column name.
479        let sql = "create materialized view mv1 as select count(x), count(*) from t";
480        let err = frontend.run_sql(sql).await.unwrap_err();
481        assert_eq!(
482            err.to_string(),
483            "Invalid input syntax: column \"count\" specified more than once"
484        );
485
486        // Literal without alias is forbidden.
487        let sql = "create materialized view mv1 as select 1";
488        let err = frontend.run_sql(sql).await.unwrap_err();
489        assert_eq!(
490            err.to_string(),
491            "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
492        );
493
494        // some expression without alias is forbidden.
495        let sql = "create materialized view mv1 as select x is null from t";
496        let err = frontend.run_sql(sql).await.unwrap_err();
497        assert_eq!(
498            err.to_string(),
499            "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
500        );
501    }
502
503    /// Creating MV with order by returns a special notice
504    #[tokio::test]
505    async fn test_create_mv_with_order_by() {
506        let frontend = LocalFrontend::new(Default::default()).await;
507
508        let sql = "create table t(x varchar)";
509        frontend.run_sql(sql).await.unwrap();
510
511        // Without order by
512        let sql = "create materialized view mv1 as select * from t";
513        let response = frontend.run_sql(sql).await.unwrap();
514        assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
515        assert!(response.notices().is_empty());
516
517        // With order by
518        let sql = "create materialized view mv2 as select * from t order by x";
519        let response = frontend.run_sql(sql).await.unwrap();
520        assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
521    }
522}