risingwave_frontend/handler/
create_table_as.rs1use std::collections::HashSet;
16
17use either::Either;
18use pgwire::pg_response::StatementType;
19use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
20use risingwave_pb::ddl_service::TableJobType;
21use risingwave_sqlparser::ast::{ColumnDef, ObjectName, OnConflict, Query, Statement};
22
23use super::{HandlerArgs, RwPgResponse};
24use crate::binder::BoundStatement;
25use crate::error::{ErrorCode, Result};
26use crate::handler::create_table::{
27 ColumnIdGenerator, CreateTableProps, gen_create_table_plan_without_source,
28};
29use crate::handler::query::handle_query;
30use crate::stream_fragmenter::GraphJobType;
31use crate::{Binder, OptimizerContext, build_graph};
32
33pub async fn handle_create_as(
34 handler_args: HandlerArgs,
35 table_name: ObjectName,
36 if_not_exists: bool,
37 query: Box<Query>,
38 column_defs: Vec<ColumnDef>,
39 append_only: bool,
40 on_conflict: Option<OnConflict>,
41 with_version_column: Option<String>,
42 ast_engine: risingwave_sqlparser::ast::Engine,
43) -> Result<RwPgResponse> {
44 if column_defs.iter().any(|column| column.data_type.is_some()) {
45 return Err(ErrorCode::InvalidInputSyntax(
46 "Should not specify data type in CREATE TABLE AS".into(),
47 )
48 .into());
49 }
50 let engine = match ast_engine {
51 risingwave_sqlparser::ast::Engine::Hummock => risingwave_common::catalog::Engine::Hummock,
52 risingwave_sqlparser::ast::Engine::Iceberg => risingwave_common::catalog::Engine::Iceberg,
53 };
54
55 let session = handler_args.session.clone();
56
57 if let Either::Right(resp) = session.check_relation_name_duplicated(
58 table_name.clone(),
59 StatementType::CREATE_TABLE,
60 if_not_exists,
61 )? {
62 return Ok(resp);
63 }
64
65 let mut col_id_gen = ColumnIdGenerator::new_initial();
66
67 let mut columns: Vec<_> = {
69 let mut binder = Binder::new(&session);
70 let bound = binder.bind(Statement::Query(query.clone()))?;
71 if let BoundStatement::Query(query) = bound {
72 query
74 .schema()
75 .fields()
76 .iter()
77 .map(|field| ColumnCatalog {
78 column_desc: ColumnDesc::from_field_without_column_id(field),
79 is_hidden: false,
80 })
81 .collect()
82 } else {
83 unreachable!()
84 }
85 };
86
87 for c in &mut columns {
89 col_id_gen.generate(c)?;
90 }
91
92 if column_defs.len() > columns.len() {
93 return Err(ErrorCode::InvalidInputSyntax(
94 "too many column names were specified".to_owned(),
95 )
96 .into());
97 }
98
99 column_defs.iter().enumerate().for_each(|(idx, column)| {
101 columns[idx].column_desc.name = column.name.real_value();
102 });
103
104 let (graph, source, table) = {
105 let context = OptimizerContext::from_handler_args(handler_args.clone());
106 let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
107 if !secret_refs.is_empty() || !connection_refs.is_empty() {
108 return Err(crate::error::ErrorCode::InvalidParameterValue(
109 "Secret reference and Connection reference are not allowed in options for CREATE TABLE AS".to_owned(),
110 )
111 .into());
112 }
113 let (plan, table) = gen_create_table_plan_without_source(
114 context,
115 table_name.clone(),
116 columns,
117 vec![],
118 vec![],
119 vec![], col_id_gen.into_version(),
121 CreateTableProps {
122 definition: "".to_owned(),
125 append_only,
126 on_conflict: on_conflict.into(),
127 with_version_column,
128 webhook_info: None,
129 engine,
130 },
131 )?;
132 let graph = build_graph(plan, Some(GraphJobType::Table))?;
133
134 (graph, None, table)
135 };
136
137 tracing::trace!(
138 "name={}, graph=\n{}",
139 table_name,
140 serde_json::to_string_pretty(&graph).unwrap()
141 );
142
143 let catalog_writer = session.catalog_writer()?;
144 catalog_writer
145 .create_table(
146 source,
147 table.to_prost(),
148 graph,
149 TableJobType::Unspecified,
150 if_not_exists,
151 HashSet::default(),
152 )
153 .await?;
154
155 let insert = Statement::Insert {
157 table_name,
158 columns: vec![],
159 source: query,
160 returning: vec![],
161 };
162
163 handle_query(handler_args, insert, vec![]).await
164}