risingwave_connector/source/adbc_snowflake/schema.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use adbc_core::Statement as _;
16use anyhow::Context;
17use risingwave_common::array::arrow::arrow_array_56::RecordBatchReader;
18use risingwave_common::array::arrow::arrow_schema_56 as arrow_schema;
19
20use super::AdbcSnowflakeProperties;
21use crate::error::ConnectorResult;
22
23impl AdbcSnowflakeProperties {
24 /// Get the Arrow schema from the Snowflake table.
25 /// This is used for schema inference when creating tables.
26 ///
27 /// **Important**: We use a `LIMIT 0` query instead of ADBC's `get_table_schema` API
28 /// because `get_table_schema` may return different types than the actual query results.
29 /// For example, Snowflake NUMBER columns may be reported as Int64 by `get_table_schema`
30 /// but returned as Decimal128 in actual query results. Using a real query ensures
31 /// consistency between schema inference (used by frontend) and data fetching (used by executor).
32 ///
33 /// The column order in the returned schema matches the column order in the Snowflake table.
34 pub fn get_arrow_schema(&self) -> ConnectorResult<arrow_schema::Schema> {
35 let database = self.create_database()?;
36 let mut connection = self.create_connection(&database)?;
37
38 // Use LIMIT 0 query to get the actual schema that will be returned by queries.
39 // This ensures schema inference matches actual data types returned by Snowflake.
40 let query = format!("SELECT * FROM {} LIMIT 0", self.table_ref());
41 let mut statement = self.create_statement(&mut connection, &query)?;
42
43 let reader = statement
44 .execute()
45 .context("Failed to execute schema query")?;
46
47 let schema = reader.schema();
48
49 Ok((*schema).clone())
50 }
51}