risingwave_frontend/handler/create_source/external_schema/
adbc_snowflake.rs1use anyhow::anyhow;
16use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
17use risingwave_connector::source::ConnectorProperties;
18use risingwave_connector::source::adbc_snowflake::AdbcSnowflakeArrowConvert;
19
20use crate::WithOptionsSecResolved;
21
22pub async fn extract_adbc_snowflake_columns(
25 with_properties: &WithOptionsSecResolved,
26) -> anyhow::Result<Vec<ColumnCatalog>> {
27 let props = ConnectorProperties::extract(with_properties.clone(), true)?;
28 if let ConnectorProperties::AdbcSnowflake(properties) = props {
29 let arrow_schema = properties.get_arrow_schema()?;
31
32 let converter = AdbcSnowflakeArrowConvert;
34 let columns: Vec<ColumnCatalog> = arrow_schema
35 .fields()
36 .iter()
37 .enumerate()
38 .map(|(i, field)| {
39 let column_desc = ColumnDesc::named(
40 field.name(),
41 ColumnId::new((i + 1).try_into().unwrap()),
42 converter.type_from_field(field).unwrap(),
43 );
44 ColumnCatalog {
45 column_desc,
46 is_hidden: false,
47 }
48 })
49 .collect();
50
51 tracing::info!(
52 "ADBC Snowflake inferred {} columns from table {}",
53 columns.len(),
54 properties.table
55 );
56
57 Ok(columns)
58 } else {
59 Err(anyhow!(format!(
60 "Invalid properties for ADBC Snowflake source: {:?}",
61 props
62 )))
63 }
64}