risingwave_frontend/handler/
create_mv.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use either::Either;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::catalog::{FunctionId, ObjectId, TableId};
20use risingwave_pb::catalog::PbTable;
21use risingwave_pb::serverless_backfill_controller::{
22    ProvisionRequest, node_group_controller_service_client,
23};
24use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
25use thiserror_ext::AsReport;
26
27use super::RwPgResponse;
28use crate::WithOptions;
29use crate::binder::{Binder, BoundQuery, BoundSetExpr};
30use crate::catalog::check_column_name_not_reserved;
31use crate::error::ErrorCode::{InvalidInputSyntax, ProtocolError};
32use crate::error::{ErrorCode, Result, RwError};
33use crate::handler::HandlerArgs;
34use crate::optimizer::backfill_order_strategy::plan_backfill_order;
35use crate::optimizer::plan_node::Explain;
36use crate::optimizer::plan_node::generic::GenericPlanRef;
37use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, RelationCollectorVisitor};
38use crate::planner::Planner;
39use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
40use crate::session::{SESSION_MANAGER, SessionImpl};
41use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
42use crate::utils::ordinal;
43
44pub const RESOURCE_GROUP_KEY: &str = "resource_group";
45pub const CLOUD_SERVERLESS_BACKFILL_ENABLED: &str = "cloud.serverless_backfill_enabled";
46
47pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
48    if columns.is_empty() {
49        None
50    } else {
51        Some(columns.iter().map(|v| v.real_value()).collect())
52    }
53}
54
55/// If columns is empty, it means that the user did not specify the column names.
56/// In this case, we extract the column names from the query.
57/// If columns is not empty, it means that user specify the column names and the user
58/// should guarantee that the column names number are consistent with the query.
59pub(super) fn get_column_names(
60    bound: &BoundQuery,
61    columns: Vec<Ident>,
62) -> Result<Option<Vec<String>>> {
63    let col_names = parse_column_names(&columns);
64    if let BoundSetExpr::Select(select) = &bound.body {
65        // `InputRef`'s alias will be implicitly assigned in `bind_project`.
66        // If user provides columns name (col_names.is_some()), we don't need alias.
67        // For other expressions (col_names.is_none()), we require the user to explicitly assign an
68        // alias.
69        if col_names.is_none() {
70            for (i, alias) in select.aliases.iter().enumerate() {
71                if alias.is_none() {
72                    return Err(ErrorCode::BindError(format!(
73                    "An alias must be specified for the {} expression (counting from 1) in result relation", ordinal(i+1)
74                ))
75                .into());
76                }
77            }
78        }
79    }
80
81    Ok(col_names)
82}
83
84/// Bind and generate create MV plan, return plan and mv table info.
85pub fn gen_create_mv_plan(
86    session: &SessionImpl,
87    context: OptimizerContextRef,
88    query: Query,
89    name: ObjectName,
90    columns: Vec<Ident>,
91    emit_mode: Option<EmitMode>,
92) -> Result<(PlanRef, PbTable)> {
93    let mut binder = Binder::new_for_stream(session);
94    let bound = binder.bind_query(query)?;
95    gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode)
96}
97
98/// Generate create MV plan from a bound query
99pub fn gen_create_mv_plan_bound(
100    session: &SessionImpl,
101    context: OptimizerContextRef,
102    query: BoundQuery,
103    name: ObjectName,
104    columns: Vec<Ident>,
105    emit_mode: Option<EmitMode>,
106) -> Result<(PlanRef, PbTable)> {
107    if session.config().create_compaction_group_for_mv() {
108        context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
109    }
110
111    let db_name = &session.database();
112    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, name)?;
113
114    let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
115
116    let definition = context.normalized_sql().to_owned();
117
118    let col_names = get_column_names(&query, columns)?;
119
120    let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose);
121    if emit_on_window_close {
122        context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
123    }
124
125    let mut plan_root = Planner::new_for_stream(context).plan_query(query)?;
126    if let Some(col_names) = col_names {
127        for name in &col_names {
128            check_column_name_not_reserved(name)?;
129        }
130        plan_root.set_out_names(col_names)?;
131    }
132    let materialize = plan_root.gen_materialize_plan(
133        database_id,
134        schema_id,
135        table_name,
136        definition,
137        emit_on_window_close,
138    )?;
139    let mut table = materialize.table().to_prost();
140
141    let plan: PlanRef = materialize.into();
142
143    table.owner = session.user_id();
144
145    let ctx = plan.ctx();
146    let explain_trace = ctx.is_explain_trace();
147    if explain_trace {
148        ctx.trace("Create Materialized View:");
149        ctx.trace(plan.explain_to_string());
150    }
151
152    Ok((plan, table))
153}
154
155pub async fn handle_create_mv(
156    handler_args: HandlerArgs,
157    if_not_exists: bool,
158    name: ObjectName,
159    query: Query,
160    columns: Vec<Ident>,
161    emit_mode: Option<EmitMode>,
162) -> Result<RwPgResponse> {
163    let (dependent_relations, dependent_udfs, bound_query) = {
164        let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
165        let bound_query = binder.bind_query(query)?;
166        (
167            binder.included_relations().clone(),
168            binder.included_udfs().clone(),
169            bound_query,
170        )
171    };
172    handle_create_mv_bound(
173        handler_args,
174        if_not_exists,
175        name,
176        bound_query,
177        dependent_relations,
178        dependent_udfs,
179        columns,
180        emit_mode,
181    )
182    .await
183}
184
185/// Send a provision request to the serverless backfill controller
186pub async fn provision_resource_group(sbc_addr: String) -> Result<String> {
187    let request = tonic::Request::new(ProvisionRequest {});
188    let mut client =
189        node_group_controller_service_client::NodeGroupControllerServiceClient::connect(
190            sbc_addr.clone(),
191        )
192        .await
193        .map_err(|e| {
194            RwError::from(ErrorCode::InternalError(format!(
195                "unable to reach serverless backfill controller at addr {}: {}",
196                sbc_addr,
197                e.as_report()
198            )))
199        })?;
200
201    match client.provision(request).await {
202        Ok(resp) => Ok(resp.into_inner().resource_group),
203        Err(e) => Err(RwError::from(ErrorCode::InternalError(format!(
204            "serverless backfill controller returned error :{}",
205            e.as_report()
206        )))),
207    }
208}
209
210fn get_with_options(handler_args: HandlerArgs) -> WithOptions {
211    let context = OptimizerContext::from_handler_args(handler_args);
212    context.with_options().clone()
213}
214
215pub async fn handle_create_mv_bound(
216    handler_args: HandlerArgs,
217    if_not_exists: bool,
218    name: ObjectName,
219    query: BoundQuery,
220    dependent_relations: HashSet<TableId>,
221    dependent_udfs: HashSet<FunctionId>, // TODO(rc): merge with `dependent_relations`
222    columns: Vec<Ident>,
223    emit_mode: Option<EmitMode>,
224) -> Result<RwPgResponse> {
225    let session = handler_args.session.clone();
226
227    // Check cluster limits
228    session.check_cluster_limits().await?;
229
230    if let Either::Right(resp) = session.check_relation_name_duplicated(
231        name.clone(),
232        StatementType::CREATE_MATERIALIZED_VIEW,
233        if_not_exists,
234    )? {
235        return Ok(resp);
236    }
237
238    let (table, graph, dependencies, resource_group) = {
239        let mut with_options = get_with_options(handler_args.clone());
240        let mut resource_group = with_options.remove(&RESOURCE_GROUP_KEY.to_owned());
241
242        if resource_group.is_some() {
243            risingwave_common::license::Feature::ResourceGroup
244                .check_available()
245                .map_err(|e| anyhow::anyhow!(e))?;
246        }
247
248        let is_serverless_backfill = with_options
249            .remove(&CLOUD_SERVERLESS_BACKFILL_ENABLED.to_owned())
250            .unwrap_or_default()
251            .parse::<bool>()
252            .unwrap_or(false);
253
254        if resource_group.is_some() && is_serverless_backfill {
255            return Err(RwError::from(InvalidInputSyntax(
256                "Please do not specify serverless backfilling and resource group together"
257                    .to_owned(),
258            )));
259        }
260
261        if !with_options.is_empty() {
262            // get other useful fields by `remove`, the logic here is to reject unknown options.
263            return Err(RwError::from(ProtocolError(format!(
264                "unexpected options in WITH clause: {:?}",
265                with_options.keys()
266            ))));
267        }
268
269        let sbc_addr = match SESSION_MANAGER.get() {
270            Some(manager) => manager.env().sbc_address(),
271            None => "",
272        }
273        .to_owned();
274
275        if is_serverless_backfill && sbc_addr.is_empty() {
276            return Err(RwError::from(InvalidInputSyntax(
277                "Serverless Backfill is disabled on-premise. Use RisingWave cloud at https://cloud.risingwave.com/auth/signup to try this feature".to_owned(),
278            )));
279        }
280
281        if is_serverless_backfill {
282            match provision_resource_group(sbc_addr).await {
283                Err(e) => {
284                    return Err(RwError::from(ProtocolError(format!(
285                        "failed to provision serverless backfill nodes: {}",
286                        e.as_report()
287                    ))));
288                }
289                Ok(val) => resource_group = Some(val),
290            }
291        }
292        tracing::debug!(
293            resource_group = resource_group,
294            "provisioning on resource group"
295        );
296
297        let context = OptimizerContext::from_handler_args(handler_args);
298        let has_order_by = !query.order.is_empty();
299        if has_order_by {
300            context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
301It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
302"#.to_owned());
303        }
304
305        if resource_group.is_some()
306            && !context
307                .session_ctx()
308                .config()
309                .streaming_use_arrangement_backfill()
310        {
311            return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
312        }
313
314        let context: OptimizerContextRef = context.into();
315
316        let (plan, table) =
317            gen_create_mv_plan_bound(&session, context.clone(), query, name, columns, emit_mode)?;
318
319        let backfill_order = plan_backfill_order(
320            context.session_ctx().as_ref(),
321            context.with_options().backfill_order_strategy(),
322            plan.clone(),
323        )?;
324
325        // TODO(rc): To be consistent with UDF dependency check, we should collect relation dependencies
326        // during binding instead of visiting the optimized plan.
327        let dependencies =
328            RelationCollectorVisitor::collect_with(dependent_relations, plan.clone())
329                .into_iter()
330                .map(|id| id.table_id() as ObjectId)
331                .chain(
332                    dependent_udfs
333                        .into_iter()
334                        .map(|id| id.function_id() as ObjectId),
335                )
336                .collect();
337
338        let graph = build_graph_with_strategy(
339            plan,
340            Some(GraphJobType::MaterializedView),
341            Some(backfill_order),
342        )?;
343
344        (table, graph, dependencies, resource_group)
345    };
346
347    // Ensure writes to `StreamJobTracker` are atomic.
348    let _job_guard =
349        session
350            .env()
351            .creating_streaming_job_tracker()
352            .guard(CreatingStreamingJobInfo::new(
353                session.session_id(),
354                table.database_id,
355                table.schema_id,
356                table.name.clone(),
357            ));
358
359    let session = session.clone();
360    let catalog_writer = session.catalog_writer()?;
361    catalog_writer
362        .create_materialized_view(table, graph, dependencies, resource_group)
363        .await?;
364
365    Ok(PgResponse::empty_result(
366        StatementType::CREATE_MATERIALIZED_VIEW,
367    ))
368}
369
370#[cfg(test)]
371pub mod tests {
372    use std::collections::HashMap;
373
374    use pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW;
375    use risingwave_common::catalog::{
376        DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
377    };
378    use risingwave_common::types::{DataType, StructType};
379
380    use crate::catalog::root_catalog::SchemaPath;
381    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
382
383    #[tokio::test]
384    async fn test_create_mv_handler() {
385        let proto_file = create_proto_file(PROTO_FILE_DATA);
386        let sql = format!(
387            r#"CREATE SOURCE t1
388    WITH (connector = 'kinesis')
389    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
390            proto_file.path().to_str().unwrap()
391        );
392        let frontend = LocalFrontend::new(Default::default()).await;
393        frontend.run_sql(sql).await.unwrap();
394
395        let sql = "create materialized view mv1 as select t1.country from t1";
396        frontend.run_sql(sql).await.unwrap();
397
398        let session = frontend.session_ref();
399        let catalog_reader = session.env().catalog_reader().read_guard();
400        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
401
402        // Check source exists.
403        let (source, _) = catalog_reader
404            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
405            .unwrap();
406        assert_eq!(source.name, "t1");
407
408        // Check table exists.
409        let (table, _) = catalog_reader
410            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
411            .unwrap();
412        assert_eq!(table.name(), "mv1");
413
414        let columns = table
415            .columns
416            .iter()
417            .map(|col| (col.name(), col.data_type().clone()))
418            .collect::<HashMap<&str, DataType>>();
419
420        let city_type = StructType::new(vec![
421            ("address", DataType::Varchar),
422            ("zipcode", DataType::Varchar),
423        ])
424        // .with_ids([5, 6].map(ColumnId::new))
425        .into();
426        let expected_columns = maplit::hashmap! {
427            ROW_ID_COLUMN_NAME => DataType::Serial,
428            "country" => StructType::new(
429                 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
430            )
431            // .with_ids([3, 4, 7].map(ColumnId::new))
432            .into(),
433            RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
434        };
435        assert_eq!(columns, expected_columns, "{columns:#?}");
436    }
437
438    /// When creating MV, a unique column name must be specified for each column
439    #[tokio::test]
440    async fn test_no_alias() {
441        let frontend = LocalFrontend::new(Default::default()).await;
442
443        let sql = "create table t(x varchar)";
444        frontend.run_sql(sql).await.unwrap();
445
446        // Aggregation without alias is ok.
447        let sql = "create materialized view mv0 as select count(x) from t";
448        frontend.run_sql(sql).await.unwrap();
449
450        // Same aggregations without alias is forbidden, because it make the same column name.
451        let sql = "create materialized view mv1 as select count(x), count(*) from t";
452        let err = frontend.run_sql(sql).await.unwrap_err();
453        assert_eq!(
454            err.to_string(),
455            "Invalid input syntax: column \"count\" specified more than once"
456        );
457
458        // Literal without alias is forbidden.
459        let sql = "create materialized view mv1 as select 1";
460        let err = frontend.run_sql(sql).await.unwrap_err();
461        assert_eq!(
462            err.to_string(),
463            "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
464        );
465
466        // some expression without alias is forbidden.
467        let sql = "create materialized view mv1 as select x is null from t";
468        let err = frontend.run_sql(sql).await.unwrap_err();
469        assert_eq!(
470            err.to_string(),
471            "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
472        );
473    }
474
475    /// Creating MV with order by returns a special notice
476    #[tokio::test]
477    async fn test_create_mv_with_order_by() {
478        let frontend = LocalFrontend::new(Default::default()).await;
479
480        let sql = "create table t(x varchar)";
481        frontend.run_sql(sql).await.unwrap();
482
483        // Without order by
484        let sql = "create materialized view mv1 as select * from t";
485        let response = frontend.run_sql(sql).await.unwrap();
486        assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
487        assert!(response.notices().is_empty());
488
489        // With order by
490        let sql = "create materialized view mv2 as select * from t order by x";
491        let response = frontend.run_sql(sql).await.unwrap();
492        assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
493    }
494}