risingwave_connector/sink/
jdbc_jni_client.rs1use std::fmt;
16
17use anyhow::Context;
18use jni::objects::JObject;
19use risingwave_common::global_jvm::Jvm;
20use risingwave_jni_core::call_static_method;
21use risingwave_jni_core::jvm_runtime::execute_with_jni_env;
22
23use crate::sink::Result;
24
25pub struct JdbcJniClient {
26 jvm: Jvm,
27 jdbc_url: String,
28 driver_props: Option<Vec<(String, String)>>,
29}
30
31impl Clone for JdbcJniClient {
32 fn clone(&self) -> Self {
33 Self {
34 jvm: self.jvm,
35 jdbc_url: self.jdbc_url.clone(),
36 driver_props: self.driver_props.clone(),
37 }
38 }
39}
40
41impl fmt::Debug for JdbcJniClient {
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 f.debug_struct("JdbcJniClient")
44 .field("jdbc_url", &self.jdbc_url)
45 .field("driver_props", &"<redacted>")
46 .finish()
47 }
48}
49
50impl JdbcJniClient {
51 pub fn new(jdbc_url: String) -> Result<Self> {
52 let jvm = Jvm::get_or_init()?;
53 Ok(Self {
54 jvm,
55 jdbc_url,
56 driver_props: None,
57 })
58 }
59
60 pub fn new_with_props(jdbc_url: String, driver_props: Vec<(String, String)>) -> Result<Self> {
61 let jvm = Jvm::get_or_init()?;
62 Ok(Self {
63 jvm,
64 jdbc_url,
65 driver_props: Some(driver_props),
66 })
67 }
68
69 pub async fn execute_sql_sync(&self, sql: Vec<String>) -> anyhow::Result<()> {
70 self.execute_sql_sync_with_props(sql).await
71 }
72
73 pub async fn execute_sql_sync_with_props(&self, sql: Vec<String>) -> anyhow::Result<()> {
74 let jvm = self.jvm;
75 let jdbc_url = self.jdbc_url.clone();
76 let driver_props = self.driver_props.clone().unwrap_or_default();
77 tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
78 execute_with_jni_env(jvm, |env| {
79 let j_url = env.new_string(&jdbc_url).with_context(|| {
80 format!("Failed to create jni string from jdbc url: {}.", jdbc_url)
81 })?;
82
83 let sql_arr =
85 env.new_object_array(sql.len() as i32, "java/lang/String", JObject::null())?;
86 for (i, s) in sql.iter().enumerate() {
87 let s_j = env.new_string(s)?;
88 env.set_object_array_element(&sql_arr, i as i32, s_j)?;
89 }
90
91 let keys_arr = env.new_object_array(
93 driver_props.len() as i32,
94 "java/lang/String",
95 JObject::null(),
96 )?;
97 let values_arr = env.new_object_array(
98 driver_props.len() as i32,
99 "java/lang/String",
100 JObject::null(),
101 )?;
102 for (i, (k, v)) in driver_props.iter().enumerate() {
103 let k_j = env.new_string(k)?;
104 let v_j = env.new_string(v)?;
105 env.set_object_array_element(&keys_arr, i as i32, k_j)?;
106 env.set_object_array_element(&values_arr, i as i32, v_j)?;
107 }
108
109 call_static_method!(
110 env,
111 { com.risingwave.runner.JDBCSqlRunner },
112 { void executeSqlWithProps(String, String[], String[], String[]) },
113 &j_url,
114 &sql_arr,
115 &keys_arr,
116 &values_arr
117 )?;
118 Ok(())
119 })?;
120 Ok(())
121 })
122 .await
123 .context("Failed to execute SQL via JDBC JNI client with properties")?
124 }
125}
126
127pub fn build_alter_add_column_sql(
128 full_table_name: &str,
129 columns: &Vec<(String, String)>,
130 if_not_exists: bool,
131) -> String {
132 let column_definitions: Vec<String> = columns
133 .iter()
134 .map(|(name, typ)| format!(r#""{}" {}"#, name, typ))
135 .collect();
136 let column_definitions_str = column_definitions.join(", ");
137 let if_not_exists_str = if if_not_exists { "IF NOT EXISTS " } else { "" };
138 format!(
139 "ALTER TABLE {} ADD COLUMN {}{} ",
140 full_table_name, if_not_exists_str, column_definitions_str
141 )
142}
143
144pub fn normalize_sql(s: &str) -> String {
145 s.split_whitespace().collect::<Vec<_>>().join(" ")
146}