risingwave_frontend/handler/
create_table_as.rs1use either::Either;
16use pgwire::pg_response::StatementType;
17use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
18use risingwave_pb::ddl_service::TableJobType;
19use risingwave_sqlparser::ast::{ColumnDef, ObjectName, OnConflict, Query, Statement};
20
21use super::{HandlerArgs, RwPgResponse};
22use crate::binder::BoundStatement;
23use crate::error::{ErrorCode, Result};
24use crate::handler::create_table::{
25 ColumnIdGenerator, CreateTableProps, gen_create_table_plan_without_source,
26};
27use crate::handler::query::handle_query;
28use crate::{Binder, OptimizerContext, build_graph};
29pub async fn handle_create_as(
30 handler_args: HandlerArgs,
31 table_name: ObjectName,
32 if_not_exists: bool,
33 query: Box<Query>,
34 column_defs: Vec<ColumnDef>,
35 append_only: bool,
36 on_conflict: Option<OnConflict>,
37 with_version_column: Option<String>,
38 ast_engine: risingwave_sqlparser::ast::Engine,
39) -> Result<RwPgResponse> {
40 if column_defs.iter().any(|column| column.data_type.is_some()) {
41 return Err(ErrorCode::InvalidInputSyntax(
42 "Should not specify data type in CREATE TABLE AS".into(),
43 )
44 .into());
45 }
46 let engine = match ast_engine {
47 risingwave_sqlparser::ast::Engine::Hummock => risingwave_common::catalog::Engine::Hummock,
48 risingwave_sqlparser::ast::Engine::Iceberg => risingwave_common::catalog::Engine::Iceberg,
49 };
50
51 let session = handler_args.session.clone();
52
53 if let Either::Right(resp) = session.check_relation_name_duplicated(
54 table_name.clone(),
55 StatementType::CREATE_TABLE,
56 if_not_exists,
57 )? {
58 return Ok(resp);
59 }
60
61 let mut col_id_gen = ColumnIdGenerator::new_initial();
62
63 let mut columns: Vec<_> = {
65 let mut binder = Binder::new(&session);
66 let bound = binder.bind(Statement::Query(query.clone()))?;
67 if let BoundStatement::Query(query) = bound {
68 query
70 .schema()
71 .fields()
72 .iter()
73 .map(|field| ColumnCatalog {
74 column_desc: ColumnDesc::from_field_without_column_id(field),
75 is_hidden: false,
76 })
77 .collect()
78 } else {
79 unreachable!()
80 }
81 };
82
83 for c in &mut columns {
85 col_id_gen.generate(c)?;
86 }
87
88 if column_defs.len() > columns.len() {
89 return Err(ErrorCode::InvalidInputSyntax(
90 "too many column names were specified".to_owned(),
91 )
92 .into());
93 }
94
95 column_defs.iter().enumerate().for_each(|(idx, column)| {
97 columns[idx].column_desc.name = column.name.real_value();
98 });
99
100 let (graph, source, table) = {
101 let context = OptimizerContext::from_handler_args(handler_args.clone());
102 let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
103 if !secret_refs.is_empty() || !connection_refs.is_empty() {
104 return Err(crate::error::ErrorCode::InvalidParameterValue(
105 "Secret reference and Connection reference are not allowed in options for CREATE TABLE AS".to_owned(),
106 )
107 .into());
108 }
109 let (plan, table) = gen_create_table_plan_without_source(
110 context,
111 table_name.clone(),
112 columns,
113 vec![],
114 vec![],
115 vec![], col_id_gen.into_version(),
117 CreateTableProps {
118 definition: "".to_owned(),
121 append_only,
122 on_conflict: on_conflict.into(),
123 with_version_column,
124 webhook_info: None,
125 engine,
126 },
127 )?;
128 let graph = build_graph(plan)?;
129
130 (graph, None, table)
131 };
132
133 tracing::trace!(
134 "name={}, graph=\n{}",
135 table_name,
136 serde_json::to_string_pretty(&graph).unwrap()
137 );
138
139 let catalog_writer = session.catalog_writer()?;
140 catalog_writer
141 .create_table(source, table, graph, TableJobType::Unspecified)
142 .await?;
143
144 let insert = Statement::Insert {
146 table_name,
147 columns: vec![],
148 source: query,
149 returning: vec![],
150 };
151
152 handle_query(handler_args, insert, vec![]).await
153}