risingwave_connector/sink/
jdbc_jni_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use jni::objects::JObject;
17use risingwave_common::global_jvm::Jvm;
18use risingwave_jni_core::call_static_method;
19use risingwave_jni_core::jvm_runtime::execute_with_jni_env;
20
21use crate::sink::Result;
22
23#[derive(Debug)]
24pub struct JdbcJniClient {
25    jvm: Jvm,
26    jdbc_url: String,
27}
28
29impl Clone for JdbcJniClient {
30    fn clone(&self) -> Self {
31        Self {
32            jvm: self.jvm,
33            jdbc_url: self.jdbc_url.clone(),
34        }
35    }
36}
37
38impl JdbcJniClient {
39    pub fn new(jdbc_url: String) -> Result<Self> {
40        let jvm = Jvm::get_or_init()?;
41        Ok(Self { jvm, jdbc_url })
42    }
43
44    pub async fn execute_sql_sync(&self, sql: Vec<String>) -> anyhow::Result<()> {
45        let jvm = self.jvm;
46        let jdbc_url = self.jdbc_url.clone();
47        tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
48            execute_with_jni_env(jvm, |env| {
49                // get source handler by source id
50                let full_url = env.new_string(&jdbc_url).with_context(|| {
51                    format!(
52                        "Failed to create jni string from source offset: {}.",
53                        jdbc_url
54                    )
55                })?;
56
57                let props =
58                    env.new_object_array((sql.len()) as i32, "java/lang/String", JObject::null())?;
59
60                for (i, sql) in sql.iter().enumerate() {
61                    let sql_j_str = env.new_string(sql)?;
62                    env.set_object_array_element(&props, i as i32, sql_j_str)?;
63                }
64
65                call_static_method!(
66                    env,
67                    { com.risingwave.runner.JDBCSqlRunner },
68                    { void executeSql(String, String[]) },
69                    &full_url,
70                    &props
71                )?;
72                Ok(())
73            })?;
74            Ok(())
75        })
76        .await
77        .context("Failed to execute SQL via JDBC JNI client")?
78    }
79}
80
81pub fn build_alter_add_column_sql(
82    full_table_name: &str,
83    columns: &Vec<(String, String)>,
84    if_not_exists: bool,
85) -> String {
86    let column_definitions: Vec<String> = columns
87        .iter()
88        .map(|(name, typ)| format!(r#""{}" {}"#, name, typ))
89        .collect();
90    let column_definitions_str = column_definitions.join(", ");
91    let if_not_exists_str = if if_not_exists { "IF NOT EXISTS " } else { "" };
92    format!(
93        "ALTER TABLE {} ADD COLUMN {}{} ",
94        full_table_name, if_not_exists_str, column_definitions_str
95    )
96}
97
98pub fn normalize_sql(s: &str) -> String {
99    s.split_whitespace().collect::<Vec<_>>().join(" ")
100}