1use std::collections::HashSet;
16
17use either::Either;
18use itertools::Itertools;
19use pgwire::pg_response::{PgResponse, StatementType};
20use risingwave_common::catalog::{FunctionId, ObjectId};
21use risingwave_pb::ddl_service::streaming_job_resource_type;
22use risingwave_pb::serverless_backfill_controller::{
23 ProvisionRequest, node_group_controller_service_client,
24};
25use risingwave_pb::stream_plan::PbStreamFragmentGraph;
26use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
27use thiserror_ext::AsReport;
28
29use super::RwPgResponse;
30use crate::binder::{Binder, BoundQuery, BoundSetExpr};
31use crate::catalog::check_column_name_not_reserved;
32use crate::error::ErrorCode::{InvalidInputSyntax, ProtocolError};
33use crate::error::{ErrorCode, Result, RwError};
34use crate::handler::HandlerArgs;
35use crate::handler::util::{LongRunningNotificationAction, execute_with_long_running_notification};
36use crate::optimizer::backfill_order_strategy::plan_backfill_order;
37use crate::optimizer::plan_node::generic::GenericPlanRef;
38use crate::optimizer::plan_node::{Explain, StreamPlanRef as PlanRef};
39use crate::optimizer::{OptimizerContext, OptimizerContextRef, RelationCollectorVisitor};
40use crate::planner::Planner;
41use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
42use crate::session::{SESSION_MANAGER, SessionImpl};
43use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
44use crate::utils::ordinal;
45use crate::{TableCatalog, WithOptions};
46
47pub const RESOURCE_GROUP_KEY: &str = "resource_group";
48pub const CLOUD_SERVERLESS_BACKFILL_ENABLED: &str = "cloud.serverless_backfill_enabled";
49
50pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
51 if columns.is_empty() {
52 None
53 } else {
54 Some(columns.iter().map(|v| v.real_value()).collect())
55 }
56}
57
58pub(super) fn get_column_names(
63 bound: &BoundQuery,
64 columns: Vec<Ident>,
65) -> Result<Option<Vec<String>>> {
66 let col_names = parse_column_names(&columns);
67 if let BoundSetExpr::Select(select) = &bound.body {
68 if col_names.is_none() {
73 for (i, alias) in select.aliases.iter().enumerate() {
74 if alias.is_none() {
75 return Err(ErrorCode::BindError(format!(
76 "An alias must be specified for the {} expression (counting from 1) in result relation", ordinal(i+1)
77 ))
78 .into());
79 }
80 }
81 }
82 }
83
84 Ok(col_names)
85}
86
87pub fn gen_create_mv_plan(
89 session: &SessionImpl,
90 context: OptimizerContextRef,
91 query: Query,
92 name: ObjectName,
93 columns: Vec<Ident>,
94 emit_mode: Option<EmitMode>,
95) -> Result<(PlanRef, TableCatalog)> {
96 let mut binder = Binder::new_for_stream(session);
97 let bound = binder.bind_query(&query)?;
98 gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode)
99}
100
101pub fn gen_create_mv_plan_bound(
103 session: &SessionImpl,
104 context: OptimizerContextRef,
105 query: BoundQuery,
106 name: ObjectName,
107 columns: Vec<Ident>,
108 emit_mode: Option<EmitMode>,
109) -> Result<(PlanRef, TableCatalog)> {
110 if session.config().create_compaction_group_for_mv() {
111 context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
112 }
113
114 let db_name = &session.database();
115 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
116
117 let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
118
119 let definition = context.normalized_sql().to_owned();
120
121 let col_names = get_column_names(&query, columns)?;
122
123 let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose);
124 if emit_on_window_close {
125 context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
126 }
127
128 let mut plan_root = Planner::new_for_stream(context).plan_query(query)?;
129 if let Some(col_names) = col_names {
130 for name in &col_names {
131 check_column_name_not_reserved(name)?;
132 }
133 plan_root.set_out_names(col_names)?;
134 }
135 let materialize = plan_root.gen_materialize_plan(
136 database_id,
137 schema_id,
138 table_name,
139 definition,
140 emit_on_window_close,
141 )?;
142
143 let mut table = materialize.table().clone();
144 table.owner = session.user_id();
145
146 let plan: PlanRef = materialize.into();
147
148 let ctx = plan.ctx();
149 let explain_trace = ctx.is_explain_trace();
150 if explain_trace {
151 ctx.trace("Create Materialized View:");
152 ctx.trace(plan.explain_to_string());
153 }
154
155 Ok((plan, table))
156}
157
158pub async fn handle_create_mv(
159 handler_args: HandlerArgs,
160 if_not_exists: bool,
161 name: ObjectName,
162 query: Query,
163 columns: Vec<Ident>,
164 emit_mode: Option<EmitMode>,
165) -> Result<RwPgResponse> {
166 let (dependent_relations, dependent_udfs, bound_query) = {
167 let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
168 let bound_query = binder.bind_query(&query)?;
169 (
170 binder.included_relations().clone(),
171 binder.included_udfs().clone(),
172 bound_query,
173 )
174 };
175 handle_create_mv_bound(
176 handler_args,
177 if_not_exists,
178 name,
179 bound_query,
180 dependent_relations,
181 dependent_udfs,
182 columns,
183 emit_mode,
184 )
185 .await
186}
187
188pub async fn provision_resource_group(sbc_addr: String) -> Result<String> {
190 let request = tonic::Request::new(ProvisionRequest {});
191 let mut client =
192 node_group_controller_service_client::NodeGroupControllerServiceClient::connect(
193 sbc_addr.clone(),
194 )
195 .await
196 .map_err(|e| {
197 RwError::from(ErrorCode::InternalError(format!(
198 "unable to reach serverless backfill controller at addr {}: {}",
199 sbc_addr,
200 e.as_report()
201 )))
202 })?;
203
204 match client.provision(request).await {
205 Ok(resp) => Ok(resp.into_inner().resource_group),
206 Err(e) => Err(RwError::from(ErrorCode::InternalError(format!(
207 "serverless backfill controller returned error :{}",
208 e.as_report()
209 )))),
210 }
211}
212
213fn get_with_options(handler_args: HandlerArgs) -> WithOptions {
214 let context = OptimizerContext::from_handler_args(handler_args);
215 context.with_options().clone()
216}
217
218pub async fn handle_create_mv_bound(
219 handler_args: HandlerArgs,
220 if_not_exists: bool,
221 name: ObjectName,
222 query: BoundQuery,
223 dependent_relations: HashSet<ObjectId>,
224 dependent_udfs: HashSet<FunctionId>, columns: Vec<Ident>,
226 emit_mode: Option<EmitMode>,
227) -> Result<RwPgResponse> {
228 let session = handler_args.session.clone();
229
230 session.check_cluster_limits().await?;
232
233 if let Either::Right(resp) = session.check_relation_name_duplicated(
234 name.clone(),
235 StatementType::CREATE_MATERIALIZED_VIEW,
236 if_not_exists,
237 )? {
238 return Ok(resp);
239 }
240
241 let (table, graph, dependencies, resource_type) = {
242 gen_create_mv_graph(
243 handler_args,
244 name,
245 query,
246 dependent_relations,
247 dependent_udfs,
248 columns,
249 emit_mode,
250 )
251 .await?
252 };
253
254 let _job_guard =
256 session
257 .env()
258 .creating_streaming_job_tracker()
259 .guard(CreatingStreamingJobInfo::new(
260 session.session_id(),
261 table.database_id,
262 table.schema_id,
263 table.name.clone(),
264 ));
265
266 let catalog_writer = session.catalog_writer()?;
267 execute_with_long_running_notification(
268 catalog_writer.create_materialized_view(
269 table.to_prost(),
270 graph,
271 dependencies,
272 resource_type,
273 if_not_exists,
274 ),
275 &session,
276 "CREATE MATERIALIZED VIEW",
277 LongRunningNotificationAction::MonitorBackfillJob,
278 )
279 .await?;
280
281 Ok(PgResponse::empty_result(
282 StatementType::CREATE_MATERIALIZED_VIEW,
283 ))
284}
285
286pub(crate) async fn gen_create_mv_graph(
287 handler_args: HandlerArgs,
288 name: ObjectName,
289 query: BoundQuery,
290 dependent_relations: HashSet<ObjectId>,
291 dependent_udfs: HashSet<FunctionId>,
292 columns: Vec<Ident>,
293 emit_mode: Option<EmitMode>,
294) -> Result<(
295 TableCatalog,
296 PbStreamFragmentGraph,
297 HashSet<ObjectId>,
298 streaming_job_resource_type::ResourceType,
299)> {
300 let mut with_options = get_with_options(handler_args.clone());
301 let resource_group = with_options.remove(&RESOURCE_GROUP_KEY.to_owned());
302
303 if resource_group.is_some() {
304 risingwave_common::license::Feature::ResourceGroup.check_available()?;
305 }
306
307 let is_serverless_backfill = with_options
308 .remove(&CLOUD_SERVERLESS_BACKFILL_ENABLED.to_owned())
309 .unwrap_or_default()
310 .parse::<bool>()
311 .unwrap_or(false);
312
313 if resource_group.is_some() && is_serverless_backfill {
314 return Err(RwError::from(InvalidInputSyntax(
315 "Please do not specify serverless backfilling and resource group together".to_owned(),
316 )));
317 }
318
319 if !with_options.is_empty() {
320 return Err(RwError::from(ProtocolError(format!(
322 "unexpected options in WITH clause: {:?}",
323 with_options.keys()
324 ))));
325 }
326
327 let sbc_addr = match SESSION_MANAGER.get() {
328 Some(manager) => manager.env().sbc_address(),
329 None => "",
330 }
331 .to_owned();
332
333 if is_serverless_backfill && sbc_addr.is_empty() {
334 return Err(RwError::from(InvalidInputSyntax(
335 "Serverless Backfill is disabled on-premise. Use RisingWave cloud at https://cloud.risingwave.com/auth/signup to try this feature".to_owned(),
336 )));
337 }
338
339 let resource_type = if is_serverless_backfill {
340 assert_eq!(resource_group, None);
341 match provision_resource_group(sbc_addr).await {
342 Err(e) => {
343 return Err(RwError::from(ProtocolError(format!(
344 "failed to provision serverless backfill nodes: {}",
345 e.as_report()
346 ))));
347 }
348 Ok(group) => {
349 tracing::info!(
350 resource_group = group,
351 "provisioning serverless backfill resource group"
352 );
353 streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(group)
354 }
355 }
356 } else if let Some(group) = resource_group {
357 streaming_job_resource_type::ResourceType::SpecificResourceGroup(group)
358 } else {
359 streaming_job_resource_type::ResourceType::Regular(true)
360 };
361 let context = OptimizerContext::from_handler_args(handler_args);
362 let has_order_by = !query.order.is_empty();
363 if has_order_by {
364 context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
365It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
366"#.to_owned());
367 }
368
369 if resource_type.resource_group().is_some()
370 && !context
371 .session_ctx()
372 .config()
373 .streaming_use_arrangement_backfill()
374 {
375 return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
376 }
377
378 let context: OptimizerContextRef = context.into();
379 let session = context.session_ctx().as_ref();
380
381 let (plan, table) =
382 gen_create_mv_plan_bound(session, context.clone(), query, name, columns, emit_mode)?;
383
384 let backfill_order = plan_backfill_order(
385 session,
386 context.with_options().backfill_order_strategy(),
387 plan.clone(),
388 )?;
389
390 let dependencies = RelationCollectorVisitor::collect_with(dependent_relations, plan.clone())
393 .into_iter()
394 .chain(dependent_udfs.iter().copied().map_into())
395 .collect();
396
397 let graph = build_graph_with_strategy(
398 plan,
399 Some(GraphJobType::MaterializedView),
400 Some(backfill_order),
401 )?;
402
403 Ok((table, graph, dependencies, resource_type))
404}
405
406#[cfg(test)]
407pub mod tests {
408 use std::collections::HashMap;
409
410 use pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW;
411 use risingwave_common::catalog::{
412 DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
413 };
414 use risingwave_common::types::{DataType, StructType};
415
416 use crate::catalog::root_catalog::SchemaPath;
417 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
418
419 #[tokio::test]
420 async fn test_create_mv_handler() {
421 let proto_file = create_proto_file(PROTO_FILE_DATA);
422 let sql = format!(
423 r#"CREATE SOURCE t1
424 WITH (connector = 'kinesis')
425 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
426 proto_file.path().to_str().unwrap()
427 );
428 let frontend = LocalFrontend::new(Default::default()).await;
429 frontend.run_sql(sql).await.unwrap();
430
431 let sql = "create materialized view mv1 as select t1.country from t1";
432 frontend.run_sql(sql).await.unwrap();
433
434 let session = frontend.session_ref();
435 let catalog_reader = session.env().catalog_reader().read_guard();
436 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
437
438 let (source, _) = catalog_reader
440 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
441 .unwrap();
442 assert_eq!(source.name, "t1");
443
444 let (table, _) = catalog_reader
446 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
447 .unwrap();
448 assert_eq!(table.name(), "mv1");
449
450 let columns = table
451 .columns
452 .iter()
453 .map(|col| (col.name(), col.data_type().clone()))
454 .collect::<HashMap<&str, DataType>>();
455
456 let city_type = StructType::new(vec![
457 ("address", DataType::Varchar),
458 ("zipcode", DataType::Varchar),
459 ])
460 .into();
462 let expected_columns = maplit::hashmap! {
463 ROW_ID_COLUMN_NAME => DataType::Serial,
464 "country" => StructType::new(
465 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
466 )
467 .into(),
469 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
470 };
471 assert_eq!(columns, expected_columns, "{columns:#?}");
472 }
473
474 #[tokio::test]
476 async fn test_no_alias() {
477 let frontend = LocalFrontend::new(Default::default()).await;
478
479 let sql = "create table t(x varchar)";
480 frontend.run_sql(sql).await.unwrap();
481
482 let sql = "create materialized view mv0 as select count(x) from t";
484 frontend.run_sql(sql).await.unwrap();
485
486 let sql = "create materialized view mv1 as select count(x), count(*) from t";
488 let err = frontend.run_sql(sql).await.unwrap_err();
489 assert_eq!(
490 err.to_string(),
491 "Invalid input syntax: column \"count\" specified more than once"
492 );
493
494 let sql = "create materialized view mv1 as select 1";
496 let err = frontend.run_sql(sql).await.unwrap_err();
497 assert_eq!(
498 err.to_string(),
499 "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
500 );
501
502 let sql = "create materialized view mv1 as select x is null from t";
504 let err = frontend.run_sql(sql).await.unwrap_err();
505 assert_eq!(
506 err.to_string(),
507 "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
508 );
509 }
510
511 #[tokio::test]
513 async fn test_create_mv_with_order_by() {
514 let frontend = LocalFrontend::new(Default::default()).await;
515
516 let sql = "create table t(x varchar)";
517 frontend.run_sql(sql).await.unwrap();
518
519 let sql = "create materialized view mv1 as select * from t";
521 let response = frontend.run_sql(sql).await.unwrap();
522 assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
523 assert!(response.notices().is_empty());
524
525 let sql = "create materialized view mv2 as select * from t order by x";
527 let response = frontend.run_sql(sql).await.unwrap();
528 assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
529 }
530}