risingwave_connector/source/adbc_snowflake/
schema.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use adbc_core::Statement as _;
16use anyhow::Context;
17use risingwave_common::array::arrow::arrow_array_56::RecordBatchReader;
18use risingwave_common::array::arrow::arrow_schema_56 as arrow_schema;
19
20use super::AdbcSnowflakeProperties;
21use crate::error::ConnectorResult;
22
23impl AdbcSnowflakeProperties {
24    /// Get the Arrow schema from the Snowflake table.
25    /// This is used for schema inference when creating tables.
26    ///
27    /// **Important**: We use a `LIMIT 0` query instead of ADBC's `get_table_schema` API
28    /// because `get_table_schema` may return different types than the actual query results.
29    /// For example, Snowflake NUMBER columns may be reported as Int64 by `get_table_schema`
30    /// but returned as Decimal128 in actual query results. Using a real query ensures
31    /// consistency between schema inference (used by frontend) and data fetching (used by executor).
32    ///
33    /// The column order in the returned schema matches the column order in the Snowflake table.
34    pub fn get_arrow_schema(&self) -> ConnectorResult<arrow_schema::Schema> {
35        let database = self.create_database()?;
36        let mut connection = self.create_connection(&database)?;
37
38        // Use LIMIT 0 query to get the actual schema that will be returned by queries.
39        // This ensures schema inference matches actual data types returned by Snowflake.
40        let query = format!("SELECT * FROM {} LIMIT 0", self.table_ref());
41        let mut statement = self.create_statement(&mut connection, &query)?;
42
43        let reader = statement
44            .execute()
45            .context("Failed to execute schema query")?;
46
47        let schema = reader.schema();
48
49        Ok((*schema).clone())
50    }
51}