risingwave_frontend/handler/
create_table_as.rsuse either::Either;
use pgwire::pg_response::StatementType;
use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
use risingwave_pb::ddl_service::TableJobType;
use risingwave_sqlparser::ast::{ColumnDef, ObjectName, OnConflict, Query, Statement};
use super::{HandlerArgs, RwPgResponse};
use crate::binder::BoundStatement;
use crate::error::{ErrorCode, Result};
use crate::handler::create_table::{gen_create_table_plan_without_source, ColumnIdGenerator};
use crate::handler::query::handle_query;
use crate::{build_graph, Binder, OptimizerContext};
pub async fn handle_create_as(
handler_args: HandlerArgs,
table_name: ObjectName,
if_not_exists: bool,
query: Box<Query>,
column_defs: Vec<ColumnDef>,
append_only: bool,
on_conflict: Option<OnConflict>,
with_version_column: Option<String>,
) -> Result<RwPgResponse> {
if column_defs.iter().any(|column| column.data_type.is_some()) {
return Err(ErrorCode::InvalidInputSyntax(
"Should not specify data type in CREATE TABLE AS".into(),
)
.into());
}
let session = handler_args.session.clone();
if let Either::Right(resp) = session.check_relation_name_duplicated(
table_name.clone(),
StatementType::CREATE_TABLE,
if_not_exists,
)? {
return Ok(resp);
}
let mut col_id_gen = ColumnIdGenerator::new_initial();
let mut columns: Vec<_> = {
let mut binder = Binder::new(&session);
let bound = binder.bind(Statement::Query(query.clone()))?;
if let BoundStatement::Query(query) = bound {
query
.schema()
.fields()
.iter()
.map(|field| {
let id = col_id_gen.generate(&field.name);
ColumnCatalog {
column_desc: ColumnDesc::from_field_with_column_id(field, id.get_id()),
is_hidden: false,
}
})
.collect()
} else {
unreachable!()
}
};
if column_defs.len() > columns.len() {
return Err(ErrorCode::InvalidInputSyntax(
"too many column names were specified".to_string(),
)
.into());
}
column_defs.iter().enumerate().for_each(|(idx, column)| {
columns[idx].column_desc.name = column.name.real_value();
});
let (graph, source, table) = {
let context = OptimizerContext::from_handler_args(handler_args.clone());
let (_, secret_refs) = context.with_options().clone().into_parts();
if !secret_refs.is_empty() {
return Err(crate::error::ErrorCode::InvalidParameterValue(
"Secret reference is not allowed in options for CREATE TABLE AS".to_string(),
)
.into());
}
let (plan, table) = gen_create_table_plan_without_source(
context,
table_name.clone(),
columns,
vec![],
vec![],
"".to_owned(), vec![], append_only,
on_conflict,
with_version_column,
Some(col_id_gen.into_version()),
)?;
let graph = build_graph(plan)?;
(graph, None, table)
};
tracing::trace!(
"name={}, graph=\n{}",
table_name,
serde_json::to_string_pretty(&graph).unwrap()
);
let catalog_writer = session.catalog_writer()?;
catalog_writer
.create_table(source, table, graph, TableJobType::Unspecified)
.await?;
let insert = Statement::Insert {
table_name,
columns: vec![],
source: query,
returning: vec![],
};
handle_query(handler_args, insert, vec![]).await
}