1use std::collections::HashSet;
16
17use either::Either;
18use itertools::Itertools;
19use pgwire::pg_response::{PgResponse, StatementType};
20use risingwave_common::catalog::{FunctionId, ObjectId};
21use risingwave_pb::serverless_backfill_controller::{
22 ProvisionRequest, node_group_controller_service_client,
23};
24use risingwave_pb::stream_plan::PbStreamFragmentGraph;
25use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
26use thiserror_ext::AsReport;
27
28use super::RwPgResponse;
29use crate::binder::{Binder, BoundQuery, BoundSetExpr};
30use crate::catalog::check_column_name_not_reserved;
31use crate::error::ErrorCode::{InvalidInputSyntax, ProtocolError};
32use crate::error::{ErrorCode, Result, RwError};
33use crate::handler::HandlerArgs;
34use crate::handler::util::{LongRunningNotificationAction, execute_with_long_running_notification};
35use crate::optimizer::backfill_order_strategy::plan_backfill_order;
36use crate::optimizer::plan_node::generic::GenericPlanRef;
37use crate::optimizer::plan_node::{Explain, StreamPlanRef as PlanRef};
38use crate::optimizer::{OptimizerContext, OptimizerContextRef, RelationCollectorVisitor};
39use crate::planner::Planner;
40use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
41use crate::session::{SESSION_MANAGER, SessionImpl};
42use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
43use crate::utils::ordinal;
44use crate::{TableCatalog, WithOptions};
45
46pub const RESOURCE_GROUP_KEY: &str = "resource_group";
47pub const CLOUD_SERVERLESS_BACKFILL_ENABLED: &str = "cloud.serverless_backfill_enabled";
48
49pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
50 if columns.is_empty() {
51 None
52 } else {
53 Some(columns.iter().map(|v| v.real_value()).collect())
54 }
55}
56
57pub(super) fn get_column_names(
62 bound: &BoundQuery,
63 columns: Vec<Ident>,
64) -> Result<Option<Vec<String>>> {
65 let col_names = parse_column_names(&columns);
66 if let BoundSetExpr::Select(select) = &bound.body {
67 if col_names.is_none() {
72 for (i, alias) in select.aliases.iter().enumerate() {
73 if alias.is_none() {
74 return Err(ErrorCode::BindError(format!(
75 "An alias must be specified for the {} expression (counting from 1) in result relation", ordinal(i+1)
76 ))
77 .into());
78 }
79 }
80 }
81 }
82
83 Ok(col_names)
84}
85
86pub fn gen_create_mv_plan(
88 session: &SessionImpl,
89 context: OptimizerContextRef,
90 query: Query,
91 name: ObjectName,
92 columns: Vec<Ident>,
93 emit_mode: Option<EmitMode>,
94) -> Result<(PlanRef, TableCatalog)> {
95 let mut binder = Binder::new_for_stream(session);
96 let bound = binder.bind_query(&query)?;
97 gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode)
98}
99
100pub fn gen_create_mv_plan_bound(
102 session: &SessionImpl,
103 context: OptimizerContextRef,
104 query: BoundQuery,
105 name: ObjectName,
106 columns: Vec<Ident>,
107 emit_mode: Option<EmitMode>,
108) -> Result<(PlanRef, TableCatalog)> {
109 if session.config().create_compaction_group_for_mv() {
110 context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
111 }
112
113 let db_name = &session.database();
114 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
115
116 let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
117
118 let definition = context.normalized_sql().to_owned();
119
120 let col_names = get_column_names(&query, columns)?;
121
122 let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose);
123 if emit_on_window_close {
124 context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
125 }
126
127 let mut plan_root = Planner::new_for_stream(context).plan_query(query)?;
128 if let Some(col_names) = col_names {
129 for name in &col_names {
130 check_column_name_not_reserved(name)?;
131 }
132 plan_root.set_out_names(col_names)?;
133 }
134 let materialize = plan_root.gen_materialize_plan(
135 database_id,
136 schema_id,
137 table_name,
138 definition,
139 emit_on_window_close,
140 )?;
141
142 let mut table = materialize.table().clone();
143 table.owner = session.user_id();
144
145 let plan: PlanRef = materialize.into();
146
147 let ctx = plan.ctx();
148 let explain_trace = ctx.is_explain_trace();
149 if explain_trace {
150 ctx.trace("Create Materialized View:");
151 ctx.trace(plan.explain_to_string());
152 }
153
154 Ok((plan, table))
155}
156
157pub async fn handle_create_mv(
158 handler_args: HandlerArgs,
159 if_not_exists: bool,
160 name: ObjectName,
161 query: Query,
162 columns: Vec<Ident>,
163 emit_mode: Option<EmitMode>,
164) -> Result<RwPgResponse> {
165 let (dependent_relations, dependent_udfs, bound_query) = {
166 let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
167 let bound_query = binder.bind_query(&query)?;
168 (
169 binder.included_relations().clone(),
170 binder.included_udfs().clone(),
171 bound_query,
172 )
173 };
174 handle_create_mv_bound(
175 handler_args,
176 if_not_exists,
177 name,
178 bound_query,
179 dependent_relations,
180 dependent_udfs,
181 columns,
182 emit_mode,
183 )
184 .await
185}
186
187pub async fn provision_resource_group(sbc_addr: String) -> Result<String> {
189 let request = tonic::Request::new(ProvisionRequest {});
190 let mut client =
191 node_group_controller_service_client::NodeGroupControllerServiceClient::connect(
192 sbc_addr.clone(),
193 )
194 .await
195 .map_err(|e| {
196 RwError::from(ErrorCode::InternalError(format!(
197 "unable to reach serverless backfill controller at addr {}: {}",
198 sbc_addr,
199 e.as_report()
200 )))
201 })?;
202
203 match client.provision(request).await {
204 Ok(resp) => Ok(resp.into_inner().resource_group),
205 Err(e) => Err(RwError::from(ErrorCode::InternalError(format!(
206 "serverless backfill controller returned error :{}",
207 e.as_report()
208 )))),
209 }
210}
211
212fn get_with_options(handler_args: HandlerArgs) -> WithOptions {
213 let context = OptimizerContext::from_handler_args(handler_args);
214 context.with_options().clone()
215}
216
217pub async fn handle_create_mv_bound(
218 handler_args: HandlerArgs,
219 if_not_exists: bool,
220 name: ObjectName,
221 query: BoundQuery,
222 dependent_relations: HashSet<ObjectId>,
223 dependent_udfs: HashSet<FunctionId>, columns: Vec<Ident>,
225 emit_mode: Option<EmitMode>,
226) -> Result<RwPgResponse> {
227 let session = handler_args.session.clone();
228
229 session.check_cluster_limits().await?;
231
232 if let Either::Right(resp) = session.check_relation_name_duplicated(
233 name.clone(),
234 StatementType::CREATE_MATERIALIZED_VIEW,
235 if_not_exists,
236 )? {
237 return Ok(resp);
238 }
239
240 let (table, graph, dependencies, resource_group) = {
241 gen_create_mv_graph(
242 handler_args,
243 name,
244 query,
245 dependent_relations,
246 dependent_udfs,
247 columns,
248 emit_mode,
249 )
250 .await?
251 };
252
253 let _job_guard =
255 session
256 .env()
257 .creating_streaming_job_tracker()
258 .guard(CreatingStreamingJobInfo::new(
259 session.session_id(),
260 table.database_id,
261 table.schema_id,
262 table.name.clone(),
263 ));
264
265 let catalog_writer = session.catalog_writer()?;
266 execute_with_long_running_notification(
267 catalog_writer.create_materialized_view(
268 table.to_prost(),
269 graph,
270 dependencies,
271 resource_group,
272 if_not_exists,
273 ),
274 &session,
275 "CREATE MATERIALIZED VIEW",
276 LongRunningNotificationAction::MonitorBackfillJob,
277 )
278 .await?;
279
280 Ok(PgResponse::empty_result(
281 StatementType::CREATE_MATERIALIZED_VIEW,
282 ))
283}
284
285pub(crate) async fn gen_create_mv_graph(
286 handler_args: HandlerArgs,
287 name: ObjectName,
288 query: BoundQuery,
289 dependent_relations: HashSet<ObjectId>,
290 dependent_udfs: HashSet<FunctionId>,
291 columns: Vec<Ident>,
292 emit_mode: Option<EmitMode>,
293) -> Result<(
294 TableCatalog,
295 PbStreamFragmentGraph,
296 HashSet<ObjectId>,
297 Option<String>,
298)> {
299 let mut with_options = get_with_options(handler_args.clone());
300 let mut resource_group = with_options.remove(&RESOURCE_GROUP_KEY.to_owned());
301
302 if resource_group.is_some() {
303 risingwave_common::license::Feature::ResourceGroup.check_available()?;
304 }
305
306 let is_serverless_backfill = with_options
307 .remove(&CLOUD_SERVERLESS_BACKFILL_ENABLED.to_owned())
308 .unwrap_or_default()
309 .parse::<bool>()
310 .unwrap_or(false);
311
312 if resource_group.is_some() && is_serverless_backfill {
313 return Err(RwError::from(InvalidInputSyntax(
314 "Please do not specify serverless backfilling and resource group together".to_owned(),
315 )));
316 }
317
318 if !with_options.is_empty() {
319 return Err(RwError::from(ProtocolError(format!(
321 "unexpected options in WITH clause: {:?}",
322 with_options.keys()
323 ))));
324 }
325
326 let sbc_addr = match SESSION_MANAGER.get() {
327 Some(manager) => manager.env().sbc_address(),
328 None => "",
329 }
330 .to_owned();
331
332 if is_serverless_backfill && sbc_addr.is_empty() {
333 return Err(RwError::from(InvalidInputSyntax(
334 "Serverless Backfill is disabled on-premise. Use RisingWave cloud at https://cloud.risingwave.com/auth/signup to try this feature".to_owned(),
335 )));
336 }
337
338 if is_serverless_backfill {
339 match provision_resource_group(sbc_addr).await {
340 Err(e) => {
341 return Err(RwError::from(ProtocolError(format!(
342 "failed to provision serverless backfill nodes: {}",
343 e.as_report()
344 ))));
345 }
346 Ok(val) => resource_group = Some(val),
347 }
348 }
349 tracing::debug!(
350 resource_group = resource_group,
351 "provisioning on resource group"
352 );
353
354 let context = OptimizerContext::from_handler_args(handler_args);
355 let has_order_by = !query.order.is_empty();
356 if has_order_by {
357 context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
358It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
359"#.to_owned());
360 }
361
362 if resource_group.is_some()
363 && !context
364 .session_ctx()
365 .config()
366 .streaming_use_arrangement_backfill()
367 {
368 return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
369 }
370
371 let context: OptimizerContextRef = context.into();
372 let session = context.session_ctx().as_ref();
373
374 let (plan, table) =
375 gen_create_mv_plan_bound(session, context.clone(), query, name, columns, emit_mode)?;
376
377 let backfill_order = plan_backfill_order(
378 session,
379 context.with_options().backfill_order_strategy(),
380 plan.clone(),
381 )?;
382
383 let dependencies = RelationCollectorVisitor::collect_with(dependent_relations, plan.clone())
386 .into_iter()
387 .chain(dependent_udfs.iter().copied().map_into())
388 .collect();
389
390 let graph = build_graph_with_strategy(
391 plan,
392 Some(GraphJobType::MaterializedView),
393 Some(backfill_order),
394 )?;
395
396 Ok((table, graph, dependencies, resource_group))
397}
398
399#[cfg(test)]
400pub mod tests {
401 use std::collections::HashMap;
402
403 use pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW;
404 use risingwave_common::catalog::{
405 DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
406 };
407 use risingwave_common::types::{DataType, StructType};
408
409 use crate::catalog::root_catalog::SchemaPath;
410 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
411
412 #[tokio::test]
413 async fn test_create_mv_handler() {
414 let proto_file = create_proto_file(PROTO_FILE_DATA);
415 let sql = format!(
416 r#"CREATE SOURCE t1
417 WITH (connector = 'kinesis')
418 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
419 proto_file.path().to_str().unwrap()
420 );
421 let frontend = LocalFrontend::new(Default::default()).await;
422 frontend.run_sql(sql).await.unwrap();
423
424 let sql = "create materialized view mv1 as select t1.country from t1";
425 frontend.run_sql(sql).await.unwrap();
426
427 let session = frontend.session_ref();
428 let catalog_reader = session.env().catalog_reader().read_guard();
429 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
430
431 let (source, _) = catalog_reader
433 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
434 .unwrap();
435 assert_eq!(source.name, "t1");
436
437 let (table, _) = catalog_reader
439 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
440 .unwrap();
441 assert_eq!(table.name(), "mv1");
442
443 let columns = table
444 .columns
445 .iter()
446 .map(|col| (col.name(), col.data_type().clone()))
447 .collect::<HashMap<&str, DataType>>();
448
449 let city_type = StructType::new(vec![
450 ("address", DataType::Varchar),
451 ("zipcode", DataType::Varchar),
452 ])
453 .into();
455 let expected_columns = maplit::hashmap! {
456 ROW_ID_COLUMN_NAME => DataType::Serial,
457 "country" => StructType::new(
458 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
459 )
460 .into(),
462 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
463 };
464 assert_eq!(columns, expected_columns, "{columns:#?}");
465 }
466
467 #[tokio::test]
469 async fn test_no_alias() {
470 let frontend = LocalFrontend::new(Default::default()).await;
471
472 let sql = "create table t(x varchar)";
473 frontend.run_sql(sql).await.unwrap();
474
475 let sql = "create materialized view mv0 as select count(x) from t";
477 frontend.run_sql(sql).await.unwrap();
478
479 let sql = "create materialized view mv1 as select count(x), count(*) from t";
481 let err = frontend.run_sql(sql).await.unwrap_err();
482 assert_eq!(
483 err.to_string(),
484 "Invalid input syntax: column \"count\" specified more than once"
485 );
486
487 let sql = "create materialized view mv1 as select 1";
489 let err = frontend.run_sql(sql).await.unwrap_err();
490 assert_eq!(
491 err.to_string(),
492 "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
493 );
494
495 let sql = "create materialized view mv1 as select x is null from t";
497 let err = frontend.run_sql(sql).await.unwrap_err();
498 assert_eq!(
499 err.to_string(),
500 "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
501 );
502 }
503
504 #[tokio::test]
506 async fn test_create_mv_with_order_by() {
507 let frontend = LocalFrontend::new(Default::default()).await;
508
509 let sql = "create table t(x varchar)";
510 frontend.run_sql(sql).await.unwrap();
511
512 let sql = "create materialized view mv1 as select * from t";
514 let response = frontend.run_sql(sql).await.unwrap();
515 assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
516 assert!(response.notices().is_empty());
517
518 let sql = "create materialized view mv2 as select * from t order by x";
520 let response = frontend.run_sql(sql).await.unwrap();
521 assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
522 }
523}