risingwave_connector/sink/
jdbc_jni_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16
17use anyhow::Context;
18use jni::objects::JObject;
19use risingwave_common::global_jvm::Jvm;
20use risingwave_jni_core::call_static_method;
21use risingwave_jni_core::jvm_runtime::execute_with_jni_env;
22
23use crate::sink::Result;
24
25pub struct JdbcJniClient {
26    jvm: Jvm,
27    jdbc_url: String,
28    driver_props: Option<Vec<(String, String)>>,
29}
30
31impl Clone for JdbcJniClient {
32    fn clone(&self) -> Self {
33        Self {
34            jvm: self.jvm,
35            jdbc_url: self.jdbc_url.clone(),
36            driver_props: self.driver_props.clone(),
37        }
38    }
39}
40
41impl fmt::Debug for JdbcJniClient {
42    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43        f.debug_struct("JdbcJniClient")
44            .field("jdbc_url", &self.jdbc_url)
45            .field("driver_props", &"<redacted>")
46            .finish()
47    }
48}
49
50impl JdbcJniClient {
51    pub fn new(jdbc_url: String) -> Result<Self> {
52        let jvm = Jvm::get_or_init()?;
53        Ok(Self {
54            jvm,
55            jdbc_url,
56            driver_props: None,
57        })
58    }
59
60    pub fn new_with_props(jdbc_url: String, driver_props: Vec<(String, String)>) -> Result<Self> {
61        let jvm = Jvm::get_or_init()?;
62        Ok(Self {
63            jvm,
64            jdbc_url,
65            driver_props: Some(driver_props),
66        })
67    }
68
69    pub async fn execute_sql_sync(&self, sql: Vec<String>) -> anyhow::Result<()> {
70        self.execute_sql_sync_with_props(sql).await
71    }
72
73    pub async fn execute_sql_sync_with_props(&self, sql: Vec<String>) -> anyhow::Result<()> {
74        let jvm = self.jvm;
75        let jdbc_url = self.jdbc_url.clone();
76        let driver_props = self.driver_props.clone().unwrap_or_default();
77        tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
78            execute_with_jni_env(jvm, |env| {
79                let j_url = env.new_string(&jdbc_url).with_context(|| {
80                    format!("Failed to create jni string from jdbc url: {}.", jdbc_url)
81                })?;
82
83                // SQL array
84                let sql_arr =
85                    env.new_object_array(sql.len() as i32, "java/lang/String", JObject::null())?;
86                for (i, s) in sql.iter().enumerate() {
87                    let s_j = env.new_string(s)?;
88                    env.set_object_array_element(&sql_arr, i as i32, s_j)?;
89                }
90
91                // Driver properties as separate key and value arrays
92                let keys_arr = env.new_object_array(
93                    driver_props.len() as i32,
94                    "java/lang/String",
95                    JObject::null(),
96                )?;
97                let values_arr = env.new_object_array(
98                    driver_props.len() as i32,
99                    "java/lang/String",
100                    JObject::null(),
101                )?;
102                for (i, (k, v)) in driver_props.iter().enumerate() {
103                    let k_j = env.new_string(k)?;
104                    let v_j = env.new_string(v)?;
105                    env.set_object_array_element(&keys_arr, i as i32, k_j)?;
106                    env.set_object_array_element(&values_arr, i as i32, v_j)?;
107                }
108
109                call_static_method!(
110                    env,
111                    { com.risingwave.runner.JDBCSqlRunner },
112                    { void executeSqlWithProps(String, String[], String[], String[]) },
113                    &j_url,
114                    &sql_arr,
115                    &keys_arr,
116                    &values_arr
117                )?;
118                Ok(())
119            })?;
120            Ok(())
121        })
122        .await
123        .context("Failed to execute SQL via JDBC JNI client with properties")?
124    }
125}
126
127pub fn build_alter_add_column_sql(
128    full_table_name: &str,
129    columns: &Vec<(String, String)>,
130    if_not_exists: bool,
131) -> String {
132    let column_definitions: Vec<String> = columns
133        .iter()
134        .map(|(name, typ)| format!(r#""{}" {}"#, name, typ))
135        .collect();
136    let column_definitions_str = column_definitions.join(", ");
137    let if_not_exists_str = if if_not_exists { "IF NOT EXISTS " } else { "" };
138    format!(
139        "ALTER TABLE {} ADD COLUMN {}{} ",
140        full_table_name, if_not_exists_str, column_definitions_str
141    )
142}
143
144pub fn normalize_sql(s: &str) -> String {
145    s.split_whitespace().collect::<Vec<_>>().join(" ")
146}