1use std::collections::HashSet;
16
17use either::Either;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::catalog::{FunctionId, ObjectId, TableId};
20use risingwave_pb::serverless_backfill_controller::{
21 ProvisionRequest, node_group_controller_service_client,
22};
23use risingwave_pb::stream_plan::PbStreamFragmentGraph;
24use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
25use thiserror_ext::AsReport;
26
27use super::RwPgResponse;
28use crate::binder::{Binder, BoundQuery, BoundSetExpr};
29use crate::catalog::check_column_name_not_reserved;
30use crate::error::ErrorCode::{InvalidInputSyntax, ProtocolError};
31use crate::error::{ErrorCode, Result, RwError};
32use crate::handler::HandlerArgs;
33use crate::optimizer::backfill_order_strategy::plan_backfill_order;
34use crate::optimizer::plan_node::generic::GenericPlanRef;
35use crate::optimizer::plan_node::{Explain, StreamPlanRef as PlanRef};
36use crate::optimizer::{OptimizerContext, OptimizerContextRef, RelationCollectorVisitor};
37use crate::planner::Planner;
38use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
39use crate::session::{SESSION_MANAGER, SessionImpl};
40use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
41use crate::utils::ordinal;
42use crate::{TableCatalog, WithOptions};
43
44pub const RESOURCE_GROUP_KEY: &str = "resource_group";
45pub const CLOUD_SERVERLESS_BACKFILL_ENABLED: &str = "cloud.serverless_backfill_enabled";
46
47pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
48 if columns.is_empty() {
49 None
50 } else {
51 Some(columns.iter().map(|v| v.real_value()).collect())
52 }
53}
54
55pub(super) fn get_column_names(
60 bound: &BoundQuery,
61 columns: Vec<Ident>,
62) -> Result<Option<Vec<String>>> {
63 let col_names = parse_column_names(&columns);
64 if let BoundSetExpr::Select(select) = &bound.body {
65 if col_names.is_none() {
70 for (i, alias) in select.aliases.iter().enumerate() {
71 if alias.is_none() {
72 return Err(ErrorCode::BindError(format!(
73 "An alias must be specified for the {} expression (counting from 1) in result relation", ordinal(i+1)
74 ))
75 .into());
76 }
77 }
78 }
79 }
80
81 Ok(col_names)
82}
83
84pub fn gen_create_mv_plan(
86 session: &SessionImpl,
87 context: OptimizerContextRef,
88 query: Query,
89 name: ObjectName,
90 columns: Vec<Ident>,
91 emit_mode: Option<EmitMode>,
92) -> Result<(PlanRef, TableCatalog)> {
93 let mut binder = Binder::new_for_stream(session);
94 let bound = binder.bind_query(&query)?;
95 gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode)
96}
97
98pub fn gen_create_mv_plan_bound(
100 session: &SessionImpl,
101 context: OptimizerContextRef,
102 query: BoundQuery,
103 name: ObjectName,
104 columns: Vec<Ident>,
105 emit_mode: Option<EmitMode>,
106) -> Result<(PlanRef, TableCatalog)> {
107 if session.config().create_compaction_group_for_mv() {
108 context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
109 }
110
111 let db_name = &session.database();
112 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
113
114 let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
115
116 let definition = context.normalized_sql().to_owned();
117
118 let col_names = get_column_names(&query, columns)?;
119
120 let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose);
121 if emit_on_window_close {
122 context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
123 }
124
125 let mut plan_root = Planner::new_for_stream(context).plan_query(query)?;
126 if let Some(col_names) = col_names {
127 for name in &col_names {
128 check_column_name_not_reserved(name)?;
129 }
130 plan_root.set_out_names(col_names)?;
131 }
132 let materialize = plan_root.gen_materialize_plan(
133 database_id,
134 schema_id,
135 table_name,
136 definition,
137 emit_on_window_close,
138 )?;
139
140 let mut table = materialize.table().clone();
141 table.owner = session.user_id();
142
143 let plan: PlanRef = materialize.into();
144
145 let ctx = plan.ctx();
146 let explain_trace = ctx.is_explain_trace();
147 if explain_trace {
148 ctx.trace("Create Materialized View:");
149 ctx.trace(plan.explain_to_string());
150 }
151
152 Ok((plan, table))
153}
154
155pub async fn handle_create_mv(
156 handler_args: HandlerArgs,
157 if_not_exists: bool,
158 name: ObjectName,
159 query: Query,
160 columns: Vec<Ident>,
161 emit_mode: Option<EmitMode>,
162) -> Result<RwPgResponse> {
163 let (dependent_relations, dependent_udfs, bound_query) = {
164 let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
165 let bound_query = binder.bind_query(&query)?;
166 (
167 binder.included_relations().clone(),
168 binder.included_udfs().clone(),
169 bound_query,
170 )
171 };
172 handle_create_mv_bound(
173 handler_args,
174 if_not_exists,
175 name,
176 bound_query,
177 dependent_relations,
178 dependent_udfs,
179 columns,
180 emit_mode,
181 )
182 .await
183}
184
185pub async fn provision_resource_group(sbc_addr: String) -> Result<String> {
187 let request = tonic::Request::new(ProvisionRequest {});
188 let mut client =
189 node_group_controller_service_client::NodeGroupControllerServiceClient::connect(
190 sbc_addr.clone(),
191 )
192 .await
193 .map_err(|e| {
194 RwError::from(ErrorCode::InternalError(format!(
195 "unable to reach serverless backfill controller at addr {}: {}",
196 sbc_addr,
197 e.as_report()
198 )))
199 })?;
200
201 match client.provision(request).await {
202 Ok(resp) => Ok(resp.into_inner().resource_group),
203 Err(e) => Err(RwError::from(ErrorCode::InternalError(format!(
204 "serverless backfill controller returned error :{}",
205 e.as_report()
206 )))),
207 }
208}
209
210fn get_with_options(handler_args: HandlerArgs) -> WithOptions {
211 let context = OptimizerContext::from_handler_args(handler_args);
212 context.with_options().clone()
213}
214
215pub async fn handle_create_mv_bound(
216 handler_args: HandlerArgs,
217 if_not_exists: bool,
218 name: ObjectName,
219 query: BoundQuery,
220 dependent_relations: HashSet<TableId>,
221 dependent_udfs: HashSet<FunctionId>, columns: Vec<Ident>,
223 emit_mode: Option<EmitMode>,
224) -> Result<RwPgResponse> {
225 let session = handler_args.session.clone();
226
227 session.check_cluster_limits().await?;
229
230 if let Either::Right(resp) = session.check_relation_name_duplicated(
231 name.clone(),
232 StatementType::CREATE_MATERIALIZED_VIEW,
233 if_not_exists,
234 )? {
235 return Ok(resp);
236 }
237
238 let (table, graph, dependencies, resource_group) = {
239 gen_create_mv_graph(
240 handler_args,
241 name,
242 query,
243 dependent_relations,
244 dependent_udfs,
245 columns,
246 emit_mode,
247 )
248 .await?
249 };
250
251 let _job_guard =
253 session
254 .env()
255 .creating_streaming_job_tracker()
256 .guard(CreatingStreamingJobInfo::new(
257 session.session_id(),
258 table.database_id,
259 table.schema_id,
260 table.name.clone(),
261 ));
262
263 let catalog_writer = session.catalog_writer()?;
264 catalog_writer
265 .create_materialized_view(
266 table.to_prost(),
267 graph,
268 dependencies,
269 resource_group,
270 if_not_exists,
271 )
272 .await?;
273
274 Ok(PgResponse::empty_result(
275 StatementType::CREATE_MATERIALIZED_VIEW,
276 ))
277}
278
279pub(crate) async fn gen_create_mv_graph(
280 handler_args: HandlerArgs,
281 name: ObjectName,
282 query: BoundQuery,
283 dependent_relations: HashSet<TableId>,
284 dependent_udfs: HashSet<FunctionId>,
285 columns: Vec<Ident>,
286 emit_mode: Option<EmitMode>,
287) -> Result<(
288 TableCatalog,
289 PbStreamFragmentGraph,
290 HashSet<u32>,
291 Option<String>,
292)> {
293 let mut with_options = get_with_options(handler_args.clone());
294 let mut resource_group = with_options.remove(&RESOURCE_GROUP_KEY.to_owned());
295
296 if resource_group.is_some() {
297 risingwave_common::license::Feature::ResourceGroup.check_available()?;
298 }
299
300 let is_serverless_backfill = with_options
301 .remove(&CLOUD_SERVERLESS_BACKFILL_ENABLED.to_owned())
302 .unwrap_or_default()
303 .parse::<bool>()
304 .unwrap_or(false);
305
306 if resource_group.is_some() && is_serverless_backfill {
307 return Err(RwError::from(InvalidInputSyntax(
308 "Please do not specify serverless backfilling and resource group together".to_owned(),
309 )));
310 }
311
312 if !with_options.is_empty() {
313 return Err(RwError::from(ProtocolError(format!(
315 "unexpected options in WITH clause: {:?}",
316 with_options.keys()
317 ))));
318 }
319
320 let sbc_addr = match SESSION_MANAGER.get() {
321 Some(manager) => manager.env().sbc_address(),
322 None => "",
323 }
324 .to_owned();
325
326 if is_serverless_backfill && sbc_addr.is_empty() {
327 return Err(RwError::from(InvalidInputSyntax(
328 "Serverless Backfill is disabled on-premise. Use RisingWave cloud at https://cloud.risingwave.com/auth/signup to try this feature".to_owned(),
329 )));
330 }
331
332 if is_serverless_backfill {
333 match provision_resource_group(sbc_addr).await {
334 Err(e) => {
335 return Err(RwError::from(ProtocolError(format!(
336 "failed to provision serverless backfill nodes: {}",
337 e.as_report()
338 ))));
339 }
340 Ok(val) => resource_group = Some(val),
341 }
342 }
343 tracing::debug!(
344 resource_group = resource_group,
345 "provisioning on resource group"
346 );
347
348 let context = OptimizerContext::from_handler_args(handler_args);
349 let has_order_by = !query.order.is_empty();
350 if has_order_by {
351 context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
352It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
353"#.to_owned());
354 }
355
356 if resource_group.is_some()
357 && !context
358 .session_ctx()
359 .config()
360 .streaming_use_arrangement_backfill()
361 {
362 return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
363 }
364
365 let context: OptimizerContextRef = context.into();
366 let session = context.session_ctx().as_ref();
367
368 let (plan, table) =
369 gen_create_mv_plan_bound(session, context.clone(), query, name, columns, emit_mode)?;
370
371 let backfill_order = plan_backfill_order(
372 session,
373 context.with_options().backfill_order_strategy(),
374 plan.clone(),
375 )?;
376
377 let dependencies = RelationCollectorVisitor::collect_with(dependent_relations, plan.clone())
380 .into_iter()
381 .map(|id| id.table_id() as ObjectId)
382 .chain(
383 dependent_udfs
384 .into_iter()
385 .map(|id| id.function_id() as ObjectId),
386 )
387 .collect();
388
389 let graph = build_graph_with_strategy(
390 plan,
391 Some(GraphJobType::MaterializedView),
392 Some(backfill_order),
393 )?;
394
395 Ok((table, graph, dependencies, resource_group))
396}
397
398#[cfg(test)]
399pub mod tests {
400 use std::collections::HashMap;
401
402 use pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW;
403 use risingwave_common::catalog::{
404 DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
405 };
406 use risingwave_common::types::{DataType, StructType};
407
408 use crate::catalog::root_catalog::SchemaPath;
409 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
410
411 #[tokio::test]
412 async fn test_create_mv_handler() {
413 let proto_file = create_proto_file(PROTO_FILE_DATA);
414 let sql = format!(
415 r#"CREATE SOURCE t1
416 WITH (connector = 'kinesis')
417 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
418 proto_file.path().to_str().unwrap()
419 );
420 let frontend = LocalFrontend::new(Default::default()).await;
421 frontend.run_sql(sql).await.unwrap();
422
423 let sql = "create materialized view mv1 as select t1.country from t1";
424 frontend.run_sql(sql).await.unwrap();
425
426 let session = frontend.session_ref();
427 let catalog_reader = session.env().catalog_reader().read_guard();
428 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
429
430 let (source, _) = catalog_reader
432 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
433 .unwrap();
434 assert_eq!(source.name, "t1");
435
436 let (table, _) = catalog_reader
438 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
439 .unwrap();
440 assert_eq!(table.name(), "mv1");
441
442 let columns = table
443 .columns
444 .iter()
445 .map(|col| (col.name(), col.data_type().clone()))
446 .collect::<HashMap<&str, DataType>>();
447
448 let city_type = StructType::new(vec![
449 ("address", DataType::Varchar),
450 ("zipcode", DataType::Varchar),
451 ])
452 .into();
454 let expected_columns = maplit::hashmap! {
455 ROW_ID_COLUMN_NAME => DataType::Serial,
456 "country" => StructType::new(
457 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
458 )
459 .into(),
461 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
462 };
463 assert_eq!(columns, expected_columns, "{columns:#?}");
464 }
465
466 #[tokio::test]
468 async fn test_no_alias() {
469 let frontend = LocalFrontend::new(Default::default()).await;
470
471 let sql = "create table t(x varchar)";
472 frontend.run_sql(sql).await.unwrap();
473
474 let sql = "create materialized view mv0 as select count(x) from t";
476 frontend.run_sql(sql).await.unwrap();
477
478 let sql = "create materialized view mv1 as select count(x), count(*) from t";
480 let err = frontend.run_sql(sql).await.unwrap_err();
481 assert_eq!(
482 err.to_string(),
483 "Invalid input syntax: column \"count\" specified more than once"
484 );
485
486 let sql = "create materialized view mv1 as select 1";
488 let err = frontend.run_sql(sql).await.unwrap_err();
489 assert_eq!(
490 err.to_string(),
491 "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
492 );
493
494 let sql = "create materialized view mv1 as select x is null from t";
496 let err = frontend.run_sql(sql).await.unwrap_err();
497 assert_eq!(
498 err.to_string(),
499 "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
500 );
501 }
502
503 #[tokio::test]
505 async fn test_create_mv_with_order_by() {
506 let frontend = LocalFrontend::new(Default::default()).await;
507
508 let sql = "create table t(x varchar)";
509 frontend.run_sql(sql).await.unwrap();
510
511 let sql = "create materialized view mv1 as select * from t";
513 let response = frontend.run_sql(sql).await.unwrap();
514 assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
515 assert!(response.notices().is_empty());
516
517 let sql = "create materialized view mv2 as select * from t order by x";
519 let response = frontend.run_sql(sql).await.unwrap();
520 assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
521 }
522}