risingwave_connector/sink/
jdbc_jni_client.rs1use anyhow::Context;
16use jni::objects::JObject;
17use risingwave_common::global_jvm::Jvm;
18use risingwave_jni_core::call_static_method;
19use risingwave_jni_core::jvm_runtime::execute_with_jni_env;
20
21use crate::sink::Result;
22
23#[derive(Debug)]
24pub struct JdbcJniClient {
25 jvm: Jvm,
26 jdbc_url: String,
27}
28
29impl Clone for JdbcJniClient {
30 fn clone(&self) -> Self {
31 Self {
32 jvm: self.jvm,
33 jdbc_url: self.jdbc_url.clone(),
34 }
35 }
36}
37
38impl JdbcJniClient {
39 pub fn new(jdbc_url: String) -> Result<Self> {
40 let jvm = Jvm::get_or_init()?;
41 Ok(Self { jvm, jdbc_url })
42 }
43
44 pub async fn execute_sql_sync(&self, sql: Vec<String>) -> anyhow::Result<()> {
45 let jvm = self.jvm;
46 let jdbc_url = self.jdbc_url.clone();
47 tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
48 execute_with_jni_env(jvm, |env| {
49 let full_url = env.new_string(&jdbc_url).with_context(|| {
51 format!(
52 "Failed to create jni string from source offset: {}.",
53 jdbc_url
54 )
55 })?;
56
57 let props =
58 env.new_object_array((sql.len()) as i32, "java/lang/String", JObject::null())?;
59
60 for (i, sql) in sql.iter().enumerate() {
61 let sql_j_str = env.new_string(sql)?;
62 env.set_object_array_element(&props, i as i32, sql_j_str)?;
63 }
64
65 call_static_method!(
66 env,
67 { com.risingwave.runner.JDBCSqlRunner },
68 { void executeSql(String, String[]) },
69 &full_url,
70 &props
71 )?;
72 Ok(())
73 })?;
74 Ok(())
75 })
76 .await
77 .context("Failed to execute SQL via JDBC JNI client")?
78 }
79}
80
81pub fn build_alter_add_column_sql(
82 full_table_name: &str,
83 columns: &Vec<(String, String)>,
84 if_not_exists: bool,
85) -> String {
86 let column_definitions: Vec<String> = columns
87 .iter()
88 .map(|(name, typ)| format!(r#""{}" {}"#, name, typ))
89 .collect();
90 let column_definitions_str = column_definitions.join(", ");
91 let if_not_exists_str = if if_not_exists { "IF NOT EXISTS " } else { "" };
92 format!(
93 "ALTER TABLE {} ADD COLUMN {}{} ",
94 full_table_name, if_not_exists_str, column_definitions_str
95 )
96}
97
98pub fn normalize_sql(s: &str) -> String {
99 s.split_whitespace().collect::<Vec<_>>().join(" ")
100}