risingwave_connector/source/cdc/enumerator/
mod.rs1use std::marker::PhantomData;
16use std::ops::Deref;
17use std::str::FromStr;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use itertools::Itertools;
22use prost::Message;
23use risingwave_common::global_jvm::JVM;
24use risingwave_common::util::addr::HostAddr;
25use risingwave_jni_core::call_static_method;
26use risingwave_jni_core::jvm_runtime::execute_with_jni_env;
27use risingwave_pb::connector_service::{SourceType, ValidateSourceRequest, ValidateSourceResponse};
28
29use crate::error::ConnectorResult;
30use crate::source::cdc::{
31 CdcProperties, CdcSourceTypeTrait, Citus, DebeziumCdcSplit, Mongodb, Mysql, Postgres,
32 SqlServer, table_schema_exclude_additional_columns,
33};
34use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};
35
36pub const DATABASE_SERVERS_KEY: &str = "database.servers";
37
38#[derive(Debug)]
39pub struct DebeziumSplitEnumerator<T: CdcSourceTypeTrait> {
40 source_id: u32,
42 worker_node_addrs: Vec<HostAddr>,
43 _phantom: PhantomData<T>,
44}
45
46#[async_trait]
47impl<T: CdcSourceTypeTrait> SplitEnumerator for DebeziumSplitEnumerator<T>
48where
49 Self: ListCdcSplits<CdcSourceType = T>,
50{
51 type Properties = CdcProperties<T>;
52 type Split = DebeziumCdcSplit<T>;
53
54 async fn new(
55 props: CdcProperties<T>,
56 context: SourceEnumeratorContextRef,
57 ) -> ConnectorResult<Self> {
58 let server_addrs = props
59 .properties
60 .get(DATABASE_SERVERS_KEY)
61 .map(|s| {
62 s.split(',')
63 .map(HostAddr::from_str)
64 .collect::<Result<Vec<_>, _>>()
65 })
66 .transpose()?
67 .unwrap_or_default();
68
69 assert_eq!(
70 props.get_source_type_pb(),
71 SourceType::from(T::source_type())
72 );
73
74 let jvm = JVM.get_or_init()?;
75 let source_id = context.info.source_id;
76 tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
77 execute_with_jni_env(jvm, |env| {
78 let validate_source_request = ValidateSourceRequest {
79 source_id: source_id as u64,
80 source_type: props.get_source_type_pb() as _,
81 properties: props.properties,
82 table_schema: Some(table_schema_exclude_additional_columns(
83 &props.table_schema,
84 )),
85 is_source_job: props.is_cdc_source_job,
86 is_backfill_table: props.is_backfill_table,
87 };
88
89 let validate_source_request_bytes =
90 env.byte_array_from_slice(&Message::encode_to_vec(&validate_source_request))?;
91
92 let validate_source_response_bytes = call_static_method!(
93 env,
94 {com.risingwave.connector.source.JniSourceValidateHandler},
95 {byte[] validate(byte[] validateSourceRequestBytes)},
96 &validate_source_request_bytes
97 )?;
98
99 let validate_source_response: ValidateSourceResponse = Message::decode(
100 risingwave_jni_core::to_guarded_slice(&validate_source_response_bytes, env)?
101 .deref(),
102 )?;
103
104 if let Some(error) = validate_source_response.error {
105 return Err(
106 anyhow!(error.error_message).context("source cannot pass validation")
107 );
108 }
109
110 Ok(())
111 })
112 })
113 .await
114 .context("failed to validate source")??;
115
116 tracing::debug!("validate cdc source properties success");
117 Ok(Self {
118 source_id,
119 worker_node_addrs: server_addrs,
120 _phantom: PhantomData,
121 })
122 }
123
124 async fn list_splits(&mut self) -> ConnectorResult<Vec<DebeziumCdcSplit<T>>> {
125 Ok(self.list_cdc_splits())
126 }
127}
128
129pub trait ListCdcSplits {
130 type CdcSourceType: CdcSourceTypeTrait;
131 fn list_cdc_splits(&mut self) -> Vec<DebeziumCdcSplit<Self::CdcSourceType>>;
133}
134
135impl ListCdcSplits for DebeziumSplitEnumerator<Mysql> {
136 type CdcSourceType = Mysql;
137
138 fn list_cdc_splits(&mut self) -> Vec<DebeziumCdcSplit<Self::CdcSourceType>> {
139 vec![DebeziumCdcSplit::<Self::CdcSourceType>::new(
141 self.source_id,
142 None,
143 None,
144 )]
145 }
146}
147
148impl ListCdcSplits for DebeziumSplitEnumerator<Postgres> {
149 type CdcSourceType = Postgres;
150
151 fn list_cdc_splits(&mut self) -> Vec<DebeziumCdcSplit<Self::CdcSourceType>> {
152 vec![DebeziumCdcSplit::<Self::CdcSourceType>::new(
154 self.source_id,
155 None,
156 None,
157 )]
158 }
159}
160
161impl ListCdcSplits for DebeziumSplitEnumerator<Citus> {
162 type CdcSourceType = Citus;
163
164 fn list_cdc_splits(&mut self) -> Vec<DebeziumCdcSplit<Self::CdcSourceType>> {
165 self.worker_node_addrs
166 .iter()
167 .enumerate()
168 .map(|(id, addr)| {
169 DebeziumCdcSplit::<Self::CdcSourceType>::new(
170 id as u32,
171 None,
172 Some(addr.to_string()),
173 )
174 })
175 .collect_vec()
176 }
177}
178impl ListCdcSplits for DebeziumSplitEnumerator<Mongodb> {
179 type CdcSourceType = Mongodb;
180
181 fn list_cdc_splits(&mut self) -> Vec<DebeziumCdcSplit<Self::CdcSourceType>> {
182 vec![DebeziumCdcSplit::<Self::CdcSourceType>::new(
184 self.source_id,
185 None,
186 None,
187 )]
188 }
189}
190
191impl ListCdcSplits for DebeziumSplitEnumerator<SqlServer> {
192 type CdcSourceType = SqlServer;
193
194 fn list_cdc_splits(&mut self) -> Vec<DebeziumCdcSplit<Self::CdcSourceType>> {
195 vec![DebeziumCdcSplit::<Self::CdcSourceType>::new(
196 self.source_id,
197 None,
198 None,
199 )]
200 }
201}