risingwave_frontend/handler/
create_mv.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use either::Either;
18use itertools::Itertools;
19use pgwire::pg_response::{PgResponse, StatementType};
20use risingwave_common::catalog::{FunctionId, ObjectId, SecretId};
21use risingwave_pb::ddl_service::streaming_job_resource_type;
22use risingwave_pb::serverless_backfill_controller::{
23    ProvisionRequest, node_group_controller_service_client,
24};
25use risingwave_pb::stream_plan::PbStreamFragmentGraph;
26use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
27use thiserror_ext::AsReport;
28
29use super::RwPgResponse;
30use crate::binder::{Binder, BoundQuery, BoundSetExpr};
31use crate::catalog::check_column_name_not_reserved;
32use crate::error::ErrorCode::{InvalidInputSyntax, ProtocolError};
33use crate::error::{ErrorCode, Result, RwError};
34use crate::handler::HandlerArgs;
35use crate::handler::util::{
36    LongRunningNotificationAction, execute_with_long_running_notification,
37    reject_internal_table_dependencies,
38};
39use crate::optimizer::backfill_order_strategy::plan_backfill_order;
40use crate::optimizer::plan_node::generic::GenericPlanRef;
41use crate::optimizer::plan_node::{Explain, StreamPlanRef as PlanRef};
42use crate::optimizer::{OptimizerContext, OptimizerContextRef, RelationCollectorVisitor};
43use crate::planner::Planner;
44use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
45use crate::session::{SESSION_MANAGER, SessionImpl};
46use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
47use crate::utils::ordinal;
48use crate::{TableCatalog, WithOptions};
49
50pub const RESOURCE_GROUP_KEY: &str = "resource_group";
51pub const CLOUD_SERVERLESS_BACKFILL_ENABLED: &str = "cloud.serverless_backfill_enabled";
52
53pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
54    if columns.is_empty() {
55        None
56    } else {
57        Some(columns.iter().map(|v| v.real_value()).collect())
58    }
59}
60
61/// If columns is empty, it means that the user did not specify the column names.
62/// In this case, we extract the column names from the query.
63/// If columns is not empty, it means that user specify the column names and the user
64/// should guarantee that the column names number are consistent with the query.
65pub(super) fn get_column_names(
66    bound: &BoundQuery,
67    columns: Vec<Ident>,
68) -> Result<Option<Vec<String>>> {
69    let col_names = parse_column_names(&columns);
70    if let BoundSetExpr::Select(select) = &bound.body {
71        // `InputRef`'s alias will be implicitly assigned in `bind_project`.
72        // If user provides columns name (col_names.is_some()), we don't need alias.
73        // For other expressions (col_names.is_none()), we require the user to explicitly assign an
74        // alias.
75        if col_names.is_none() {
76            for (i, alias) in select.aliases.iter().enumerate() {
77                if alias.is_none() {
78                    return Err(ErrorCode::BindError(format!(
79                    "An alias must be specified for the {} expression (counting from 1) in result relation", ordinal(i+1)
80                ))
81                .into());
82                }
83            }
84        }
85    }
86
87    Ok(col_names)
88}
89
90/// Bind and generate create MV plan, return plan and mv table info.
91pub fn gen_create_mv_plan(
92    session: &SessionImpl,
93    context: OptimizerContextRef,
94    query: Query,
95    name: ObjectName,
96    columns: Vec<Ident>,
97    emit_mode: Option<EmitMode>,
98) -> Result<(PlanRef, TableCatalog)> {
99    let mut binder = Binder::new_for_stream(session);
100    let bound = binder.bind_query(&query)?;
101    gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode)
102}
103
104/// Generate create MV plan from a bound query
105pub fn gen_create_mv_plan_bound(
106    session: &SessionImpl,
107    context: OptimizerContextRef,
108    query: BoundQuery,
109    name: ObjectName,
110    columns: Vec<Ident>,
111    emit_mode: Option<EmitMode>,
112) -> Result<(PlanRef, TableCatalog)> {
113    if session.config().create_compaction_group_for_mv() {
114        context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
115    }
116
117    let db_name = &session.database();
118    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
119
120    let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
121
122    let definition = context.normalized_sql().to_owned();
123
124    let col_names = get_column_names(&query, columns)?;
125
126    let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose);
127    if emit_on_window_close {
128        context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
129    }
130
131    let mut plan_root = Planner::new_for_stream(context).plan_query(query)?;
132    if let Some(col_names) = col_names {
133        for name in &col_names {
134            check_column_name_not_reserved(name)?;
135        }
136        plan_root.set_out_names(col_names)?;
137    }
138    let materialize = plan_root.gen_materialize_plan(
139        database_id,
140        schema_id,
141        table_name,
142        definition,
143        emit_on_window_close,
144    )?;
145
146    let mut table = materialize.table().clone();
147    table.owner = session.user_id();
148
149    let plan: PlanRef = materialize.into();
150
151    let ctx = plan.ctx();
152    let explain_trace = ctx.is_explain_trace();
153    if explain_trace {
154        ctx.trace("Create Materialized View:");
155        ctx.trace(plan.explain_to_string());
156    }
157
158    Ok((plan, table))
159}
160
161pub async fn handle_create_mv(
162    handler_args: HandlerArgs,
163    if_not_exists: bool,
164    name: ObjectName,
165    query: Query,
166    columns: Vec<Ident>,
167    emit_mode: Option<EmitMode>,
168) -> Result<RwPgResponse> {
169    let (dependent_relations, dependent_udfs, dependent_secrets, bound_query) = {
170        let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
171        let bound_query = binder.bind_query(&query)?;
172        (
173            binder.included_relations().clone(),
174            binder.included_udfs().clone(),
175            binder.included_secrets().clone(),
176            bound_query,
177        )
178    };
179    handle_create_mv_bound(
180        handler_args,
181        if_not_exists,
182        name,
183        bound_query,
184        dependent_relations,
185        dependent_udfs,
186        dependent_secrets,
187        columns,
188        emit_mode,
189    )
190    .await
191}
192
193/// Send a provision request to the serverless backfill controller
194pub async fn provision_resource_group(sbc_addr: String) -> Result<String> {
195    let request = tonic::Request::new(ProvisionRequest {});
196    let mut client =
197        node_group_controller_service_client::NodeGroupControllerServiceClient::connect(
198            sbc_addr.clone(),
199        )
200        .await
201        .map_err(|e| {
202            RwError::from(ErrorCode::InternalError(format!(
203                "unable to reach serverless backfill controller at addr {}: {}",
204                sbc_addr,
205                e.as_report()
206            )))
207        })?;
208
209    match client.provision(request).await {
210        Ok(resp) => Ok(resp.into_inner().resource_group),
211        Err(e) => Err(RwError::from(ErrorCode::InternalError(format!(
212            "serverless backfill controller returned error :{}",
213            e.as_report()
214        )))),
215    }
216}
217
218fn get_with_options(handler_args: HandlerArgs) -> WithOptions {
219    let context = OptimizerContext::from_handler_args(handler_args);
220    context.with_options().clone()
221}
222
223pub async fn handle_create_mv_bound(
224    handler_args: HandlerArgs,
225    if_not_exists: bool,
226    name: ObjectName,
227    query: BoundQuery,
228    dependent_relations: HashSet<ObjectId>,
229    dependent_udfs: HashSet<FunctionId>, // TODO(rc): merge with `dependent_relations`
230    dependent_secrets: HashSet<SecretId>,
231    columns: Vec<Ident>,
232    emit_mode: Option<EmitMode>,
233) -> Result<RwPgResponse> {
234    let session = handler_args.session.clone();
235
236    // Check cluster limits
237    session.check_cluster_limits().await?;
238
239    if let Either::Right(resp) = session.check_relation_name_duplicated(
240        name.clone(),
241        StatementType::CREATE_MATERIALIZED_VIEW,
242        if_not_exists,
243    )? {
244        return Ok(resp);
245    }
246
247    reject_internal_table_dependencies(
248        session.as_ref(),
249        &dependent_relations,
250        "CREATE MATERIALIZED VIEW",
251    )?;
252
253    let (table, graph, dependencies, resource_type) = {
254        gen_create_mv_graph(
255            handler_args,
256            name,
257            query,
258            dependent_relations,
259            dependent_udfs,
260            dependent_secrets,
261            columns,
262            emit_mode,
263        )
264        .await?
265    };
266
267    // Ensure writes to `StreamJobTracker` are atomic.
268    let _job_guard =
269        session
270            .env()
271            .creating_streaming_job_tracker()
272            .guard(CreatingStreamingJobInfo::new(
273                session.session_id(),
274                table.database_id,
275                table.schema_id,
276                table.name.clone(),
277            ));
278
279    let catalog_writer = session.catalog_writer()?;
280    execute_with_long_running_notification(
281        catalog_writer.create_materialized_view(
282            table.to_prost(),
283            graph,
284            dependencies,
285            resource_type,
286            if_not_exists,
287        ),
288        &session,
289        "CREATE MATERIALIZED VIEW",
290        LongRunningNotificationAction::MonitorBackfillJob,
291    )
292    .await?;
293
294    Ok(PgResponse::empty_result(
295        StatementType::CREATE_MATERIALIZED_VIEW,
296    ))
297}
298
299pub(crate) async fn gen_create_mv_graph(
300    handler_args: HandlerArgs,
301    name: ObjectName,
302    query: BoundQuery,
303    dependent_relations: HashSet<ObjectId>,
304    dependent_udfs: HashSet<FunctionId>,
305    dependent_secrets: HashSet<SecretId>,
306    columns: Vec<Ident>,
307    emit_mode: Option<EmitMode>,
308) -> Result<(
309    TableCatalog,
310    PbStreamFragmentGraph,
311    HashSet<ObjectId>,
312    streaming_job_resource_type::ResourceType,
313)> {
314    let mut with_options = get_with_options(handler_args.clone());
315    let resource_group = with_options.remove(&RESOURCE_GROUP_KEY.to_owned());
316
317    if resource_group.is_some() {
318        risingwave_common::license::Feature::ResourceGroup.check_available()?;
319    }
320
321    let serverless_backfill_from_with = with_options
322        .remove(&CLOUD_SERVERLESS_BACKFILL_ENABLED.to_owned())
323        .map(|value| value.parse::<bool>().unwrap_or(false));
324    let is_serverless_backfill = match serverless_backfill_from_with {
325        Some(value) => value,
326        None => {
327            if resource_group.is_some() {
328                false
329            } else {
330                handler_args.session.config().enable_serverless_backfill()
331            }
332        }
333    };
334
335    if resource_group.is_some() && is_serverless_backfill {
336        return Err(RwError::from(InvalidInputSyntax(
337            "Please do not specify serverless backfilling and resource group together".to_owned(),
338        )));
339    }
340
341    if !with_options.is_empty() {
342        // get other useful fields by `remove`, the logic here is to reject unknown options.
343        return Err(RwError::from(ProtocolError(format!(
344            "unexpected options in WITH clause: {:?}",
345            with_options.keys()
346        ))));
347    }
348
349    let sbc_addr = match SESSION_MANAGER.get() {
350        Some(manager) => manager.env().sbc_address(),
351        None => "",
352    }
353    .to_owned();
354
355    if is_serverless_backfill && sbc_addr.is_empty() {
356        return Err(RwError::from(InvalidInputSyntax(
357            "Serverless Backfill is disabled. Use RisingWave cloud at https://cloud.risingwave.com/auth/signup to try this feature".to_owned(),
358        )));
359    }
360
361    let resource_type = if is_serverless_backfill {
362        assert_eq!(resource_group, None);
363        match provision_resource_group(sbc_addr).await {
364            Err(e) => {
365                return Err(RwError::from(ProtocolError(format!(
366                    "failed to provision serverless backfill nodes: {}",
367                    e.as_report()
368                ))));
369            }
370            Ok(group) => {
371                tracing::info!(
372                    resource_group = group,
373                    "provisioning serverless backfill resource group"
374                );
375                streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(group)
376            }
377        }
378    } else if let Some(group) = resource_group {
379        streaming_job_resource_type::ResourceType::SpecificResourceGroup(group)
380    } else {
381        streaming_job_resource_type::ResourceType::Regular(true)
382    };
383    let context = OptimizerContext::from_handler_args(handler_args);
384    let has_order_by = !query.order.is_empty();
385    if has_order_by {
386        context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
387It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
388"#.to_owned());
389    }
390
391    if resource_type.resource_group().is_some()
392        && !context
393            .session_ctx()
394            .config()
395            .streaming_use_arrangement_backfill()
396    {
397        return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
398    }
399
400    let context: OptimizerContextRef = context.into();
401    let session = context.session_ctx().as_ref();
402
403    let (plan, table) =
404        gen_create_mv_plan_bound(session, context.clone(), query, name, columns, emit_mode)?;
405
406    let backfill_order = plan_backfill_order(
407        session,
408        context.with_options().backfill_order_strategy(),
409        plan.clone(),
410    )?;
411
412    // TODO(rc): To be consistent with UDF dependency check, we should collect relation dependencies
413    // during binding instead of visiting the optimized plan.
414    let dependencies = RelationCollectorVisitor::collect_with(dependent_relations, plan.clone())
415        .into_iter()
416        .chain(dependent_udfs.iter().copied().map_into())
417        .chain(
418            dependent_secrets
419                .iter()
420                .copied()
421                .map(|id| id.as_object_id()),
422        )
423        .collect();
424
425    let graph = build_graph_with_strategy(
426        plan,
427        Some(GraphJobType::MaterializedView),
428        Some(backfill_order),
429    )?;
430
431    Ok((table, graph, dependencies, resource_type))
432}
433
434#[cfg(test)]
435pub mod tests {
436    use std::collections::HashMap;
437
438    use pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW;
439    use risingwave_common::catalog::{
440        DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
441    };
442    use risingwave_common::types::{DataType, StructType};
443
444    use crate::catalog::root_catalog::SchemaPath;
445    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
446
447    #[tokio::test]
448    async fn test_create_mv_handler() {
449        let proto_file = create_proto_file(PROTO_FILE_DATA);
450        let sql = format!(
451            r#"CREATE SOURCE t1
452    WITH (connector = 'kinesis')
453    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
454            proto_file.path().to_str().unwrap()
455        );
456        let frontend = LocalFrontend::new(Default::default()).await;
457        frontend.run_sql(sql).await.unwrap();
458
459        let sql = "create materialized view mv1 as select t1.country from t1";
460        frontend.run_sql(sql).await.unwrap();
461
462        let session = frontend.session_ref();
463        let catalog_reader = session.env().catalog_reader().read_guard();
464        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
465
466        // Check source exists.
467        let (source, _) = catalog_reader
468            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
469            .unwrap();
470        assert_eq!(source.name, "t1");
471
472        // Check table exists.
473        let (table, _) = catalog_reader
474            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
475            .unwrap();
476        assert_eq!(table.name(), "mv1");
477
478        let columns = table
479            .columns
480            .iter()
481            .map(|col| (col.name(), col.data_type().clone()))
482            .collect::<HashMap<&str, DataType>>();
483
484        let city_type = StructType::new(vec![
485            ("address", DataType::Varchar),
486            ("zipcode", DataType::Varchar),
487        ])
488        // .with_ids([5, 6].map(ColumnId::new))
489        .into();
490        let expected_columns = maplit::hashmap! {
491            ROW_ID_COLUMN_NAME => DataType::Serial,
492            "country" => StructType::new(
493                 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
494            )
495            // .with_ids([3, 4, 7].map(ColumnId::new))
496            .into(),
497            RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
498        };
499        assert_eq!(columns, expected_columns, "{columns:#?}");
500    }
501
502    /// When creating MV, a unique column name must be specified for each column
503    #[tokio::test]
504    async fn test_no_alias() {
505        let frontend = LocalFrontend::new(Default::default()).await;
506
507        let sql = "create table t(x varchar)";
508        frontend.run_sql(sql).await.unwrap();
509
510        // Aggregation without alias is ok.
511        let sql = "create materialized view mv0 as select count(x) from t";
512        frontend.run_sql(sql).await.unwrap();
513
514        // Same aggregations without alias is forbidden, because it make the same column name.
515        let sql = "create materialized view mv1 as select count(x), count(*) from t";
516        let err = frontend.run_sql(sql).await.unwrap_err();
517        assert_eq!(
518            err.to_string(),
519            "Invalid input syntax: column \"count\" specified more than once"
520        );
521
522        // Literal without alias is forbidden.
523        let sql = "create materialized view mv1 as select 1";
524        let err = frontend.run_sql(sql).await.unwrap_err();
525        assert_eq!(
526            err.to_string(),
527            "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
528        );
529
530        // some expression without alias is forbidden.
531        let sql = "create materialized view mv1 as select x is null from t";
532        let err = frontend.run_sql(sql).await.unwrap_err();
533        assert_eq!(
534            err.to_string(),
535            "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
536        );
537    }
538
539    /// Creating MV with order by returns a special notice
540    #[tokio::test]
541    async fn test_create_mv_with_order_by() {
542        let frontend = LocalFrontend::new(Default::default()).await;
543
544        let sql = "create table t(x varchar)";
545        frontend.run_sql(sql).await.unwrap();
546
547        // Without order by
548        let sql = "create materialized view mv1 as select * from t";
549        let response = frontend.run_sql(sql).await.unwrap();
550        assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
551        assert!(response.notices().is_empty());
552
553        // With order by
554        let sql = "create materialized view mv2 as select * from t order by x";
555        let response = frontend.run_sql(sql).await.unwrap();
556        assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
557    }
558}