risingwave_frontend/handler/
create_mv.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use either::Either;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::catalog::{FunctionId, ObjectId, TableId};
20use risingwave_pb::catalog::PbTable;
21use risingwave_pb::serverless_backfill_controller::{
22    ProvisionRequest, node_group_controller_service_client,
23};
24use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
25use thiserror_ext::AsReport;
26
27use super::RwPgResponse;
28use crate::WithOptions;
29use crate::binder::{Binder, BoundQuery, BoundSetExpr};
30use crate::catalog::check_column_name_not_reserved;
31use crate::error::ErrorCode::{InvalidInputSyntax, ProtocolError};
32use crate::error::{ErrorCode, Result, RwError};
33use crate::handler::HandlerArgs;
34use crate::optimizer::plan_node::Explain;
35use crate::optimizer::plan_node::generic::GenericPlanRef;
36use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, RelationCollectorVisitor};
37use crate::planner::Planner;
38use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
39use crate::session::{SESSION_MANAGER, SessionImpl};
40use crate::stream_fragmenter::build_graph;
41use crate::utils::ordinal;
42
43pub const RESOURCE_GROUP_KEY: &str = "resource_group";
44pub const CLOUD_SERVERLESS_BACKFILL_ENABLED: &str = "cloud.serverless_backfill_enabled";
45
46pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
47    if columns.is_empty() {
48        None
49    } else {
50        Some(columns.iter().map(|v| v.real_value()).collect())
51    }
52}
53
54/// If columns is empty, it means that the user did not specify the column names.
55/// In this case, we extract the column names from the query.
56/// If columns is not empty, it means that user specify the column names and the user
57/// should guarantee that the column names number are consistent with the query.
58pub(super) fn get_column_names(
59    bound: &BoundQuery,
60    columns: Vec<Ident>,
61) -> Result<Option<Vec<String>>> {
62    let col_names = parse_column_names(&columns);
63    if let BoundSetExpr::Select(select) = &bound.body {
64        // `InputRef`'s alias will be implicitly assigned in `bind_project`.
65        // If user provides columns name (col_names.is_some()), we don't need alias.
66        // For other expressions (col_names.is_none()), we require the user to explicitly assign an
67        // alias.
68        if col_names.is_none() {
69            for (i, alias) in select.aliases.iter().enumerate() {
70                if alias.is_none() {
71                    return Err(ErrorCode::BindError(format!(
72                    "An alias must be specified for the {} expression (counting from 1) in result relation", ordinal(i+1)
73                ))
74                .into());
75                }
76            }
77        }
78    }
79
80    Ok(col_names)
81}
82
83/// Bind and generate create MV plan, return plan and mv table info.
84pub fn gen_create_mv_plan(
85    session: &SessionImpl,
86    context: OptimizerContextRef,
87    query: Query,
88    name: ObjectName,
89    columns: Vec<Ident>,
90    emit_mode: Option<EmitMode>,
91) -> Result<(PlanRef, PbTable)> {
92    let mut binder = Binder::new_for_stream(session);
93    let bound = binder.bind_query(query)?;
94    gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode)
95}
96
97/// Generate create MV plan from a bound query
98pub fn gen_create_mv_plan_bound(
99    session: &SessionImpl,
100    context: OptimizerContextRef,
101    query: BoundQuery,
102    name: ObjectName,
103    columns: Vec<Ident>,
104    emit_mode: Option<EmitMode>,
105) -> Result<(PlanRef, PbTable)> {
106    if session.config().create_compaction_group_for_mv() {
107        context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
108    }
109
110    let db_name = &session.database();
111    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, name)?;
112
113    let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
114
115    let definition = context.normalized_sql().to_owned();
116
117    let col_names = get_column_names(&query, columns)?;
118
119    let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose);
120    if emit_on_window_close {
121        context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
122    }
123
124    let mut plan_root = Planner::new_for_stream(context).plan_query(query)?;
125    plan_root.set_req_dist_as_same_as_req_order();
126    if let Some(col_names) = col_names {
127        for name in &col_names {
128            check_column_name_not_reserved(name)?;
129        }
130        plan_root.set_out_names(col_names)?;
131    }
132    let materialize = plan_root.gen_materialize_plan(
133        database_id,
134        schema_id,
135        table_name,
136        definition,
137        emit_on_window_close,
138    )?;
139    let mut table = materialize.table().to_prost();
140
141    let plan: PlanRef = materialize.into();
142
143    table.owner = session.user_id();
144
145    let ctx = plan.ctx();
146    let explain_trace = ctx.is_explain_trace();
147    if explain_trace {
148        ctx.trace("Create Materialized View:");
149        ctx.trace(plan.explain_to_string());
150    }
151
152    Ok((plan, table))
153}
154
155pub async fn handle_create_mv(
156    handler_args: HandlerArgs,
157    if_not_exists: bool,
158    name: ObjectName,
159    query: Query,
160    columns: Vec<Ident>,
161    emit_mode: Option<EmitMode>,
162) -> Result<RwPgResponse> {
163    let (dependent_relations, dependent_udfs, bound) = {
164        let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
165        let bound = binder.bind_query(query)?;
166        (
167            binder.included_relations().clone(),
168            binder.included_udfs().clone(),
169            bound,
170        )
171    };
172    handle_create_mv_bound(
173        handler_args,
174        if_not_exists,
175        name,
176        bound,
177        dependent_relations,
178        dependent_udfs,
179        columns,
180        emit_mode,
181    )
182    .await
183}
184
185/// Send a provision request to the serverless backfill controller
186pub async fn provision_resource_group(sbc_addr: String) -> Result<String> {
187    let request = tonic::Request::new(ProvisionRequest {});
188    let mut client =
189        node_group_controller_service_client::NodeGroupControllerServiceClient::connect(
190            sbc_addr.clone(),
191        )
192        .await
193        .map_err(|e| {
194            RwError::from(ErrorCode::InternalError(format!(
195                "unable to reach serverless backfill controller at addr {}: {}",
196                sbc_addr,
197                e.as_report()
198            )))
199        })?;
200
201    match client.provision(request).await {
202        Ok(resp) => Ok(resp.into_inner().resource_group),
203        Err(e) => Err(RwError::from(ErrorCode::InternalError(format!(
204            "serverless backfill controller returned error :{}",
205            e.as_report()
206        )))),
207    }
208}
209
210fn get_with_options(handler_args: HandlerArgs) -> WithOptions {
211    let context = OptimizerContext::from_handler_args(handler_args);
212    context.with_options().clone()
213}
214
215pub async fn handle_create_mv_bound(
216    handler_args: HandlerArgs,
217    if_not_exists: bool,
218    name: ObjectName,
219    query: BoundQuery,
220    dependent_relations: HashSet<TableId>,
221    dependent_udfs: HashSet<FunctionId>, // TODO(rc): merge with `dependent_relations`
222    columns: Vec<Ident>,
223    emit_mode: Option<EmitMode>,
224) -> Result<RwPgResponse> {
225    let session = handler_args.session.clone();
226
227    // Check cluster limits
228    session.check_cluster_limits().await?;
229
230    if let Either::Right(resp) = session.check_relation_name_duplicated(
231        name.clone(),
232        StatementType::CREATE_MATERIALIZED_VIEW,
233        if_not_exists,
234    )? {
235        return Ok(resp);
236    }
237
238    let (table, graph, dependencies, resource_group) = {
239        let mut with_options = get_with_options(handler_args.clone());
240        let mut resource_group = with_options.remove(&RESOURCE_GROUP_KEY.to_owned());
241
242        let is_serverless_backfill = with_options
243            .remove(&CLOUD_SERVERLESS_BACKFILL_ENABLED.to_owned())
244            .unwrap_or_default()
245            .parse::<bool>()
246            .unwrap_or(false);
247
248        if resource_group.is_some() && is_serverless_backfill {
249            return Err(RwError::from(InvalidInputSyntax(
250                "Please do not specify serverless backfilling and resource group together"
251                    .to_owned(),
252            )));
253        }
254
255        if !with_options.is_empty() {
256            // get other useful fields by `remove`, the logic here is to reject unknown options.
257            return Err(RwError::from(ProtocolError(format!(
258                "unexpected options in WITH clause: {:?}",
259                with_options.keys()
260            ))));
261        }
262
263        let sbc_addr = match SESSION_MANAGER.get() {
264            Some(manager) => manager.env().sbc_address(),
265            None => "",
266        }
267        .to_owned();
268
269        if is_serverless_backfill && sbc_addr.is_empty() {
270            return Err(RwError::from(InvalidInputSyntax(
271                "Serverless Backfill is disabled on-premise. Use RisingWave cloud at https://cloud.risingwave.com/auth/signup to try this feature".to_owned(),
272            )));
273        }
274
275        if is_serverless_backfill {
276            match provision_resource_group(sbc_addr).await {
277                Err(e) => {
278                    return Err(RwError::from(ProtocolError(format!(
279                        "failed to provision serverless backfill nodes: {}",
280                        e.as_report()
281                    ))));
282                }
283                Ok(val) => resource_group = Some(val),
284            }
285        }
286        tracing::debug!(
287            resource_group = resource_group,
288            "provisioning on resource group"
289        );
290
291        let context = OptimizerContext::from_handler_args(handler_args);
292        let has_order_by = !query.order.is_empty();
293        if has_order_by {
294            context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
295It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
296"#.to_owned());
297        }
298
299        if resource_group.is_some()
300            && !context
301                .session_ctx()
302                .config()
303                .streaming_use_arrangement_backfill()
304        {
305            return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
306        }
307
308        let (plan, table) =
309            gen_create_mv_plan_bound(&session, context.into(), query, name, columns, emit_mode)?;
310
311        // TODO(rc): To be consistent with UDF dependency check, we should collect relation dependencies
312        // during binding instead of visiting the optimized plan.
313        let dependencies =
314            RelationCollectorVisitor::collect_with(dependent_relations, plan.clone())
315                .into_iter()
316                .map(|id| id.table_id() as ObjectId)
317                .chain(
318                    dependent_udfs
319                        .into_iter()
320                        .map(|id| id.function_id() as ObjectId),
321                )
322                .collect();
323
324        let graph = build_graph(plan)?;
325
326        (table, graph, dependencies, resource_group)
327    };
328
329    // Ensure writes to `StreamJobTracker` are atomic.
330    let _job_guard =
331        session
332            .env()
333            .creating_streaming_job_tracker()
334            .guard(CreatingStreamingJobInfo::new(
335                session.session_id(),
336                table.database_id,
337                table.schema_id,
338                table.name.clone(),
339            ));
340
341    let session = session.clone();
342    let catalog_writer = session.catalog_writer()?;
343    catalog_writer
344        .create_materialized_view(table, graph, dependencies, resource_group)
345        .await?;
346
347    Ok(PgResponse::empty_result(
348        StatementType::CREATE_MATERIALIZED_VIEW,
349    ))
350}
351
352#[cfg(test)]
353pub mod tests {
354    use std::collections::HashMap;
355
356    use pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW;
357    use risingwave_common::catalog::{
358        DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
359    };
360    use risingwave_common::types::{DataType, StructType};
361
362    use crate::catalog::root_catalog::SchemaPath;
363    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
364
365    #[tokio::test]
366    async fn test_create_mv_handler() {
367        let proto_file = create_proto_file(PROTO_FILE_DATA);
368        let sql = format!(
369            r#"CREATE SOURCE t1
370    WITH (connector = 'kinesis')
371    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
372            proto_file.path().to_str().unwrap()
373        );
374        let frontend = LocalFrontend::new(Default::default()).await;
375        frontend.run_sql(sql).await.unwrap();
376
377        let sql = "create materialized view mv1 as select t1.country from t1";
378        frontend.run_sql(sql).await.unwrap();
379
380        let session = frontend.session_ref();
381        let catalog_reader = session.env().catalog_reader().read_guard();
382        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
383
384        // Check source exists.
385        let (source, _) = catalog_reader
386            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
387            .unwrap();
388        assert_eq!(source.name, "t1");
389
390        // Check table exists.
391        let (table, _) = catalog_reader
392            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
393            .unwrap();
394        assert_eq!(table.name(), "mv1");
395
396        let columns = table
397            .columns
398            .iter()
399            .map(|col| (col.name(), col.data_type().clone()))
400            .collect::<HashMap<&str, DataType>>();
401
402        let city_type = StructType::new(vec![
403            ("address", DataType::Varchar),
404            ("zipcode", DataType::Varchar),
405        ])
406        // .with_ids([5, 6].map(ColumnId::new))
407        .into();
408        let expected_columns = maplit::hashmap! {
409            ROW_ID_COLUMN_NAME => DataType::Serial,
410            "country" => StructType::new(
411                 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
412            )
413            // .with_ids([3, 4, 7].map(ColumnId::new))
414            .into(),
415            RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
416        };
417        assert_eq!(columns, expected_columns, "{columns:#?}");
418    }
419
420    /// When creating MV, a unique column name must be specified for each column
421    #[tokio::test]
422    async fn test_no_alias() {
423        let frontend = LocalFrontend::new(Default::default()).await;
424
425        let sql = "create table t(x varchar)";
426        frontend.run_sql(sql).await.unwrap();
427
428        // Aggregation without alias is ok.
429        let sql = "create materialized view mv0 as select count(x) from t";
430        frontend.run_sql(sql).await.unwrap();
431
432        // Same aggregations without alias is forbidden, because it make the same column name.
433        let sql = "create materialized view mv1 as select count(x), count(*) from t";
434        let err = frontend.run_sql(sql).await.unwrap_err();
435        assert_eq!(
436            err.to_string(),
437            "Invalid input syntax: column \"count\" specified more than once"
438        );
439
440        // Literal without alias is forbidden.
441        let sql = "create materialized view mv1 as select 1";
442        let err = frontend.run_sql(sql).await.unwrap_err();
443        assert_eq!(
444            err.to_string(),
445            "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
446        );
447
448        // some expression without alias is forbidden.
449        let sql = "create materialized view mv1 as select x is null from t";
450        let err = frontend.run_sql(sql).await.unwrap_err();
451        assert_eq!(
452            err.to_string(),
453            "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
454        );
455    }
456
457    /// Creating MV with order by returns a special notice
458    #[tokio::test]
459    async fn test_create_mv_with_order_by() {
460        let frontend = LocalFrontend::new(Default::default()).await;
461
462        let sql = "create table t(x varchar)";
463        frontend.run_sql(sql).await.unwrap();
464
465        // Without order by
466        let sql = "create materialized view mv1 as select * from t";
467        let response = frontend.run_sql(sql).await.unwrap();
468        assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
469        assert!(response.notices().is_empty());
470
471        // With order by
472        let sql = "create materialized view mv2 as select * from t order by x";
473        let response = frontend.run_sql(sql).await.unwrap();
474        assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
475    }
476}