1use std::collections::HashSet;
16
17use either::Either;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::catalog::{FunctionId, ObjectId, TableId};
20use risingwave_pb::catalog::PbTable;
21use risingwave_pb::serverless_backfill_controller::{
22 ProvisionRequest, node_group_controller_service_client,
23};
24use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
25use thiserror_ext::AsReport;
26
27use super::RwPgResponse;
28use crate::WithOptions;
29use crate::binder::{Binder, BoundQuery, BoundSetExpr};
30use crate::catalog::check_column_name_not_reserved;
31use crate::error::ErrorCode::{InvalidInputSyntax, ProtocolError};
32use crate::error::{ErrorCode, Result, RwError};
33use crate::handler::HandlerArgs;
34use crate::optimizer::backfill_order_strategy::plan_backfill_order;
35use crate::optimizer::plan_node::Explain;
36use crate::optimizer::plan_node::generic::GenericPlanRef;
37use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, RelationCollectorVisitor};
38use crate::planner::Planner;
39use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
40use crate::session::{SESSION_MANAGER, SessionImpl};
41use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
42use crate::utils::ordinal;
43
44pub const RESOURCE_GROUP_KEY: &str = "resource_group";
45pub const CLOUD_SERVERLESS_BACKFILL_ENABLED: &str = "cloud.serverless_backfill_enabled";
46
47pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
48 if columns.is_empty() {
49 None
50 } else {
51 Some(columns.iter().map(|v| v.real_value()).collect())
52 }
53}
54
55pub(super) fn get_column_names(
60 bound: &BoundQuery,
61 columns: Vec<Ident>,
62) -> Result<Option<Vec<String>>> {
63 let col_names = parse_column_names(&columns);
64 if let BoundSetExpr::Select(select) = &bound.body {
65 if col_names.is_none() {
70 for (i, alias) in select.aliases.iter().enumerate() {
71 if alias.is_none() {
72 return Err(ErrorCode::BindError(format!(
73 "An alias must be specified for the {} expression (counting from 1) in result relation", ordinal(i+1)
74 ))
75 .into());
76 }
77 }
78 }
79 }
80
81 Ok(col_names)
82}
83
84pub fn gen_create_mv_plan(
86 session: &SessionImpl,
87 context: OptimizerContextRef,
88 query: Query,
89 name: ObjectName,
90 columns: Vec<Ident>,
91 emit_mode: Option<EmitMode>,
92) -> Result<(PlanRef, PbTable)> {
93 let mut binder = Binder::new_for_stream(session);
94 let bound = binder.bind_query(query)?;
95 gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode)
96}
97
98pub fn gen_create_mv_plan_bound(
100 session: &SessionImpl,
101 context: OptimizerContextRef,
102 query: BoundQuery,
103 name: ObjectName,
104 columns: Vec<Ident>,
105 emit_mode: Option<EmitMode>,
106) -> Result<(PlanRef, PbTable)> {
107 if session.config().create_compaction_group_for_mv() {
108 context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
109 }
110
111 let db_name = &session.database();
112 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, name)?;
113
114 let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
115
116 let definition = context.normalized_sql().to_owned();
117
118 let col_names = get_column_names(&query, columns)?;
119
120 let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose);
121 if emit_on_window_close {
122 context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
123 }
124
125 let mut plan_root = Planner::new_for_stream(context).plan_query(query)?;
126 if let Some(col_names) = col_names {
127 for name in &col_names {
128 check_column_name_not_reserved(name)?;
129 }
130 plan_root.set_out_names(col_names)?;
131 }
132 let materialize = plan_root.gen_materialize_plan(
133 database_id,
134 schema_id,
135 table_name,
136 definition,
137 emit_on_window_close,
138 )?;
139 let mut table = materialize.table().to_prost();
140
141 let plan: PlanRef = materialize.into();
142
143 table.owner = session.user_id();
144
145 let ctx = plan.ctx();
146 let explain_trace = ctx.is_explain_trace();
147 if explain_trace {
148 ctx.trace("Create Materialized View:");
149 ctx.trace(plan.explain_to_string());
150 }
151
152 Ok((plan, table))
153}
154
155pub async fn handle_create_mv(
156 handler_args: HandlerArgs,
157 if_not_exists: bool,
158 name: ObjectName,
159 query: Query,
160 columns: Vec<Ident>,
161 emit_mode: Option<EmitMode>,
162) -> Result<RwPgResponse> {
163 let (dependent_relations, dependent_udfs, bound_query) = {
164 let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
165 let bound_query = binder.bind_query(query)?;
166 (
167 binder.included_relations().clone(),
168 binder.included_udfs().clone(),
169 bound_query,
170 )
171 };
172 handle_create_mv_bound(
173 handler_args,
174 if_not_exists,
175 name,
176 bound_query,
177 dependent_relations,
178 dependent_udfs,
179 columns,
180 emit_mode,
181 )
182 .await
183}
184
185pub async fn provision_resource_group(sbc_addr: String) -> Result<String> {
187 let request = tonic::Request::new(ProvisionRequest {});
188 let mut client =
189 node_group_controller_service_client::NodeGroupControllerServiceClient::connect(
190 sbc_addr.clone(),
191 )
192 .await
193 .map_err(|e| {
194 RwError::from(ErrorCode::InternalError(format!(
195 "unable to reach serverless backfill controller at addr {}: {}",
196 sbc_addr,
197 e.as_report()
198 )))
199 })?;
200
201 match client.provision(request).await {
202 Ok(resp) => Ok(resp.into_inner().resource_group),
203 Err(e) => Err(RwError::from(ErrorCode::InternalError(format!(
204 "serverless backfill controller returned error :{}",
205 e.as_report()
206 )))),
207 }
208}
209
210fn get_with_options(handler_args: HandlerArgs) -> WithOptions {
211 let context = OptimizerContext::from_handler_args(handler_args);
212 context.with_options().clone()
213}
214
215pub async fn handle_create_mv_bound(
216 handler_args: HandlerArgs,
217 if_not_exists: bool,
218 name: ObjectName,
219 query: BoundQuery,
220 dependent_relations: HashSet<TableId>,
221 dependent_udfs: HashSet<FunctionId>, columns: Vec<Ident>,
223 emit_mode: Option<EmitMode>,
224) -> Result<RwPgResponse> {
225 let session = handler_args.session.clone();
226
227 session.check_cluster_limits().await?;
229
230 if let Either::Right(resp) = session.check_relation_name_duplicated(
231 name.clone(),
232 StatementType::CREATE_MATERIALIZED_VIEW,
233 if_not_exists,
234 )? {
235 return Ok(resp);
236 }
237
238 let (table, graph, dependencies, resource_group) = {
239 let mut with_options = get_with_options(handler_args.clone());
240 let mut resource_group = with_options.remove(&RESOURCE_GROUP_KEY.to_owned());
241
242 if resource_group.is_some() {
243 risingwave_common::license::Feature::ResourceGroup
244 .check_available()
245 .map_err(|e| anyhow::anyhow!(e))?;
246 }
247
248 let is_serverless_backfill = with_options
249 .remove(&CLOUD_SERVERLESS_BACKFILL_ENABLED.to_owned())
250 .unwrap_or_default()
251 .parse::<bool>()
252 .unwrap_or(false);
253
254 if resource_group.is_some() && is_serverless_backfill {
255 return Err(RwError::from(InvalidInputSyntax(
256 "Please do not specify serverless backfilling and resource group together"
257 .to_owned(),
258 )));
259 }
260
261 if !with_options.is_empty() {
262 return Err(RwError::from(ProtocolError(format!(
264 "unexpected options in WITH clause: {:?}",
265 with_options.keys()
266 ))));
267 }
268
269 let sbc_addr = match SESSION_MANAGER.get() {
270 Some(manager) => manager.env().sbc_address(),
271 None => "",
272 }
273 .to_owned();
274
275 if is_serverless_backfill && sbc_addr.is_empty() {
276 return Err(RwError::from(InvalidInputSyntax(
277 "Serverless Backfill is disabled on-premise. Use RisingWave cloud at https://cloud.risingwave.com/auth/signup to try this feature".to_owned(),
278 )));
279 }
280
281 if is_serverless_backfill {
282 match provision_resource_group(sbc_addr).await {
283 Err(e) => {
284 return Err(RwError::from(ProtocolError(format!(
285 "failed to provision serverless backfill nodes: {}",
286 e.as_report()
287 ))));
288 }
289 Ok(val) => resource_group = Some(val),
290 }
291 }
292 tracing::debug!(
293 resource_group = resource_group,
294 "provisioning on resource group"
295 );
296
297 let context = OptimizerContext::from_handler_args(handler_args);
298 let has_order_by = !query.order.is_empty();
299 if has_order_by {
300 context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
301It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
302"#.to_owned());
303 }
304
305 if resource_group.is_some()
306 && !context
307 .session_ctx()
308 .config()
309 .streaming_use_arrangement_backfill()
310 {
311 return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
312 }
313
314 let context: OptimizerContextRef = context.into();
315
316 let (plan, table) =
317 gen_create_mv_plan_bound(&session, context.clone(), query, name, columns, emit_mode)?;
318
319 let backfill_order = plan_backfill_order(
320 context.session_ctx().as_ref(),
321 context.with_options().backfill_order_strategy(),
322 plan.clone(),
323 )?;
324
325 let dependencies =
328 RelationCollectorVisitor::collect_with(dependent_relations, plan.clone())
329 .into_iter()
330 .map(|id| id.table_id() as ObjectId)
331 .chain(
332 dependent_udfs
333 .into_iter()
334 .map(|id| id.function_id() as ObjectId),
335 )
336 .collect();
337
338 let graph = build_graph_with_strategy(
339 plan,
340 Some(GraphJobType::MaterializedView),
341 Some(backfill_order),
342 )?;
343
344 (table, graph, dependencies, resource_group)
345 };
346
347 let _job_guard =
349 session
350 .env()
351 .creating_streaming_job_tracker()
352 .guard(CreatingStreamingJobInfo::new(
353 session.session_id(),
354 table.database_id,
355 table.schema_id,
356 table.name.clone(),
357 ));
358
359 let session = session.clone();
360 let catalog_writer = session.catalog_writer()?;
361 catalog_writer
362 .create_materialized_view(table, graph, dependencies, resource_group)
363 .await?;
364
365 Ok(PgResponse::empty_result(
366 StatementType::CREATE_MATERIALIZED_VIEW,
367 ))
368}
369
370#[cfg(test)]
371pub mod tests {
372 use std::collections::HashMap;
373
374 use pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW;
375 use risingwave_common::catalog::{
376 DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
377 };
378 use risingwave_common::types::{DataType, StructType};
379
380 use crate::catalog::root_catalog::SchemaPath;
381 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
382
383 #[tokio::test]
384 async fn test_create_mv_handler() {
385 let proto_file = create_proto_file(PROTO_FILE_DATA);
386 let sql = format!(
387 r#"CREATE SOURCE t1
388 WITH (connector = 'kinesis')
389 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
390 proto_file.path().to_str().unwrap()
391 );
392 let frontend = LocalFrontend::new(Default::default()).await;
393 frontend.run_sql(sql).await.unwrap();
394
395 let sql = "create materialized view mv1 as select t1.country from t1";
396 frontend.run_sql(sql).await.unwrap();
397
398 let session = frontend.session_ref();
399 let catalog_reader = session.env().catalog_reader().read_guard();
400 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
401
402 let (source, _) = catalog_reader
404 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
405 .unwrap();
406 assert_eq!(source.name, "t1");
407
408 let (table, _) = catalog_reader
410 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
411 .unwrap();
412 assert_eq!(table.name(), "mv1");
413
414 let columns = table
415 .columns
416 .iter()
417 .map(|col| (col.name(), col.data_type().clone()))
418 .collect::<HashMap<&str, DataType>>();
419
420 let city_type = StructType::new(vec![
421 ("address", DataType::Varchar),
422 ("zipcode", DataType::Varchar),
423 ])
424 .into();
426 let expected_columns = maplit::hashmap! {
427 ROW_ID_COLUMN_NAME => DataType::Serial,
428 "country" => StructType::new(
429 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
430 )
431 .into(),
433 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
434 };
435 assert_eq!(columns, expected_columns, "{columns:#?}");
436 }
437
438 #[tokio::test]
440 async fn test_no_alias() {
441 let frontend = LocalFrontend::new(Default::default()).await;
442
443 let sql = "create table t(x varchar)";
444 frontend.run_sql(sql).await.unwrap();
445
446 let sql = "create materialized view mv0 as select count(x) from t";
448 frontend.run_sql(sql).await.unwrap();
449
450 let sql = "create materialized view mv1 as select count(x), count(*) from t";
452 let err = frontend.run_sql(sql).await.unwrap_err();
453 assert_eq!(
454 err.to_string(),
455 "Invalid input syntax: column \"count\" specified more than once"
456 );
457
458 let sql = "create materialized view mv1 as select 1";
460 let err = frontend.run_sql(sql).await.unwrap_err();
461 assert_eq!(
462 err.to_string(),
463 "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
464 );
465
466 let sql = "create materialized view mv1 as select x is null from t";
468 let err = frontend.run_sql(sql).await.unwrap_err();
469 assert_eq!(
470 err.to_string(),
471 "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
472 );
473 }
474
475 #[tokio::test]
477 async fn test_create_mv_with_order_by() {
478 let frontend = LocalFrontend::new(Default::default()).await;
479
480 let sql = "create table t(x varchar)";
481 frontend.run_sql(sql).await.unwrap();
482
483 let sql = "create materialized view mv1 as select * from t";
485 let response = frontend.run_sql(sql).await.unwrap();
486 assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
487 assert!(response.notices().is_empty());
488
489 let sql = "create materialized view mv2 as select * from t order by x";
491 let response = frontend.run_sql(sql).await.unwrap();
492 assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
493 }
494}