1use std::collections::HashSet;
16
17use either::Either;
18use itertools::Itertools;
19use pgwire::pg_response::{PgResponse, StatementType};
20use risingwave_common::catalog::{FunctionId, ObjectId, SecretId};
21use risingwave_pb::ddl_service::streaming_job_resource_type;
22use risingwave_pb::serverless_backfill_controller::{
23 ProvisionRequest, node_group_controller_service_client,
24};
25use risingwave_pb::stream_plan::PbStreamFragmentGraph;
26use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
27use thiserror_ext::AsReport;
28
29use super::RwPgResponse;
30use crate::binder::{Binder, BoundQuery, BoundSetExpr};
31use crate::catalog::check_column_name_not_reserved;
32use crate::error::ErrorCode::{InvalidInputSyntax, ProtocolError};
33use crate::error::{ErrorCode, Result, RwError};
34use crate::handler::HandlerArgs;
35use crate::handler::util::{
36 LongRunningNotificationAction, execute_with_long_running_notification,
37 reject_internal_table_dependencies,
38};
39use crate::optimizer::backfill_order_strategy::plan_backfill_order;
40use crate::optimizer::plan_node::generic::GenericPlanRef;
41use crate::optimizer::plan_node::{Explain, StreamPlanRef as PlanRef};
42use crate::optimizer::{OptimizerContext, OptimizerContextRef, RelationCollectorVisitor};
43use crate::planner::Planner;
44use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
45use crate::session::{SESSION_MANAGER, SessionImpl};
46use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
47use crate::utils::ordinal;
48use crate::{TableCatalog, WithOptions};
49
50pub const RESOURCE_GROUP_KEY: &str = "resource_group";
51pub const CLOUD_SERVERLESS_BACKFILL_ENABLED: &str = "cloud.serverless_backfill_enabled";
52
53pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
54 if columns.is_empty() {
55 None
56 } else {
57 Some(columns.iter().map(|v| v.real_value()).collect())
58 }
59}
60
61pub(super) fn get_column_names(
66 bound: &BoundQuery,
67 columns: Vec<Ident>,
68) -> Result<Option<Vec<String>>> {
69 let col_names = parse_column_names(&columns);
70 if let BoundSetExpr::Select(select) = &bound.body {
71 if col_names.is_none() {
76 for (i, alias) in select.aliases.iter().enumerate() {
77 if alias.is_none() {
78 return Err(ErrorCode::BindError(format!(
79 "An alias must be specified for the {} expression (counting from 1) in result relation", ordinal(i+1)
80 ))
81 .into());
82 }
83 }
84 }
85 }
86
87 Ok(col_names)
88}
89
90pub fn gen_create_mv_plan(
92 session: &SessionImpl,
93 context: OptimizerContextRef,
94 query: Query,
95 name: ObjectName,
96 columns: Vec<Ident>,
97 emit_mode: Option<EmitMode>,
98) -> Result<(PlanRef, TableCatalog)> {
99 let mut binder = Binder::new_for_stream(session);
100 let bound = binder.bind_query(&query)?;
101 gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode)
102}
103
104pub fn gen_create_mv_plan_bound(
106 session: &SessionImpl,
107 context: OptimizerContextRef,
108 query: BoundQuery,
109 name: ObjectName,
110 columns: Vec<Ident>,
111 emit_mode: Option<EmitMode>,
112) -> Result<(PlanRef, TableCatalog)> {
113 if session.config().create_compaction_group_for_mv() {
114 context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
115 }
116
117 let db_name = &session.database();
118 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
119
120 let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
121
122 let definition = context.normalized_sql().to_owned();
123
124 let col_names = get_column_names(&query, columns)?;
125
126 let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose);
127 if emit_on_window_close {
128 context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
129 }
130
131 let mut plan_root = Planner::new_for_stream(context).plan_query(query)?;
132 if let Some(col_names) = col_names {
133 for name in &col_names {
134 check_column_name_not_reserved(name)?;
135 }
136 plan_root.set_out_names(col_names)?;
137 }
138 let materialize = plan_root.gen_materialize_plan(
139 database_id,
140 schema_id,
141 table_name,
142 definition,
143 emit_on_window_close,
144 )?;
145
146 let mut table = materialize.table().clone();
147 table.owner = session.user_id();
148
149 let plan: PlanRef = materialize.into();
150
151 let ctx = plan.ctx();
152 let explain_trace = ctx.is_explain_trace();
153 if explain_trace {
154 ctx.trace("Create Materialized View:");
155 ctx.trace(plan.explain_to_string());
156 }
157
158 Ok((plan, table))
159}
160
161pub async fn handle_create_mv(
162 handler_args: HandlerArgs,
163 if_not_exists: bool,
164 name: ObjectName,
165 query: Query,
166 columns: Vec<Ident>,
167 emit_mode: Option<EmitMode>,
168) -> Result<RwPgResponse> {
169 let (dependent_relations, dependent_udfs, dependent_secrets, bound_query) = {
170 let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
171 let bound_query = binder.bind_query(&query)?;
172 (
173 binder.included_relations().clone(),
174 binder.included_udfs().clone(),
175 binder.included_secrets().clone(),
176 bound_query,
177 )
178 };
179 handle_create_mv_bound(
180 handler_args,
181 if_not_exists,
182 name,
183 bound_query,
184 dependent_relations,
185 dependent_udfs,
186 dependent_secrets,
187 columns,
188 emit_mode,
189 )
190 .await
191}
192
193pub async fn provision_resource_group(sbc_addr: String) -> Result<String> {
195 let request = tonic::Request::new(ProvisionRequest {});
196 let mut client =
197 node_group_controller_service_client::NodeGroupControllerServiceClient::connect(
198 sbc_addr.clone(),
199 )
200 .await
201 .map_err(|e| {
202 RwError::from(ErrorCode::InternalError(format!(
203 "unable to reach serverless backfill controller at addr {}: {}",
204 sbc_addr,
205 e.as_report()
206 )))
207 })?;
208
209 match client.provision(request).await {
210 Ok(resp) => Ok(resp.into_inner().resource_group),
211 Err(e) => Err(RwError::from(ErrorCode::InternalError(format!(
212 "serverless backfill controller returned error :{}",
213 e.as_report()
214 )))),
215 }
216}
217
218fn get_with_options(handler_args: HandlerArgs) -> WithOptions {
219 let context = OptimizerContext::from_handler_args(handler_args);
220 context.with_options().clone()
221}
222
223pub async fn handle_create_mv_bound(
224 handler_args: HandlerArgs,
225 if_not_exists: bool,
226 name: ObjectName,
227 query: BoundQuery,
228 dependent_relations: HashSet<ObjectId>,
229 dependent_udfs: HashSet<FunctionId>, dependent_secrets: HashSet<SecretId>,
231 columns: Vec<Ident>,
232 emit_mode: Option<EmitMode>,
233) -> Result<RwPgResponse> {
234 let session = handler_args.session.clone();
235
236 session.check_cluster_limits().await?;
238
239 if let Either::Right(resp) = session.check_relation_name_duplicated(
240 name.clone(),
241 StatementType::CREATE_MATERIALIZED_VIEW,
242 if_not_exists,
243 )? {
244 return Ok(resp);
245 }
246
247 reject_internal_table_dependencies(
248 session.as_ref(),
249 &dependent_relations,
250 "CREATE MATERIALIZED VIEW",
251 )?;
252
253 let (table, graph, dependencies, resource_type) = {
254 gen_create_mv_graph(
255 handler_args,
256 name,
257 query,
258 dependent_relations,
259 dependent_udfs,
260 dependent_secrets,
261 columns,
262 emit_mode,
263 )
264 .await?
265 };
266
267 let _job_guard =
269 session
270 .env()
271 .creating_streaming_job_tracker()
272 .guard(CreatingStreamingJobInfo::new(
273 session.session_id(),
274 table.database_id,
275 table.schema_id,
276 table.name.clone(),
277 ));
278
279 let catalog_writer = session.catalog_writer()?;
280 execute_with_long_running_notification(
281 catalog_writer.create_materialized_view(
282 table.to_prost(),
283 graph,
284 dependencies,
285 resource_type,
286 if_not_exists,
287 ),
288 &session,
289 "CREATE MATERIALIZED VIEW",
290 LongRunningNotificationAction::MonitorBackfillJob,
291 )
292 .await?;
293
294 Ok(PgResponse::empty_result(
295 StatementType::CREATE_MATERIALIZED_VIEW,
296 ))
297}
298
299pub(crate) async fn gen_create_mv_graph(
300 handler_args: HandlerArgs,
301 name: ObjectName,
302 query: BoundQuery,
303 dependent_relations: HashSet<ObjectId>,
304 dependent_udfs: HashSet<FunctionId>,
305 dependent_secrets: HashSet<SecretId>,
306 columns: Vec<Ident>,
307 emit_mode: Option<EmitMode>,
308) -> Result<(
309 TableCatalog,
310 PbStreamFragmentGraph,
311 HashSet<ObjectId>,
312 streaming_job_resource_type::ResourceType,
313)> {
314 let mut with_options = get_with_options(handler_args.clone());
315 let resource_group = with_options.remove(&RESOURCE_GROUP_KEY.to_owned());
316
317 if resource_group.is_some() {
318 risingwave_common::license::Feature::ResourceGroup.check_available()?;
319 }
320
321 let serverless_backfill_from_with = with_options
322 .remove(&CLOUD_SERVERLESS_BACKFILL_ENABLED.to_owned())
323 .map(|value| value.parse::<bool>().unwrap_or(false));
324 let is_serverless_backfill = match serverless_backfill_from_with {
325 Some(value) => value,
326 None => {
327 if resource_group.is_some() {
328 false
329 } else {
330 handler_args.session.config().enable_serverless_backfill()
331 }
332 }
333 };
334
335 if resource_group.is_some() && is_serverless_backfill {
336 return Err(RwError::from(InvalidInputSyntax(
337 "Please do not specify serverless backfilling and resource group together".to_owned(),
338 )));
339 }
340
341 if !with_options.is_empty() {
342 return Err(RwError::from(ProtocolError(format!(
344 "unexpected options in WITH clause: {:?}",
345 with_options.keys()
346 ))));
347 }
348
349 let sbc_addr = match SESSION_MANAGER.get() {
350 Some(manager) => manager.env().sbc_address(),
351 None => "",
352 }
353 .to_owned();
354
355 if is_serverless_backfill && sbc_addr.is_empty() {
356 return Err(RwError::from(InvalidInputSyntax(
357 "Serverless Backfill is disabled. Use RisingWave cloud at https://cloud.risingwave.com/auth/signup to try this feature".to_owned(),
358 )));
359 }
360
361 let resource_type = if is_serverless_backfill {
362 assert_eq!(resource_group, None);
363 match provision_resource_group(sbc_addr).await {
364 Err(e) => {
365 return Err(RwError::from(ProtocolError(format!(
366 "failed to provision serverless backfill nodes: {}",
367 e.as_report()
368 ))));
369 }
370 Ok(group) => {
371 tracing::info!(
372 resource_group = group,
373 "provisioning serverless backfill resource group"
374 );
375 streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(group)
376 }
377 }
378 } else if let Some(group) = resource_group {
379 streaming_job_resource_type::ResourceType::SpecificResourceGroup(group)
380 } else {
381 streaming_job_resource_type::ResourceType::Regular(true)
382 };
383 let context = OptimizerContext::from_handler_args(handler_args);
384 let has_order_by = !query.order.is_empty();
385 if has_order_by {
386 context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
387It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
388"#.to_owned());
389 }
390
391 if resource_type.resource_group().is_some()
392 && !context
393 .session_ctx()
394 .config()
395 .streaming_use_arrangement_backfill()
396 {
397 return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
398 }
399
400 let context: OptimizerContextRef = context.into();
401 let session = context.session_ctx().as_ref();
402
403 let (plan, table) =
404 gen_create_mv_plan_bound(session, context.clone(), query, name, columns, emit_mode)?;
405
406 let backfill_order = plan_backfill_order(
407 session,
408 context.with_options().backfill_order_strategy(),
409 plan.clone(),
410 )?;
411
412 let dependencies = RelationCollectorVisitor::collect_with(dependent_relations, plan.clone())
415 .into_iter()
416 .chain(dependent_udfs.iter().copied().map_into())
417 .chain(
418 dependent_secrets
419 .iter()
420 .copied()
421 .map(|id| id.as_object_id()),
422 )
423 .collect();
424
425 let graph = build_graph_with_strategy(
426 plan,
427 Some(GraphJobType::MaterializedView),
428 Some(backfill_order),
429 )?;
430
431 Ok((table, graph, dependencies, resource_type))
432}
433
434#[cfg(test)]
435pub mod tests {
436 use std::collections::HashMap;
437
438 use pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW;
439 use risingwave_common::catalog::{
440 DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
441 };
442 use risingwave_common::types::{DataType, StructType};
443
444 use crate::catalog::root_catalog::SchemaPath;
445 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
446
447 #[tokio::test]
448 async fn test_create_mv_handler() {
449 let proto_file = create_proto_file(PROTO_FILE_DATA);
450 let sql = format!(
451 r#"CREATE SOURCE t1
452 WITH (connector = 'kinesis')
453 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
454 proto_file.path().to_str().unwrap()
455 );
456 let frontend = LocalFrontend::new(Default::default()).await;
457 frontend.run_sql(sql).await.unwrap();
458
459 let sql = "create materialized view mv1 as select t1.country from t1";
460 frontend.run_sql(sql).await.unwrap();
461
462 let session = frontend.session_ref();
463 let catalog_reader = session.env().catalog_reader().read_guard();
464 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
465
466 let (source, _) = catalog_reader
468 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
469 .unwrap();
470 assert_eq!(source.name, "t1");
471
472 let (table, _) = catalog_reader
474 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
475 .unwrap();
476 assert_eq!(table.name(), "mv1");
477
478 let columns = table
479 .columns
480 .iter()
481 .map(|col| (col.name(), col.data_type().clone()))
482 .collect::<HashMap<&str, DataType>>();
483
484 let city_type = StructType::new(vec![
485 ("address", DataType::Varchar),
486 ("zipcode", DataType::Varchar),
487 ])
488 .into();
490 let expected_columns = maplit::hashmap! {
491 ROW_ID_COLUMN_NAME => DataType::Serial,
492 "country" => StructType::new(
493 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
494 )
495 .into(),
497 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
498 };
499 assert_eq!(columns, expected_columns, "{columns:#?}");
500 }
501
502 #[tokio::test]
504 async fn test_no_alias() {
505 let frontend = LocalFrontend::new(Default::default()).await;
506
507 let sql = "create table t(x varchar)";
508 frontend.run_sql(sql).await.unwrap();
509
510 let sql = "create materialized view mv0 as select count(x) from t";
512 frontend.run_sql(sql).await.unwrap();
513
514 let sql = "create materialized view mv1 as select count(x), count(*) from t";
516 let err = frontend.run_sql(sql).await.unwrap_err();
517 assert_eq!(
518 err.to_string(),
519 "Invalid input syntax: column \"count\" specified more than once"
520 );
521
522 let sql = "create materialized view mv1 as select 1";
524 let err = frontend.run_sql(sql).await.unwrap_err();
525 assert_eq!(
526 err.to_string(),
527 "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
528 );
529
530 let sql = "create materialized view mv1 as select x is null from t";
532 let err = frontend.run_sql(sql).await.unwrap_err();
533 assert_eq!(
534 err.to_string(),
535 "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
536 );
537 }
538
539 #[tokio::test]
541 async fn test_create_mv_with_order_by() {
542 let frontend = LocalFrontend::new(Default::default()).await;
543
544 let sql = "create table t(x varchar)";
545 frontend.run_sql(sql).await.unwrap();
546
547 let sql = "create materialized view mv1 as select * from t";
549 let response = frontend.run_sql(sql).await.unwrap();
550 assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
551 assert!(response.notices().is_empty());
552
553 let sql = "create materialized view mv2 as select * from t order by x";
555 let response = frontend.run_sql(sql).await.unwrap();
556 assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
557 }
558}