1use std::collections::{HashMap, HashSet};
16use std::time::Duration;
17
18use anyhow::anyhow;
19use either::Either;
20use itertools::Itertools;
21use prometheus::core::{AtomicU64, GenericCounter};
22use risingwave_common::array::ArrayRef;
23use risingwave_common::catalog::TableId;
24use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedMetric};
25use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::types::JsonbVal;
28use risingwave_common::util::epoch::{Epoch, EpochPair};
29use risingwave_connector::source::cdc::split::{
30 extract_postgres_lsn_from_offset_str, extract_sql_server_commit_lsn_from_offset_str,
31};
32use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
33use risingwave_connector::source::reader::reader::SourceReader;
34use risingwave_connector::source::{
35 ConnectorState, SplitId, SplitImpl, SplitMetaData, WaitCheckpointTask,
36 build_pulsar_ack_channel_id,
37};
38use risingwave_hummock_sdk::HummockReadEpoch;
39use risingwave_pb::common::ThrottleType;
40use risingwave_pb::id::SourceId;
41use risingwave_storage::store::TryWaitEpochOptions;
42use thiserror_ext::AsReport;
43use tokio::sync::mpsc;
44use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
45use tokio::time::Instant;
46
47use super::executor_core::StreamSourceCore;
48use super::{barrier_to_message_stream, get_split_offset_col_idx, prune_additional_cols};
49use crate::executor::UpdateMutation;
50use crate::executor::prelude::*;
51use crate::executor::source::reader_stream::{SourceReaderEventWithState, StreamReaderBuilder};
52use crate::executor::stream_reader::StreamReaderWithPause;
53use crate::task::LocalBarrierManager;
54
55pub const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;
58
59fn lsn_u128_to_i64(lsn: u128) -> i64 {
60 lsn.min(i64::MAX as u128) as i64
61}
62
63pub struct SourceExecutor<S: StateStore> {
64 actor_ctx: ActorContextRef,
65
66 stream_source_core: StreamSourceCore<S>,
68
69 metrics: Arc<StreamingMetrics>,
71
72 barrier_receiver: Option<UnboundedReceiver<Barrier>>,
74
75 system_params: SystemParamsReaderRef,
77
78 rate_limit_rps: Option<u32>,
80
81 is_shared_non_cdc: bool,
82
83 barrier_manager: LocalBarrierManager,
85}
86
87impl<S: StateStore> SourceExecutor<S> {
88 #[expect(clippy::too_many_arguments)]
89 pub fn new(
90 actor_ctx: ActorContextRef,
91 stream_source_core: StreamSourceCore<S>,
92 metrics: Arc<StreamingMetrics>,
93 barrier_receiver: UnboundedReceiver<Barrier>,
94 system_params: SystemParamsReaderRef,
95 rate_limit_rps: Option<u32>,
96 is_shared_non_cdc: bool,
97 barrier_manager: LocalBarrierManager,
98 ) -> Self {
99 Self {
100 actor_ctx,
101 stream_source_core,
102 metrics,
103 barrier_receiver: Some(barrier_receiver),
104 system_params,
105 rate_limit_rps,
106 is_shared_non_cdc,
107 barrier_manager,
108 }
109 }
110
111 fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder {
112 StreamReaderBuilder {
113 source_desc,
114 rate_limit: self.rate_limit_rps,
115 source_id: self.stream_source_core.source_id,
116 source_name: self.stream_source_core.source_name.clone(),
117 is_auto_schema_change_enable: self.is_auto_schema_change_enable(),
118 actor_ctx: self.actor_ctx.clone(),
119 reader_stream: None,
120 }
121 }
122
123 async fn spawn_wait_checkpoint_worker(
124 core: &StreamSourceCore<S>,
125 source_reader: SourceReader,
126 metrics: Arc<StreamingMetrics>,
127 ) -> StreamExecutorResult<Option<WaitCheckpointTaskBuilder>> {
128 let Some(initial_task) = source_reader.create_wait_checkpoint_task().await? else {
129 return Ok(None);
130 };
131 let (wait_checkpoint_tx, wait_checkpoint_rx) = mpsc::unbounded_channel();
132 let wait_checkpoint_worker = WaitCheckpointWorker {
133 wait_checkpoint_rx,
134 state_store: core.split_state_store.state_table().state_store().clone(),
135 table_id: core.split_state_store.state_table().table_id(),
136 source_id: core.source_id,
137 source_name: core.source_name.clone(),
138 metrics,
139 };
140 tokio::spawn(wait_checkpoint_worker.run());
141 Ok(Some(WaitCheckpointTaskBuilder {
142 wait_checkpoint_tx,
143 building_task: initial_task,
144 }))
145 }
146
147 fn is_auto_schema_change_enable(&self) -> bool {
148 self.actor_ctx.config.developer.enable_auto_schema_change
149 }
150
151 #[inline]
153 fn get_metric_labels(&self) -> [String; 4] {
154 [
155 self.stream_source_core.source_id.to_string(),
156 self.stream_source_core.source_name.clone(),
157 self.actor_ctx.id.to_string(),
158 self.actor_ctx.fragment_id.to_string(),
159 ]
160 }
161
162 async fn apply_split_change_after_yield_barrier<const BIASED: bool>(
171 &mut self,
172 barrier_epoch: EpochPair,
173 source_desc: &SourceDesc,
174 stream: &mut StreamReaderWithPause<BIASED, SourceReaderEventWithState>,
175 apply_mutation: ApplyMutationAfterBarrier<'_>,
176 ) -> StreamExecutorResult<()> {
177 {
178 let mut should_rebuild_stream = false;
179 match apply_mutation {
180 ApplyMutationAfterBarrier::SplitChange {
181 target_splits,
182 should_trim_state,
183 split_change_count,
184 } => {
185 split_change_count.inc();
186 if self
187 .update_state_if_changed(barrier_epoch, target_splits, should_trim_state)
188 .await?
189 {
190 should_rebuild_stream = true;
191 }
192 }
193 ApplyMutationAfterBarrier::ConnectorPropsChange => {
194 should_rebuild_stream = true;
195 }
196 }
197
198 if should_rebuild_stream {
199 self.rebuild_stream_reader(source_desc, stream)?;
200 }
201 }
202
203 Ok(())
204 }
205
206 async fn update_state_if_changed(
208 &mut self,
209 barrier_epoch: EpochPair,
210 target_splits: Vec<SplitImpl>,
211 should_trim_state: bool,
212 ) -> StreamExecutorResult<bool> {
213 let core = &mut self.stream_source_core;
214
215 let target_splits: HashMap<_, _> = target_splits
216 .into_iter()
217 .map(|split| (split.id(), split))
218 .collect();
219
220 let mut target_state: HashMap<SplitId, SplitImpl> =
221 HashMap::with_capacity(target_splits.len());
222
223 let mut split_changed = false;
224
225 let committed_reader = core
226 .split_state_store
227 .new_committed_reader(barrier_epoch)
228 .await?;
229
230 for (split_id, split) in target_splits {
232 if let Some(s) = core.latest_split_info.get(&split_id) {
233 target_state.insert(split_id, s.clone());
236 } else {
237 split_changed = true;
238 let initial_state = if let Some(recover_state) = committed_reader
241 .try_recover_from_state_store(&split)
242 .await?
243 {
244 recover_state
245 } else {
246 split
247 };
248
249 core.updated_splits_in_epoch
250 .entry(split_id.clone())
251 .or_insert_with(|| initial_state.clone());
252
253 target_state.insert(split_id, initial_state);
254 }
255 }
256
257 for existing_split_id in core.latest_split_info.keys() {
259 if !target_state.contains_key(existing_split_id) {
260 tracing::info!("split dropping detected: {}", existing_split_id);
261 split_changed = true;
262 }
263 }
264
265 if split_changed {
266 tracing::info!(
267 actor_id = %self.actor_ctx.id,
268 state = ?target_state,
269 "apply split change"
270 );
271
272 core.updated_splits_in_epoch
273 .retain(|split_id, _| target_state.contains_key(split_id));
274
275 let dropped_splits = core
276 .latest_split_info
277 .extract_if(|split_id, _| !target_state.contains_key(split_id))
278 .map(|(_, split)| split)
279 .collect_vec();
280
281 if should_trim_state && !dropped_splits.is_empty() {
282 core.split_state_store.trim_state(&dropped_splits).await?;
284 }
285
286 core.latest_split_info = target_state;
287 }
288
289 Ok(split_changed)
290 }
291
292 fn rebuild_stream_reader_from_error<const BIASED: bool>(
294 &mut self,
295 source_desc: &SourceDesc,
296 stream: &mut StreamReaderWithPause<BIASED, SourceReaderEventWithState>,
297 e: StreamExecutorError,
298 ) -> StreamExecutorResult<()> {
299 let core = &mut self.stream_source_core;
300 tracing::error!(
301 error = ?e.as_report(),
302 actor_id = %self.actor_ctx.id,
303 source_id = %core.source_id,
304 "stream source reader error",
305 );
306 GLOBAL_ERROR_METRICS.user_source_error.report([
307 e.variant_name().to_owned(),
308 core.source_id.to_string(),
309 core.source_name.clone(),
310 self.actor_ctx.fragment_id.to_string(),
311 ]);
312
313 self.rebuild_stream_reader(source_desc, stream)
314 }
315
316 fn rebuild_stream_reader<const BIASED: bool>(
317 &mut self,
318 source_desc: &SourceDesc,
319 stream: &mut StreamReaderWithPause<BIASED, SourceReaderEventWithState>,
320 ) -> StreamExecutorResult<()> {
321 let core = &mut self.stream_source_core;
322 let target_state: Vec<SplitImpl> = core.latest_split_info.values().cloned().collect();
323
324 tracing::info!(
325 "actor {:?} apply source split change to {:?}",
326 self.actor_ctx.id,
327 target_state
328 );
329
330 let reader_stream_builder = self.stream_reader_builder(source_desc.clone());
332 let reader_stream = reader_stream_builder.into_retry_stream(Some(target_state), false);
333
334 stream.replace_data_stream(reader_stream);
335
336 Ok(())
337 }
338
339 async fn handle_inject_source_offsets(
345 &mut self,
346 split_offsets: &HashMap<String, String>,
347 ) -> StreamExecutorResult<bool> {
348 tracing::warn!(
349 actor_id = %self.actor_ctx.id,
350 source_id = %self.stream_source_core.source_id,
351 num_offsets = split_offsets.len(),
352 "UNSAFE: Injecting source offsets - this may cause data duplication or loss"
353 );
354
355 let offsets_to_inject: Vec<(String, String)> = {
357 let owned_splits: HashSet<&str> = self
358 .stream_source_core
359 .latest_split_info
360 .keys()
361 .map(|s| s.as_ref())
362 .collect();
363 split_offsets
364 .iter()
365 .filter(|(split_id, _)| owned_splits.contains(split_id.as_str()))
366 .map(|(k, v)| (k.clone(), v.clone()))
367 .collect()
368 };
369
370 let mut json_states: Vec<(String, JsonbVal)> = Vec::new();
372 let mut failed_splits: Vec<String> = Vec::new();
373
374 for (split_id, offset) in &offsets_to_inject {
375 if let Some(split) = self
376 .stream_source_core
377 .latest_split_info
378 .get_mut(split_id.as_str())
379 {
380 tracing::info!(
381 actor_id = %self.actor_ctx.id,
382 split_id = %split_id,
383 offset = %offset,
384 "Injecting offset for owned split"
385 );
386 if let Err(e) = split.update_in_place(offset.clone()) {
388 tracing::error!(
389 actor_id = %self.actor_ctx.id,
390 split_id = %split_id,
391 error = ?e.as_report(),
392 "Failed to update split offset"
393 );
394 failed_splits.push(split_id.clone());
395 continue;
396 }
397 self.stream_source_core
399 .updated_splits_in_epoch
400 .insert(split_id.clone().into(), split.clone());
401 let json_value: serde_json::Value = serde_json::from_str(offset)
403 .unwrap_or_else(|_| serde_json::json!({ "offset": offset }));
404 json_states.push((split_id.clone(), JsonbVal::from(json_value)));
405 }
406 }
407
408 if !failed_splits.is_empty() {
409 return Err(StreamExecutorError::connector_error(anyhow!(
410 "failed to inject offsets for splits: {:?}",
411 failed_splits
412 )));
413 }
414
415 let num_injected = json_states.len();
416 if num_injected > 0 {
417 self.stream_source_core
419 .split_state_store
420 .set_states_json(json_states)
421 .await?;
422
423 tracing::info!(
424 actor_id = %self.actor_ctx.id,
425 source_id = %self.stream_source_core.source_id,
426 num_injected = num_injected,
427 "Offset injection completed for owned splits, triggering rebuild"
428 );
429 Ok(true)
430 } else {
431 tracing::info!(
432 actor_id = %self.actor_ctx.id,
433 source_id = %self.stream_source_core.source_id,
434 "No owned splits to inject offsets for"
435 );
436 Ok(false)
437 }
438 }
439
440 async fn persist_state_and_clear_cache(
441 &mut self,
442 epoch: EpochPair,
443 ) -> StreamExecutorResult<HashMap<SplitId, SplitImpl>> {
444 let core = &mut self.stream_source_core;
445
446 let cache = core
447 .updated_splits_in_epoch
448 .values()
449 .map(|split_impl| split_impl.to_owned())
450 .collect_vec();
451
452 if !cache.is_empty() {
453 tracing::debug!(state = ?cache, "take snapshot");
454
455 let source_id = core.source_id.to_string();
457 for split_impl in &cache {
458 match split_impl {
460 SplitImpl::PostgresCdc(pg_split) => {
461 if let Some(lsn_value) = pg_split.pg_lsn() {
462 self.metrics
463 .pg_cdc_state_table_lsn
464 .with_guarded_label_values(&[&source_id])
465 .set(lsn_value as i64);
466 }
467 }
468 SplitImpl::MysqlCdc(mysql_split) => {
469 if let Some((file_seq, position)) = mysql_split.mysql_binlog_offset() {
470 self.metrics
471 .mysql_cdc_state_binlog_file_seq
472 .with_guarded_label_values(&[&source_id])
473 .set(file_seq as i64);
474
475 self.metrics
476 .mysql_cdc_state_binlog_position
477 .with_guarded_label_values(&[&source_id])
478 .set(position as i64);
479 }
480 }
481 SplitImpl::SqlServerCdc(sqlserver_split) => {
482 if let Some(lsn) = sqlserver_split.sql_server_change_lsn() {
483 self.metrics
484 .sqlserver_cdc_state_change_lsn
485 .with_guarded_label_values(&[&source_id])
486 .set(lsn_u128_to_i64(lsn));
487 }
488 if let Some(lsn) = sqlserver_split.sql_server_commit_lsn() {
489 self.metrics
490 .sqlserver_cdc_state_commit_lsn
491 .with_guarded_label_values(&[&source_id])
492 .set(lsn_u128_to_i64(lsn));
493 }
494 }
495 _ => {}
496 }
497 }
498
499 core.split_state_store.set_states(cache).await?;
500 }
501
502 core.split_state_store.commit(epoch).await?;
504
505 let updated_splits = core.updated_splits_in_epoch.clone();
506
507 core.updated_splits_in_epoch.clear();
508
509 Ok(updated_splits)
510 }
511
512 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
514 let core = &mut self.stream_source_core;
515 core.split_state_store.try_flush().await?;
516
517 Ok(())
518 }
519
520 fn apply_latest_split_state(&mut self, latest_state: HashMap<SplitId, SplitImpl>) {
521 for (split_id, new_split_impl) in &latest_state {
522 if let Some(split_impl) = self.stream_source_core.latest_split_info.get_mut(split_id) {
523 *split_impl = new_split_impl.clone();
524 }
525 }
526 self.stream_source_core
527 .updated_splits_in_epoch
528 .extend(latest_state);
529 }
530
531 fn maybe_report_cdc_source_offset(
533 &self,
534 updated_splits: &HashMap<SplitId, SplitImpl>,
535 epoch: EpochPair,
536 source_id: SourceId,
537 must_report_cdc_offset_once: &mut bool,
538 must_wait_cdc_offset_before_report: bool,
539 ) {
540 if *must_report_cdc_offset_once
542 && (!must_wait_cdc_offset_before_report
543 || updated_splits
544 .values()
545 .any(|split| split.is_cdc_split() && !split.get_cdc_split_offset().is_empty()))
546 {
547 self.barrier_manager.report_cdc_source_offset_updated(
549 epoch,
550 self.actor_ctx.id,
551 source_id,
552 );
553 tracing::info!(
554 actor_id = %self.actor_ctx.id,
555 source_id = %source_id,
556 epoch = ?epoch,
557 "Reported CDC source offset updated to meta (first time only)"
558 );
559 *must_report_cdc_offset_once = false;
561 }
562 }
563
564 #[try_stream(ok = Message, error = StreamExecutorError)]
569 async fn execute_inner(mut self) {
570 let mut barrier_receiver = self.barrier_receiver.take().unwrap();
571 let first_barrier = barrier_receiver
572 .recv()
573 .instrument_await("source_recv_first_barrier")
574 .await
575 .ok_or_else(|| {
576 anyhow!(
577 "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
578 self.actor_ctx.id,
579 self.stream_source_core.source_id
580 )
581 })?;
582 let first_epoch = first_barrier.epoch;
583 let (mut boot_state, mut must_report_cdc_offset_once, must_wait_cdc_offset_before_report) =
586 if let Some(splits) = first_barrier.initial_split_assignment(self.actor_ctx.id) {
587 tracing::debug!(?splits, "boot with splits");
589 let must_report_cdc_offset_once = splits.iter().any(|split| split.is_cdc_split());
591 let must_wait_cdc_offset_before_report = must_report_cdc_offset_once
593 && splits.iter().any(|split| {
594 matches!(split, SplitImpl::MysqlCdc(_) | SplitImpl::SqlServerCdc(_))
595 });
596 (
597 splits.to_vec(),
598 must_report_cdc_offset_once,
599 must_wait_cdc_offset_before_report,
600 )
601 } else {
602 (Vec::default(), true, true)
603 };
604 let is_pause_on_startup = first_barrier.is_pause_on_startup();
605 let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id);
606
607 yield Message::Barrier(first_barrier);
608
609 let mut core = self.stream_source_core;
610 let source_id = core.source_id;
611
612 let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap();
614 let mut source_desc = source_desc_builder
615 .build()
616 .map_err(StreamExecutorError::connector_error)?;
617
618 let mut wait_checkpoint_task_builder = Self::spawn_wait_checkpoint_worker(
619 &core,
620 source_desc.source.clone(),
621 self.metrics.clone(),
622 )
623 .await?;
624
625 let (Some(split_idx), Some(offset_idx), pulsar_message_id_idx) =
626 get_split_offset_col_idx(&source_desc.columns)
627 else {
628 unreachable!("Partition and offset columns must be set.");
629 };
630
631 core.split_state_store.init_epoch(first_epoch).await?;
632 {
633 let committed_reader = core
634 .split_state_store
635 .new_committed_reader(first_epoch)
636 .await?;
637 for ele in &mut boot_state {
638 if let Some(recover_state) =
639 committed_reader.try_recover_from_state_store(ele).await?
640 {
641 *ele = recover_state;
642 is_uninitialized = false;
644 } else {
645 core.updated_splits_in_epoch.insert(ele.id(), ele.clone());
649 }
650 }
651 }
652
653 core.init_split_state(boot_state.clone());
655
656 self.stream_source_core = core;
658
659 let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
660 tracing::debug!(state = ?recover_state, "start with state");
661
662 let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
663 let mut reader_stream_builder = self.stream_reader_builder(source_desc.clone());
664 let mut latest_splits = None;
665 if is_uninitialized {
667 let create_split_reader_result = reader_stream_builder
668 .fetch_latest_splits(recover_state.clone(), self.is_shared_non_cdc)
669 .await?;
670 latest_splits = create_split_reader_result.latest_splits;
671 }
672
673 if let Some(latest_splits) = latest_splits {
674 self.stream_source_core
677 .updated_splits_in_epoch
678 .extend(latest_splits.into_iter().map(|s| (s.id(), s)));
679 }
680 let mut stream = StreamReaderWithPause::<true, SourceReaderEventWithState>::new(
683 barrier_stream,
684 reader_stream_builder
685 .into_retry_stream(recover_state, is_uninitialized && self.is_shared_non_cdc),
686 );
687 let mut command_paused = false;
688
689 if is_pause_on_startup {
691 tracing::info!("source paused on startup");
692 stream.pause_stream();
693 command_paused = true;
694 }
695
696 let mut max_wait_barrier_time_ms =
699 self.system_params.load().barrier_interval_ms() as u128 * WAIT_BARRIER_MULTIPLE_TIMES;
700 let mut last_barrier_time = Instant::now();
701 let mut self_paused = false;
702
703 let source_output_row_count = self
704 .metrics
705 .source_output_row_count
706 .with_guarded_label_values(&self.get_metric_labels());
707
708 let source_split_change_count = self
709 .metrics
710 .source_split_change_count
711 .with_guarded_label_values(&self.get_metric_labels());
712
713 while let Some(msg) = stream.next().await {
714 let Ok(msg) = msg else {
715 tokio::time::sleep(Duration::from_millis(1000)).await;
716 self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err())?;
717 continue;
718 };
719
720 match msg {
721 Either::Left(Message::Barrier(barrier)) => {
723 last_barrier_time = Instant::now();
724
725 if self_paused {
726 self_paused = false;
727 if !command_paused {
729 stream.resume_stream();
730 }
731 }
732
733 let epoch = barrier.epoch;
734 let mut split_change = None;
735
736 if let Some(mutation) = barrier.mutation.as_deref() {
737 match mutation {
738 Mutation::Pause => {
739 command_paused = true;
740 stream.pause_stream()
741 }
742 Mutation::Resume => {
743 command_paused = false;
744 stream.resume_stream()
745 }
746 Mutation::SourceChangeSplit(actor_splits) => {
747 tracing::info!(
748 actor_id = %self.actor_ctx.id,
749 actor_splits = ?actor_splits,
750 "source change split received"
751 );
752
753 split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
754 |target_splits| {
755 (
756 &source_desc,
757 &mut stream,
758 ApplyMutationAfterBarrier::SplitChange {
759 target_splits,
760 should_trim_state: true,
761 split_change_count: &source_split_change_count,
762 },
763 )
764 },
765 );
766 }
767
768 Mutation::ConnectorPropsChange(maybe_mutation) => {
769 if let Some(new_props) = maybe_mutation.get(&source_id.as_raw_id())
770 {
771 tracing::info!(
773 actor_id = %self.actor_ctx.id,
774 source_id = %source_id,
775 "updating source connector properties",
776 );
777 source_desc.update_reader(new_props.clone())?;
778 split_change = Some((
780 &source_desc,
781 &mut stream,
782 ApplyMutationAfterBarrier::ConnectorPropsChange,
783 ));
784 }
785 }
786
787 Mutation::Update(UpdateMutation { actor_splits, .. }) => {
788 split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
789 |target_splits| {
790 (
791 &source_desc,
792 &mut stream,
793 ApplyMutationAfterBarrier::SplitChange {
794 target_splits,
795 should_trim_state: false,
796 split_change_count: &source_split_change_count,
797 },
798 )
799 },
800 );
801 }
802 Mutation::Throttle(fragment_to_apply) => {
803 if let Some(entry) =
804 fragment_to_apply.get(&self.actor_ctx.fragment_id)
805 && entry.throttle_type() == ThrottleType::Source
806 && entry.rate_limit != self.rate_limit_rps
807 {
808 tracing::info!(
809 "updating rate limit from {:?} to {:?}",
810 self.rate_limit_rps,
811 entry.rate_limit
812 );
813 self.rate_limit_rps = entry.rate_limit;
814 self.rebuild_stream_reader(&source_desc, &mut stream)?;
816 }
817 }
818 Mutation::ResetSource { source_id } => {
819 if *source_id == self.stream_source_core.source_id {
823 tracing::info!(
824 actor_id = %self.actor_ctx.id,
825 source_id = source_id.as_raw_id(),
826 "Resetting CDC source: clearing offset (set to None)"
827 );
828
829 let splits_with_cleared_offset: Vec<SplitImpl> = self.stream_source_core
831 .latest_split_info
832 .values()
833 .map(|split| {
834 let mut new_split = split.clone();
836 match &mut new_split {
837 SplitImpl::MysqlCdc(debezium_split) => {
838 if let Some(mysql_split) = debezium_split.mysql_split.as_mut() {
839 tracing::info!(
840 split_id = ?mysql_split.inner.split_id,
841 old_offset = ?mysql_split.inner.start_offset,
842 "Clearing MySQL CDC offset"
843 );
844 mysql_split.inner.start_offset = None;
845 }
846 }
847 SplitImpl::PostgresCdc(debezium_split) => {
848 if let Some(pg_split) = debezium_split.postgres_split.as_mut() {
849 tracing::info!(
850 split_id = ?pg_split.inner.split_id,
851 old_offset = ?pg_split.inner.start_offset,
852 "Clearing PostgreSQL CDC offset"
853 );
854 pg_split.inner.start_offset = None;
855 }
856 }
857 SplitImpl::MongodbCdc(debezium_split) => {
858 if let Some(mongo_split) = debezium_split.mongodb_split.as_mut() {
859 tracing::info!(
860 split_id = ?mongo_split.inner.split_id,
861 old_offset = ?mongo_split.inner.start_offset,
862 "Clearing MongoDB CDC offset"
863 );
864 mongo_split.inner.start_offset = None;
865 }
866 }
867 SplitImpl::CitusCdc(debezium_split) => {
868 if let Some(citus_split) = debezium_split.citus_split.as_mut() {
869 tracing::info!(
870 split_id = ?citus_split.inner.split_id,
871 old_offset = ?citus_split.inner.start_offset,
872 "Clearing Citus CDC offset"
873 );
874 citus_split.inner.start_offset = None;
875 }
876 }
877 SplitImpl::SqlServerCdc(debezium_split) => {
878 if let Some(sqlserver_split) = debezium_split.sql_server_split.as_mut() {
879 tracing::info!(
880 split_id = ?sqlserver_split.inner.split_id,
881 old_offset = ?sqlserver_split.inner.start_offset,
882 "Clearing SQL Server CDC offset"
883 );
884 sqlserver_split.inner.start_offset = None;
885 }
886 }
887 _ => {
888 tracing::warn!(
889 "RESET SOURCE called on non-CDC split type"
890 );
891 }
892 }
893 new_split
894 })
895 .collect();
896
897 if !splits_with_cleared_offset.is_empty() {
898 tracing::info!(
899 actor_id = %self.actor_ctx.id,
900 split_count = splits_with_cleared_offset.len(),
901 "Updating state table with cleared offsets"
902 );
903
904 self.stream_source_core
906 .split_state_store
907 .set_states(splits_with_cleared_offset.clone())
908 .await?;
909
910 for split in splits_with_cleared_offset {
912 self.stream_source_core
913 .latest_split_info
914 .insert(split.id(), split.clone());
915 self.stream_source_core
916 .updated_splits_in_epoch
917 .insert(split.id(), split);
918 }
919
920 tracing::info!(
921 actor_id = %self.actor_ctx.id,
922 source_id = source_id.as_raw_id(),
923 "RESET SOURCE completed: offset cleared (set to None). \
924 Trigger recovery/restart to fetch latest offset from upstream."
925 );
926 } else {
927 tracing::warn!(
928 actor_id = %self.actor_ctx.id,
929 "No splits found to reset - source may not be initialized yet"
930 );
931 }
932 } else {
933 tracing::debug!(
934 actor_id = %self.actor_ctx.id,
935 target_source_id = source_id.as_raw_id(),
936 current_source_id = self.stream_source_core.source_id.as_raw_id(),
937 "ResetSource mutation for different source, ignoring"
938 );
939 }
940 }
941
942 Mutation::InjectSourceOffsets {
943 source_id,
944 split_offsets,
945 } => {
946 if *source_id == self.stream_source_core.source_id {
947 if self.handle_inject_source_offsets(split_offsets).await? {
948 split_change = Some((
950 &source_desc,
951 &mut stream,
952 ApplyMutationAfterBarrier::ConnectorPropsChange,
953 ));
954 }
955 } else {
956 tracing::debug!(
957 actor_id = %self.actor_ctx.id,
958 target_source_id = source_id.as_raw_id(),
959 current_source_id = self.stream_source_core.source_id.as_raw_id(),
960 "InjectSourceOffsets mutation for different source, ignoring"
961 );
962 }
963 }
964
965 _ => {}
966 }
967 }
968
969 let updated_splits = self.persist_state_and_clear_cache(epoch).await?;
970
971 self.maybe_report_cdc_source_offset(
972 &updated_splits,
973 epoch,
974 source_id,
975 &mut must_report_cdc_offset_once,
976 must_wait_cdc_offset_before_report,
977 );
978
979 if barrier.kind.is_checkpoint()
981 && let Some(task_builder) = &mut wait_checkpoint_task_builder
982 {
983 task_builder.update_task_on_checkpoint(updated_splits);
984
985 tracing::debug!("epoch to wait {:?}", epoch);
986 task_builder.send(Epoch(epoch.prev));
987 }
988
989 let barrier_epoch = barrier.epoch;
990 yield Message::Barrier(barrier);
991
992 if let Some((source_desc, stream, to_apply_mutation)) = split_change {
993 self.apply_split_change_after_yield_barrier(
994 barrier_epoch,
995 source_desc,
996 stream,
997 to_apply_mutation,
998 )
999 .await?;
1000 }
1001 }
1002 Either::Left(_) => {
1003 unreachable!();
1006 }
1007
1008 Either::Right(event) => {
1009 let (chunk, latest_state) = match event {
1010 SourceReaderEventWithState::Progress(latest_state) => {
1011 self.apply_latest_split_state(latest_state);
1012 continue;
1013 }
1014 SourceReaderEventWithState::Data((chunk, latest_state)) => {
1015 (chunk, latest_state)
1016 }
1017 };
1018
1019 if let Some(task_builder) = &mut wait_checkpoint_task_builder {
1020 if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
1021 let pulsar_message_id_col = chunk.column_at(pulsar_message_id_idx);
1022 task_builder.update_task_on_chunk(
1023 source_id,
1024 &latest_state,
1025 pulsar_message_id_col.clone(),
1026 );
1027 } else {
1028 let offset_col = chunk.column_at(offset_idx);
1029 task_builder.update_task_on_chunk(
1030 source_id,
1031 &latest_state,
1032 offset_col.clone(),
1033 );
1034 }
1035 }
1036 if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
1037 self_paused = true;
1042 tracing::warn!(
1043 "source paused, wait barrier for {:?}",
1044 last_barrier_time.elapsed()
1045 );
1046 stream.pause_stream();
1047
1048 max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
1053 as u128
1054 * WAIT_BARRIER_MULTIPLE_TIMES;
1055 }
1056
1057 self.apply_latest_split_state(latest_state);
1058
1059 let card = chunk.cardinality();
1060 if card == 0 {
1061 continue;
1062 }
1063 source_output_row_count.inc_by(card as u64);
1064 let to_remove_col_indices =
1065 if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
1066 vec![split_idx, offset_idx, pulsar_message_id_idx]
1067 } else {
1068 vec![split_idx, offset_idx]
1069 };
1070 let chunk =
1071 prune_additional_cols(&chunk, &to_remove_col_indices, &source_desc.columns);
1072 yield Message::Chunk(chunk);
1073 self.try_flush_data().await?;
1074 }
1075 }
1076 }
1077
1078 tracing::error!(
1080 actor_id = %self.actor_ctx.id,
1081 "source executor exited unexpectedly"
1082 )
1083 }
1084}
1085
1086#[derive(Debug, Clone)]
1087enum ApplyMutationAfterBarrier<'a> {
1088 SplitChange {
1089 target_splits: Vec<SplitImpl>,
1090 should_trim_state: bool,
1091 split_change_count: &'a LabelGuardedMetric<GenericCounter<AtomicU64>>,
1092 },
1093 ConnectorPropsChange,
1094}
1095
1096impl<S: StateStore> Execute for SourceExecutor<S> {
1097 fn execute(self: Box<Self>) -> BoxedMessageStream {
1098 self.execute_inner().boxed()
1099 }
1100}
1101
1102impl<S: StateStore> Debug for SourceExecutor<S> {
1103 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1104 f.debug_struct("SourceExecutor")
1105 .field("source_id", &self.stream_source_core.source_id)
1106 .field("column_ids", &self.stream_source_core.column_ids)
1107 .finish()
1108 }
1109}
1110
1111struct WaitCheckpointTaskBuilder {
1112 wait_checkpoint_tx: UnboundedSender<(Epoch, WaitCheckpointTask)>,
1113 building_task: WaitCheckpointTask,
1114}
1115
1116impl WaitCheckpointTaskBuilder {
1117 fn update_task_on_chunk(
1118 &mut self,
1119 source_id: SourceId,
1120 latest_state: &HashMap<SplitId, SplitImpl>,
1121 offset_col: ArrayRef,
1122 ) {
1123 match &mut self.building_task {
1124 WaitCheckpointTask::AckPubsubMessage(_, arrays) => {
1125 arrays.push(offset_col);
1126 }
1127 WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => {
1128 arrays.push(offset_col);
1129 }
1130 WaitCheckpointTask::AckPulsarMessage(arrays) => {
1131 let split_id = latest_state.keys().next().unwrap();
1133 let pulsar_ack_channel_id = build_pulsar_ack_channel_id(source_id, split_id);
1134 arrays.push((pulsar_ack_channel_id, offset_col));
1135 }
1136 WaitCheckpointTask::CommitCdcOffset(_) => {}
1137 }
1138 }
1139
1140 fn update_task_on_checkpoint(&mut self, updated_splits: HashMap<SplitId, SplitImpl>) {
1141 #[expect(clippy::single_match)]
1142 match &mut self.building_task {
1143 WaitCheckpointTask::CommitCdcOffset(offsets) => {
1144 if !updated_splits.is_empty() {
1145 assert_eq!(1, updated_splits.len());
1147 for (split_id, split_impl) in updated_splits {
1148 if split_impl.is_cdc_split() {
1149 *offsets = Some((split_id, split_impl.get_cdc_split_offset()));
1150 } else {
1151 unreachable!()
1152 }
1153 }
1154 }
1155 }
1156 _ => {}
1157 }
1158 }
1159
1160 fn send(&mut self, epoch: Epoch) {
1164 let new_task = self.building_task.reset_for_next_epoch();
1165 self.wait_checkpoint_tx
1166 .send((epoch, std::mem::replace(&mut self.building_task, new_task)))
1167 .expect("wait_checkpoint_tx send should succeed");
1168 }
1169}
1170
1171struct WaitCheckpointWorker<S: StateStore> {
1197 wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>,
1198 state_store: S,
1199 table_id: TableId,
1200 source_id: SourceId,
1201 source_name: String,
1202 metrics: Arc<StreamingMetrics>,
1203}
1204
1205impl<S: StateStore> WaitCheckpointWorker<S> {
1206 pub async fn run(mut self) {
1207 tracing::debug!("wait epoch worker start success");
1208 loop {
1209 match self.wait_checkpoint_rx.recv().await {
1211 Some((epoch, task)) => {
1212 tracing::debug!("start to wait epoch {}", epoch.0);
1213 let ret = self
1214 .state_store
1215 .try_wait_epoch(
1216 HummockReadEpoch::Committed(epoch.0),
1217 TryWaitEpochOptions {
1218 table_id: self.table_id,
1219 },
1220 )
1221 .await;
1222 if self.wait_checkpoint_rx.is_closed() {
1223 tracing::debug!(epoch = epoch.0, "Drop stale wait checkpoint task.");
1227 break;
1228 }
1229 match ret {
1230 Ok(()) => {
1231 tracing::debug!(epoch = epoch.0, "wait epoch success");
1232
1233 task.run_with_on_commit_success(
1235 self.source_id,
1236 &self.source_name,
1237 |source_id: u64, offset| {
1238 if let Some(lsn_value) =
1239 extract_postgres_lsn_from_offset_str(offset)
1240 {
1241 self.metrics
1242 .pg_cdc_jni_commit_offset_lsn
1243 .with_guarded_label_values(&[&source_id.to_string()])
1244 .set(lsn_value as i64);
1245 }
1246 if let Some(lsn_value) =
1247 extract_sql_server_commit_lsn_from_offset_str(offset)
1248 {
1249 self.metrics
1250 .sqlserver_cdc_jni_commit_offset_lsn
1251 .with_guarded_label_values(&[&source_id.to_string()])
1252 .set(lsn_u128_to_i64(lsn_value));
1253 }
1254 },
1255 )
1256 .await;
1257 }
1258 Err(e) => {
1259 tracing::error!(
1260 error = %e.as_report(),
1261 "wait epoch {} failed", epoch.0
1262 );
1263 }
1264 }
1265 }
1266 None => {
1267 tracing::error!("wait epoch rx closed");
1268 break;
1269 }
1270 }
1271 }
1272 }
1273}
1274
1275#[cfg(test)]
1276mod tests {
1277 use maplit::{btreemap, convert_args, hashmap};
1278 use risingwave_common::catalog::{ColumnId, Field};
1279 use risingwave_common::id::SourceId;
1280 use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
1281 use risingwave_common::test_prelude::StreamChunkTestExt;
1282 use risingwave_common::util::epoch::{EpochExt, test_epoch};
1283 use risingwave_connector::source::datagen::DatagenSplit;
1284 use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
1285 use risingwave_pb::catalog::StreamSourceInfo;
1286 use risingwave_pb::plan_common::PbRowFormatType;
1287 use risingwave_storage::memory::MemoryStateStore;
1288 use tokio::sync::mpsc::unbounded_channel;
1289 use tracing_test::traced_test;
1290
1291 use super::*;
1292 use crate::executor::AddMutation;
1293 use crate::executor::source::{SourceStateTableHandler, default_source_internal_table};
1294 use crate::task::LocalBarrierManager;
1295
1296 const MOCK_SOURCE_NAME: &str = "mock_source";
1297
1298 #[tokio::test]
1299 async fn test_source_executor() {
1300 let source_id = 0.into();
1301 let schema = Schema {
1302 fields: vec![Field::with_name(DataType::Int32, "sequence_int")],
1303 };
1304 let row_id_index = None;
1305 let source_info = StreamSourceInfo {
1306 row_format: PbRowFormatType::Native as i32,
1307 ..Default::default()
1308 };
1309 let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1310 let column_ids = vec![0].into_iter().map(ColumnId::from).collect();
1311
1312 let properties = convert_args!(btreemap!(
1314 "connector" => "datagen",
1315 "datagen.rows.per.second" => "3",
1316 "fields.sequence_int.kind" => "sequence",
1317 "fields.sequence_int.start" => "11",
1318 "fields.sequence_int.end" => "11111",
1319 ));
1320 let source_desc_builder =
1321 create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1322 let split_state_store = SourceStateTableHandler::from_table_catalog(
1323 &default_source_internal_table(0x2333),
1324 MemoryStateStore::new(),
1325 )
1326 .await;
1327 let core = StreamSourceCore::<MemoryStateStore> {
1328 source_id,
1329 column_ids,
1330 source_desc_builder: Some(source_desc_builder),
1331 latest_split_info: HashMap::new(),
1332 split_state_store,
1333 updated_splits_in_epoch: HashMap::new(),
1334 source_name: MOCK_SOURCE_NAME.to_owned(),
1335 };
1336
1337 let system_params_manager = LocalSystemParamsManager::for_test();
1338
1339 let executor = SourceExecutor::new(
1340 ActorContext::for_test(0),
1341 core,
1342 Arc::new(StreamingMetrics::unused()),
1343 barrier_rx,
1344 system_params_manager.get_params(),
1345 None,
1346 false,
1347 LocalBarrierManager::for_test(),
1348 );
1349 let mut executor = executor.boxed().execute();
1350
1351 let init_barrier =
1352 Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
1353 splits: hashmap! {
1354 ActorId::default() => vec![
1355 SplitImpl::Datagen(DatagenSplit {
1356 split_index: 0,
1357 split_num: 1,
1358 start_offset: None,
1359 }),
1360 ],
1361 },
1362 ..Default::default()
1363 }));
1364 barrier_tx.send(init_barrier).unwrap();
1365
1366 executor.next().await.unwrap().unwrap();
1368
1369 let msg = executor.next().await.unwrap().unwrap();
1371
1372 assert_eq!(
1374 msg.into_chunk().unwrap(),
1375 StreamChunk::from_pretty(
1376 " i
1377 + 11
1378 + 12
1379 + 13"
1380 )
1381 );
1382 }
1383
1384 #[traced_test]
1385 #[tokio::test]
1386 async fn test_split_change_mutation() {
1387 let source_id = SourceId::new(0);
1388 let schema = Schema {
1389 fields: vec![Field::with_name(DataType::Int32, "v1")],
1390 };
1391 let row_id_index = None;
1392 let source_info = StreamSourceInfo {
1393 row_format: PbRowFormatType::Native as i32,
1394 ..Default::default()
1395 };
1396 let properties = convert_args!(btreemap!(
1397 "connector" => "datagen",
1398 "fields.v1.kind" => "sequence",
1399 "fields.v1.start" => "11",
1400 "fields.v1.end" => "11111",
1401 ));
1402
1403 let source_desc_builder =
1404 create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1405 let mem_state_store = MemoryStateStore::new();
1406
1407 let column_ids = vec![ColumnId::from(0)];
1408 let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1409 let split_state_store = SourceStateTableHandler::from_table_catalog(
1410 &default_source_internal_table(0x2333),
1411 mem_state_store.clone(),
1412 )
1413 .await;
1414
1415 let core = StreamSourceCore::<MemoryStateStore> {
1416 source_id,
1417 column_ids: column_ids.clone(),
1418 source_desc_builder: Some(source_desc_builder),
1419 latest_split_info: HashMap::new(),
1420 split_state_store,
1421 updated_splits_in_epoch: HashMap::new(),
1422 source_name: MOCK_SOURCE_NAME.to_owned(),
1423 };
1424
1425 let system_params_manager = LocalSystemParamsManager::for_test();
1426
1427 let executor = SourceExecutor::new(
1428 ActorContext::for_test(0),
1429 core,
1430 Arc::new(StreamingMetrics::unused()),
1431 barrier_rx,
1432 system_params_manager.get_params(),
1433 None,
1434 false,
1435 LocalBarrierManager::for_test(),
1436 );
1437 let mut handler = executor.boxed().execute();
1438
1439 let mut epoch = test_epoch(1);
1440 let init_barrier =
1441 Barrier::new_test_barrier(epoch).with_mutation(Mutation::Add(AddMutation {
1442 splits: hashmap! {
1443 ActorId::default() => vec![
1444 SplitImpl::Datagen(DatagenSplit {
1445 split_index: 0,
1446 split_num: 3,
1447 start_offset: None,
1448 }),
1449 ],
1450 },
1451 ..Default::default()
1452 }));
1453 barrier_tx.send(init_barrier).unwrap();
1454
1455 handler
1457 .next()
1458 .await
1459 .unwrap()
1460 .unwrap()
1461 .into_barrier()
1462 .unwrap();
1463
1464 let mut ready_chunks = handler.ready_chunks(10);
1465
1466 let _ = ready_chunks.next().await.unwrap();
1467
1468 let new_assignment = vec![
1469 SplitImpl::Datagen(DatagenSplit {
1470 split_index: 0,
1471 split_num: 3,
1472 start_offset: None,
1473 }),
1474 SplitImpl::Datagen(DatagenSplit {
1475 split_index: 1,
1476 split_num: 3,
1477 start_offset: None,
1478 }),
1479 SplitImpl::Datagen(DatagenSplit {
1480 split_index: 2,
1481 split_num: 3,
1482 start_offset: None,
1483 }),
1484 ];
1485
1486 epoch.inc_epoch();
1487 let change_split_mutation =
1488 Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1489 ActorId::default() => new_assignment.clone()
1490 }));
1491
1492 barrier_tx.send(change_split_mutation).unwrap();
1493
1494 let _ = ready_chunks.next().await.unwrap(); epoch.inc_epoch();
1497 let barrier = Barrier::new_test_barrier(epoch);
1498 barrier_tx.send(barrier).unwrap();
1499
1500 ready_chunks.next().await.unwrap(); let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1503 &default_source_internal_table(0x2333),
1504 mem_state_store.clone(),
1505 )
1506 .await;
1507
1508 source_state_handler
1510 .init_epoch(EpochPair::new_test_epoch(epoch))
1511 .await
1512 .unwrap();
1513 source_state_handler
1514 .get(&new_assignment[1].id())
1515 .await
1516 .unwrap()
1517 .unwrap();
1518
1519 tokio::time::sleep(Duration::from_millis(100)).await;
1520
1521 let _ = ready_chunks.next().await.unwrap();
1522
1523 epoch.inc_epoch();
1524 let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Pause);
1525 barrier_tx.send(barrier).unwrap();
1526
1527 epoch.inc_epoch();
1528 let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Resume);
1529 barrier_tx.send(barrier).unwrap();
1530
1531 ready_chunks.next().await.unwrap();
1533
1534 let prev_assignment = new_assignment;
1535 let new_assignment = vec![prev_assignment[2].clone()];
1536
1537 epoch.inc_epoch();
1538 let drop_split_mutation =
1539 Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1540 ActorId::default() => new_assignment.clone()
1541 }));
1542
1543 barrier_tx.send(drop_split_mutation).unwrap();
1544
1545 ready_chunks.next().await.unwrap(); epoch.inc_epoch();
1548 let barrier = Barrier::new_test_barrier(epoch);
1549 barrier_tx.send(barrier).unwrap();
1550
1551 ready_chunks.next().await.unwrap(); let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1554 &default_source_internal_table(0x2333),
1555 mem_state_store.clone(),
1556 )
1557 .await;
1558
1559 let new_epoch = EpochPair::new_test_epoch(epoch);
1560 source_state_handler.init_epoch(new_epoch).await.unwrap();
1561
1562 let committed_reader = source_state_handler
1563 .new_committed_reader(new_epoch)
1564 .await
1565 .unwrap();
1566 assert!(
1567 committed_reader
1568 .try_recover_from_state_store(&prev_assignment[0])
1569 .await
1570 .unwrap()
1571 .is_none()
1572 );
1573
1574 assert!(
1575 committed_reader
1576 .try_recover_from_state_store(&prev_assignment[1])
1577 .await
1578 .unwrap()
1579 .is_none()
1580 );
1581
1582 assert!(
1583 committed_reader
1584 .try_recover_from_state_store(&prev_assignment[2])
1585 .await
1586 .unwrap()
1587 .is_some()
1588 );
1589 }
1590}