1use std::collections::HashSet;
16
17use either::Either;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::catalog::{FunctionId, ObjectId, TableId};
20use risingwave_pb::serverless_backfill_controller::{
21 ProvisionRequest, node_group_controller_service_client,
22};
23use risingwave_pb::stream_plan::PbStreamFragmentGraph;
24use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
25use thiserror_ext::AsReport;
26
27use super::RwPgResponse;
28use crate::binder::{Binder, BoundQuery, BoundSetExpr};
29use crate::catalog::check_column_name_not_reserved;
30use crate::error::ErrorCode::{InvalidInputSyntax, ProtocolError};
31use crate::error::{ErrorCode, Result, RwError};
32use crate::handler::HandlerArgs;
33use crate::optimizer::backfill_order_strategy::plan_backfill_order;
34use crate::optimizer::plan_node::Explain;
35use crate::optimizer::plan_node::generic::GenericPlanRef;
36use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, RelationCollectorVisitor};
37use crate::planner::Planner;
38use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
39use crate::session::{SESSION_MANAGER, SessionImpl};
40use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
41use crate::utils::ordinal;
42use crate::{TableCatalog, WithOptions};
43
44pub const RESOURCE_GROUP_KEY: &str = "resource_group";
45pub const CLOUD_SERVERLESS_BACKFILL_ENABLED: &str = "cloud.serverless_backfill_enabled";
46
47pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
48 if columns.is_empty() {
49 None
50 } else {
51 Some(columns.iter().map(|v| v.real_value()).collect())
52 }
53}
54
55pub(super) fn get_column_names(
60 bound: &BoundQuery,
61 columns: Vec<Ident>,
62) -> Result<Option<Vec<String>>> {
63 let col_names = parse_column_names(&columns);
64 if let BoundSetExpr::Select(select) = &bound.body {
65 if col_names.is_none() {
70 for (i, alias) in select.aliases.iter().enumerate() {
71 if alias.is_none() {
72 return Err(ErrorCode::BindError(format!(
73 "An alias must be specified for the {} expression (counting from 1) in result relation", ordinal(i+1)
74 ))
75 .into());
76 }
77 }
78 }
79 }
80
81 Ok(col_names)
82}
83
84pub fn gen_create_mv_plan(
86 session: &SessionImpl,
87 context: OptimizerContextRef,
88 query: Query,
89 name: ObjectName,
90 columns: Vec<Ident>,
91 emit_mode: Option<EmitMode>,
92) -> Result<(PlanRef, TableCatalog)> {
93 let mut binder = Binder::new_for_stream(session);
94 let bound = binder.bind_query(query)?;
95 gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode)
96}
97
98pub fn gen_create_mv_plan_bound(
100 session: &SessionImpl,
101 context: OptimizerContextRef,
102 query: BoundQuery,
103 name: ObjectName,
104 columns: Vec<Ident>,
105 emit_mode: Option<EmitMode>,
106) -> Result<(PlanRef, TableCatalog)> {
107 if session.config().create_compaction_group_for_mv() {
108 context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
109 }
110
111 let db_name = &session.database();
112 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, name)?;
113
114 let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
115
116 let definition = context.normalized_sql().to_owned();
117
118 let col_names = get_column_names(&query, columns)?;
119
120 let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose);
121 if emit_on_window_close {
122 context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
123 }
124
125 let mut plan_root = Planner::new_for_stream(context).plan_query(query)?;
126 if let Some(col_names) = col_names {
127 for name in &col_names {
128 check_column_name_not_reserved(name)?;
129 }
130 plan_root.set_out_names(col_names)?;
131 }
132 let materialize = plan_root.gen_materialize_plan(
133 database_id,
134 schema_id,
135 table_name,
136 definition,
137 emit_on_window_close,
138 )?;
139
140 let mut table = materialize.table().clone();
141 table.owner = session.user_id();
142
143 let plan: PlanRef = materialize.into();
144
145 let ctx = plan.ctx();
146 let explain_trace = ctx.is_explain_trace();
147 if explain_trace {
148 ctx.trace("Create Materialized View:");
149 ctx.trace(plan.explain_to_string());
150 }
151
152 Ok((plan, table))
153}
154
155pub async fn handle_create_mv(
156 handler_args: HandlerArgs,
157 if_not_exists: bool,
158 name: ObjectName,
159 query: Query,
160 columns: Vec<Ident>,
161 emit_mode: Option<EmitMode>,
162) -> Result<RwPgResponse> {
163 let (dependent_relations, dependent_udfs, bound_query) = {
164 let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
165 let bound_query = binder.bind_query(query)?;
166 (
167 binder.included_relations().clone(),
168 binder.included_udfs().clone(),
169 bound_query,
170 )
171 };
172 handle_create_mv_bound(
173 handler_args,
174 if_not_exists,
175 name,
176 bound_query,
177 dependent_relations,
178 dependent_udfs,
179 columns,
180 emit_mode,
181 )
182 .await
183}
184
185pub async fn provision_resource_group(sbc_addr: String) -> Result<String> {
187 let request = tonic::Request::new(ProvisionRequest {});
188 let mut client =
189 node_group_controller_service_client::NodeGroupControllerServiceClient::connect(
190 sbc_addr.clone(),
191 )
192 .await
193 .map_err(|e| {
194 RwError::from(ErrorCode::InternalError(format!(
195 "unable to reach serverless backfill controller at addr {}: {}",
196 sbc_addr,
197 e.as_report()
198 )))
199 })?;
200
201 match client.provision(request).await {
202 Ok(resp) => Ok(resp.into_inner().resource_group),
203 Err(e) => Err(RwError::from(ErrorCode::InternalError(format!(
204 "serverless backfill controller returned error :{}",
205 e.as_report()
206 )))),
207 }
208}
209
210fn get_with_options(handler_args: HandlerArgs) -> WithOptions {
211 let context = OptimizerContext::from_handler_args(handler_args);
212 context.with_options().clone()
213}
214
215pub async fn handle_create_mv_bound(
216 handler_args: HandlerArgs,
217 if_not_exists: bool,
218 name: ObjectName,
219 query: BoundQuery,
220 dependent_relations: HashSet<TableId>,
221 dependent_udfs: HashSet<FunctionId>, columns: Vec<Ident>,
223 emit_mode: Option<EmitMode>,
224) -> Result<RwPgResponse> {
225 let session = handler_args.session.clone();
226
227 session.check_cluster_limits().await?;
229
230 if let Either::Right(resp) = session.check_relation_name_duplicated(
231 name.clone(),
232 StatementType::CREATE_MATERIALIZED_VIEW,
233 if_not_exists,
234 )? {
235 return Ok(resp);
236 }
237
238 let (table, graph, dependencies, resource_group) = {
239 gen_create_mv_graph(
240 handler_args,
241 name,
242 query,
243 dependent_relations,
244 dependent_udfs,
245 columns,
246 emit_mode,
247 )
248 .await?
249 };
250
251 let _job_guard =
253 session
254 .env()
255 .creating_streaming_job_tracker()
256 .guard(CreatingStreamingJobInfo::new(
257 session.session_id(),
258 table.database_id,
259 table.schema_id,
260 table.name.clone(),
261 ));
262
263 let catalog_writer = session.catalog_writer()?;
264 catalog_writer
265 .create_materialized_view(
266 table.to_prost(),
267 graph,
268 dependencies,
269 resource_group,
270 if_not_exists,
271 )
272 .await?;
273
274 Ok(PgResponse::empty_result(
275 StatementType::CREATE_MATERIALIZED_VIEW,
276 ))
277}
278
279pub(crate) async fn gen_create_mv_graph(
280 handler_args: HandlerArgs,
281 name: ObjectName,
282 query: BoundQuery,
283 dependent_relations: HashSet<TableId>,
284 dependent_udfs: HashSet<FunctionId>,
285 columns: Vec<Ident>,
286 emit_mode: Option<EmitMode>,
287) -> Result<(
288 TableCatalog,
289 PbStreamFragmentGraph,
290 HashSet<u32>,
291 Option<String>,
292)> {
293 let mut with_options = get_with_options(handler_args.clone());
294 let mut resource_group = with_options.remove(&RESOURCE_GROUP_KEY.to_owned());
295
296 if resource_group.is_some() {
297 risingwave_common::license::Feature::ResourceGroup
298 .check_available()
299 .map_err(|e| anyhow::anyhow!(e))?;
300 }
301
302 let is_serverless_backfill = with_options
303 .remove(&CLOUD_SERVERLESS_BACKFILL_ENABLED.to_owned())
304 .unwrap_or_default()
305 .parse::<bool>()
306 .unwrap_or(false);
307
308 if resource_group.is_some() && is_serverless_backfill {
309 return Err(RwError::from(InvalidInputSyntax(
310 "Please do not specify serverless backfilling and resource group together".to_owned(),
311 )));
312 }
313
314 if !with_options.is_empty() {
315 return Err(RwError::from(ProtocolError(format!(
317 "unexpected options in WITH clause: {:?}",
318 with_options.keys()
319 ))));
320 }
321
322 let sbc_addr = match SESSION_MANAGER.get() {
323 Some(manager) => manager.env().sbc_address(),
324 None => "",
325 }
326 .to_owned();
327
328 if is_serverless_backfill && sbc_addr.is_empty() {
329 return Err(RwError::from(InvalidInputSyntax(
330 "Serverless Backfill is disabled on-premise. Use RisingWave cloud at https://cloud.risingwave.com/auth/signup to try this feature".to_owned(),
331 )));
332 }
333
334 if is_serverless_backfill {
335 match provision_resource_group(sbc_addr).await {
336 Err(e) => {
337 return Err(RwError::from(ProtocolError(format!(
338 "failed to provision serverless backfill nodes: {}",
339 e.as_report()
340 ))));
341 }
342 Ok(val) => resource_group = Some(val),
343 }
344 }
345 tracing::debug!(
346 resource_group = resource_group,
347 "provisioning on resource group"
348 );
349
350 let context = OptimizerContext::from_handler_args(handler_args);
351 let has_order_by = !query.order.is_empty();
352 if has_order_by {
353 context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
354It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
355"#.to_owned());
356 }
357
358 if resource_group.is_some()
359 && !context
360 .session_ctx()
361 .config()
362 .streaming_use_arrangement_backfill()
363 {
364 return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
365 }
366
367 let context: OptimizerContextRef = context.into();
368 let session = context.session_ctx().as_ref();
369
370 let (plan, table) =
371 gen_create_mv_plan_bound(session, context.clone(), query, name, columns, emit_mode)?;
372
373 let backfill_order = plan_backfill_order(
374 session,
375 context.with_options().backfill_order_strategy(),
376 plan.clone(),
377 )?;
378
379 let dependencies = RelationCollectorVisitor::collect_with(dependent_relations, plan.clone())
382 .into_iter()
383 .map(|id| id.table_id() as ObjectId)
384 .chain(
385 dependent_udfs
386 .into_iter()
387 .map(|id| id.function_id() as ObjectId),
388 )
389 .collect();
390
391 let graph = build_graph_with_strategy(
392 plan,
393 Some(GraphJobType::MaterializedView),
394 Some(backfill_order),
395 )?;
396
397 Ok((table, graph, dependencies, resource_group))
398}
399
400#[cfg(test)]
401pub mod tests {
402 use std::collections::HashMap;
403
404 use pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW;
405 use risingwave_common::catalog::{
406 DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
407 };
408 use risingwave_common::types::{DataType, StructType};
409
410 use crate::catalog::root_catalog::SchemaPath;
411 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
412
413 #[tokio::test]
414 async fn test_create_mv_handler() {
415 let proto_file = create_proto_file(PROTO_FILE_DATA);
416 let sql = format!(
417 r#"CREATE SOURCE t1
418 WITH (connector = 'kinesis')
419 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
420 proto_file.path().to_str().unwrap()
421 );
422 let frontend = LocalFrontend::new(Default::default()).await;
423 frontend.run_sql(sql).await.unwrap();
424
425 let sql = "create materialized view mv1 as select t1.country from t1";
426 frontend.run_sql(sql).await.unwrap();
427
428 let session = frontend.session_ref();
429 let catalog_reader = session.env().catalog_reader().read_guard();
430 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
431
432 let (source, _) = catalog_reader
434 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
435 .unwrap();
436 assert_eq!(source.name, "t1");
437
438 let (table, _) = catalog_reader
440 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
441 .unwrap();
442 assert_eq!(table.name(), "mv1");
443
444 let columns = table
445 .columns
446 .iter()
447 .map(|col| (col.name(), col.data_type().clone()))
448 .collect::<HashMap<&str, DataType>>();
449
450 let city_type = StructType::new(vec![
451 ("address", DataType::Varchar),
452 ("zipcode", DataType::Varchar),
453 ])
454 .into();
456 let expected_columns = maplit::hashmap! {
457 ROW_ID_COLUMN_NAME => DataType::Serial,
458 "country" => StructType::new(
459 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
460 )
461 .into(),
463 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
464 };
465 assert_eq!(columns, expected_columns, "{columns:#?}");
466 }
467
468 #[tokio::test]
470 async fn test_no_alias() {
471 let frontend = LocalFrontend::new(Default::default()).await;
472
473 let sql = "create table t(x varchar)";
474 frontend.run_sql(sql).await.unwrap();
475
476 let sql = "create materialized view mv0 as select count(x) from t";
478 frontend.run_sql(sql).await.unwrap();
479
480 let sql = "create materialized view mv1 as select count(x), count(*) from t";
482 let err = frontend.run_sql(sql).await.unwrap_err();
483 assert_eq!(
484 err.to_string(),
485 "Invalid input syntax: column \"count\" specified more than once"
486 );
487
488 let sql = "create materialized view mv1 as select 1";
490 let err = frontend.run_sql(sql).await.unwrap_err();
491 assert_eq!(
492 err.to_string(),
493 "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
494 );
495
496 let sql = "create materialized view mv1 as select x is null from t";
498 let err = frontend.run_sql(sql).await.unwrap_err();
499 assert_eq!(
500 err.to_string(),
501 "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
502 );
503 }
504
505 #[tokio::test]
507 async fn test_create_mv_with_order_by() {
508 let frontend = LocalFrontend::new(Default::default()).await;
509
510 let sql = "create table t(x varchar)";
511 frontend.run_sql(sql).await.unwrap();
512
513 let sql = "create materialized view mv1 as select * from t";
515 let response = frontend.run_sql(sql).await.unwrap();
516 assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
517 assert!(response.notices().is_empty());
518
519 let sql = "create materialized view mv2 as select * from t order by x";
521 let response = frontend.run_sql(sql).await.unwrap();
522 assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
523 }
524}