risingwave_meta/stream/source_manager/
worker.rs1use risingwave_connector::WithPropertiesExt;
16#[cfg(not(debug_assertions))]
17use risingwave_connector::error::ConnectorError;
18use risingwave_connector::source::AnySplitEnumerator;
19
20use super::*;
21
22const MAX_FAIL_CNT: u32 = 10;
23const DEFAULT_SOURCE_TICK_TIMEOUT: Duration = Duration::from_secs(10);
24
25const DEBUG_SPLITS_KEY: &str = "debug_splits";
30
31pub struct SharedSplitMap {
32 pub splits: Option<BTreeMap<SplitId, SplitImpl>>,
33}
34
35type SharedSplitMapRef = Arc<Mutex<SharedSplitMap>>;
36
37pub struct ConnectorSourceWorker {
40 source_id: SourceId,
41 source_name: String,
42 current_splits: SharedSplitMapRef,
43 enumerator: Box<dyn AnySplitEnumerator>,
45 period: Duration,
46 metrics: Arc<MetaMetrics>,
47 connector_properties: ConnectorProperties,
48 fail_cnt: u32,
49 source_is_up: LabelGuardedIntGauge<2>,
50
51 debug_splits: Option<Vec<SplitImpl>>,
52}
53
54fn extract_prop_from_existing_source(source: &Source) -> ConnectorResult<ConnectorProperties> {
55 let options_with_secret =
56 WithOptionsSecResolved::new(source.with_properties.clone(), source.secret_refs.clone());
57 let mut properties = ConnectorProperties::extract(options_with_secret, false)?;
58 properties.init_from_pb_source(source);
59 Ok(properties)
60}
61fn extract_prop_from_new_source(source: &Source) -> ConnectorResult<ConnectorProperties> {
62 let options_with_secret = WithOptionsSecResolved::new(
63 {
64 let mut with_properties = source.with_properties.clone();
65 let _removed = with_properties.remove(DEBUG_SPLITS_KEY);
66
67 #[cfg(not(debug_assertions))]
68 {
69 if _removed.is_some() {
70 return Err(ConnectorError::from(anyhow::anyhow!(
71 "`debug_splits` is not allowed in release mode"
72 )));
73 }
74 }
75
76 with_properties
77 },
78 source.secret_refs.clone(),
79 );
80 let mut properties = ConnectorProperties::extract(options_with_secret, true)?;
81 properties.init_from_pb_source(source);
82 Ok(properties)
83}
84
85pub async fn create_source_worker(
89 source: &Source,
90 metrics: Arc<MetaMetrics>,
91) -> MetaResult<ConnectorSourceWorkerHandle> {
92 tracing::info!("spawning new watcher for source {}", source.id);
93
94 let splits = Arc::new(Mutex::new(SharedSplitMap { splits: None }));
95 let current_splits_ref = splits.clone();
96
97 let connector_properties = extract_prop_from_new_source(source)?;
98 let enable_scale_in = connector_properties.enable_drop_split();
99 let enable_adaptive_splits = connector_properties.enable_adaptive_splits();
100 let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
101 let sync_call_timeout = source
102 .with_properties
103 .get_sync_call_timeout()
104 .unwrap_or(DEFAULT_SOURCE_TICK_TIMEOUT);
105 let handle = {
106 let mut worker = ConnectorSourceWorker::create(
107 source,
108 connector_properties,
109 DEFAULT_SOURCE_WORKER_TICK_INTERVAL,
110 current_splits_ref.clone(),
111 metrics,
112 )
113 .await?;
114
115 tokio::time::timeout(sync_call_timeout, worker.tick())
117 .await
118 .with_context(|| {
119 format!(
120 "failed to fetch meta info for source {}, timeout {:?}",
121 source.id, DEFAULT_SOURCE_TICK_TIMEOUT
122 )
123 })??;
124
125 tokio::spawn(async move { worker.run(command_rx).await })
126 };
127 Ok(ConnectorSourceWorkerHandle {
128 handle,
129 command_tx,
130 splits,
131 enable_drop_split: enable_scale_in,
132 enable_adaptive_splits,
133 })
134}
135
136pub fn create_source_worker_async(
138 source: Source,
139 managed_sources: &mut HashMap<SourceId, ConnectorSourceWorkerHandle>,
140 metrics: Arc<MetaMetrics>,
141) -> MetaResult<()> {
142 tracing::info!("spawning new watcher for source {}", source.id);
143
144 let splits = Arc::new(Mutex::new(SharedSplitMap { splits: None }));
145 let current_splits_ref = splits.clone();
146 let source_id = source.id;
147
148 let connector_properties = extract_prop_from_existing_source(&source)?;
149
150 let enable_drop_split = connector_properties.enable_drop_split();
151 let enable_adaptive_splits = connector_properties.enable_adaptive_splits();
152 let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
153 let handle = tokio::spawn(async move {
154 let mut ticker = time::interval(DEFAULT_SOURCE_WORKER_TICK_INTERVAL);
155 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
156
157 let mut worker = loop {
158 ticker.tick().await;
159
160 match ConnectorSourceWorker::create(
161 &source,
162 connector_properties.clone(),
163 DEFAULT_SOURCE_WORKER_TICK_INTERVAL,
164 current_splits_ref.clone(),
165 metrics.clone(),
166 )
167 .await
168 {
169 Ok(worker) => {
170 break worker;
171 }
172 Err(e) => {
173 tracing::warn!(error = %e.as_report(), "failed to create source worker");
174 }
175 }
176 };
177
178 worker.run(command_rx).await
179 });
180
181 managed_sources.insert(
182 source_id as SourceId,
183 ConnectorSourceWorkerHandle {
184 handle,
185 command_tx,
186 splits,
187 enable_drop_split,
188 enable_adaptive_splits,
189 },
190 );
191 Ok(())
192}
193
194const DEFAULT_SOURCE_WORKER_TICK_INTERVAL: Duration = Duration::from_secs(30);
195
196impl ConnectorSourceWorker {
197 async fn refresh(&mut self) -> MetaResult<()> {
199 let enumerator = self
200 .connector_properties
201 .clone()
202 .create_split_enumerator(Arc::new(SourceEnumeratorContext {
203 metrics: self.metrics.source_enumerator_metrics.clone(),
204 info: SourceEnumeratorInfo {
205 source_id: self.source_id as u32,
206 },
207 }))
208 .await
209 .context("failed to create SplitEnumerator")?;
210 self.enumerator = enumerator;
211 self.fail_cnt = 0;
212 tracing::info!("refreshed source enumerator: {}", self.source_name);
213 Ok(())
214 }
215
216 pub async fn create(
219 source: &Source,
220 connector_properties: ConnectorProperties,
221 period: Duration,
222 splits: Arc<Mutex<SharedSplitMap>>,
223 metrics: Arc<MetaMetrics>,
224 ) -> MetaResult<Self> {
225 let enumerator = connector_properties
226 .clone()
227 .create_split_enumerator(Arc::new(SourceEnumeratorContext {
228 metrics: metrics.source_enumerator_metrics.clone(),
229 info: SourceEnumeratorInfo {
230 source_id: source.id,
231 },
232 }))
233 .await
234 .context("failed to create SplitEnumerator")?;
235
236 let source_is_up = metrics
237 .source_is_up
238 .with_guarded_label_values(&[source.id.to_string().as_str(), &source.name]);
239
240 Ok(Self {
241 source_id: source.id as SourceId,
242 source_name: source.name.clone(),
243 current_splits: splits,
244 enumerator,
245 period,
246 metrics,
247 connector_properties,
248 fail_cnt: 0,
249 source_is_up,
250 debug_splits: {
251 let debug_splits = source.with_properties.get(DEBUG_SPLITS_KEY);
252 #[cfg(not(debug_assertions))]
253 {
254 if debug_splits.is_some() {
255 return Err(ConnectorError::from(anyhow::anyhow!(
256 "`debug_splits` is not allowed in release mode"
257 ))
258 .into());
259 }
260 None
261 }
262
263 #[cfg(debug_assertions)]
264 {
265 use risingwave_common::types::JsonbVal;
266 if let Some(debug_splits) = debug_splits {
267 let mut splits = Vec::new();
268 let debug_splits_value =
269 jsonbb::serde_json::from_str::<serde_json::Value>(debug_splits)
270 .context("failed to parse split impl")?;
271 for split_impl_value in debug_splits_value.as_array().unwrap() {
272 splits.push(SplitImpl::restore_from_json(JsonbVal::from(
273 split_impl_value.clone(),
274 ))?);
275 }
276 Some(splits)
277 } else {
278 None
279 }
280 }
281 },
282 })
283 }
284
285 pub async fn run(&mut self, mut command_rx: UnboundedReceiver<SourceWorkerCommand>) {
286 let mut interval = time::interval(self.period);
287 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
288 loop {
289 select! {
290 biased;
291 cmd = command_rx.borrow_mut().recv() => {
292 if let Some(cmd) = cmd {
293 match cmd {
294 SourceWorkerCommand::Tick(tx) => {
295 let _ = tx.send(self.tick().await);
296 }
297 SourceWorkerCommand::DropFragments(fragment_ids) => {
298 if let Err(e) = self.drop_fragments(fragment_ids).await {
299 tracing::warn!(error = %e.as_report(), "error happened when drop fragment");
301 }
302 }
303 SourceWorkerCommand::FinishBackfill(fragment_ids) => {
304 if let Err(e) = self.finish_backfill(fragment_ids).await {
305 tracing::warn!(error = %e.as_report(), "error happened when finish backfill");
307 }
308 }
309 SourceWorkerCommand::Terminate => {
310 return;
311 }
312 }
313 }
314 }
315 _ = interval.tick() => {
316 if self.fail_cnt > MAX_FAIL_CNT {
317 if let Err(e) = self.refresh().await {
318 tracing::error!(error = %e.as_report(), "error happened when refresh from connector source worker");
319 }
320 }
321 if let Err(e) = self.tick().await {
322 tracing::error!(error = %e.as_report(), "error happened when tick from connector source worker");
323 }
324 }
325 }
326 }
327 }
328
329 async fn tick(&mut self) -> MetaResult<()> {
331 let source_is_up = |res: i64| {
332 self.source_is_up.set(res);
333 };
334
335 let splits = {
336 if let Some(debug_splits) = &self.debug_splits {
337 debug_splits.clone()
338 } else {
339 self.enumerator.list_splits().await.inspect_err(|_| {
340 source_is_up(0);
341 self.fail_cnt += 1;
342 })?
343 }
344 };
345
346 source_is_up(1);
347 self.fail_cnt = 0;
348 let mut current_splits = self.current_splits.lock().await;
349 current_splits.splits.replace(
350 splits
351 .into_iter()
352 .map(|split| (split.id(), split))
353 .collect(),
354 );
355
356 Ok(())
357 }
358
359 async fn drop_fragments(&mut self, fragment_ids: Vec<FragmentId>) -> MetaResult<()> {
360 self.enumerator.on_drop_fragments(fragment_ids).await?;
361 Ok(())
362 }
363
364 async fn finish_backfill(&mut self, fragment_ids: Vec<FragmentId>) -> MetaResult<()> {
365 self.enumerator.on_finish_backfill(fragment_ids).await?;
366 Ok(())
367 }
368}
369
370pub struct ConnectorSourceWorkerHandle {
372 #[expect(dead_code)]
373 handle: JoinHandle<()>,
374 command_tx: UnboundedSender<SourceWorkerCommand>,
375 pub splits: SharedSplitMapRef,
376 pub enable_drop_split: bool,
377 pub enable_adaptive_splits: bool,
378}
379
380impl ConnectorSourceWorkerHandle {
381 pub fn get_enable_adaptive_splits(&self) -> bool {
382 self.enable_adaptive_splits
383 }
384
385 pub async fn discovered_splits(
386 &self,
387 source_id: SourceId,
388 actors: &HashSet<ActorId>,
389 ) -> MetaResult<BTreeMap<Arc<str>, SplitImpl>> {
390 let Some(mut discovered_splits) = self.splits.lock().await.splits.clone() else {
392 tracing::info!(
393 "The discover loop for source {} is not ready yet; we'll wait for the next run",
394 source_id
395 );
396 return Ok(BTreeMap::new());
397 };
398 if discovered_splits.is_empty() {
399 tracing::warn!("No splits discovered for source {}", source_id);
400 }
401
402 if self.enable_adaptive_splits {
403 debug_assert!(self.enable_drop_split);
408 debug_assert!(discovered_splits.len() == 1);
409 discovered_splits =
410 fill_adaptive_split(discovered_splits.values().next().unwrap(), actors)?;
411 }
412
413 Ok(discovered_splits)
414 }
415
416 fn send_command(&self, command: SourceWorkerCommand) -> MetaResult<()> {
417 let cmd_str = format!("{:?}", command);
418 self.command_tx
419 .send(command)
420 .with_context(|| format!("failed to send {cmd_str} command to source worker"))?;
421 Ok(())
422 }
423
424 pub async fn force_tick(&self) -> MetaResult<()> {
426 let (tx, rx) = oneshot::channel();
427 self.send_command(SourceWorkerCommand::Tick(tx))?;
428 rx.await
429 .context("failed to receive tick command response from source worker")?
430 .context("source worker tick failed")?;
431 Ok(())
432 }
433
434 pub fn drop_fragments(&self, fragment_ids: Vec<FragmentId>) {
435 tracing::debug!("drop_fragments: {:?}", fragment_ids);
436 if let Err(e) = self.send_command(SourceWorkerCommand::DropFragments(fragment_ids)) {
437 tracing::warn!(error = %e.as_report(), "failed to drop fragments");
439 }
440 }
441
442 pub fn finish_backfill(&self, fragment_ids: Vec<FragmentId>) {
443 tracing::debug!("finish_backfill: {:?}", fragment_ids);
444 if let Err(e) = self.send_command(SourceWorkerCommand::FinishBackfill(fragment_ids)) {
445 tracing::warn!(error = %e.as_report(), "failed to finish backfill");
447 }
448 }
449
450 pub fn terminate(&self, dropped_fragments: Option<BTreeSet<FragmentId>>) {
451 tracing::debug!("terminate: {:?}", dropped_fragments);
452 if let Some(dropped_fragments) = dropped_fragments {
453 self.drop_fragments(dropped_fragments.into_iter().collect());
454 }
455 if let Err(e) = self.send_command(SourceWorkerCommand::Terminate) {
456 tracing::warn!(error = %e.as_report(), "failed to terminate source worker");
458 }
459 }
460}
461
462#[derive(educe::Educe)]
463#[educe(Debug)]
464pub enum SourceWorkerCommand {
465 Tick(#[educe(Debug(ignore))] oneshot::Sender<MetaResult<()>>),
467 DropFragments(Vec<FragmentId>),
469 FinishBackfill(Vec<FragmentId>),
471 Terminate,
473}