risingwave_frontend/handler/
create_mv.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use either::Either;
18use itertools::Itertools;
19use pgwire::pg_response::{PgResponse, StatementType};
20use risingwave_common::catalog::{FunctionId, ObjectId};
21use risingwave_pb::ddl_service::streaming_job_resource_type;
22use risingwave_pb::serverless_backfill_controller::{
23    ProvisionRequest, node_group_controller_service_client,
24};
25use risingwave_pb::stream_plan::PbStreamFragmentGraph;
26use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
27use thiserror_ext::AsReport;
28
29use super::RwPgResponse;
30use crate::binder::{Binder, BoundQuery, BoundSetExpr};
31use crate::catalog::check_column_name_not_reserved;
32use crate::error::ErrorCode::{InvalidInputSyntax, ProtocolError};
33use crate::error::{ErrorCode, Result, RwError};
34use crate::handler::HandlerArgs;
35use crate::handler::util::{LongRunningNotificationAction, execute_with_long_running_notification};
36use crate::optimizer::backfill_order_strategy::plan_backfill_order;
37use crate::optimizer::plan_node::generic::GenericPlanRef;
38use crate::optimizer::plan_node::{Explain, StreamPlanRef as PlanRef};
39use crate::optimizer::{OptimizerContext, OptimizerContextRef, RelationCollectorVisitor};
40use crate::planner::Planner;
41use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
42use crate::session::{SESSION_MANAGER, SessionImpl};
43use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
44use crate::utils::ordinal;
45use crate::{TableCatalog, WithOptions};
46
47pub const RESOURCE_GROUP_KEY: &str = "resource_group";
48pub const CLOUD_SERVERLESS_BACKFILL_ENABLED: &str = "cloud.serverless_backfill_enabled";
49
50pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
51    if columns.is_empty() {
52        None
53    } else {
54        Some(columns.iter().map(|v| v.real_value()).collect())
55    }
56}
57
58/// If columns is empty, it means that the user did not specify the column names.
59/// In this case, we extract the column names from the query.
60/// If columns is not empty, it means that user specify the column names and the user
61/// should guarantee that the column names number are consistent with the query.
62pub(super) fn get_column_names(
63    bound: &BoundQuery,
64    columns: Vec<Ident>,
65) -> Result<Option<Vec<String>>> {
66    let col_names = parse_column_names(&columns);
67    if let BoundSetExpr::Select(select) = &bound.body {
68        // `InputRef`'s alias will be implicitly assigned in `bind_project`.
69        // If user provides columns name (col_names.is_some()), we don't need alias.
70        // For other expressions (col_names.is_none()), we require the user to explicitly assign an
71        // alias.
72        if col_names.is_none() {
73            for (i, alias) in select.aliases.iter().enumerate() {
74                if alias.is_none() {
75                    return Err(ErrorCode::BindError(format!(
76                    "An alias must be specified for the {} expression (counting from 1) in result relation", ordinal(i+1)
77                ))
78                .into());
79                }
80            }
81        }
82    }
83
84    Ok(col_names)
85}
86
87/// Bind and generate create MV plan, return plan and mv table info.
88pub fn gen_create_mv_plan(
89    session: &SessionImpl,
90    context: OptimizerContextRef,
91    query: Query,
92    name: ObjectName,
93    columns: Vec<Ident>,
94    emit_mode: Option<EmitMode>,
95) -> Result<(PlanRef, TableCatalog)> {
96    let mut binder = Binder::new_for_stream(session);
97    let bound = binder.bind_query(&query)?;
98    gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode)
99}
100
101/// Generate create MV plan from a bound query
102pub fn gen_create_mv_plan_bound(
103    session: &SessionImpl,
104    context: OptimizerContextRef,
105    query: BoundQuery,
106    name: ObjectName,
107    columns: Vec<Ident>,
108    emit_mode: Option<EmitMode>,
109) -> Result<(PlanRef, TableCatalog)> {
110    if session.config().create_compaction_group_for_mv() {
111        context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
112    }
113
114    let db_name = &session.database();
115    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
116
117    let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
118
119    let definition = context.normalized_sql().to_owned();
120
121    let col_names = get_column_names(&query, columns)?;
122
123    let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose);
124    if emit_on_window_close {
125        context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
126    }
127
128    let mut plan_root = Planner::new_for_stream(context).plan_query(query)?;
129    if let Some(col_names) = col_names {
130        for name in &col_names {
131            check_column_name_not_reserved(name)?;
132        }
133        plan_root.set_out_names(col_names)?;
134    }
135    let materialize = plan_root.gen_materialize_plan(
136        database_id,
137        schema_id,
138        table_name,
139        definition,
140        emit_on_window_close,
141    )?;
142
143    let mut table = materialize.table().clone();
144    table.owner = session.user_id();
145
146    let plan: PlanRef = materialize.into();
147
148    let ctx = plan.ctx();
149    let explain_trace = ctx.is_explain_trace();
150    if explain_trace {
151        ctx.trace("Create Materialized View:");
152        ctx.trace(plan.explain_to_string());
153    }
154
155    Ok((plan, table))
156}
157
158pub async fn handle_create_mv(
159    handler_args: HandlerArgs,
160    if_not_exists: bool,
161    name: ObjectName,
162    query: Query,
163    columns: Vec<Ident>,
164    emit_mode: Option<EmitMode>,
165) -> Result<RwPgResponse> {
166    let (dependent_relations, dependent_udfs, bound_query) = {
167        let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
168        let bound_query = binder.bind_query(&query)?;
169        (
170            binder.included_relations().clone(),
171            binder.included_udfs().clone(),
172            bound_query,
173        )
174    };
175    handle_create_mv_bound(
176        handler_args,
177        if_not_exists,
178        name,
179        bound_query,
180        dependent_relations,
181        dependent_udfs,
182        columns,
183        emit_mode,
184    )
185    .await
186}
187
188/// Send a provision request to the serverless backfill controller
189pub async fn provision_resource_group(sbc_addr: String) -> Result<String> {
190    let request = tonic::Request::new(ProvisionRequest {});
191    let mut client =
192        node_group_controller_service_client::NodeGroupControllerServiceClient::connect(
193            sbc_addr.clone(),
194        )
195        .await
196        .map_err(|e| {
197            RwError::from(ErrorCode::InternalError(format!(
198                "unable to reach serverless backfill controller at addr {}: {}",
199                sbc_addr,
200                e.as_report()
201            )))
202        })?;
203
204    match client.provision(request).await {
205        Ok(resp) => Ok(resp.into_inner().resource_group),
206        Err(e) => Err(RwError::from(ErrorCode::InternalError(format!(
207            "serverless backfill controller returned error :{}",
208            e.as_report()
209        )))),
210    }
211}
212
213fn get_with_options(handler_args: HandlerArgs) -> WithOptions {
214    let context = OptimizerContext::from_handler_args(handler_args);
215    context.with_options().clone()
216}
217
218pub async fn handle_create_mv_bound(
219    handler_args: HandlerArgs,
220    if_not_exists: bool,
221    name: ObjectName,
222    query: BoundQuery,
223    dependent_relations: HashSet<ObjectId>,
224    dependent_udfs: HashSet<FunctionId>, // TODO(rc): merge with `dependent_relations`
225    columns: Vec<Ident>,
226    emit_mode: Option<EmitMode>,
227) -> Result<RwPgResponse> {
228    let session = handler_args.session.clone();
229
230    // Check cluster limits
231    session.check_cluster_limits().await?;
232
233    if let Either::Right(resp) = session.check_relation_name_duplicated(
234        name.clone(),
235        StatementType::CREATE_MATERIALIZED_VIEW,
236        if_not_exists,
237    )? {
238        return Ok(resp);
239    }
240
241    let (table, graph, dependencies, resource_type) = {
242        gen_create_mv_graph(
243            handler_args,
244            name,
245            query,
246            dependent_relations,
247            dependent_udfs,
248            columns,
249            emit_mode,
250        )
251        .await?
252    };
253
254    // Ensure writes to `StreamJobTracker` are atomic.
255    let _job_guard =
256        session
257            .env()
258            .creating_streaming_job_tracker()
259            .guard(CreatingStreamingJobInfo::new(
260                session.session_id(),
261                table.database_id,
262                table.schema_id,
263                table.name.clone(),
264            ));
265
266    let catalog_writer = session.catalog_writer()?;
267    execute_with_long_running_notification(
268        catalog_writer.create_materialized_view(
269            table.to_prost(),
270            graph,
271            dependencies,
272            resource_type,
273            if_not_exists,
274        ),
275        &session,
276        "CREATE MATERIALIZED VIEW",
277        LongRunningNotificationAction::MonitorBackfillJob,
278    )
279    .await?;
280
281    Ok(PgResponse::empty_result(
282        StatementType::CREATE_MATERIALIZED_VIEW,
283    ))
284}
285
286pub(crate) async fn gen_create_mv_graph(
287    handler_args: HandlerArgs,
288    name: ObjectName,
289    query: BoundQuery,
290    dependent_relations: HashSet<ObjectId>,
291    dependent_udfs: HashSet<FunctionId>,
292    columns: Vec<Ident>,
293    emit_mode: Option<EmitMode>,
294) -> Result<(
295    TableCatalog,
296    PbStreamFragmentGraph,
297    HashSet<ObjectId>,
298    streaming_job_resource_type::ResourceType,
299)> {
300    let mut with_options = get_with_options(handler_args.clone());
301    let resource_group = with_options.remove(&RESOURCE_GROUP_KEY.to_owned());
302
303    if resource_group.is_some() {
304        risingwave_common::license::Feature::ResourceGroup.check_available()?;
305    }
306
307    let serverless_backfill_from_with = with_options
308        .remove(&CLOUD_SERVERLESS_BACKFILL_ENABLED.to_owned())
309        .map(|value| value.parse::<bool>().unwrap_or(false));
310    let is_serverless_backfill = match serverless_backfill_from_with {
311        Some(value) => value,
312        None => {
313            if resource_group.is_some() {
314                false
315            } else {
316                handler_args.session.config().enable_serverless_backfill()
317            }
318        }
319    };
320
321    if resource_group.is_some() && is_serverless_backfill {
322        return Err(RwError::from(InvalidInputSyntax(
323            "Please do not specify serverless backfilling and resource group together".to_owned(),
324        )));
325    }
326
327    if !with_options.is_empty() {
328        // get other useful fields by `remove`, the logic here is to reject unknown options.
329        return Err(RwError::from(ProtocolError(format!(
330            "unexpected options in WITH clause: {:?}",
331            with_options.keys()
332        ))));
333    }
334
335    let sbc_addr = match SESSION_MANAGER.get() {
336        Some(manager) => manager.env().sbc_address(),
337        None => "",
338    }
339    .to_owned();
340
341    if is_serverless_backfill && sbc_addr.is_empty() {
342        return Err(RwError::from(InvalidInputSyntax(
343            "Serverless Backfill is disabled. Use RisingWave cloud at https://cloud.risingwave.com/auth/signup to try this feature".to_owned(),
344        )));
345    }
346
347    let resource_type = if is_serverless_backfill {
348        assert_eq!(resource_group, None);
349        match provision_resource_group(sbc_addr).await {
350            Err(e) => {
351                return Err(RwError::from(ProtocolError(format!(
352                    "failed to provision serverless backfill nodes: {}",
353                    e.as_report()
354                ))));
355            }
356            Ok(group) => {
357                tracing::info!(
358                    resource_group = group,
359                    "provisioning serverless backfill resource group"
360                );
361                streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(group)
362            }
363        }
364    } else if let Some(group) = resource_group {
365        streaming_job_resource_type::ResourceType::SpecificResourceGroup(group)
366    } else {
367        streaming_job_resource_type::ResourceType::Regular(true)
368    };
369    let context = OptimizerContext::from_handler_args(handler_args);
370    let has_order_by = !query.order.is_empty();
371    if has_order_by {
372        context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
373It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
374"#.to_owned());
375    }
376
377    if resource_type.resource_group().is_some()
378        && !context
379            .session_ctx()
380            .config()
381            .streaming_use_arrangement_backfill()
382    {
383        return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
384    }
385
386    let context: OptimizerContextRef = context.into();
387    let session = context.session_ctx().as_ref();
388
389    let (plan, table) =
390        gen_create_mv_plan_bound(session, context.clone(), query, name, columns, emit_mode)?;
391
392    let backfill_order = plan_backfill_order(
393        session,
394        context.with_options().backfill_order_strategy(),
395        plan.clone(),
396    )?;
397
398    // TODO(rc): To be consistent with UDF dependency check, we should collect relation dependencies
399    // during binding instead of visiting the optimized plan.
400    let dependencies = RelationCollectorVisitor::collect_with(dependent_relations, plan.clone())
401        .into_iter()
402        .chain(dependent_udfs.iter().copied().map_into())
403        .collect();
404
405    let graph = build_graph_with_strategy(
406        plan,
407        Some(GraphJobType::MaterializedView),
408        Some(backfill_order),
409    )?;
410
411    Ok((table, graph, dependencies, resource_type))
412}
413
414#[cfg(test)]
415pub mod tests {
416    use std::collections::HashMap;
417
418    use pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW;
419    use risingwave_common::catalog::{
420        DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
421    };
422    use risingwave_common::types::{DataType, StructType};
423
424    use crate::catalog::root_catalog::SchemaPath;
425    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
426
427    #[tokio::test]
428    async fn test_create_mv_handler() {
429        let proto_file = create_proto_file(PROTO_FILE_DATA);
430        let sql = format!(
431            r#"CREATE SOURCE t1
432    WITH (connector = 'kinesis')
433    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
434            proto_file.path().to_str().unwrap()
435        );
436        let frontend = LocalFrontend::new(Default::default()).await;
437        frontend.run_sql(sql).await.unwrap();
438
439        let sql = "create materialized view mv1 as select t1.country from t1";
440        frontend.run_sql(sql).await.unwrap();
441
442        let session = frontend.session_ref();
443        let catalog_reader = session.env().catalog_reader().read_guard();
444        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
445
446        // Check source exists.
447        let (source, _) = catalog_reader
448            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
449            .unwrap();
450        assert_eq!(source.name, "t1");
451
452        // Check table exists.
453        let (table, _) = catalog_reader
454            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
455            .unwrap();
456        assert_eq!(table.name(), "mv1");
457
458        let columns = table
459            .columns
460            .iter()
461            .map(|col| (col.name(), col.data_type().clone()))
462            .collect::<HashMap<&str, DataType>>();
463
464        let city_type = StructType::new(vec![
465            ("address", DataType::Varchar),
466            ("zipcode", DataType::Varchar),
467        ])
468        // .with_ids([5, 6].map(ColumnId::new))
469        .into();
470        let expected_columns = maplit::hashmap! {
471            ROW_ID_COLUMN_NAME => DataType::Serial,
472            "country" => StructType::new(
473                 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
474            )
475            // .with_ids([3, 4, 7].map(ColumnId::new))
476            .into(),
477            RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
478        };
479        assert_eq!(columns, expected_columns, "{columns:#?}");
480    }
481
482    /// When creating MV, a unique column name must be specified for each column
483    #[tokio::test]
484    async fn test_no_alias() {
485        let frontend = LocalFrontend::new(Default::default()).await;
486
487        let sql = "create table t(x varchar)";
488        frontend.run_sql(sql).await.unwrap();
489
490        // Aggregation without alias is ok.
491        let sql = "create materialized view mv0 as select count(x) from t";
492        frontend.run_sql(sql).await.unwrap();
493
494        // Same aggregations without alias is forbidden, because it make the same column name.
495        let sql = "create materialized view mv1 as select count(x), count(*) from t";
496        let err = frontend.run_sql(sql).await.unwrap_err();
497        assert_eq!(
498            err.to_string(),
499            "Invalid input syntax: column \"count\" specified more than once"
500        );
501
502        // Literal without alias is forbidden.
503        let sql = "create materialized view mv1 as select 1";
504        let err = frontend.run_sql(sql).await.unwrap_err();
505        assert_eq!(
506            err.to_string(),
507            "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
508        );
509
510        // some expression without alias is forbidden.
511        let sql = "create materialized view mv1 as select x is null from t";
512        let err = frontend.run_sql(sql).await.unwrap_err();
513        assert_eq!(
514            err.to_string(),
515            "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
516        );
517    }
518
519    /// Creating MV with order by returns a special notice
520    #[tokio::test]
521    async fn test_create_mv_with_order_by() {
522        let frontend = LocalFrontend::new(Default::default()).await;
523
524        let sql = "create table t(x varchar)";
525        frontend.run_sql(sql).await.unwrap();
526
527        // Without order by
528        let sql = "create materialized view mv1 as select * from t";
529        let response = frontend.run_sql(sql).await.unwrap();
530        assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
531        assert!(response.notices().is_empty());
532
533        // With order by
534        let sql = "create materialized view mv2 as select * from t order by x";
535        let response = frontend.run_sql(sql).await.unwrap();
536        assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
537    }
538}