risingwave_frontend/handler/
create_mv.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use either::Either;
18use itertools::Itertools;
19use pgwire::pg_response::{PgResponse, StatementType};
20use risingwave_common::catalog::{FunctionId, ObjectId};
21use risingwave_pb::ddl_service::streaming_job_resource_type;
22use risingwave_pb::serverless_backfill_controller::{
23    ProvisionRequest, node_group_controller_service_client,
24};
25use risingwave_pb::stream_plan::PbStreamFragmentGraph;
26use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
27use thiserror_ext::AsReport;
28
29use super::RwPgResponse;
30use crate::binder::{Binder, BoundQuery, BoundSetExpr};
31use crate::catalog::check_column_name_not_reserved;
32use crate::error::ErrorCode::{InvalidInputSyntax, ProtocolError};
33use crate::error::{ErrorCode, Result, RwError};
34use crate::handler::HandlerArgs;
35use crate::handler::util::{LongRunningNotificationAction, execute_with_long_running_notification};
36use crate::optimizer::backfill_order_strategy::plan_backfill_order;
37use crate::optimizer::plan_node::generic::GenericPlanRef;
38use crate::optimizer::plan_node::{Explain, StreamPlanRef as PlanRef};
39use crate::optimizer::{OptimizerContext, OptimizerContextRef, RelationCollectorVisitor};
40use crate::planner::Planner;
41use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
42use crate::session::{SESSION_MANAGER, SessionImpl};
43use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
44use crate::utils::ordinal;
45use crate::{TableCatalog, WithOptions};
46
47pub const RESOURCE_GROUP_KEY: &str = "resource_group";
48pub const CLOUD_SERVERLESS_BACKFILL_ENABLED: &str = "cloud.serverless_backfill_enabled";
49
50pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
51    if columns.is_empty() {
52        None
53    } else {
54        Some(columns.iter().map(|v| v.real_value()).collect())
55    }
56}
57
58/// If columns is empty, it means that the user did not specify the column names.
59/// In this case, we extract the column names from the query.
60/// If columns is not empty, it means that user specify the column names and the user
61/// should guarantee that the column names number are consistent with the query.
62pub(super) fn get_column_names(
63    bound: &BoundQuery,
64    columns: Vec<Ident>,
65) -> Result<Option<Vec<String>>> {
66    let col_names = parse_column_names(&columns);
67    if let BoundSetExpr::Select(select) = &bound.body {
68        // `InputRef`'s alias will be implicitly assigned in `bind_project`.
69        // If user provides columns name (col_names.is_some()), we don't need alias.
70        // For other expressions (col_names.is_none()), we require the user to explicitly assign an
71        // alias.
72        if col_names.is_none() {
73            for (i, alias) in select.aliases.iter().enumerate() {
74                if alias.is_none() {
75                    return Err(ErrorCode::BindError(format!(
76                    "An alias must be specified for the {} expression (counting from 1) in result relation", ordinal(i+1)
77                ))
78                .into());
79                }
80            }
81        }
82    }
83
84    Ok(col_names)
85}
86
87/// Bind and generate create MV plan, return plan and mv table info.
88pub fn gen_create_mv_plan(
89    session: &SessionImpl,
90    context: OptimizerContextRef,
91    query: Query,
92    name: ObjectName,
93    columns: Vec<Ident>,
94    emit_mode: Option<EmitMode>,
95) -> Result<(PlanRef, TableCatalog)> {
96    let mut binder = Binder::new_for_stream(session);
97    let bound = binder.bind_query(&query)?;
98    gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode)
99}
100
101/// Generate create MV plan from a bound query
102pub fn gen_create_mv_plan_bound(
103    session: &SessionImpl,
104    context: OptimizerContextRef,
105    query: BoundQuery,
106    name: ObjectName,
107    columns: Vec<Ident>,
108    emit_mode: Option<EmitMode>,
109) -> Result<(PlanRef, TableCatalog)> {
110    if session.config().create_compaction_group_for_mv() {
111        context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
112    }
113
114    let db_name = &session.database();
115    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
116
117    let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
118
119    let definition = context.normalized_sql().to_owned();
120
121    let col_names = get_column_names(&query, columns)?;
122
123    let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose);
124    if emit_on_window_close {
125        context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
126    }
127
128    let mut plan_root = Planner::new_for_stream(context).plan_query(query)?;
129    if let Some(col_names) = col_names {
130        for name in &col_names {
131            check_column_name_not_reserved(name)?;
132        }
133        plan_root.set_out_names(col_names)?;
134    }
135    let materialize = plan_root.gen_materialize_plan(
136        database_id,
137        schema_id,
138        table_name,
139        definition,
140        emit_on_window_close,
141    )?;
142
143    let mut table = materialize.table().clone();
144    table.owner = session.user_id();
145
146    let plan: PlanRef = materialize.into();
147
148    let ctx = plan.ctx();
149    let explain_trace = ctx.is_explain_trace();
150    if explain_trace {
151        ctx.trace("Create Materialized View:");
152        ctx.trace(plan.explain_to_string());
153    }
154
155    Ok((plan, table))
156}
157
158pub async fn handle_create_mv(
159    handler_args: HandlerArgs,
160    if_not_exists: bool,
161    name: ObjectName,
162    query: Query,
163    columns: Vec<Ident>,
164    emit_mode: Option<EmitMode>,
165) -> Result<RwPgResponse> {
166    let (dependent_relations, dependent_udfs, bound_query) = {
167        let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
168        let bound_query = binder.bind_query(&query)?;
169        (
170            binder.included_relations().clone(),
171            binder.included_udfs().clone(),
172            bound_query,
173        )
174    };
175    handle_create_mv_bound(
176        handler_args,
177        if_not_exists,
178        name,
179        bound_query,
180        dependent_relations,
181        dependent_udfs,
182        columns,
183        emit_mode,
184    )
185    .await
186}
187
188/// Send a provision request to the serverless backfill controller
189pub async fn provision_resource_group(sbc_addr: String) -> Result<String> {
190    let request = tonic::Request::new(ProvisionRequest {});
191    let mut client =
192        node_group_controller_service_client::NodeGroupControllerServiceClient::connect(
193            sbc_addr.clone(),
194        )
195        .await
196        .map_err(|e| {
197            RwError::from(ErrorCode::InternalError(format!(
198                "unable to reach serverless backfill controller at addr {}: {}",
199                sbc_addr,
200                e.as_report()
201            )))
202        })?;
203
204    match client.provision(request).await {
205        Ok(resp) => Ok(resp.into_inner().resource_group),
206        Err(e) => Err(RwError::from(ErrorCode::InternalError(format!(
207            "serverless backfill controller returned error :{}",
208            e.as_report()
209        )))),
210    }
211}
212
213fn get_with_options(handler_args: HandlerArgs) -> WithOptions {
214    let context = OptimizerContext::from_handler_args(handler_args);
215    context.with_options().clone()
216}
217
218pub async fn handle_create_mv_bound(
219    handler_args: HandlerArgs,
220    if_not_exists: bool,
221    name: ObjectName,
222    query: BoundQuery,
223    dependent_relations: HashSet<ObjectId>,
224    dependent_udfs: HashSet<FunctionId>, // TODO(rc): merge with `dependent_relations`
225    columns: Vec<Ident>,
226    emit_mode: Option<EmitMode>,
227) -> Result<RwPgResponse> {
228    let session = handler_args.session.clone();
229
230    // Check cluster limits
231    session.check_cluster_limits().await?;
232
233    if let Either::Right(resp) = session.check_relation_name_duplicated(
234        name.clone(),
235        StatementType::CREATE_MATERIALIZED_VIEW,
236        if_not_exists,
237    )? {
238        return Ok(resp);
239    }
240
241    let (table, graph, dependencies, resource_type) = {
242        gen_create_mv_graph(
243            handler_args,
244            name,
245            query,
246            dependent_relations,
247            dependent_udfs,
248            columns,
249            emit_mode,
250        )
251        .await?
252    };
253
254    // Ensure writes to `StreamJobTracker` are atomic.
255    let _job_guard =
256        session
257            .env()
258            .creating_streaming_job_tracker()
259            .guard(CreatingStreamingJobInfo::new(
260                session.session_id(),
261                table.database_id,
262                table.schema_id,
263                table.name.clone(),
264            ));
265
266    let catalog_writer = session.catalog_writer()?;
267    execute_with_long_running_notification(
268        catalog_writer.create_materialized_view(
269            table.to_prost(),
270            graph,
271            dependencies,
272            resource_type,
273            if_not_exists,
274        ),
275        &session,
276        "CREATE MATERIALIZED VIEW",
277        LongRunningNotificationAction::MonitorBackfillJob,
278    )
279    .await?;
280
281    Ok(PgResponse::empty_result(
282        StatementType::CREATE_MATERIALIZED_VIEW,
283    ))
284}
285
286pub(crate) async fn gen_create_mv_graph(
287    handler_args: HandlerArgs,
288    name: ObjectName,
289    query: BoundQuery,
290    dependent_relations: HashSet<ObjectId>,
291    dependent_udfs: HashSet<FunctionId>,
292    columns: Vec<Ident>,
293    emit_mode: Option<EmitMode>,
294) -> Result<(
295    TableCatalog,
296    PbStreamFragmentGraph,
297    HashSet<ObjectId>,
298    streaming_job_resource_type::ResourceType,
299)> {
300    let mut with_options = get_with_options(handler_args.clone());
301    let resource_group = with_options.remove(&RESOURCE_GROUP_KEY.to_owned());
302
303    if resource_group.is_some() {
304        risingwave_common::license::Feature::ResourceGroup.check_available()?;
305    }
306
307    let is_serverless_backfill = with_options
308        .remove(&CLOUD_SERVERLESS_BACKFILL_ENABLED.to_owned())
309        .unwrap_or_default()
310        .parse::<bool>()
311        .unwrap_or(false);
312
313    if resource_group.is_some() && is_serverless_backfill {
314        return Err(RwError::from(InvalidInputSyntax(
315            "Please do not specify serverless backfilling and resource group together".to_owned(),
316        )));
317    }
318
319    if !with_options.is_empty() {
320        // get other useful fields by `remove`, the logic here is to reject unknown options.
321        return Err(RwError::from(ProtocolError(format!(
322            "unexpected options in WITH clause: {:?}",
323            with_options.keys()
324        ))));
325    }
326
327    let sbc_addr = match SESSION_MANAGER.get() {
328        Some(manager) => manager.env().sbc_address(),
329        None => "",
330    }
331    .to_owned();
332
333    if is_serverless_backfill && sbc_addr.is_empty() {
334        return Err(RwError::from(InvalidInputSyntax(
335            "Serverless Backfill is disabled on-premise. Use RisingWave cloud at https://cloud.risingwave.com/auth/signup to try this feature".to_owned(),
336        )));
337    }
338
339    let resource_type = if is_serverless_backfill {
340        assert_eq!(resource_group, None);
341        match provision_resource_group(sbc_addr).await {
342            Err(e) => {
343                return Err(RwError::from(ProtocolError(format!(
344                    "failed to provision serverless backfill nodes: {}",
345                    e.as_report()
346                ))));
347            }
348            Ok(group) => {
349                tracing::info!(
350                    resource_group = group,
351                    "provisioning serverless backfill resource group"
352                );
353                streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(group)
354            }
355        }
356    } else if let Some(group) = resource_group {
357        streaming_job_resource_type::ResourceType::SpecificResourceGroup(group)
358    } else {
359        streaming_job_resource_type::ResourceType::Regular(true)
360    };
361    let context = OptimizerContext::from_handler_args(handler_args);
362    let has_order_by = !query.order.is_empty();
363    if has_order_by {
364        context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
365It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
366"#.to_owned());
367    }
368
369    if resource_type.resource_group().is_some()
370        && !context
371            .session_ctx()
372            .config()
373            .streaming_use_arrangement_backfill()
374    {
375        return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
376    }
377
378    let context: OptimizerContextRef = context.into();
379    let session = context.session_ctx().as_ref();
380
381    let (plan, table) =
382        gen_create_mv_plan_bound(session, context.clone(), query, name, columns, emit_mode)?;
383
384    let backfill_order = plan_backfill_order(
385        session,
386        context.with_options().backfill_order_strategy(),
387        plan.clone(),
388    )?;
389
390    // TODO(rc): To be consistent with UDF dependency check, we should collect relation dependencies
391    // during binding instead of visiting the optimized plan.
392    let dependencies = RelationCollectorVisitor::collect_with(dependent_relations, plan.clone())
393        .into_iter()
394        .chain(dependent_udfs.iter().copied().map_into())
395        .collect();
396
397    let graph = build_graph_with_strategy(
398        plan,
399        Some(GraphJobType::MaterializedView),
400        Some(backfill_order),
401    )?;
402
403    Ok((table, graph, dependencies, resource_type))
404}
405
406#[cfg(test)]
407pub mod tests {
408    use std::collections::HashMap;
409
410    use pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW;
411    use risingwave_common::catalog::{
412        DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
413    };
414    use risingwave_common::types::{DataType, StructType};
415
416    use crate::catalog::root_catalog::SchemaPath;
417    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
418
419    #[tokio::test]
420    async fn test_create_mv_handler() {
421        let proto_file = create_proto_file(PROTO_FILE_DATA);
422        let sql = format!(
423            r#"CREATE SOURCE t1
424    WITH (connector = 'kinesis')
425    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
426            proto_file.path().to_str().unwrap()
427        );
428        let frontend = LocalFrontend::new(Default::default()).await;
429        frontend.run_sql(sql).await.unwrap();
430
431        let sql = "create materialized view mv1 as select t1.country from t1";
432        frontend.run_sql(sql).await.unwrap();
433
434        let session = frontend.session_ref();
435        let catalog_reader = session.env().catalog_reader().read_guard();
436        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
437
438        // Check source exists.
439        let (source, _) = catalog_reader
440            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
441            .unwrap();
442        assert_eq!(source.name, "t1");
443
444        // Check table exists.
445        let (table, _) = catalog_reader
446            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
447            .unwrap();
448        assert_eq!(table.name(), "mv1");
449
450        let columns = table
451            .columns
452            .iter()
453            .map(|col| (col.name(), col.data_type().clone()))
454            .collect::<HashMap<&str, DataType>>();
455
456        let city_type = StructType::new(vec![
457            ("address", DataType::Varchar),
458            ("zipcode", DataType::Varchar),
459        ])
460        // .with_ids([5, 6].map(ColumnId::new))
461        .into();
462        let expected_columns = maplit::hashmap! {
463            ROW_ID_COLUMN_NAME => DataType::Serial,
464            "country" => StructType::new(
465                 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
466            )
467            // .with_ids([3, 4, 7].map(ColumnId::new))
468            .into(),
469            RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
470        };
471        assert_eq!(columns, expected_columns, "{columns:#?}");
472    }
473
474    /// When creating MV, a unique column name must be specified for each column
475    #[tokio::test]
476    async fn test_no_alias() {
477        let frontend = LocalFrontend::new(Default::default()).await;
478
479        let sql = "create table t(x varchar)";
480        frontend.run_sql(sql).await.unwrap();
481
482        // Aggregation without alias is ok.
483        let sql = "create materialized view mv0 as select count(x) from t";
484        frontend.run_sql(sql).await.unwrap();
485
486        // Same aggregations without alias is forbidden, because it make the same column name.
487        let sql = "create materialized view mv1 as select count(x), count(*) from t";
488        let err = frontend.run_sql(sql).await.unwrap_err();
489        assert_eq!(
490            err.to_string(),
491            "Invalid input syntax: column \"count\" specified more than once"
492        );
493
494        // Literal without alias is forbidden.
495        let sql = "create materialized view mv1 as select 1";
496        let err = frontend.run_sql(sql).await.unwrap_err();
497        assert_eq!(
498            err.to_string(),
499            "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
500        );
501
502        // some expression without alias is forbidden.
503        let sql = "create materialized view mv1 as select x is null from t";
504        let err = frontend.run_sql(sql).await.unwrap_err();
505        assert_eq!(
506            err.to_string(),
507            "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
508        );
509    }
510
511    /// Creating MV with order by returns a special notice
512    #[tokio::test]
513    async fn test_create_mv_with_order_by() {
514        let frontend = LocalFrontend::new(Default::default()).await;
515
516        let sql = "create table t(x varchar)";
517        frontend.run_sql(sql).await.unwrap();
518
519        // Without order by
520        let sql = "create materialized view mv1 as select * from t";
521        let response = frontend.run_sql(sql).await.unwrap();
522        assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
523        assert!(response.notices().is_empty());
524
525        // With order by
526        let sql = "create materialized view mv2 as select * from t order by x";
527        let response = frontend.run_sql(sql).await.unwrap();
528        assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
529    }
530}