use std::collections::{BTreeMap, HashMap};
use std::rc::Rc;
use std::sync::LazyLock;
use anyhow::{anyhow, Context};
use either::Either;
use itertools::Itertools;
use maplit::{convert_args, hashmap};
use pgwire::pg_response::{PgResponse, StatementType};
use rand::Rng;
use risingwave_common::array::arrow::{arrow_schema_iceberg, IcebergArrowConvert};
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::{
debug_assert_column_ids_distinct, ColumnCatalog, ColumnDesc, ColumnId, Schema, TableId,
INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME,
};
use risingwave_common::license::Feature;
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::types::DataType;
use risingwave_connector::parser::additional_columns::{
build_additional_column_desc, get_supported_additional_columns,
};
use risingwave_connector::parser::{
fetch_json_schema_and_map_to_columns, AvroParserConfig, DebeziumAvroParserConfig,
ProtobufParserConfig, SchemaLocation, SpecificParserConfig, TimestamptzHandling,
DEBEZIUM_IGNORE_KEY,
};
use risingwave_connector::schema::schema_registry::{
name_strategy_from_str, SchemaRegistryAuth, SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_USERNAME,
};
use risingwave_connector::schema::AWS_GLUE_SCHEMA_ARN_KEY;
use risingwave_connector::source::cdc::{
CDC_AUTO_SCHEMA_CHANGE_KEY, CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL, CDC_SNAPSHOT_MODE_KEY,
CDC_TRANSACTIONAL_KEY, CDC_WAIT_FOR_STREAMING_START_TIMEOUT, CITUS_CDC_CONNECTOR,
MONGODB_CDC_CONNECTOR, MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR,
};
use risingwave_connector::source::datagen::DATAGEN_CONNECTOR;
use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
use risingwave_connector::source::nexmark::source::{get_event_data_types_with_names, EventType};
use risingwave_connector::source::test_source::TEST_CONNECTOR;
pub use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
use risingwave_connector::source::{
ConnectorProperties, AZBLOB_CONNECTOR, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR,
KINESIS_CONNECTOR, MQTT_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, OPENDAL_S3_CONNECTOR,
POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR,
};
use risingwave_connector::WithPropertiesExt;
use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc};
use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
use risingwave_pb::plan_common::{EncodeType, FormatType};
use risingwave_sqlparser::ast::{
get_delimiter, AstString, ColumnDef, CreateSourceStatement, Encode, Format,
FormatEncodeOptions, ObjectName, ProtobufSchema, SourceWatermark, TableConstraint,
};
use risingwave_sqlparser::parser::{IncludeOption, IncludeOptionItem};
use thiserror_ext::AsReport;
use super::RwPgResponse;
use crate::binder::Binder;
use crate::catalog::source_catalog::SourceCatalog;
use crate::catalog::{CatalogError, DatabaseId, SchemaId};
use crate::error::ErrorCode::{self, Deprecated, InvalidInputSyntax, NotSupported, ProtocolError};
use crate::error::{Result, RwError};
use crate::expr::Expr;
use crate::handler::create_table::{
bind_pk_and_row_id_on_relation, bind_sql_column_constraints, bind_sql_columns,
bind_sql_pk_names, bind_table_constraints, ColumnIdGenerator,
};
use crate::handler::util::SourceSchemaCompatExt;
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::generic::SourceNodeKind;
use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext};
use crate::session::SessionImpl;
use crate::utils::{
resolve_privatelink_in_with_option, resolve_secret_ref_in_with_options, OverwriteOptions,
};
use crate::{bind_data_type, build_graph, OptimizerContext, WithOptions, WithOptionsSecResolved};
async fn extract_json_table_schema(
schema_config: &Option<(AstString, bool)>,
with_properties: &BTreeMap<String, String>,
format_encode_options: &mut BTreeMap<String, String>,
) -> Result<Option<Vec<ColumnCatalog>>> {
match schema_config {
None => Ok(None),
Some((schema_location, use_schema_registry)) => {
let schema_registry_auth = use_schema_registry.then(|| {
let auth = SchemaRegistryAuth::from(&*format_encode_options);
try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_USERNAME);
try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_PASSWORD);
auth
});
Ok(Some(
fetch_json_schema_and_map_to_columns(
&schema_location.0,
schema_registry_auth,
with_properties,
)
.await?
.into_iter()
.map(|col| ColumnCatalog {
column_desc: col.into(),
is_hidden: false,
})
.collect_vec(),
))
}
}
}
pub fn debezium_cdc_source_schema() -> Vec<ColumnCatalog> {
let columns = vec![
ColumnCatalog {
column_desc: ColumnDesc::named("payload", ColumnId::placeholder(), DataType::Jsonb),
is_hidden: false,
},
ColumnCatalog::offset_column(),
ColumnCatalog::cdc_table_name_column(),
];
columns
}
fn json_schema_infer_use_schema_registry(schema_config: &Option<(AstString, bool)>) -> bool {
match schema_config {
None => false,
Some((_, use_registry)) => *use_registry,
}
}
async fn extract_avro_table_schema(
info: &StreamSourceInfo,
with_properties: &WithOptionsSecResolved,
format_encode_options: &mut BTreeMap<String, String>,
is_debezium: bool,
) -> Result<Vec<ColumnCatalog>> {
let parser_config = SpecificParserConfig::new(info, with_properties)?;
try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_USERNAME);
try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_PASSWORD);
consume_aws_config_from_options(format_encode_options);
let vec_column_desc = if is_debezium {
let conf = DebeziumAvroParserConfig::new(parser_config.encoding_config).await?;
conf.map_to_columns()?
} else {
if let risingwave_connector::parser::EncodingProperties::Avro(avro_props) =
&parser_config.encoding_config
&& matches!(avro_props.schema_location, SchemaLocation::File { .. })
&& format_encode_options
.get("with_deprecated_file_header")
.is_none_or(|v| v != "true")
{
bail_not_implemented!(issue = 12871, "avro without schema registry");
}
let conf = AvroParserConfig::new(parser_config.encoding_config).await?;
conf.map_to_columns()?
};
Ok(vec_column_desc
.into_iter()
.map(|col| ColumnCatalog {
column_desc: col.into(),
is_hidden: false,
})
.collect_vec())
}
async fn extract_debezium_avro_table_pk_columns(
info: &StreamSourceInfo,
with_properties: &WithOptionsSecResolved,
) -> Result<Vec<String>> {
let parser_config = SpecificParserConfig::new(info, with_properties)?;
let conf = DebeziumAvroParserConfig::new(parser_config.encoding_config).await?;
Ok(conf.extract_pks()?.drain(..).map(|c| c.name).collect())
}
async fn extract_protobuf_table_schema(
schema: &ProtobufSchema,
with_properties: &WithOptionsSecResolved,
format_encode_options: &mut BTreeMap<String, String>,
) -> Result<Vec<ColumnCatalog>> {
let info = StreamSourceInfo {
proto_message_name: schema.message_name.0.clone(),
row_schema_location: schema.row_schema_location.0.clone(),
use_schema_registry: schema.use_schema_registry,
format: FormatType::Plain.into(),
row_encode: EncodeType::Protobuf.into(),
format_encode_options: format_encode_options.clone(),
..Default::default()
};
let parser_config = SpecificParserConfig::new(&info, with_properties)?;
try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_USERNAME);
try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_PASSWORD);
consume_aws_config_from_options(format_encode_options);
let conf = ProtobufParserConfig::new(parser_config.encoding_config).await?;
let column_descs = conf.map_to_columns()?;
Ok(column_descs
.into_iter()
.map(|col| ColumnCatalog {
column_desc: col.into(),
is_hidden: false,
})
.collect_vec())
}
fn non_generated_sql_columns(columns: &[ColumnDef]) -> Vec<ColumnDef> {
columns
.iter()
.filter(|c| !c.is_generated())
.cloned()
.collect()
}
fn try_consume_string_from_options(
format_encode_options: &mut BTreeMap<String, String>,
key: &str,
) -> Option<AstString> {
format_encode_options.remove(key).map(AstString)
}
fn consume_string_from_options(
format_encode_options: &mut BTreeMap<String, String>,
key: &str,
) -> Result<AstString> {
try_consume_string_from_options(format_encode_options, key).ok_or(RwError::from(ProtocolError(
format!("missing field {} in options", key),
)))
}
fn consume_aws_config_from_options(format_encode_options: &mut BTreeMap<String, String>) {
format_encode_options.retain(|key, _| !key.starts_with("aws."))
}
pub fn get_json_schema_location(
format_encode_options: &mut BTreeMap<String, String>,
) -> Result<Option<(AstString, bool)>> {
let schema_location = try_consume_string_from_options(format_encode_options, "schema.location");
let schema_registry = try_consume_string_from_options(format_encode_options, "schema.registry");
match (schema_location, schema_registry) {
(None, None) => Ok(None),
(None, Some(schema_registry)) => Ok(Some((schema_registry, true))),
(Some(schema_location), None) => Ok(Some((schema_location, false))),
(Some(_), Some(_)) => Err(RwError::from(ProtocolError(
"only need either the schema location or the schema registry".to_string(),
))),
}
}
fn get_schema_location(
format_encode_options: &mut BTreeMap<String, String>,
) -> Result<(AstString, bool)> {
let schema_location = try_consume_string_from_options(format_encode_options, "schema.location");
let schema_registry = try_consume_string_from_options(format_encode_options, "schema.registry");
match (schema_location, schema_registry) {
(None, None) => Err(RwError::from(ProtocolError(
"missing either a schema location or a schema registry".to_string(),
))),
(None, Some(schema_registry)) => Ok((schema_registry, true)),
(Some(schema_location), None) => Ok((schema_location, false)),
(Some(_), Some(_)) => Err(RwError::from(ProtocolError(
"only need either the schema location or the schema registry".to_string(),
))),
}
}
#[inline]
fn get_name_strategy_or_default(name_strategy: Option<AstString>) -> Result<Option<i32>> {
match name_strategy {
None => Ok(None),
Some(name) => Ok(Some(name_strategy_from_str(name.0.as_str())
.ok_or_else(|| RwError::from(ProtocolError(format!("\
expect strategy name in topic_name_strategy, record_name_strategy and topic_record_name_strategy, but got {}", name))))? as i32)),
}
}
pub(crate) async fn bind_columns_from_source(
session: &SessionImpl,
format_encode: &FormatEncodeOptions,
with_properties: Either<&WithOptions, &WithOptionsSecResolved>,
) -> Result<(Option<Vec<ColumnCatalog>>, StreamSourceInfo)> {
const MESSAGE_NAME_KEY: &str = "message";
const KEY_MESSAGE_NAME_KEY: &str = "key.message";
const NAME_STRATEGY_KEY: &str = "schema.registry.name.strategy";
let options_with_secret = match with_properties {
Either::Left(options) => resolve_secret_ref_in_with_options(options.clone(), session)?,
Either::Right(options_with_secret) => options_with_secret.clone(),
};
let is_kafka: bool = options_with_secret.is_kafka_connector();
let (format_encode_options, format_encode_secret_refs) = resolve_secret_ref_in_with_options(
WithOptions::try_from(format_encode.row_options())?,
session,
)?
.into_parts();
let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets(
format_encode_options.clone(),
format_encode_secret_refs.clone(),
)?;
fn get_key_message_name(options: &mut BTreeMap<String, String>) -> Option<String> {
consume_string_from_options(options, KEY_MESSAGE_NAME_KEY)
.map(|ele| Some(ele.0))
.unwrap_or(None)
}
fn get_sr_name_strategy_check(
options: &mut BTreeMap<String, String>,
use_sr: bool,
) -> Result<Option<i32>> {
let name_strategy = get_name_strategy_or_default(try_consume_string_from_options(
options,
NAME_STRATEGY_KEY,
))?;
if !use_sr && name_strategy.is_some() {
return Err(RwError::from(ProtocolError(
"schema registry name strategy only works with schema registry enabled".to_string(),
)));
}
Ok(name_strategy)
}
let mut stream_source_info = StreamSourceInfo {
format: format_to_prost(&format_encode.format) as i32,
row_encode: row_encode_to_prost(&format_encode.row_encode) as i32,
format_encode_options,
format_encode_secret_refs,
..Default::default()
};
if format_encode.format == Format::Debezium {
try_consume_string_from_options(&mut format_encode_options_to_consume, DEBEZIUM_IGNORE_KEY);
}
let columns = match (&format_encode.format, &format_encode.row_encode) {
(Format::Native, Encode::Native)
| (Format::Plain, Encode::Bytes)
| (Format::DebeziumMongo, Encode::Json) => None,
(Format::Plain, Encode::Protobuf) | (Format::Upsert, Encode::Protobuf) => {
let (row_schema_location, use_schema_registry) =
get_schema_location(&mut format_encode_options_to_consume)?;
let protobuf_schema = ProtobufSchema {
message_name: consume_string_from_options(
&mut format_encode_options_to_consume,
MESSAGE_NAME_KEY,
)?,
row_schema_location,
use_schema_registry,
};
let name_strategy = get_sr_name_strategy_check(
&mut format_encode_options_to_consume,
protobuf_schema.use_schema_registry,
)?;
stream_source_info.use_schema_registry = protobuf_schema.use_schema_registry;
stream_source_info
.row_schema_location
.clone_from(&protobuf_schema.row_schema_location.0);
stream_source_info
.proto_message_name
.clone_from(&protobuf_schema.message_name.0);
stream_source_info.key_message_name =
get_key_message_name(&mut format_encode_options_to_consume);
stream_source_info.name_strategy =
name_strategy.unwrap_or(PbSchemaRegistryNameStrategy::Unspecified as i32);
Some(
extract_protobuf_table_schema(
&protobuf_schema,
&options_with_secret,
&mut format_encode_options_to_consume,
)
.await?,
)
}
(format @ (Format::Plain | Format::Upsert | Format::Debezium), Encode::Avro) => {
if format_encode_options_to_consume
.remove(AWS_GLUE_SCHEMA_ARN_KEY)
.is_none()
{
let (row_schema_location, use_schema_registry) =
get_schema_location(&mut format_encode_options_to_consume)?;
if matches!(format, Format::Debezium) && !use_schema_registry {
return Err(RwError::from(ProtocolError(
"schema location for DEBEZIUM_AVRO row format is not supported".to_string(),
)));
}
let message_name = try_consume_string_from_options(
&mut format_encode_options_to_consume,
MESSAGE_NAME_KEY,
);
let name_strategy = get_sr_name_strategy_check(
&mut format_encode_options_to_consume,
use_schema_registry,
)?;
stream_source_info.use_schema_registry = use_schema_registry;
stream_source_info
.row_schema_location
.clone_from(&row_schema_location.0);
stream_source_info.proto_message_name =
message_name.unwrap_or(AstString("".into())).0;
stream_source_info.key_message_name =
get_key_message_name(&mut format_encode_options_to_consume);
stream_source_info.name_strategy =
name_strategy.unwrap_or(PbSchemaRegistryNameStrategy::Unspecified as i32);
}
Some(
extract_avro_table_schema(
&stream_source_info,
&options_with_secret,
&mut format_encode_options_to_consume,
matches!(format, Format::Debezium),
)
.await?,
)
}
(Format::Plain, Encode::Csv) => {
let chars =
consume_string_from_options(&mut format_encode_options_to_consume, "delimiter")?.0;
let delimiter = get_delimiter(chars.as_str()).context("failed to parse delimiter")?;
let has_header = try_consume_string_from_options(
&mut format_encode_options_to_consume,
"without_header",
)
.map(|s| s.0 == "false")
.unwrap_or(true);
if is_kafka && has_header {
return Err(RwError::from(ProtocolError(
"CSV HEADER is not supported when creating table with Kafka connector"
.to_owned(),
)));
}
stream_source_info.csv_delimiter = delimiter as i32;
stream_source_info.csv_has_header = has_header;
None
}
(Format::Plain, Encode::Parquet) => None,
(
Format::Plain | Format::Upsert | Format::Maxwell | Format::Canal | Format::Debezium,
Encode::Json,
) => {
if matches!(
format_encode.format,
Format::Plain | Format::Upsert | Format::Debezium
) {
TimestamptzHandling::from_options(&format_encode_options_to_consume)
.map_err(|err| InvalidInputSyntax(err.message))?;
try_consume_string_from_options(
&mut format_encode_options_to_consume,
TimestamptzHandling::OPTION_KEY,
);
}
let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?;
stream_source_info.use_schema_registry =
json_schema_infer_use_schema_registry(&schema_config);
extract_json_table_schema(
&schema_config,
&options_with_secret,
&mut format_encode_options_to_consume,
)
.await?
}
(Format::None, Encode::None) => {
if options_with_secret.is_iceberg_connector() {
Some(
extract_iceberg_columns(&options_with_secret)
.await
.map_err(|err| ProtocolError(err.to_report_string()))?,
)
} else {
None
}
}
(format, encoding) => {
return Err(RwError::from(ProtocolError(format!(
"Unknown combination {:?} {:?}",
format, encoding
))));
}
};
if !format_encode_options_to_consume.is_empty() {
let err_string = format!(
"Get unknown format_encode_options for {:?} {:?}: {}",
format_encode.format,
format_encode.row_encode,
format_encode_options_to_consume
.keys()
.map(|k| k.to_string())
.collect::<Vec<String>>()
.join(","),
);
session.notice_to_user(err_string);
}
Ok((columns, stream_source_info))
}
fn bind_columns_from_source_for_cdc(
session: &SessionImpl,
format_encode: &FormatEncodeOptions,
) -> Result<(Option<Vec<ColumnCatalog>>, StreamSourceInfo)> {
let (format_encode_options, format_encode_secret_refs) = resolve_secret_ref_in_with_options(
WithOptions::try_from(format_encode.row_options())?,
session,
)?
.into_parts();
let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets(
format_encode_options.clone(),
format_encode_secret_refs.clone(),
)?;
match (&format_encode.format, &format_encode.row_encode) {
(Format::Plain, Encode::Json) => (),
(format, encoding) => {
return Err(RwError::from(ProtocolError(format!(
"Row format for CDC connectors should be either omitted or set to `FORMAT PLAIN ENCODE JSON`, got: {:?} {:?}",
format, encoding
))));
}
};
let columns = debezium_cdc_source_schema();
let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?;
let stream_source_info = StreamSourceInfo {
format: format_to_prost(&format_encode.format) as i32,
row_encode: row_encode_to_prost(&format_encode.row_encode) as i32,
format_encode_options,
use_schema_registry: json_schema_infer_use_schema_registry(&schema_config),
cdc_source_job: true,
is_distributed: false,
format_encode_secret_refs,
..Default::default()
};
if !format_encode_options_to_consume.is_empty() {
let err_string = format!(
"Get unknown format_encode_options for {:?} {:?}: {}",
format_encode.format,
format_encode.row_encode,
format_encode_options_to_consume
.keys()
.map(|k| k.to_string())
.collect::<Vec<String>>()
.join(","),
);
session.notice_to_user(err_string);
}
Ok((Some(columns), stream_source_info))
}
fn check_additional_column_compatibility(
column_def: &IncludeOptionItem,
format_encode: Option<&FormatEncodeOptions>,
) -> Result<()> {
if column_def.inner_field.is_some()
&& !column_def
.column_type
.real_value()
.eq_ignore_ascii_case("header")
{
return Err(RwError::from(ProtocolError(format!(
"Only header column can have inner field, but got {:?}",
column_def.column_type.real_value(),
))));
}
if let Some(schema) = format_encode
&& column_def
.column_type
.real_value()
.eq_ignore_ascii_case("payload")
&& !matches!(schema.row_encode, Encode::Json)
{
return Err(RwError::from(ProtocolError(format!(
"INCLUDE payload is only allowed when using ENCODE JSON, but got ENCODE {:?}",
schema.row_encode
))));
}
Ok(())
}
pub fn handle_addition_columns(
format_encode: Option<&FormatEncodeOptions>,
with_properties: &BTreeMap<String, String>,
mut additional_columns: IncludeOption,
columns: &mut Vec<ColumnCatalog>,
is_cdc_backfill_table: bool,
) -> Result<()> {
let connector_name = with_properties.get_connector().unwrap(); if get_supported_additional_columns(connector_name.as_str(), is_cdc_backfill_table).is_none()
&& !additional_columns.is_empty()
{
return Err(RwError::from(ProtocolError(format!(
"Connector {} accepts no additional column but got {:?}",
connector_name, additional_columns
))));
}
while let Some(item) = additional_columns.pop() {
check_additional_column_compatibility(&item, format_encode)?;
let data_type = item
.header_inner_expect_type
.map(|dt| bind_data_type(&dt))
.transpose()?;
if let Some(dt) = &data_type
&& !matches!(dt, DataType::Bytea | DataType::Varchar)
{
return Err(
ErrorCode::BindError(format!("invalid additional column data type: {dt}")).into(),
);
}
let col = build_additional_column_desc(
ColumnId::placeholder(),
connector_name.as_str(),
item.column_type.real_value().as_str(),
item.column_alias.map(|alias| alias.real_value()),
item.inner_field.as_deref(),
data_type.as_ref(),
true,
is_cdc_backfill_table,
)?;
columns.push(ColumnCatalog::visible(col));
}
Ok(())
}
pub(crate) fn bind_all_columns(
format_encode: &FormatEncodeOptions,
cols_from_source: Option<Vec<ColumnCatalog>>,
cols_from_sql: Vec<ColumnCatalog>,
col_defs_from_sql: &[ColumnDef],
wildcard_idx: Option<usize>,
) -> Result<Vec<ColumnCatalog>> {
if let Some(cols_from_source) = cols_from_source {
if cols_from_sql.is_empty() {
Ok(cols_from_source)
} else if let Some(wildcard_idx) = wildcard_idx {
if col_defs_from_sql.iter().any(|c| !c.is_generated()) {
Err(RwError::from(NotSupported(
"Only generated columns are allowed in user-defined schema from SQL"
.to_string(),
"Remove the non-generated columns".to_string(),
)))
} else {
let mut cols_from_sql = cols_from_sql;
let mut cols_from_source = cols_from_source;
let mut cols_from_sql_r = cols_from_sql.split_off(wildcard_idx);
cols_from_sql.append(&mut cols_from_source);
cols_from_sql.append(&mut cols_from_sql_r);
Ok(cols_from_sql)
}
} else {
Err(RwError::from(ProtocolError(
format!("User-defined schema from SQL is not allowed with FORMAT {} ENCODE {}. \
Please refer to https://www.risingwave.dev/docs/current/sql-create-source/ for more information.", format_encode.format, format_encode.row_encode))))
}
} else {
if wildcard_idx.is_some() {
return Err(RwError::from(NotSupported(
"Wildcard in user-defined schema is only allowed when there exists columns from external schema".to_string(),
"Remove the wildcard or use a source with external schema".to_string(),
)));
}
let non_generated_sql_defined_columns = non_generated_sql_columns(col_defs_from_sql);
match (&format_encode.format, &format_encode.row_encode) {
(Format::DebeziumMongo, Encode::Json) => {
let mut columns = vec![
ColumnCatalog {
column_desc: ColumnDesc::named("_id", 0.into(), DataType::Varchar),
is_hidden: false,
},
ColumnCatalog {
column_desc: ColumnDesc::named("payload", 0.into(), DataType::Jsonb),
is_hidden: false,
},
];
if non_generated_sql_defined_columns.len() != 2
|| non_generated_sql_defined_columns[0].name.real_value() != columns[0].name()
|| non_generated_sql_defined_columns[1].name.real_value() != columns[1].name()
{
return Err(RwError::from(ProtocolError(
"the not generated columns of the source with row format DebeziumMongoJson
must be (_id [Jsonb | Varchar | Int32 | Int64], payload jsonb)."
.to_string(),
)));
}
let key_data_type = bind_data_type(
non_generated_sql_defined_columns[0]
.data_type
.as_ref()
.unwrap(),
)?;
match key_data_type {
DataType::Jsonb | DataType::Varchar | DataType::Int32 | DataType::Int64 => {
columns[0].column_desc.data_type = key_data_type.clone();
}
_ => {
return Err(RwError::from(ProtocolError(
"the `_id` column of the source with row format DebeziumMongoJson
must be [Jsonb | Varchar | Int32 | Int64]"
.to_string(),
)));
}
}
let value_data_type = bind_data_type(
non_generated_sql_defined_columns[1]
.data_type
.as_ref()
.unwrap(),
)?;
if !matches!(value_data_type, DataType::Jsonb) {
return Err(RwError::from(ProtocolError(
"the `payload` column of the source with row format DebeziumMongoJson
must be Jsonb datatype"
.to_string(),
)));
}
Ok(columns)
}
(Format::Plain, Encode::Bytes) => {
let err = Err(RwError::from(ProtocolError(
"ENCODE BYTES only accepts one BYTEA type column".to_string(),
)));
if non_generated_sql_defined_columns.len() == 1 {
let col_data_type = bind_data_type(
non_generated_sql_defined_columns[0]
.data_type
.as_ref()
.unwrap(),
)?;
if col_data_type == DataType::Bytea {
Ok(cols_from_sql)
} else {
err
}
} else {
err
}
}
(_, _) => Ok(cols_from_sql),
}
}
}
fn hint_upsert(encode: &Encode) -> String {
format!(
r#"Hint: For FORMAT UPSERT ENCODE {encode:}, INCLUDE KEY must be specified and the key column must be used as primary key.
example:
CREATE TABLE <table_name> ( PRIMARY KEY ([rw_key | <key_name>]) )
INCLUDE KEY [AS <key_name>]
WITH (...)
FORMAT UPSERT ENCODE {encode:} (...)
"#
)
}
pub(crate) async fn bind_source_pk(
format_encode: &FormatEncodeOptions,
source_info: &StreamSourceInfo,
columns: &mut [ColumnCatalog],
sql_defined_pk_names: Vec<String>,
with_properties: &WithOptionsSecResolved,
) -> Result<Vec<String>> {
let sql_defined_pk = !sql_defined_pk_names.is_empty();
let include_key_column_name: Option<String> = {
columns.iter().find_map(|catalog| {
if matches!(
catalog.column_desc.additional_column.column_type,
Some(AdditionalColumnType::Key(_))
) {
Some(catalog.name().to_string())
} else {
None
}
})
};
let additional_column_names = columns
.iter()
.filter_map(|col| {
if col.column_desc.additional_column.column_type.is_some() {
Some(col.name().to_string())
} else {
None
}
})
.collect_vec();
let res = match (&format_encode.format, &format_encode.row_encode) {
(Format::Native, Encode::Native) | (Format::None, Encode::None) | (Format::Plain, _) => {
sql_defined_pk_names
}
(
Format::Upsert,
encode @ Encode::Json | encode @ Encode::Avro | encode @ Encode::Protobuf,
) => {
if let Some(ref key_column_name) = include_key_column_name
&& sql_defined_pk
{
if sql_defined_pk_names.len() != 1
|| !key_column_name.eq(sql_defined_pk_names[0].as_str())
{
return Err(RwError::from(ProtocolError(format!(
"Only \"{}\" can be used as primary key\n\n{}",
key_column_name,
hint_upsert(encode)
))));
}
sql_defined_pk_names
} else {
return if let Some(include_key_column_name) = include_key_column_name {
Err(RwError::from(ProtocolError(format!(
"Primary key must be specified to {}\n\n{}",
include_key_column_name,
hint_upsert(encode)
))))
} else {
Err(RwError::from(ProtocolError(format!(
"INCLUDE KEY clause not set\n\n{}",
hint_upsert(encode)
))))
};
}
}
(Format::Debezium, Encode::Json) => {
if !additional_column_names.is_empty() {
return Err(RwError::from(ProtocolError(format!(
"FORMAT DEBEZIUM forbids additional columns, but got {:?}",
additional_column_names
))));
}
if !sql_defined_pk {
return Err(RwError::from(ProtocolError(
"Primary key must be specified when creating source with FORMAT DEBEZIUM."
.to_string(),
)));
}
sql_defined_pk_names
}
(Format::Debezium, Encode::Avro) => {
if !additional_column_names.is_empty() {
return Err(RwError::from(ProtocolError(format!(
"FORMAT DEBEZIUM forbids additional columns, but got {:?}",
additional_column_names
))));
}
if sql_defined_pk {
sql_defined_pk_names
} else {
let pk_names =
extract_debezium_avro_table_pk_columns(source_info, with_properties).await?;
for pk_name in &pk_names {
columns
.iter()
.find(|c: &&ColumnCatalog| c.name().eq(pk_name))
.ok_or_else(|| {
RwError::from(ProtocolError(format!(
"avro's key column {} not exists in avro's row schema",
pk_name
)))
})?;
}
pk_names
}
}
(Format::DebeziumMongo, Encode::Json) => {
if sql_defined_pk {
sql_defined_pk_names
} else {
vec!["_id".to_string()]
}
}
(Format::Maxwell, Encode::Json) => {
if !additional_column_names.is_empty() {
return Err(RwError::from(ProtocolError(format!(
"FORMAT MAXWELL forbids additional columns, but got {:?}",
additional_column_names
))));
}
if !sql_defined_pk {
return Err(RwError::from(ProtocolError(
"Primary key must be specified when creating source with FORMAT MAXWELL ENCODE JSON."
.to_string(),
)));
}
sql_defined_pk_names
}
(Format::Canal, Encode::Json) => {
if !additional_column_names.is_empty() {
return Err(RwError::from(ProtocolError(format!(
"FORMAT CANAL forbids additional columns, but got {:?}",
additional_column_names
))));
}
if !sql_defined_pk {
return Err(RwError::from(ProtocolError(
"Primary key must be specified when creating source with FORMAT CANAL ENCODE JSON."
.to_string(),
)));
}
sql_defined_pk_names
}
(format, encoding) => {
return Err(RwError::from(ProtocolError(format!(
"Unknown combination {:?} {:?}",
format, encoding
))));
}
};
Ok(res)
}
fn check_and_add_timestamp_column(with_properties: &WithOptions, columns: &mut Vec<ColumnCatalog>) {
if with_properties.is_kafka_connector() {
if columns.iter().any(|col| {
matches!(
col.column_desc.additional_column.column_type,
Some(AdditionalColumnType::Timestamp(_))
)
}) {
return;
}
let col = build_additional_column_desc(
ColumnId::placeholder(),
KAFKA_CONNECTOR,
"timestamp",
Some(KAFKA_TIMESTAMP_COLUMN_NAME.to_string()),
None,
None,
true,
false,
)
.unwrap();
columns.push(ColumnCatalog::hidden(col));
}
}
pub(super) fn bind_source_watermark(
session: &SessionImpl,
name: String,
source_watermarks: Vec<SourceWatermark>,
column_catalogs: &[ColumnCatalog],
) -> Result<Vec<WatermarkDesc>> {
let mut binder = Binder::new_for_ddl(session);
binder.bind_columns_to_context(name.clone(), column_catalogs)?;
let watermark_descs = source_watermarks
.into_iter()
.map(|source_watermark| {
let col_name = source_watermark.column.real_value();
let watermark_idx = binder.get_column_binding_index(name.clone(), &col_name)?;
let expr = binder.bind_expr(source_watermark.expr)?;
let watermark_col_type = column_catalogs[watermark_idx].data_type();
let watermark_expr_type = &expr.return_type();
if watermark_col_type != watermark_expr_type {
Err(RwError::from(ErrorCode::BindError(
format!("The return value type of the watermark expression must be identical to the watermark column data type. Current data type of watermark return value: `{}`, column `{}`",watermark_expr_type, watermark_col_type),
)))
} else {
let expr_proto = expr.to_expr_proto();
Ok::<_, RwError>(WatermarkDesc {
watermark_idx: watermark_idx as u32,
expr: Some(expr_proto),
})
}
})
.try_collect()?;
Ok(watermark_descs)
}
static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
LazyLock::new(|| {
convert_args!(hashmap!(
KAFKA_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes, Encode::Csv],
Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
Format::Debezium => vec![Encode::Json, Encode::Avro],
Format::Maxwell => vec![Encode::Json],
Format::Canal => vec![Encode::Json],
Format::DebeziumMongo => vec![Encode::Json],
),
PULSAR_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes],
Format::Upsert => vec![Encode::Json, Encode::Avro],
Format::Debezium => vec![Encode::Json],
Format::Maxwell => vec![Encode::Json],
Format::Canal => vec![Encode::Json],
),
KINESIS_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes],
Format::Upsert => vec![Encode::Json, Encode::Avro],
Format::Debezium => vec![Encode::Json],
Format::Maxwell => vec![Encode::Json],
Format::Canal => vec![Encode::Json],
),
GOOGLE_PUBSUB_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes],
Format::Debezium => vec![Encode::Json],
Format::Maxwell => vec![Encode::Json],
Format::Canal => vec![Encode::Json],
),
NEXMARK_CONNECTOR => hashmap!(
Format::Native => vec![Encode::Native],
Format::Plain => vec![Encode::Bytes],
),
DATAGEN_CONNECTOR => hashmap!(
Format::Native => vec![Encode::Native],
Format::Plain => vec![Encode::Bytes, Encode::Json],
),
S3_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Csv, Encode::Json],
),
OPENDAL_S3_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
),
GCS_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
),
AZBLOB_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
),
POSIX_FS_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Csv],
),
MYSQL_CDC_CONNECTOR => hashmap!(
Format::Debezium => vec![Encode::Json],
Format::Plain => vec![Encode::Json],
),
POSTGRES_CDC_CONNECTOR => hashmap!(
Format::Debezium => vec![Encode::Json],
Format::Plain => vec![Encode::Json],
),
CITUS_CDC_CONNECTOR => hashmap!(
Format::Debezium => vec![Encode::Json],
),
MONGODB_CDC_CONNECTOR => hashmap!(
Format::DebeziumMongo => vec![Encode::Json],
),
NATS_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Bytes],
),
MQTT_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Json, Encode::Bytes],
),
TEST_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Json],
),
ICEBERG_CONNECTOR => hashmap!(
Format::None => vec![Encode::None],
),
SQL_SERVER_CDC_CONNECTOR => hashmap!(
Format::Debezium => vec![Encode::Json],
Format::Plain => vec![Encode::Json],
),
))
});
pub fn validate_license(connector: &str) -> Result<()> {
if connector == SQL_SERVER_CDC_CONNECTOR {
Feature::SqlServerCdcSource
.check_available()
.map_err(|e| anyhow::anyhow!(e))?;
}
Ok(())
}
pub fn validate_compatibility(
format_encode: &FormatEncodeOptions,
props: &mut BTreeMap<String, String>,
) -> Result<()> {
let mut connector = props
.get_connector()
.ok_or_else(|| RwError::from(ProtocolError("missing field 'connector'".to_string())))?;
if connector == OPENDAL_S3_CONNECTOR {
return Err(RwError::from(Deprecated(
OPENDAL_S3_CONNECTOR.to_string(),
S3_CONNECTOR.to_string(),
)));
}
if connector == S3_CONNECTOR {
let entry = props.get_mut(UPSTREAM_SOURCE_KEY).unwrap();
*entry = OPENDAL_S3_CONNECTOR.to_string();
connector = OPENDAL_S3_CONNECTOR.to_string();
}
let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
.get(&connector)
.ok_or_else(|| {
RwError::from(ProtocolError(format!(
"connector {:?} is not supported, accept {:?}",
connector,
CONNECTORS_COMPATIBLE_FORMATS.keys()
)))
})?;
validate_license(&connector)?;
if connector != KAFKA_CONNECTOR {
let res = match (&format_encode.format, &format_encode.row_encode) {
(Format::Plain, Encode::Protobuf) | (Format::Plain, Encode::Avro) => {
let mut options = WithOptions::try_from(format_encode.row_options())?;
let (_, use_schema_registry) = get_schema_location(options.inner_mut())?;
use_schema_registry
}
(Format::Debezium, Encode::Avro) => true,
(_, _) => false,
};
if res {
return Err(RwError::from(ProtocolError(format!(
"The {} must be kafka when schema registry is used",
UPSTREAM_SOURCE_KEY
))));
}
}
let compatible_encodes = compatible_formats
.get(&format_encode.format)
.ok_or_else(|| {
RwError::from(ProtocolError(format!(
"connector {} does not support format {:?}",
connector, format_encode.format
)))
})?;
if !compatible_encodes.contains(&format_encode.row_encode) {
return Err(RwError::from(ProtocolError(format!(
"connector {} does not support format {:?} with encode {:?}",
connector, format_encode.format, format_encode.row_encode
))));
}
if connector == POSTGRES_CDC_CONNECTOR || connector == CITUS_CDC_CONNECTOR {
match props.get("slot.name") {
None => {
let uuid = uuid::Uuid::new_v4();
props.insert("slot.name".into(), format!("rw_cdc_{}", uuid.simple()));
}
Some(slot_name) => {
if !slot_name
.chars()
.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')
|| slot_name.len() > 63
{
return Err(RwError::from(ProtocolError(format!(
"Invalid replication slot name: {:?}. Valid replication slot name must contain only digits, lowercase characters and underscores with length <= 63",
slot_name
))));
}
}
}
if !props.contains_key("schema.name") {
props.insert("schema.name".into(), "public".into());
}
if !props.contains_key("publication.name") {
props.insert("publication.name".into(), "rw_publication".into());
}
if !props.contains_key("publication.create.enable") {
props.insert("publication.create.enable".into(), "true".into());
}
}
if connector == SQL_SERVER_CDC_CONNECTOR && !props.contains_key("schema.name") {
props.insert("schema.name".into(), "dbo".into());
}
Ok(())
}
pub(super) async fn check_format_encode(
props: &WithOptionsSecResolved,
row_id_index: Option<usize>,
columns: &[ColumnCatalog],
) -> Result<()> {
let Some(connector) = props.get_connector() else {
return Ok(());
};
if connector == NEXMARK_CONNECTOR {
check_nexmark_schema(props, row_id_index, columns)
} else if connector == ICEBERG_CONNECTOR {
Ok(check_iceberg_source(props, columns)
.await
.map_err(|err| ProtocolError(err.to_report_string()))?)
} else {
Ok(())
}
}
pub(super) fn check_nexmark_schema(
props: &WithOptionsSecResolved,
row_id_index: Option<usize>,
columns: &[ColumnCatalog],
) -> Result<()> {
let table_type = props
.get("nexmark.table.type")
.map(|t| t.to_ascii_lowercase());
let event_type = match table_type.as_deref() {
None => None,
Some("bid") => Some(EventType::Bid),
Some("auction") => Some(EventType::Auction),
Some("person") => Some(EventType::Person),
Some(t) => {
return Err(RwError::from(ProtocolError(format!(
"unsupported table type for nexmark source: {}",
t
))))
}
};
let user_defined_columns = columns.iter().filter(|c| !c.is_generated());
let row_id_index = if let Some(index) = row_id_index {
let col_id = columns[index].column_id();
user_defined_columns
.clone()
.position(|c| c.column_id() == col_id)
.unwrap()
.into()
} else {
None
};
let expected = get_event_data_types_with_names(event_type, row_id_index);
let user_defined = user_defined_columns
.map(|c| {
(
c.column_desc.name.to_ascii_lowercase(),
c.column_desc.data_type.to_owned(),
)
})
.collect_vec();
if expected != user_defined {
let cmp = pretty_assertions::Comparison::new(&expected, &user_defined);
return Err(RwError::from(ProtocolError(format!(
"The schema of the nexmark source must specify all columns in order:\n{cmp}",
))));
}
Ok(())
}
pub async fn extract_iceberg_columns(
with_properties: &WithOptionsSecResolved,
) -> anyhow::Result<Vec<ColumnCatalog>> {
let props = ConnectorProperties::extract(with_properties.clone(), true)?;
if let ConnectorProperties::Iceberg(properties) = props {
let table = properties.load_table_v2().await?;
let iceberg_schema: arrow_schema_iceberg::Schema =
iceberg::arrow::schema_to_arrow_schema(table.metadata().current_schema())?;
let columns = iceberg_schema
.fields()
.iter()
.enumerate()
.map(|(i, field)| {
let column_desc = ColumnDesc::named(
field.name(),
ColumnId::new((i + 1).try_into().unwrap()),
IcebergArrowConvert.type_from_field(field).unwrap(),
);
ColumnCatalog {
column_desc,
is_hidden: false,
}
})
.collect();
Ok(columns)
} else {
Err(anyhow!(format!(
"Invalid properties for iceberg source: {:?}",
props
)))
}
}
pub async fn check_iceberg_source(
props: &WithOptionsSecResolved,
columns: &[ColumnCatalog],
) -> anyhow::Result<()> {
let props = ConnectorProperties::extract(props.clone(), true)?;
let ConnectorProperties::Iceberg(properties) = props else {
return Err(anyhow!(format!(
"Invalid properties for iceberg source: {:?}",
props
)));
};
let schema = Schema {
fields: columns
.iter()
.cloned()
.map(|c| c.column_desc.into())
.collect(),
};
let table = properties.load_table_v2().await?;
let iceberg_schema = iceberg::arrow::schema_to_arrow_schema(table.metadata().current_schema())?;
for f1 in schema.fields() {
if !iceberg_schema.fields.iter().any(|f2| f2.name() == &f1.name) {
return Err(anyhow::anyhow!(format!(
"Column {} not found in iceberg table",
f1.name
)));
}
}
let new_iceberg_field = iceberg_schema
.fields
.iter()
.filter(|f1| schema.fields.iter().any(|f2| f1.name() == &f2.name))
.cloned()
.collect::<Vec<_>>();
let new_iceberg_schema = arrow_schema_iceberg::Schema::new(new_iceberg_field);
risingwave_connector::sink::iceberg::try_matches_arrow_schema(&schema, &new_iceberg_schema)?;
Ok(())
}
pub fn bind_connector_props(
handler_args: &HandlerArgs,
format_encode: &FormatEncodeOptions,
is_create_source: bool,
) -> Result<WithOptions> {
let mut with_properties = handler_args.with_options.clone().into_connector_props();
validate_compatibility(format_encode, &mut with_properties)?;
let create_cdc_source_job = with_properties.is_shareable_cdc_connector();
if !is_create_source && with_properties.is_shareable_only_cdc_connector() {
return Err(RwError::from(ProtocolError(format!(
"connector {} does not support `CREATE TABLE`, please use `CREATE SOURCE` instead",
with_properties.get_connector().unwrap(),
))));
}
if is_create_source && create_cdc_source_job {
if let Some(value) = with_properties.get(CDC_AUTO_SCHEMA_CHANGE_KEY)
&& value
.parse::<bool>()
.map_err(|_| anyhow!("invalid value of '{}' option", CDC_AUTO_SCHEMA_CHANGE_KEY))?
{
Feature::CdcAutoSchemaChange
.check_available()
.map_err(|e| anyhow::anyhow!(e))?;
}
with_properties.insert(CDC_SNAPSHOT_MODE_KEY.into(), CDC_SNAPSHOT_BACKFILL.into());
with_properties.insert(CDC_SHARING_MODE_KEY.into(), "true".into());
if with_properties.enable_transaction_metadata() {
with_properties.insert(CDC_TRANSACTIONAL_KEY.into(), "true".into());
}
with_properties.insert(
CDC_WAIT_FOR_STREAMING_START_TIMEOUT.into(),
handler_args
.session
.config()
.cdc_source_wait_streaming_start_timeout()
.to_string(),
);
}
if with_properties.is_mysql_cdc_connector() {
with_properties
.entry("server.id".to_string())
.or_insert(rand::thread_rng().gen_range(1..u32::MAX).to_string());
}
Ok(with_properties)
}
#[allow(clippy::too_many_arguments)]
pub async fn bind_create_source_or_table_with_connector(
handler_args: HandlerArgs,
full_name: ObjectName,
format_encode: FormatEncodeOptions,
with_properties: WithOptions,
sql_columns_defs: &[ColumnDef],
constraints: Vec<TableConstraint>,
wildcard_idx: Option<usize>,
source_watermarks: Vec<SourceWatermark>,
columns_from_resolve_source: Option<Vec<ColumnCatalog>>,
source_info: StreamSourceInfo,
include_column_options: IncludeOption,
col_id_gen: &mut ColumnIdGenerator,
is_create_source: bool,
source_rate_limit: Option<u32>,
) -> Result<(SourceCatalog, DatabaseId, SchemaId)> {
let session = &handler_args.session;
let db_name: &str = session.database();
let (schema_name, source_name) = Binder::resolve_schema_qualified_name(db_name, full_name)?;
let (database_id, schema_id) =
session.get_database_and_schema_id_for_create(schema_name.clone())?;
if !is_create_source && with_properties.is_iceberg_connector() {
return Err(ErrorCode::BindError(
"can't CREATE TABLE with iceberg connector\n\nHint: use CREATE SOURCE instead"
.to_string(),
)
.into());
}
if is_create_source {
match format_encode.format {
Format::Upsert => {
return Err(ErrorCode::BindError(format!(
"can't CREATE SOURCE with FORMAT UPSERT\n\nHint: use CREATE TABLE instead\n\n{}",
hint_upsert(&format_encode.row_encode)
))
.into());
}
_ => {
}
}
}
let sql_pk_names = bind_sql_pk_names(sql_columns_defs, bind_table_constraints(&constraints)?)?;
let columns_from_sql = bind_sql_columns(sql_columns_defs)?;
let mut columns = bind_all_columns(
&format_encode,
columns_from_resolve_source,
columns_from_sql,
sql_columns_defs,
wildcard_idx,
)?;
handle_addition_columns(
Some(&format_encode),
&with_properties,
include_column_options,
&mut columns,
false,
)?;
if columns.is_empty() {
return Err(RwError::from(ProtocolError(
"Schema definition is required, either from SQL or schema registry.".to_string(),
)));
}
if is_create_source {
check_and_add_timestamp_column(&with_properties, &mut columns);
}
let mut with_properties = with_properties;
resolve_privatelink_in_with_option(&mut with_properties)?;
let with_properties = resolve_secret_ref_in_with_options(with_properties, session)?;
let pk_names = bind_source_pk(
&format_encode,
&source_info,
&mut columns,
sql_pk_names,
&with_properties,
)
.await?;
if is_create_source && !pk_names.is_empty() {
return Err(ErrorCode::InvalidInputSyntax(
"Source does not support PRIMARY KEY constraint, please use \"CREATE TABLE\" instead"
.to_owned(),
)
.into());
}
for c in &mut columns {
c.column_desc.column_id = col_id_gen.generate(c.name())
}
debug_assert_column_ids_distinct(&columns);
let must_need_pk = if is_create_source {
with_properties.connector_need_pk()
} else {
assert!(with_properties.connector_need_pk());
true
};
let (mut columns, pk_col_ids, row_id_index) =
bind_pk_and_row_id_on_relation(columns, pk_names, must_need_pk)?;
let watermark_descs =
bind_source_watermark(session, source_name.clone(), source_watermarks, &columns)?;
assert!(watermark_descs.len() <= 1);
bind_sql_column_constraints(
session,
source_name.clone(),
&mut columns,
sql_columns_defs.to_vec(),
&pk_col_ids,
)?;
check_format_encode(&with_properties, row_id_index, &columns).await?;
let definition = handler_args.normalized_sql.clone();
let associated_table_id = if is_create_source {
None
} else {
Some(TableId::placeholder())
};
let source = SourceCatalog {
id: TableId::placeholder().table_id,
name: source_name,
columns,
pk_col_ids,
append_only: row_id_index.is_some(),
owner: session.user_id(),
info: source_info,
row_id_index,
with_properties,
watermark_descs,
associated_table_id,
definition,
connection_id: None, created_at_epoch: None,
initialized_at_epoch: None,
version: INITIAL_SOURCE_VERSION_ID,
created_at_cluster_version: None,
initialized_at_cluster_version: None,
rate_limit: source_rate_limit,
};
Ok((source, database_id, schema_id))
}
pub async fn handle_create_source(
mut handler_args: HandlerArgs,
stmt: CreateSourceStatement,
) -> Result<RwPgResponse> {
let session = handler_args.session.clone();
let overwrite_options = OverwriteOptions::new(&mut handler_args);
if let Either::Right(resp) = session.check_relation_name_duplicated(
stmt.source_name.clone(),
StatementType::CREATE_SOURCE,
stmt.if_not_exists,
)? {
return Ok(resp);
}
if handler_args.with_options.is_empty() {
return Err(RwError::from(InvalidInputSyntax(
"missing WITH clause".to_string(),
)));
}
let format_encode = stmt.format_encode.into_v2_with_warning();
let with_properties = bind_connector_props(&handler_args, &format_encode, true)?;
let create_cdc_source_job = with_properties.is_shareable_cdc_connector();
let is_shared = create_cdc_source_job
|| (with_properties.is_shareable_non_cdc_connector()
&& session
.env()
.streaming_config()
.developer
.enable_shared_source
&& session.config().streaming_use_shared_source());
let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job {
bind_columns_from_source_for_cdc(&session, &format_encode)?
} else {
bind_columns_from_source(&session, &format_encode, Either::Left(&with_properties)).await?
};
if is_shared {
source_info.cdc_source_job = true;
source_info.is_distributed = !create_cdc_source_job;
}
let mut col_id_gen = ColumnIdGenerator::new_initial();
let (source_catalog, database_id, schema_id) = bind_create_source_or_table_with_connector(
handler_args.clone(),
stmt.source_name,
format_encode,
with_properties,
&stmt.columns,
stmt.constraints,
stmt.wildcard_idx,
stmt.source_watermarks,
columns_from_resolve_source,
source_info,
stmt.include_column_options,
&mut col_id_gen,
true,
overwrite_options.source_rate_limit,
)
.await?;
if stmt.temporary {
if session.get_temporary_source(&source_catalog.name).is_some() {
return Err(CatalogError::Duplicated("source", source_catalog.name.clone()).into());
}
session.create_temporary_source(source_catalog);
return Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE));
}
let source = source_catalog.to_prost(schema_id, database_id);
let catalog_writer = session.catalog_writer()?;
if is_shared {
let graph = {
let context = OptimizerContext::from_handler_args(handler_args);
let source_node = LogicalSource::with_catalog(
Rc::new(source_catalog),
SourceNodeKind::CreateSharedSource,
context.into(),
None,
)?;
let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?;
build_graph(stream_plan)?
};
catalog_writer.create_source(source, Some(graph)).await?;
} else {
catalog_writer.create_source(source, None).await?;
}
Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE))
}
fn format_to_prost(format: &Format) -> FormatType {
match format {
Format::Native => FormatType::Native,
Format::Plain => FormatType::Plain,
Format::Upsert => FormatType::Upsert,
Format::Debezium => FormatType::Debezium,
Format::DebeziumMongo => FormatType::DebeziumMongo,
Format::Maxwell => FormatType::Maxwell,
Format::Canal => FormatType::Canal,
Format::None => FormatType::None,
}
}
fn row_encode_to_prost(row_encode: &Encode) -> EncodeType {
match row_encode {
Encode::Native => EncodeType::Native,
Encode::Json => EncodeType::Json,
Encode::Avro => EncodeType::Avro,
Encode::Protobuf => EncodeType::Protobuf,
Encode::Csv => EncodeType::Csv,
Encode::Bytes => EncodeType::Bytes,
Encode::Template => EncodeType::Template,
Encode::Parquet => EncodeType::Parquet,
Encode::None => EncodeType::None,
Encode::Text => EncodeType::Text,
}
}
#[cfg(test)]
pub mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use risingwave_common::catalog::{
CDC_SOURCE_COLUMN_NUM, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, OFFSET_COLUMN_NAME,
ROWID_PREFIX, TABLE_NAME_COLUMN_NAME,
};
use risingwave_common::types::DataType;
use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::source_catalog::SourceCatalog;
use crate::handler::create_source::debezium_cdc_source_schema;
use crate::test_utils::{create_proto_file, LocalFrontend, PROTO_FILE_DATA};
const GET_COLUMN_FROM_CATALOG: fn(&Arc<SourceCatalog>) -> HashMap<&str, DataType> =
|catalog: &Arc<SourceCatalog>| -> HashMap<&str, DataType> {
catalog
.columns
.iter()
.map(|col| (col.name(), col.data_type().clone()))
.collect::<HashMap<&str, DataType>>()
};
#[tokio::test]
async fn test_create_source_handler() {
let proto_file = create_proto_file(PROTO_FILE_DATA);
let sql = format!(
r#"CREATE SOURCE t
WITH (connector = 'kinesis')
FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
proto_file.path().to_str().unwrap()
);
let frontend = LocalFrontend::new(Default::default()).await;
frontend.run_sql(sql).await.unwrap();
let session = frontend.session_ref();
let catalog_reader = session.env().catalog_reader().read_guard();
let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
let (source, _) = catalog_reader
.get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
.unwrap();
assert_eq!(source.name, "t");
let columns = GET_COLUMN_FROM_CATALOG(source);
let city_type = DataType::new_struct(
vec![DataType::Varchar, DataType::Varchar],
vec!["address".to_string(), "zipcode".to_string()],
);
let expected_columns = maplit::hashmap! {
ROWID_PREFIX => DataType::Serial,
"id" => DataType::Int32,
"zipcode" => DataType::Int64,
"rate" => DataType::Float32,
"country" => DataType::new_struct(
vec![DataType::Varchar,city_type,DataType::Varchar],
vec!["address".to_string(), "city".to_string(), "zipcode".to_string()],
),
};
assert_eq!(columns, expected_columns);
}
#[tokio::test]
async fn test_duplicate_props_options() {
let proto_file = create_proto_file(PROTO_FILE_DATA);
let sql = format!(
r#"CREATE SOURCE t
WITH (
connector = 'kinesis',
aws.region='user_test_topic',
endpoint='172.10.1.1:9090,172.10.1.2:9090',
aws.credentials.access_key_id = 'your_access_key_1',
aws.credentials.secret_access_key = 'your_secret_key_1'
)
FORMAT PLAIN ENCODE PROTOBUF (
message = '.test.TestRecord',
aws.credentials.access_key_id = 'your_access_key_2',
aws.credentials.secret_access_key = 'your_secret_key_2',
schema.location = 'file://{}',
)"#,
proto_file.path().to_str().unwrap()
);
let frontend = LocalFrontend::new(Default::default()).await;
frontend.run_sql(sql).await.unwrap();
let session = frontend.session_ref();
let catalog_reader = session.env().catalog_reader().read_guard();
let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
let (source, _) = catalog_reader
.get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
.unwrap();
assert_eq!(source.name, "t");
assert_eq!(
source
.info
.format_encode_options
.get("aws.credentials.access_key_id")
.unwrap(),
"your_access_key_2"
);
assert_eq!(
source
.info
.format_encode_options
.get("aws.credentials.secret_access_key")
.unwrap(),
"your_secret_key_2"
);
assert_eq!(
source
.with_properties
.get("aws.credentials.access_key_id")
.unwrap(),
"your_access_key_1"
);
assert_eq!(
source
.with_properties
.get("aws.credentials.secret_access_key")
.unwrap(),
"your_secret_key_1"
);
assert!(!source.with_properties.contains_key("schema.location"));
}
#[tokio::test]
async fn test_multi_table_cdc_create_source_handler() {
let sql =
"CREATE SOURCE t2 WITH (connector = 'mysql-cdc') FORMAT PLAIN ENCODE JSON".to_string();
let frontend = LocalFrontend::new(Default::default()).await;
let session = frontend.session_ref();
frontend
.run_sql_with_session(session.clone(), sql)
.await
.unwrap();
let catalog_reader = session.env().catalog_reader().read_guard();
let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
let (source, _) = catalog_reader
.get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t2")
.unwrap();
assert_eq!(source.name, "t2");
let columns = source
.columns
.iter()
.map(|col| (col.name(), col.data_type().clone()))
.collect::<HashMap<&str, DataType>>();
let expected_columns = maplit::hashmap! {
ROWID_PREFIX => DataType::Serial,
"payload" => DataType::Jsonb,
OFFSET_COLUMN_NAME => DataType::Varchar,
TABLE_NAME_COLUMN_NAME => DataType::Varchar,
};
assert_eq!(columns, expected_columns);
}
#[tokio::test]
async fn test_cdc_source_job_schema() {
let columns = debezium_cdc_source_schema();
assert_eq!(CDC_SOURCE_COLUMN_NUM, columns.len() as u32);
}
#[tokio::test]
async fn test_source_addition_columns() {
let sql =
"CREATE SOURCE s (v1 int) include key as _rw_kafka_key with (connector = 'kafka') format plain encode json"
.to_string();
let frontend = LocalFrontend::new(Default::default()).await;
frontend.run_sql(sql).await.unwrap();
let session = frontend.session_ref();
let catalog_reader = session.env().catalog_reader().read_guard();
let (source, _) = catalog_reader
.get_source_by_name(
DEFAULT_DATABASE_NAME,
SchemaPath::Name(DEFAULT_SCHEMA_NAME),
"s",
)
.unwrap();
assert_eq!(source.name, "s");
let columns = GET_COLUMN_FROM_CATALOG(source);
let expect_columns = maplit::hashmap! {
ROWID_PREFIX => DataType::Serial,
"v1" => DataType::Int32,
"_rw_kafka_key" => DataType::Bytea,
"_rw_kafka_timestamp" => DataType::Timestamptz,
};
assert_eq!(columns, expect_columns);
let sql =
"CREATE SOURCE s3 (v1 int) include timestamp 'header1' as header_col with (connector = 'kafka') format plain encode json"
.to_string();
match frontend.run_sql(sql).await {
Err(e) => {
assert_eq!(
e.to_string(),
"Protocol error: Only header column can have inner field, but got \"timestamp\""
)
}
_ => unreachable!(),
}
}
}