risingwave_frontend/handler/
create_table_as.rs1use std::collections::HashSet;
16
17use either::Either;
18use pgwire::pg_response::StatementType;
19use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
20use risingwave_pb::ddl_service::TableJobType;
21use risingwave_sqlparser::ast::{ColumnDef, ObjectName, OnConflict, Query, Statement};
22
23use super::{HandlerArgs, RwPgResponse};
24use crate::binder::BoundStatement;
25use crate::error::{ErrorCode, Result};
26use crate::handler::create_table::{
27 ColumnIdGenerator, CreateTableProps, gen_create_table_plan_without_source,
28};
29use crate::handler::query::handle_query;
30use crate::handler::util::{
31 LongRunningNotificationAction, execute_with_long_running_notification,
32 reject_internal_table_dependencies,
33};
34use crate::stream_fragmenter::GraphJobType;
35use crate::{Binder, OptimizerContext, build_graph};
36
37pub async fn handle_create_as(
38 handler_args: HandlerArgs,
39 table_name: ObjectName,
40 if_not_exists: bool,
41 query: Box<Query>,
42 column_defs: Vec<ColumnDef>,
43 append_only: bool,
44 on_conflict: Option<OnConflict>,
45 with_version_columns: Vec<String>,
46 ast_engine: risingwave_sqlparser::ast::Engine,
47) -> Result<RwPgResponse> {
48 if column_defs.iter().any(|column| column.data_type.is_some()) {
49 return Err(ErrorCode::InvalidInputSyntax(
50 "Should not specify data type in CREATE TABLE AS".into(),
51 )
52 .into());
53 }
54 let engine = match ast_engine {
55 risingwave_sqlparser::ast::Engine::Hummock => risingwave_common::catalog::Engine::Hummock,
56 risingwave_sqlparser::ast::Engine::Iceberg => risingwave_common::catalog::Engine::Iceberg,
57 };
58
59 let session = handler_args.session.clone();
60
61 if let Either::Right(resp) = session.check_relation_name_duplicated(
62 table_name.clone(),
63 StatementType::CREATE_TABLE,
64 if_not_exists,
65 )? {
66 return Ok(resp);
67 }
68
69 let mut col_id_gen = ColumnIdGenerator::new_initial();
70
71 let (mut columns, dependent_relations): (Vec<_>, HashSet<_>) = {
73 let mut binder = Binder::new_for_batch(&session);
74 let bound = binder.bind(Statement::Query(query.clone()))?;
75 if let BoundStatement::Query(query) = bound {
76 let columns = query
78 .schema()
79 .fields()
80 .iter()
81 .map(|field| ColumnCatalog {
82 column_desc: ColumnDesc::from_field_without_column_id(field),
83 is_hidden: false,
84 })
85 .collect();
86 (columns, binder.included_relations().clone())
87 } else {
88 unreachable!()
89 }
90 };
91
92 reject_internal_table_dependencies(&session, &dependent_relations, "CREATE TABLE AS")?;
93
94 for c in &mut columns {
96 col_id_gen.generate(c)?;
97 }
98
99 if column_defs.len() > columns.len() {
100 return Err(ErrorCode::InvalidInputSyntax(
101 "too many column names were specified".to_owned(),
102 )
103 .into());
104 }
105
106 column_defs.iter().enumerate().for_each(|(idx, column)| {
108 columns[idx].column_desc.name = column.name.real_value();
109 });
110
111 let (graph, source, table) = {
112 let context = OptimizerContext::from_handler_args(handler_args.clone());
113 let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
114 if !secret_refs.is_empty() || !connection_refs.is_empty() {
115 return Err(crate::error::ErrorCode::InvalidParameterValue(
116 "Secret reference and Connection reference are not allowed in options for CREATE TABLE AS".to_owned(),
117 )
118 .into());
119 }
120 let (plan, table) = gen_create_table_plan_without_source(
121 context,
122 table_name.clone(),
123 columns,
124 vec![],
125 vec![],
126 vec![], col_id_gen.into_version(),
128 CreateTableProps {
129 definition: "".to_owned(),
132 append_only,
133 on_conflict: on_conflict.into(),
134 with_version_columns,
135 webhook_info: None,
136 engine,
137 },
138 )?;
139 let graph = build_graph(plan, Some(GraphJobType::Table))?;
140
141 (graph, None, table)
142 };
143
144 tracing::trace!(
145 "name={}, graph=\n{}",
146 table_name,
147 serde_json::to_string_pretty(&graph).unwrap()
148 );
149
150 let catalog_writer = session.catalog_writer()?;
151 execute_with_long_running_notification(
152 catalog_writer.create_table(
153 source,
154 table.to_prost(),
155 graph,
156 TableJobType::Unspecified,
157 if_not_exists,
158 HashSet::default(),
159 ),
160 &session,
161 "CREATE TABLE AS",
162 LongRunningNotificationAction::DiagnoseBarrierLatency,
163 )
164 .await?;
165
166 let insert = Statement::Insert {
168 table_name,
169 columns: vec![],
170 source: query,
171 returning: vec![],
172 };
173
174 handle_query(handler_args, insert, vec![]).await
175}