risingwave_frontend/handler/
create_table_as.rs1use std::collections::HashSet;
16
17use either::Either;
18use pgwire::pg_response::StatementType;
19use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
20use risingwave_pb::ddl_service::TableJobType;
21use risingwave_sqlparser::ast::{ColumnDef, ObjectName, OnConflict, Query, Statement};
22
23use super::{HandlerArgs, RwPgResponse};
24use crate::binder::BoundStatement;
25use crate::error::{ErrorCode, Result};
26use crate::handler::create_table::{
27 ColumnIdGenerator, CreateTableProps, gen_create_table_plan_without_source,
28};
29use crate::handler::query::handle_query;
30use crate::handler::util::{LongRunningNotificationAction, execute_with_long_running_notification};
31use crate::stream_fragmenter::GraphJobType;
32use crate::{Binder, OptimizerContext, build_graph};
33
34pub async fn handle_create_as(
35 handler_args: HandlerArgs,
36 table_name: ObjectName,
37 if_not_exists: bool,
38 query: Box<Query>,
39 column_defs: Vec<ColumnDef>,
40 append_only: bool,
41 on_conflict: Option<OnConflict>,
42 with_version_columns: Vec<String>,
43 ast_engine: risingwave_sqlparser::ast::Engine,
44) -> Result<RwPgResponse> {
45 if column_defs.iter().any(|column| column.data_type.is_some()) {
46 return Err(ErrorCode::InvalidInputSyntax(
47 "Should not specify data type in CREATE TABLE AS".into(),
48 )
49 .into());
50 }
51 let engine = match ast_engine {
52 risingwave_sqlparser::ast::Engine::Hummock => risingwave_common::catalog::Engine::Hummock,
53 risingwave_sqlparser::ast::Engine::Iceberg => risingwave_common::catalog::Engine::Iceberg,
54 };
55
56 let session = handler_args.session.clone();
57
58 if let Either::Right(resp) = session.check_relation_name_duplicated(
59 table_name.clone(),
60 StatementType::CREATE_TABLE,
61 if_not_exists,
62 )? {
63 return Ok(resp);
64 }
65
66 let mut col_id_gen = ColumnIdGenerator::new_initial();
67
68 let mut columns: Vec<_> = {
70 let mut binder = Binder::new_for_batch(&session);
71 let bound = binder.bind(Statement::Query(query.clone()))?;
72 if let BoundStatement::Query(query) = bound {
73 query
75 .schema()
76 .fields()
77 .iter()
78 .map(|field| ColumnCatalog {
79 column_desc: ColumnDesc::from_field_without_column_id(field),
80 is_hidden: false,
81 })
82 .collect()
83 } else {
84 unreachable!()
85 }
86 };
87
88 for c in &mut columns {
90 col_id_gen.generate(c)?;
91 }
92
93 if column_defs.len() > columns.len() {
94 return Err(ErrorCode::InvalidInputSyntax(
95 "too many column names were specified".to_owned(),
96 )
97 .into());
98 }
99
100 column_defs.iter().enumerate().for_each(|(idx, column)| {
102 columns[idx].column_desc.name = column.name.real_value();
103 });
104
105 let (graph, source, table) = {
106 let context = OptimizerContext::from_handler_args(handler_args.clone());
107 let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
108 if !secret_refs.is_empty() || !connection_refs.is_empty() {
109 return Err(crate::error::ErrorCode::InvalidParameterValue(
110 "Secret reference and Connection reference are not allowed in options for CREATE TABLE AS".to_owned(),
111 )
112 .into());
113 }
114 let (plan, table) = gen_create_table_plan_without_source(
115 context,
116 table_name.clone(),
117 columns,
118 vec![],
119 vec![],
120 vec![], col_id_gen.into_version(),
122 CreateTableProps {
123 definition: "".to_owned(),
126 append_only,
127 on_conflict: on_conflict.into(),
128 with_version_columns,
129 webhook_info: None,
130 engine,
131 },
132 )?;
133 let graph = build_graph(plan, Some(GraphJobType::Table))?;
134
135 (graph, None, table)
136 };
137
138 tracing::trace!(
139 "name={}, graph=\n{}",
140 table_name,
141 serde_json::to_string_pretty(&graph).unwrap()
142 );
143
144 let catalog_writer = session.catalog_writer()?;
145 execute_with_long_running_notification(
146 catalog_writer.create_table(
147 source,
148 table.to_prost(),
149 graph,
150 TableJobType::Unspecified,
151 if_not_exists,
152 HashSet::default(),
153 ),
154 &session,
155 "CREATE TABLE AS",
156 LongRunningNotificationAction::DiagnoseBarrierLatency,
157 )
158 .await?;
159
160 let insert = Statement::Insert {
162 table_name,
163 columns: vec![],
164 source: query,
165 returning: vec![],
166 };
167
168 handle_query(handler_args, insert, vec![]).await
169}