risingwave_frontend/handler/create_source/external_schema/
adbc_snowflake.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
17use risingwave_connector::source::ConnectorProperties;
18use risingwave_connector::source::adbc_snowflake::AdbcSnowflakeArrowConvert;
19
20use crate::WithOptionsSecResolved;
21
22/// Extract column schema from ADBC Snowflake query result.
23/// This function infers the schema by executing the query and examining the result schema.
24pub async fn extract_adbc_snowflake_columns(
25    with_properties: &WithOptionsSecResolved,
26) -> anyhow::Result<Vec<ColumnCatalog>> {
27    let props = ConnectorProperties::extract(with_properties.clone(), true)?;
28    if let ConnectorProperties::AdbcSnowflake(properties) = props {
29        // Get the Arrow schema from Snowflake
30        let arrow_schema = properties.get_arrow_schema()?;
31
32        // Convert Arrow schema to RisingWave columns
33        let converter = AdbcSnowflakeArrowConvert;
34        let columns: Vec<ColumnCatalog> = arrow_schema
35            .fields()
36            .iter()
37            .enumerate()
38            .map(|(i, field)| {
39                let column_desc = ColumnDesc::named(
40                    field.name(),
41                    ColumnId::new((i + 1).try_into().unwrap()),
42                    converter.type_from_field(field).unwrap(),
43                );
44                ColumnCatalog {
45                    column_desc,
46                    is_hidden: false,
47                }
48            })
49            .collect();
50
51        tracing::info!(
52            "ADBC Snowflake inferred {} columns from table {}",
53            columns.len(),
54            properties.table
55        );
56
57        Ok(columns)
58    } else {
59        Err(anyhow!(format!(
60            "Invalid properties for ADBC Snowflake source: {:?}",
61            props
62        )))
63    }
64}