risingwave_frontend/handler/
create_table_as.rs1use either::Either;
16use pgwire::pg_response::StatementType;
17use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
18use risingwave_pb::ddl_service::TableJobType;
19use risingwave_sqlparser::ast::{ColumnDef, ObjectName, OnConflict, Query, Statement};
20
21use super::{HandlerArgs, RwPgResponse};
22use crate::binder::BoundStatement;
23use crate::error::{ErrorCode, Result};
24use crate::handler::create_table::{
25 ColumnIdGenerator, CreateTableProps, gen_create_table_plan_without_source,
26};
27use crate::handler::query::handle_query;
28use crate::stream_fragmenter::GraphJobType;
29use crate::{Binder, OptimizerContext, build_graph};
30
31pub async fn handle_create_as(
32 handler_args: HandlerArgs,
33 table_name: ObjectName,
34 if_not_exists: bool,
35 query: Box<Query>,
36 column_defs: Vec<ColumnDef>,
37 append_only: bool,
38 on_conflict: Option<OnConflict>,
39 with_version_column: Option<String>,
40 ast_engine: risingwave_sqlparser::ast::Engine,
41) -> Result<RwPgResponse> {
42 if column_defs.iter().any(|column| column.data_type.is_some()) {
43 return Err(ErrorCode::InvalidInputSyntax(
44 "Should not specify data type in CREATE TABLE AS".into(),
45 )
46 .into());
47 }
48 let engine = match ast_engine {
49 risingwave_sqlparser::ast::Engine::Hummock => risingwave_common::catalog::Engine::Hummock,
50 risingwave_sqlparser::ast::Engine::Iceberg => risingwave_common::catalog::Engine::Iceberg,
51 };
52
53 let session = handler_args.session.clone();
54
55 if let Either::Right(resp) = session.check_relation_name_duplicated(
56 table_name.clone(),
57 StatementType::CREATE_TABLE,
58 if_not_exists,
59 )? {
60 return Ok(resp);
61 }
62
63 let mut col_id_gen = ColumnIdGenerator::new_initial();
64
65 let mut columns: Vec<_> = {
67 let mut binder = Binder::new(&session);
68 let bound = binder.bind(Statement::Query(query.clone()))?;
69 if let BoundStatement::Query(query) = bound {
70 query
72 .schema()
73 .fields()
74 .iter()
75 .map(|field| ColumnCatalog {
76 column_desc: ColumnDesc::from_field_without_column_id(field),
77 is_hidden: false,
78 })
79 .collect()
80 } else {
81 unreachable!()
82 }
83 };
84
85 for c in &mut columns {
87 col_id_gen.generate(c)?;
88 }
89
90 if column_defs.len() > columns.len() {
91 return Err(ErrorCode::InvalidInputSyntax(
92 "too many column names were specified".to_owned(),
93 )
94 .into());
95 }
96
97 column_defs.iter().enumerate().for_each(|(idx, column)| {
99 columns[idx].column_desc.name = column.name.real_value();
100 });
101
102 let (graph, source, table) = {
103 let context = OptimizerContext::from_handler_args(handler_args.clone());
104 let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
105 if !secret_refs.is_empty() || !connection_refs.is_empty() {
106 return Err(crate::error::ErrorCode::InvalidParameterValue(
107 "Secret reference and Connection reference are not allowed in options for CREATE TABLE AS".to_owned(),
108 )
109 .into());
110 }
111 let (plan, table) = gen_create_table_plan_without_source(
112 context,
113 table_name.clone(),
114 columns,
115 vec![],
116 vec![],
117 vec![], col_id_gen.into_version(),
119 CreateTableProps {
120 definition: "".to_owned(),
123 append_only,
124 on_conflict: on_conflict.into(),
125 with_version_column,
126 webhook_info: None,
127 engine,
128 },
129 )?;
130 let graph = build_graph(plan, Some(GraphJobType::Table))?;
131
132 (graph, None, table)
133 };
134
135 tracing::trace!(
136 "name={}, graph=\n{}",
137 table_name,
138 serde_json::to_string_pretty(&graph).unwrap()
139 );
140
141 let catalog_writer = session.catalog_writer()?;
142 catalog_writer
143 .create_table(source, table, graph, TableJobType::Unspecified)
144 .await?;
145
146 let insert = Statement::Insert {
148 table_name,
149 columns: vec![],
150 source: query,
151 returning: vec![],
152 };
153
154 handle_query(handler_args, insert, vec![]).await
155}