1use std::collections::HashMap;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use either::Either;
20use itertools::Itertools;
21use prometheus::core::{AtomicU64, GenericCounter};
22use risingwave_common::array::ArrayRef;
23use risingwave_common::catalog::{ColumnId, TableId};
24use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedMetric};
25use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::util::epoch::{Epoch, EpochPair};
28use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
29use risingwave_connector::source::cdc::split::extract_postgres_lsn_from_offset_str;
30use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
31use risingwave_connector::source::reader::reader::SourceReader;
32use risingwave_connector::source::{
33 ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData,
34 StreamChunkWithState, WaitCheckpointTask, build_pulsar_ack_channel_id,
35};
36use risingwave_hummock_sdk::HummockReadEpoch;
37use risingwave_pb::common::ThrottleType;
38use risingwave_pb::id::SourceId;
39use risingwave_storage::store::TryWaitEpochOptions;
40use thiserror_ext::AsReport;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
42use tokio::sync::{mpsc, oneshot};
43use tokio::time::Instant;
44
45use super::executor_core::StreamSourceCore;
46use super::{barrier_to_message_stream, get_split_offset_col_idx, prune_additional_cols};
47use crate::common::rate_limit::limited_chunk_size;
48use crate::executor::UpdateMutation;
49use crate::executor::prelude::*;
50use crate::executor::source::reader_stream::StreamReaderBuilder;
51use crate::executor::stream_reader::StreamReaderWithPause;
52use crate::task::LocalBarrierManager;
53
54pub const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;
57
58pub struct SourceExecutor<S: StateStore> {
59 actor_ctx: ActorContextRef,
60
61 stream_source_core: StreamSourceCore<S>,
63
64 metrics: Arc<StreamingMetrics>,
66
67 barrier_receiver: Option<UnboundedReceiver<Barrier>>,
69
70 system_params: SystemParamsReaderRef,
72
73 rate_limit_rps: Option<u32>,
75
76 is_shared_non_cdc: bool,
77
78 barrier_manager: LocalBarrierManager,
80}
81
82impl<S: StateStore> SourceExecutor<S> {
83 #[expect(clippy::too_many_arguments)]
84 pub fn new(
85 actor_ctx: ActorContextRef,
86 stream_source_core: StreamSourceCore<S>,
87 metrics: Arc<StreamingMetrics>,
88 barrier_receiver: UnboundedReceiver<Barrier>,
89 system_params: SystemParamsReaderRef,
90 rate_limit_rps: Option<u32>,
91 is_shared_non_cdc: bool,
92 barrier_manager: LocalBarrierManager,
93 ) -> Self {
94 Self {
95 actor_ctx,
96 stream_source_core,
97 metrics,
98 barrier_receiver: Some(barrier_receiver),
99 system_params,
100 rate_limit_rps,
101 is_shared_non_cdc,
102 barrier_manager,
103 }
104 }
105
106 fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder {
107 StreamReaderBuilder {
108 source_desc,
109 rate_limit: self.rate_limit_rps,
110 source_id: self.stream_source_core.source_id,
111 source_name: self.stream_source_core.source_name.clone(),
112 is_auto_schema_change_enable: self.is_auto_schema_change_enable(),
113 actor_ctx: self.actor_ctx.clone(),
114 reader_stream: None,
115 }
116 }
117
118 async fn spawn_wait_checkpoint_worker(
119 core: &StreamSourceCore<S>,
120 source_reader: SourceReader,
121 metrics: Arc<StreamingMetrics>,
122 ) -> StreamExecutorResult<Option<WaitCheckpointTaskBuilder>> {
123 let Some(initial_task) = source_reader.create_wait_checkpoint_task().await? else {
124 return Ok(None);
125 };
126 let (wait_checkpoint_tx, wait_checkpoint_rx) = mpsc::unbounded_channel();
127 let wait_checkpoint_worker = WaitCheckpointWorker {
128 wait_checkpoint_rx,
129 state_store: core.split_state_store.state_table().state_store().clone(),
130 table_id: core.split_state_store.state_table().table_id(),
131 metrics,
132 };
133 tokio::spawn(wait_checkpoint_worker.run());
134 Ok(Some(WaitCheckpointTaskBuilder {
135 wait_checkpoint_tx,
136 source_reader,
137 building_task: initial_task,
138 }))
139 }
140
141 pub fn prepare_source_stream_build(
143 &self,
144 source_desc: &SourceDesc,
145 ) -> (Vec<ColumnId>, SourceContext) {
146 let column_ids = source_desc
147 .columns
148 .iter()
149 .map(|column_desc| column_desc.column_id)
150 .collect_vec();
151
152 let (schema_change_tx, mut schema_change_rx) =
153 mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
154 let schema_change_tx = if self.is_auto_schema_change_enable() {
155 let meta_client = self.actor_ctx.meta_client.clone();
156 let _join_handle = tokio::task::spawn(async move {
158 while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
159 let table_ids = schema_change.table_ids();
160 tracing::info!(
161 target: "auto_schema_change",
162 "recv a schema change event for tables: {:?}", table_ids);
163 if let Some(ref meta_client) = meta_client {
165 match meta_client
166 .auto_schema_change(schema_change.to_protobuf())
167 .await
168 {
169 Ok(_) => {
170 tracing::info!(
171 target: "auto_schema_change",
172 "schema change success for tables: {:?}", table_ids);
173 finish_tx.send(()).unwrap();
174 }
175 Err(e) => {
176 tracing::error!(
177 target: "auto_schema_change",
178 error = ?e.as_report(), "schema change error");
179 finish_tx.send(()).unwrap();
180 }
181 }
182 }
183 }
184 });
185 Some(schema_change_tx)
186 } else {
187 info!("auto schema change is disabled in config");
188 None
189 };
190 let source_ctx = SourceContext::new(
191 self.actor_ctx.id,
192 self.stream_source_core.source_id,
193 self.actor_ctx.fragment_id,
194 self.stream_source_core.source_name.clone(),
195 source_desc.metrics.clone(),
196 SourceCtrlOpts {
197 chunk_size: limited_chunk_size(self.rate_limit_rps),
198 split_txn: self.rate_limit_rps.is_some(), },
200 source_desc.source.config.clone(),
201 schema_change_tx,
202 );
203
204 (column_ids, source_ctx)
205 }
206
207 fn is_auto_schema_change_enable(&self) -> bool {
208 self.actor_ctx.config.developer.enable_auto_schema_change
209 }
210
211 #[inline]
213 fn get_metric_labels(&self) -> [String; 4] {
214 [
215 self.stream_source_core.source_id.to_string(),
216 self.stream_source_core.source_name.clone(),
217 self.actor_ctx.id.to_string(),
218 self.actor_ctx.fragment_id.to_string(),
219 ]
220 }
221
222 async fn apply_split_change_after_yield_barrier<const BIASED: bool>(
231 &mut self,
232 barrier_epoch: EpochPair,
233 source_desc: &SourceDesc,
234 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
235 apply_mutation: ApplyMutationAfterBarrier<'_>,
236 ) -> StreamExecutorResult<()> {
237 {
238 let mut should_rebuild_stream = false;
239 match apply_mutation {
240 ApplyMutationAfterBarrier::SplitChange {
241 target_splits,
242 should_trim_state,
243 split_change_count,
244 } => {
245 split_change_count.inc();
246 if self
247 .update_state_if_changed(barrier_epoch, target_splits, should_trim_state)
248 .await?
249 {
250 should_rebuild_stream = true;
251 }
252 }
253 ApplyMutationAfterBarrier::ConnectorPropsChange => {
254 should_rebuild_stream = true;
255 }
256 }
257
258 if should_rebuild_stream {
259 self.rebuild_stream_reader(source_desc, stream)?;
260 }
261 }
262
263 Ok(())
264 }
265
266 async fn update_state_if_changed(
268 &mut self,
269 barrier_epoch: EpochPair,
270 target_splits: Vec<SplitImpl>,
271 should_trim_state: bool,
272 ) -> StreamExecutorResult<bool> {
273 let core = &mut self.stream_source_core;
274
275 let target_splits: HashMap<_, _> = target_splits
276 .into_iter()
277 .map(|split| (split.id(), split))
278 .collect();
279
280 let mut target_state: HashMap<SplitId, SplitImpl> =
281 HashMap::with_capacity(target_splits.len());
282
283 let mut split_changed = false;
284
285 let committed_reader = core
286 .split_state_store
287 .new_committed_reader(barrier_epoch)
288 .await?;
289
290 for (split_id, split) in target_splits {
292 if let Some(s) = core.latest_split_info.get(&split_id) {
293 target_state.insert(split_id, s.clone());
296 } else {
297 split_changed = true;
298 let initial_state = if let Some(recover_state) = committed_reader
301 .try_recover_from_state_store(&split)
302 .await?
303 {
304 recover_state
305 } else {
306 split
307 };
308
309 core.updated_splits_in_epoch
310 .entry(split_id.clone())
311 .or_insert_with(|| initial_state.clone());
312
313 target_state.insert(split_id, initial_state);
314 }
315 }
316
317 for existing_split_id in core.latest_split_info.keys() {
319 if !target_state.contains_key(existing_split_id) {
320 tracing::info!("split dropping detected: {}", existing_split_id);
321 split_changed = true;
322 }
323 }
324
325 if split_changed {
326 tracing::info!(
327 actor_id = %self.actor_ctx.id,
328 state = ?target_state,
329 "apply split change"
330 );
331
332 core.updated_splits_in_epoch
333 .retain(|split_id, _| target_state.contains_key(split_id));
334
335 let dropped_splits = core
336 .latest_split_info
337 .extract_if(|split_id, _| !target_state.contains_key(split_id))
338 .map(|(_, split)| split)
339 .collect_vec();
340
341 if should_trim_state && !dropped_splits.is_empty() {
342 core.split_state_store.trim_state(&dropped_splits).await?;
344 }
345
346 core.latest_split_info = target_state;
347 }
348
349 Ok(split_changed)
350 }
351
352 fn rebuild_stream_reader_from_error<const BIASED: bool>(
354 &mut self,
355 source_desc: &SourceDesc,
356 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
357 e: StreamExecutorError,
358 ) -> StreamExecutorResult<()> {
359 let core = &mut self.stream_source_core;
360 tracing::error!(
361 error = ?e.as_report(),
362 actor_id = %self.actor_ctx.id,
363 source_id = %core.source_id,
364 "stream source reader error",
365 );
366 GLOBAL_ERROR_METRICS.user_source_error.report([
367 e.variant_name().to_owned(),
368 core.source_id.to_string(),
369 core.source_name.clone(),
370 self.actor_ctx.fragment_id.to_string(),
371 ]);
372
373 self.rebuild_stream_reader(source_desc, stream)
374 }
375
376 fn rebuild_stream_reader<const BIASED: bool>(
377 &mut self,
378 source_desc: &SourceDesc,
379 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
380 ) -> StreamExecutorResult<()> {
381 let core = &mut self.stream_source_core;
382 let target_state: Vec<SplitImpl> = core.latest_split_info.values().cloned().collect();
383
384 tracing::info!(
385 "actor {:?} apply source split change to {:?}",
386 self.actor_ctx.id,
387 target_state
388 );
389
390 let reader_stream_builder = self.stream_reader_builder(source_desc.clone());
392 let reader_stream = reader_stream_builder.into_retry_stream(Some(target_state), false);
393
394 stream.replace_data_stream(reader_stream);
395
396 Ok(())
397 }
398
399 async fn persist_state_and_clear_cache(
400 &mut self,
401 epoch: EpochPair,
402 ) -> StreamExecutorResult<HashMap<SplitId, SplitImpl>> {
403 let core = &mut self.stream_source_core;
404
405 let cache = core
406 .updated_splits_in_epoch
407 .values()
408 .map(|split_impl| split_impl.to_owned())
409 .collect_vec();
410
411 if !cache.is_empty() {
412 tracing::debug!(state = ?cache, "take snapshot");
413
414 let source_id = core.source_id.to_string();
416 for split_impl in &cache {
417 match split_impl {
419 SplitImpl::PostgresCdc(pg_split) => {
420 if let Some(lsn_value) = pg_split.pg_lsn() {
421 self.metrics
422 .pg_cdc_state_table_lsn
423 .with_guarded_label_values(&[&source_id])
424 .set(lsn_value as i64);
425 }
426 }
427 SplitImpl::MysqlCdc(mysql_split) => {
428 if let Some((file_seq, position)) = mysql_split.mysql_binlog_offset() {
429 self.metrics
430 .mysql_cdc_state_binlog_file_seq
431 .with_guarded_label_values(&[&source_id])
432 .set(file_seq as i64);
433
434 self.metrics
435 .mysql_cdc_state_binlog_position
436 .with_guarded_label_values(&[&source_id])
437 .set(position as i64);
438 }
439 }
440 _ => {}
441 }
442 }
443
444 core.split_state_store.set_states(cache).await?;
445 }
446
447 core.split_state_store.commit(epoch).await?;
449
450 let updated_splits = core.updated_splits_in_epoch.clone();
451
452 core.updated_splits_in_epoch.clear();
453
454 Ok(updated_splits)
455 }
456
457 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
459 let core = &mut self.stream_source_core;
460 core.split_state_store.try_flush().await?;
461
462 Ok(())
463 }
464
465 fn maybe_report_cdc_source_offset(
467 &self,
468 updated_splits: &HashMap<SplitId, SplitImpl>,
469 epoch: EpochPair,
470 source_id: SourceId,
471 must_report_cdc_offset_once: &mut bool,
472 must_wait_cdc_offset_before_report: bool,
473 ) {
474 if *must_report_cdc_offset_once
476 && (!must_wait_cdc_offset_before_report
477 || updated_splits
478 .values()
479 .any(|split| split.is_cdc_split() && !split.get_cdc_split_offset().is_empty()))
480 {
481 self.barrier_manager.report_cdc_source_offset_updated(
483 epoch,
484 self.actor_ctx.id,
485 source_id,
486 );
487 tracing::info!(
488 actor_id = %self.actor_ctx.id,
489 source_id = %source_id,
490 epoch = ?epoch,
491 "Reported CDC source offset updated to meta (first time only)"
492 );
493 *must_report_cdc_offset_once = false;
495 }
496 }
497
498 #[try_stream(ok = Message, error = StreamExecutorError)]
503 async fn execute_inner(mut self) {
504 let mut barrier_receiver = self.barrier_receiver.take().unwrap();
505 let first_barrier = barrier_receiver
506 .recv()
507 .instrument_await("source_recv_first_barrier")
508 .await
509 .ok_or_else(|| {
510 anyhow!(
511 "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
512 self.actor_ctx.id,
513 self.stream_source_core.source_id
514 )
515 })?;
516 let first_epoch = first_barrier.epoch;
517 let (mut boot_state, mut must_report_cdc_offset_once, must_wait_cdc_offset_before_report) =
520 if let Some(splits) = first_barrier.initial_split_assignment(self.actor_ctx.id) {
521 tracing::debug!(?splits, "boot with splits");
523 let must_report_cdc_offset_once = splits.iter().any(|split| split.is_cdc_split());
525 let must_wait_cdc_offset_before_report = must_report_cdc_offset_once
527 && splits.iter().any(|split| {
528 matches!(split, SplitImpl::MysqlCdc(_) | SplitImpl::SqlServerCdc(_))
529 });
530 (
531 splits.to_vec(),
532 must_report_cdc_offset_once,
533 must_wait_cdc_offset_before_report,
534 )
535 } else {
536 (Vec::default(), true, true)
537 };
538 let is_pause_on_startup = first_barrier.is_pause_on_startup();
539 let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id);
540
541 yield Message::Barrier(first_barrier);
542
543 let mut core = self.stream_source_core;
544 let source_id = core.source_id;
545
546 let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap();
548 let mut source_desc = source_desc_builder
549 .build()
550 .map_err(StreamExecutorError::connector_error)?;
551
552 let mut wait_checkpoint_task_builder = Self::spawn_wait_checkpoint_worker(
553 &core,
554 source_desc.source.clone(),
555 self.metrics.clone(),
556 )
557 .await?;
558
559 let (Some(split_idx), Some(offset_idx), pulsar_message_id_idx) =
560 get_split_offset_col_idx(&source_desc.columns)
561 else {
562 unreachable!("Partition and offset columns must be set.");
563 };
564
565 core.split_state_store.init_epoch(first_epoch).await?;
566 {
567 let committed_reader = core
568 .split_state_store
569 .new_committed_reader(first_epoch)
570 .await?;
571 for ele in &mut boot_state {
572 if let Some(recover_state) =
573 committed_reader.try_recover_from_state_store(ele).await?
574 {
575 *ele = recover_state;
576 is_uninitialized = false;
578 } else {
579 core.updated_splits_in_epoch.insert(ele.id(), ele.clone());
583 }
584 }
585 }
586
587 core.init_split_state(boot_state.clone());
589
590 self.stream_source_core = core;
592
593 let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
594 tracing::debug!(state = ?recover_state, "start with state");
595
596 let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
597 let mut reader_stream_builder = self.stream_reader_builder(source_desc.clone());
598 let mut latest_splits = None;
599 if is_uninitialized {
601 let create_split_reader_result = reader_stream_builder
602 .fetch_latest_splits(recover_state.clone(), self.is_shared_non_cdc)
603 .await?;
604 latest_splits = create_split_reader_result.latest_splits;
605 }
606
607 if let Some(latest_splits) = latest_splits {
608 self.stream_source_core
611 .updated_splits_in_epoch
612 .extend(latest_splits.into_iter().map(|s| (s.id(), s)));
613 }
614 let mut stream = StreamReaderWithPause::<true, StreamChunkWithState>::new(
617 barrier_stream,
618 reader_stream_builder
619 .into_retry_stream(recover_state, is_uninitialized && self.is_shared_non_cdc),
620 );
621 let mut command_paused = false;
622
623 if is_pause_on_startup {
625 tracing::info!("source paused on startup");
626 stream.pause_stream();
627 command_paused = true;
628 }
629
630 let mut max_wait_barrier_time_ms =
633 self.system_params.load().barrier_interval_ms() as u128 * WAIT_BARRIER_MULTIPLE_TIMES;
634 let mut last_barrier_time = Instant::now();
635 let mut self_paused = false;
636
637 let source_output_row_count = self
638 .metrics
639 .source_output_row_count
640 .with_guarded_label_values(&self.get_metric_labels());
641
642 let source_split_change_count = self
643 .metrics
644 .source_split_change_count
645 .with_guarded_label_values(&self.get_metric_labels());
646
647 while let Some(msg) = stream.next().await {
648 let Ok(msg) = msg else {
649 tokio::time::sleep(Duration::from_millis(1000)).await;
650 self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err())?;
651 continue;
652 };
653
654 match msg {
655 Either::Left(Message::Barrier(barrier)) => {
657 last_barrier_time = Instant::now();
658
659 if self_paused {
660 self_paused = false;
661 if !command_paused {
663 stream.resume_stream();
664 }
665 }
666
667 let epoch = barrier.epoch;
668 let mut split_change = None;
669
670 if let Some(mutation) = barrier.mutation.as_deref() {
671 match mutation {
672 Mutation::Pause => {
673 command_paused = true;
674 stream.pause_stream()
675 }
676 Mutation::Resume => {
677 command_paused = false;
678 stream.resume_stream()
679 }
680 Mutation::SourceChangeSplit(actor_splits) => {
681 tracing::info!(
682 actor_id = %self.actor_ctx.id,
683 actor_splits = ?actor_splits,
684 "source change split received"
685 );
686
687 split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
688 |target_splits| {
689 (
690 &source_desc,
691 &mut stream,
692 ApplyMutationAfterBarrier::SplitChange {
693 target_splits,
694 should_trim_state: true,
695 split_change_count: &source_split_change_count,
696 },
697 )
698 },
699 );
700 }
701
702 Mutation::ConnectorPropsChange(maybe_mutation) => {
703 if let Some(new_props) = maybe_mutation.get(&source_id.as_raw_id())
704 {
705 tracing::info!(
707 actor_id = %self.actor_ctx.id,
708 source_id = %source_id,
709 "updating source connector properties",
710 );
711 source_desc.update_reader(new_props.clone())?;
712 split_change = Some((
714 &source_desc,
715 &mut stream,
716 ApplyMutationAfterBarrier::ConnectorPropsChange,
717 ));
718 }
719 }
720
721 Mutation::Update(UpdateMutation { actor_splits, .. }) => {
722 split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
723 |target_splits| {
724 (
725 &source_desc,
726 &mut stream,
727 ApplyMutationAfterBarrier::SplitChange {
728 target_splits,
729 should_trim_state: false,
730 split_change_count: &source_split_change_count,
731 },
732 )
733 },
734 );
735 }
736 Mutation::Throttle(fragment_to_apply) => {
737 if let Some(entry) =
738 fragment_to_apply.get(&self.actor_ctx.fragment_id)
739 && entry.throttle_type() == ThrottleType::Source
740 && entry.rate_limit != self.rate_limit_rps
741 {
742 tracing::info!(
743 "updating rate limit from {:?} to {:?}",
744 self.rate_limit_rps,
745 entry.rate_limit
746 );
747 self.rate_limit_rps = entry.rate_limit;
748 self.rebuild_stream_reader(&source_desc, &mut stream)?;
750 }
751 }
752 Mutation::ResetSource { source_id } => {
753 if *source_id == self.stream_source_core.source_id {
757 tracing::info!(
758 actor_id = %self.actor_ctx.id,
759 source_id = source_id.as_raw_id(),
760 "Resetting CDC source: clearing offset (set to None)"
761 );
762
763 let splits_with_cleared_offset: Vec<SplitImpl> = self.stream_source_core
765 .latest_split_info
766 .values()
767 .map(|split| {
768 let mut new_split = split.clone();
770 match &mut new_split {
771 SplitImpl::MysqlCdc(debezium_split) => {
772 if let Some(mysql_split) = debezium_split.mysql_split.as_mut() {
773 tracing::info!(
774 split_id = ?mysql_split.inner.split_id,
775 old_offset = ?mysql_split.inner.start_offset,
776 "Clearing MySQL CDC offset"
777 );
778 mysql_split.inner.start_offset = None;
779 }
780 }
781 SplitImpl::PostgresCdc(debezium_split) => {
782 if let Some(pg_split) = debezium_split.postgres_split.as_mut() {
783 tracing::info!(
784 split_id = ?pg_split.inner.split_id,
785 old_offset = ?pg_split.inner.start_offset,
786 "Clearing PostgreSQL CDC offset"
787 );
788 pg_split.inner.start_offset = None;
789 }
790 }
791 SplitImpl::MongodbCdc(debezium_split) => {
792 if let Some(mongo_split) = debezium_split.mongodb_split.as_mut() {
793 tracing::info!(
794 split_id = ?mongo_split.inner.split_id,
795 old_offset = ?mongo_split.inner.start_offset,
796 "Clearing MongoDB CDC offset"
797 );
798 mongo_split.inner.start_offset = None;
799 }
800 }
801 SplitImpl::CitusCdc(debezium_split) => {
802 if let Some(citus_split) = debezium_split.citus_split.as_mut() {
803 tracing::info!(
804 split_id = ?citus_split.inner.split_id,
805 old_offset = ?citus_split.inner.start_offset,
806 "Clearing Citus CDC offset"
807 );
808 citus_split.inner.start_offset = None;
809 }
810 }
811 SplitImpl::SqlServerCdc(debezium_split) => {
812 if let Some(sqlserver_split) = debezium_split.sql_server_split.as_mut() {
813 tracing::info!(
814 split_id = ?sqlserver_split.inner.split_id,
815 old_offset = ?sqlserver_split.inner.start_offset,
816 "Clearing SQL Server CDC offset"
817 );
818 sqlserver_split.inner.start_offset = None;
819 }
820 }
821 _ => {
822 tracing::warn!(
823 "RESET SOURCE called on non-CDC split type"
824 );
825 }
826 }
827 new_split
828 })
829 .collect();
830
831 if !splits_with_cleared_offset.is_empty() {
832 tracing::info!(
833 actor_id = %self.actor_ctx.id,
834 split_count = splits_with_cleared_offset.len(),
835 "Updating state table with cleared offsets"
836 );
837
838 self.stream_source_core
840 .split_state_store
841 .set_states(splits_with_cleared_offset.clone())
842 .await?;
843
844 for split in splits_with_cleared_offset {
846 self.stream_source_core
847 .latest_split_info
848 .insert(split.id(), split.clone());
849 self.stream_source_core
850 .updated_splits_in_epoch
851 .insert(split.id(), split);
852 }
853
854 tracing::info!(
855 actor_id = %self.actor_ctx.id,
856 source_id = source_id.as_raw_id(),
857 "RESET SOURCE completed: offset cleared (set to None). \
858 Trigger recovery/restart to fetch latest offset from upstream."
859 );
860 } else {
861 tracing::warn!(
862 actor_id = %self.actor_ctx.id,
863 "No splits found to reset - source may not be initialized yet"
864 );
865 }
866 } else {
867 tracing::debug!(
868 actor_id = %self.actor_ctx.id,
869 target_source_id = source_id.as_raw_id(),
870 current_source_id = self.stream_source_core.source_id.as_raw_id(),
871 "ResetSource mutation for different source, ignoring"
872 );
873 }
874 }
875 _ => {}
876 }
877 }
878
879 let updated_splits = self.persist_state_and_clear_cache(epoch).await?;
880
881 self.maybe_report_cdc_source_offset(
882 &updated_splits,
883 epoch,
884 source_id,
885 &mut must_report_cdc_offset_once,
886 must_wait_cdc_offset_before_report,
887 );
888
889 if barrier.kind.is_checkpoint()
891 && let Some(task_builder) = &mut wait_checkpoint_task_builder
892 {
893 task_builder.update_task_on_checkpoint(updated_splits);
894
895 tracing::debug!("epoch to wait {:?}", epoch);
896 task_builder.send(Epoch(epoch.prev)).await?
897 }
898
899 let barrier_epoch = barrier.epoch;
900 yield Message::Barrier(barrier);
901
902 if let Some((source_desc, stream, to_apply_mutation)) = split_change {
903 self.apply_split_change_after_yield_barrier(
904 barrier_epoch,
905 source_desc,
906 stream,
907 to_apply_mutation,
908 )
909 .await?;
910 }
911 }
912 Either::Left(_) => {
913 unreachable!();
916 }
917
918 Either::Right((chunk, latest_state)) => {
919 if let Some(task_builder) = &mut wait_checkpoint_task_builder {
920 if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
921 let pulsar_message_id_col = chunk.column_at(pulsar_message_id_idx);
922 task_builder.update_task_on_chunk(
923 source_id,
924 &latest_state,
925 pulsar_message_id_col.clone(),
926 );
927 } else {
928 let offset_col = chunk.column_at(offset_idx);
929 task_builder.update_task_on_chunk(
930 source_id,
931 &latest_state,
932 offset_col.clone(),
933 );
934 }
935 }
936 if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
937 self_paused = true;
942 tracing::warn!(
943 "source paused, wait barrier for {:?}",
944 last_barrier_time.elapsed()
945 );
946 stream.pause_stream();
947
948 max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
953 as u128
954 * WAIT_BARRIER_MULTIPLE_TIMES;
955 }
956
957 latest_state.iter().for_each(|(split_id, new_split_impl)| {
958 if let Some(split_impl) =
959 self.stream_source_core.latest_split_info.get_mut(split_id)
960 {
961 *split_impl = new_split_impl.clone();
962 }
963 });
964
965 self.stream_source_core
966 .updated_splits_in_epoch
967 .extend(latest_state);
968
969 let card = chunk.cardinality();
970 if card == 0 {
971 continue;
972 }
973 source_output_row_count.inc_by(card as u64);
974 let to_remove_col_indices =
975 if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
976 vec![split_idx, offset_idx, pulsar_message_id_idx]
977 } else {
978 vec![split_idx, offset_idx]
979 };
980 let chunk =
981 prune_additional_cols(&chunk, &to_remove_col_indices, &source_desc.columns);
982 yield Message::Chunk(chunk);
983 self.try_flush_data().await?;
984 }
985 }
986 }
987
988 tracing::error!(
990 actor_id = %self.actor_ctx.id,
991 "source executor exited unexpectedly"
992 )
993 }
994}
995
996#[derive(Debug, Clone)]
997enum ApplyMutationAfterBarrier<'a> {
998 SplitChange {
999 target_splits: Vec<SplitImpl>,
1000 should_trim_state: bool,
1001 split_change_count: &'a LabelGuardedMetric<GenericCounter<AtomicU64>>,
1002 },
1003 ConnectorPropsChange,
1004}
1005
1006impl<S: StateStore> Execute for SourceExecutor<S> {
1007 fn execute(self: Box<Self>) -> BoxedMessageStream {
1008 self.execute_inner().boxed()
1009 }
1010}
1011
1012impl<S: StateStore> Debug for SourceExecutor<S> {
1013 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1014 f.debug_struct("SourceExecutor")
1015 .field("source_id", &self.stream_source_core.source_id)
1016 .field("column_ids", &self.stream_source_core.column_ids)
1017 .finish()
1018 }
1019}
1020
1021struct WaitCheckpointTaskBuilder {
1022 wait_checkpoint_tx: UnboundedSender<(Epoch, WaitCheckpointTask)>,
1023 source_reader: SourceReader,
1024 building_task: WaitCheckpointTask,
1025}
1026
1027impl WaitCheckpointTaskBuilder {
1028 fn update_task_on_chunk(
1029 &mut self,
1030 source_id: SourceId,
1031 latest_state: &HashMap<SplitId, SplitImpl>,
1032 offset_col: ArrayRef,
1033 ) {
1034 match &mut self.building_task {
1035 WaitCheckpointTask::AckPubsubMessage(_, arrays) => {
1036 arrays.push(offset_col);
1037 }
1038 WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => {
1039 arrays.push(offset_col);
1040 }
1041 WaitCheckpointTask::AckPulsarMessage(arrays) => {
1042 let split_id = latest_state.keys().next().unwrap();
1044 let pulsar_ack_channel_id = build_pulsar_ack_channel_id(source_id, split_id);
1045 arrays.push((pulsar_ack_channel_id, offset_col));
1046 }
1047 WaitCheckpointTask::CommitCdcOffset(_) => {}
1048 }
1049 }
1050
1051 fn update_task_on_checkpoint(&mut self, updated_splits: HashMap<SplitId, SplitImpl>) {
1052 #[expect(clippy::single_match)]
1053 match &mut self.building_task {
1054 WaitCheckpointTask::CommitCdcOffset(offsets) => {
1055 if !updated_splits.is_empty() {
1056 assert_eq!(1, updated_splits.len());
1058 for (split_id, split_impl) in updated_splits {
1059 if split_impl.is_cdc_split() {
1060 *offsets = Some((split_id, split_impl.get_cdc_split_offset()));
1061 } else {
1062 unreachable!()
1063 }
1064 }
1065 }
1066 }
1067 _ => {}
1068 }
1069 }
1070
1071 async fn send(&mut self, epoch: Epoch) -> Result<(), anyhow::Error> {
1073 let new_task = self
1074 .source_reader
1075 .create_wait_checkpoint_task()
1076 .await?
1077 .expect("wait checkpoint task should be created");
1078 self.wait_checkpoint_tx
1079 .send((epoch, std::mem::replace(&mut self.building_task, new_task)))
1080 .expect("wait_checkpoint_tx send should succeed");
1081 Ok(())
1082 }
1083}
1084
1085struct WaitCheckpointWorker<S: StateStore> {
1111 wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>,
1112 state_store: S,
1113 table_id: TableId,
1114 metrics: Arc<StreamingMetrics>,
1115}
1116
1117impl<S: StateStore> WaitCheckpointWorker<S> {
1118 pub async fn run(mut self) {
1119 tracing::debug!("wait epoch worker start success");
1120 loop {
1121 match self.wait_checkpoint_rx.recv().await {
1123 Some((epoch, task)) => {
1124 tracing::debug!("start to wait epoch {}", epoch.0);
1125 let ret = self
1126 .state_store
1127 .try_wait_epoch(
1128 HummockReadEpoch::Committed(epoch.0),
1129 TryWaitEpochOptions {
1130 table_id: self.table_id,
1131 },
1132 )
1133 .await;
1134
1135 match ret {
1136 Ok(()) => {
1137 tracing::debug!(epoch = epoch.0, "wait epoch success");
1138
1139 task.run_with_on_commit_success(|source_id: u64, offset| {
1141 if let Some(lsn_value) =
1142 extract_postgres_lsn_from_offset_str(offset)
1143 {
1144 self.metrics
1145 .pg_cdc_jni_commit_offset_lsn
1146 .with_guarded_label_values(&[&source_id.to_string()])
1147 .set(lsn_value as i64);
1148 }
1149 })
1150 .await;
1151 }
1152 Err(e) => {
1153 tracing::error!(
1154 error = %e.as_report(),
1155 "wait epoch {} failed", epoch.0
1156 );
1157 }
1158 }
1159 }
1160 None => {
1161 tracing::error!("wait epoch rx closed");
1162 break;
1163 }
1164 }
1165 }
1166 }
1167}
1168
1169#[cfg(test)]
1170mod tests {
1171 use maplit::{btreemap, convert_args, hashmap};
1172 use risingwave_common::catalog::{ColumnId, Field};
1173 use risingwave_common::id::SourceId;
1174 use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
1175 use risingwave_common::test_prelude::StreamChunkTestExt;
1176 use risingwave_common::util::epoch::{EpochExt, test_epoch};
1177 use risingwave_connector::source::datagen::DatagenSplit;
1178 use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
1179 use risingwave_pb::catalog::StreamSourceInfo;
1180 use risingwave_pb::plan_common::PbRowFormatType;
1181 use risingwave_storage::memory::MemoryStateStore;
1182 use tokio::sync::mpsc::unbounded_channel;
1183 use tracing_test::traced_test;
1184
1185 use super::*;
1186 use crate::executor::AddMutation;
1187 use crate::executor::source::{SourceStateTableHandler, default_source_internal_table};
1188 use crate::task::LocalBarrierManager;
1189
1190 const MOCK_SOURCE_NAME: &str = "mock_source";
1191
1192 #[tokio::test]
1193 async fn test_source_executor() {
1194 let source_id = 0.into();
1195 let schema = Schema {
1196 fields: vec![Field::with_name(DataType::Int32, "sequence_int")],
1197 };
1198 let row_id_index = None;
1199 let source_info = StreamSourceInfo {
1200 row_format: PbRowFormatType::Native as i32,
1201 ..Default::default()
1202 };
1203 let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1204 let column_ids = vec![0].into_iter().map(ColumnId::from).collect();
1205
1206 let properties = convert_args!(btreemap!(
1208 "connector" => "datagen",
1209 "datagen.rows.per.second" => "3",
1210 "fields.sequence_int.kind" => "sequence",
1211 "fields.sequence_int.start" => "11",
1212 "fields.sequence_int.end" => "11111",
1213 ));
1214 let source_desc_builder =
1215 create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1216 let split_state_store = SourceStateTableHandler::from_table_catalog(
1217 &default_source_internal_table(0x2333),
1218 MemoryStateStore::new(),
1219 )
1220 .await;
1221 let core = StreamSourceCore::<MemoryStateStore> {
1222 source_id,
1223 column_ids,
1224 source_desc_builder: Some(source_desc_builder),
1225 latest_split_info: HashMap::new(),
1226 split_state_store,
1227 updated_splits_in_epoch: HashMap::new(),
1228 source_name: MOCK_SOURCE_NAME.to_owned(),
1229 };
1230
1231 let system_params_manager = LocalSystemParamsManager::for_test();
1232
1233 let executor = SourceExecutor::new(
1234 ActorContext::for_test(0),
1235 core,
1236 Arc::new(StreamingMetrics::unused()),
1237 barrier_rx,
1238 system_params_manager.get_params(),
1239 None,
1240 false,
1241 LocalBarrierManager::for_test(),
1242 );
1243 let mut executor = executor.boxed().execute();
1244
1245 let init_barrier =
1246 Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
1247 splits: hashmap! {
1248 ActorId::default() => vec![
1249 SplitImpl::Datagen(DatagenSplit {
1250 split_index: 0,
1251 split_num: 1,
1252 start_offset: None,
1253 }),
1254 ],
1255 },
1256 ..Default::default()
1257 }));
1258 barrier_tx.send(init_barrier).unwrap();
1259
1260 executor.next().await.unwrap().unwrap();
1262
1263 let msg = executor.next().await.unwrap().unwrap();
1265
1266 assert_eq!(
1268 msg.into_chunk().unwrap(),
1269 StreamChunk::from_pretty(
1270 " i
1271 + 11
1272 + 12
1273 + 13"
1274 )
1275 );
1276 }
1277
1278 #[traced_test]
1279 #[tokio::test]
1280 async fn test_split_change_mutation() {
1281 let source_id = SourceId::new(0);
1282 let schema = Schema {
1283 fields: vec![Field::with_name(DataType::Int32, "v1")],
1284 };
1285 let row_id_index = None;
1286 let source_info = StreamSourceInfo {
1287 row_format: PbRowFormatType::Native as i32,
1288 ..Default::default()
1289 };
1290 let properties = convert_args!(btreemap!(
1291 "connector" => "datagen",
1292 "fields.v1.kind" => "sequence",
1293 "fields.v1.start" => "11",
1294 "fields.v1.end" => "11111",
1295 ));
1296
1297 let source_desc_builder =
1298 create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1299 let mem_state_store = MemoryStateStore::new();
1300
1301 let column_ids = vec![ColumnId::from(0)];
1302 let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1303 let split_state_store = SourceStateTableHandler::from_table_catalog(
1304 &default_source_internal_table(0x2333),
1305 mem_state_store.clone(),
1306 )
1307 .await;
1308
1309 let core = StreamSourceCore::<MemoryStateStore> {
1310 source_id,
1311 column_ids: column_ids.clone(),
1312 source_desc_builder: Some(source_desc_builder),
1313 latest_split_info: HashMap::new(),
1314 split_state_store,
1315 updated_splits_in_epoch: HashMap::new(),
1316 source_name: MOCK_SOURCE_NAME.to_owned(),
1317 };
1318
1319 let system_params_manager = LocalSystemParamsManager::for_test();
1320
1321 let executor = SourceExecutor::new(
1322 ActorContext::for_test(0),
1323 core,
1324 Arc::new(StreamingMetrics::unused()),
1325 barrier_rx,
1326 system_params_manager.get_params(),
1327 None,
1328 false,
1329 LocalBarrierManager::for_test(),
1330 );
1331 let mut handler = executor.boxed().execute();
1332
1333 let mut epoch = test_epoch(1);
1334 let init_barrier =
1335 Barrier::new_test_barrier(epoch).with_mutation(Mutation::Add(AddMutation {
1336 splits: hashmap! {
1337 ActorId::default() => vec![
1338 SplitImpl::Datagen(DatagenSplit {
1339 split_index: 0,
1340 split_num: 3,
1341 start_offset: None,
1342 }),
1343 ],
1344 },
1345 ..Default::default()
1346 }));
1347 barrier_tx.send(init_barrier).unwrap();
1348
1349 handler
1351 .next()
1352 .await
1353 .unwrap()
1354 .unwrap()
1355 .into_barrier()
1356 .unwrap();
1357
1358 let mut ready_chunks = handler.ready_chunks(10);
1359
1360 let _ = ready_chunks.next().await.unwrap();
1361
1362 let new_assignment = vec![
1363 SplitImpl::Datagen(DatagenSplit {
1364 split_index: 0,
1365 split_num: 3,
1366 start_offset: None,
1367 }),
1368 SplitImpl::Datagen(DatagenSplit {
1369 split_index: 1,
1370 split_num: 3,
1371 start_offset: None,
1372 }),
1373 SplitImpl::Datagen(DatagenSplit {
1374 split_index: 2,
1375 split_num: 3,
1376 start_offset: None,
1377 }),
1378 ];
1379
1380 epoch.inc_epoch();
1381 let change_split_mutation =
1382 Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1383 ActorId::default() => new_assignment.clone()
1384 }));
1385
1386 barrier_tx.send(change_split_mutation).unwrap();
1387
1388 let _ = ready_chunks.next().await.unwrap(); epoch.inc_epoch();
1391 let barrier = Barrier::new_test_barrier(epoch);
1392 barrier_tx.send(barrier).unwrap();
1393
1394 ready_chunks.next().await.unwrap(); let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1397 &default_source_internal_table(0x2333),
1398 mem_state_store.clone(),
1399 )
1400 .await;
1401
1402 source_state_handler
1404 .init_epoch(EpochPair::new_test_epoch(epoch))
1405 .await
1406 .unwrap();
1407 source_state_handler
1408 .get(&new_assignment[1].id())
1409 .await
1410 .unwrap()
1411 .unwrap();
1412
1413 tokio::time::sleep(Duration::from_millis(100)).await;
1414
1415 let _ = ready_chunks.next().await.unwrap();
1416
1417 epoch.inc_epoch();
1418 let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Pause);
1419 barrier_tx.send(barrier).unwrap();
1420
1421 epoch.inc_epoch();
1422 let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Resume);
1423 barrier_tx.send(barrier).unwrap();
1424
1425 ready_chunks.next().await.unwrap();
1427
1428 let prev_assignment = new_assignment;
1429 let new_assignment = vec![prev_assignment[2].clone()];
1430
1431 epoch.inc_epoch();
1432 let drop_split_mutation =
1433 Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1434 ActorId::default() => new_assignment.clone()
1435 }));
1436
1437 barrier_tx.send(drop_split_mutation).unwrap();
1438
1439 ready_chunks.next().await.unwrap(); epoch.inc_epoch();
1442 let barrier = Barrier::new_test_barrier(epoch);
1443 barrier_tx.send(barrier).unwrap();
1444
1445 ready_chunks.next().await.unwrap(); let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1448 &default_source_internal_table(0x2333),
1449 mem_state_store.clone(),
1450 )
1451 .await;
1452
1453 let new_epoch = EpochPair::new_test_epoch(epoch);
1454 source_state_handler.init_epoch(new_epoch).await.unwrap();
1455
1456 let committed_reader = source_state_handler
1457 .new_committed_reader(new_epoch)
1458 .await
1459 .unwrap();
1460 assert!(
1461 committed_reader
1462 .try_recover_from_state_store(&prev_assignment[0])
1463 .await
1464 .unwrap()
1465 .is_none()
1466 );
1467
1468 assert!(
1469 committed_reader
1470 .try_recover_from_state_store(&prev_assignment[1])
1471 .await
1472 .unwrap()
1473 .is_none()
1474 );
1475
1476 assert!(
1477 committed_reader
1478 .try_recover_from_state_store(&prev_assignment[2])
1479 .await
1480 .unwrap()
1481 .is_some()
1482 );
1483 }
1484}