1use std::collections::HashMap;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use either::Either;
20use itertools::Itertools;
21use prometheus::core::{AtomicU64, GenericCounter};
22use risingwave_common::array::ArrayRef;
23use risingwave_common::catalog::{ColumnId, TableId};
24use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedMetric};
25use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::util::epoch::{Epoch, EpochPair};
28use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
29use risingwave_connector::source::cdc::split::extract_postgres_lsn_from_offset_str;
30use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
31use risingwave_connector::source::reader::reader::SourceReader;
32use risingwave_connector::source::{
33 ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData,
34 StreamChunkWithState, WaitCheckpointTask, build_pulsar_ack_channel_id,
35};
36use risingwave_hummock_sdk::HummockReadEpoch;
37use risingwave_pb::id::SourceId;
38use risingwave_storage::store::TryWaitEpochOptions;
39use thiserror_ext::AsReport;
40use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
41use tokio::sync::{mpsc, oneshot};
42use tokio::time::Instant;
43
44use super::executor_core::StreamSourceCore;
45use super::{barrier_to_message_stream, get_split_offset_col_idx, prune_additional_cols};
46use crate::common::rate_limit::limited_chunk_size;
47use crate::executor::UpdateMutation;
48use crate::executor::prelude::*;
49use crate::executor::source::reader_stream::StreamReaderBuilder;
50use crate::executor::stream_reader::StreamReaderWithPause;
51use crate::task::LocalBarrierManager;
52
53pub const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;
56
57pub struct SourceExecutor<S: StateStore> {
58 actor_ctx: ActorContextRef,
59
60 stream_source_core: StreamSourceCore<S>,
62
63 metrics: Arc<StreamingMetrics>,
65
66 barrier_receiver: Option<UnboundedReceiver<Barrier>>,
68
69 system_params: SystemParamsReaderRef,
71
72 rate_limit_rps: Option<u32>,
74
75 is_shared_non_cdc: bool,
76
77 _barrier_manager: LocalBarrierManager,
79}
80
81impl<S: StateStore> SourceExecutor<S> {
82 #[expect(clippy::too_many_arguments)]
83 pub fn new(
84 actor_ctx: ActorContextRef,
85 stream_source_core: StreamSourceCore<S>,
86 metrics: Arc<StreamingMetrics>,
87 barrier_receiver: UnboundedReceiver<Barrier>,
88 system_params: SystemParamsReaderRef,
89 rate_limit_rps: Option<u32>,
90 is_shared_non_cdc: bool,
91 barrier_manager: LocalBarrierManager,
92 ) -> Self {
93 Self {
94 actor_ctx,
95 stream_source_core,
96 metrics,
97 barrier_receiver: Some(barrier_receiver),
98 system_params,
99 rate_limit_rps,
100 is_shared_non_cdc,
101 _barrier_manager: barrier_manager,
102 }
103 }
104
105 fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder {
106 StreamReaderBuilder {
107 source_desc,
108 rate_limit: self.rate_limit_rps,
109 source_id: self.stream_source_core.source_id,
110 source_name: self.stream_source_core.source_name.clone(),
111 is_auto_schema_change_enable: self.is_auto_schema_change_enable(),
112 actor_ctx: self.actor_ctx.clone(),
113 reader_stream: None,
114 }
115 }
116
117 async fn spawn_wait_checkpoint_worker(
118 core: &StreamSourceCore<S>,
119 source_reader: SourceReader,
120 metrics: Arc<StreamingMetrics>,
121 ) -> StreamExecutorResult<Option<WaitCheckpointTaskBuilder>> {
122 let Some(initial_task) = source_reader.create_wait_checkpoint_task().await? else {
123 return Ok(None);
124 };
125 let (wait_checkpoint_tx, wait_checkpoint_rx) = mpsc::unbounded_channel();
126 let wait_checkpoint_worker = WaitCheckpointWorker {
127 wait_checkpoint_rx,
128 state_store: core.split_state_store.state_table().state_store().clone(),
129 table_id: core.split_state_store.state_table().table_id(),
130 metrics,
131 };
132 tokio::spawn(wait_checkpoint_worker.run());
133 Ok(Some(WaitCheckpointTaskBuilder {
134 wait_checkpoint_tx,
135 source_reader,
136 building_task: initial_task,
137 }))
138 }
139
140 pub fn prepare_source_stream_build(
142 &self,
143 source_desc: &SourceDesc,
144 ) -> (Vec<ColumnId>, SourceContext) {
145 let column_ids = source_desc
146 .columns
147 .iter()
148 .map(|column_desc| column_desc.column_id)
149 .collect_vec();
150
151 let (schema_change_tx, mut schema_change_rx) =
152 mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
153 let schema_change_tx = if self.is_auto_schema_change_enable() {
154 let meta_client = self.actor_ctx.meta_client.clone();
155 let _join_handle = tokio::task::spawn(async move {
157 while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
158 let table_ids = schema_change.table_ids();
159 tracing::info!(
160 target: "auto_schema_change",
161 "recv a schema change event for tables: {:?}", table_ids);
162 if let Some(ref meta_client) = meta_client {
164 match meta_client
165 .auto_schema_change(schema_change.to_protobuf())
166 .await
167 {
168 Ok(_) => {
169 tracing::info!(
170 target: "auto_schema_change",
171 "schema change success for tables: {:?}", table_ids);
172 finish_tx.send(()).unwrap();
173 }
174 Err(e) => {
175 tracing::error!(
176 target: "auto_schema_change",
177 error = ?e.as_report(), "schema change error");
178 finish_tx.send(()).unwrap();
179 }
180 }
181 }
182 }
183 });
184 Some(schema_change_tx)
185 } else {
186 info!("auto schema change is disabled in config");
187 None
188 };
189 let source_ctx = SourceContext::new(
190 self.actor_ctx.id,
191 self.stream_source_core.source_id,
192 self.actor_ctx.fragment_id,
193 self.stream_source_core.source_name.clone(),
194 source_desc.metrics.clone(),
195 SourceCtrlOpts {
196 chunk_size: limited_chunk_size(self.rate_limit_rps),
197 split_txn: self.rate_limit_rps.is_some(), },
199 source_desc.source.config.clone(),
200 schema_change_tx,
201 );
202
203 (column_ids, source_ctx)
204 }
205
206 fn is_auto_schema_change_enable(&self) -> bool {
207 self.actor_ctx.config.developer.enable_auto_schema_change
208 }
209
210 #[inline]
212 fn get_metric_labels(&self) -> [String; 4] {
213 [
214 self.stream_source_core.source_id.to_string(),
215 self.stream_source_core.source_name.clone(),
216 self.actor_ctx.id.to_string(),
217 self.actor_ctx.fragment_id.to_string(),
218 ]
219 }
220
221 async fn apply_split_change_after_yield_barrier<const BIASED: bool>(
230 &mut self,
231 barrier_epoch: EpochPair,
232 source_desc: &SourceDesc,
233 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
234 apply_mutation: ApplyMutationAfterBarrier<'_>,
235 ) -> StreamExecutorResult<()> {
236 {
237 let mut should_rebuild_stream = false;
238 match apply_mutation {
239 ApplyMutationAfterBarrier::SplitChange {
240 target_splits,
241 should_trim_state,
242 split_change_count,
243 } => {
244 split_change_count.inc();
245 if self
246 .update_state_if_changed(barrier_epoch, target_splits, should_trim_state)
247 .await?
248 {
249 should_rebuild_stream = true;
250 }
251 }
252 ApplyMutationAfterBarrier::ConnectorPropsChange => {
253 should_rebuild_stream = true;
254 }
255 }
256
257 if should_rebuild_stream {
258 self.rebuild_stream_reader(source_desc, stream)?;
259 }
260 }
261
262 Ok(())
263 }
264
265 async fn update_state_if_changed(
267 &mut self,
268 barrier_epoch: EpochPair,
269 target_splits: Vec<SplitImpl>,
270 should_trim_state: bool,
271 ) -> StreamExecutorResult<bool> {
272 let core = &mut self.stream_source_core;
273
274 let target_splits: HashMap<_, _> = target_splits
275 .into_iter()
276 .map(|split| (split.id(), split))
277 .collect();
278
279 let mut target_state: HashMap<SplitId, SplitImpl> =
280 HashMap::with_capacity(target_splits.len());
281
282 let mut split_changed = false;
283
284 let committed_reader = core
285 .split_state_store
286 .new_committed_reader(barrier_epoch)
287 .await?;
288
289 for (split_id, split) in target_splits {
291 if let Some(s) = core.latest_split_info.get(&split_id) {
292 target_state.insert(split_id, s.clone());
295 } else {
296 split_changed = true;
297 let initial_state = if let Some(recover_state) = committed_reader
300 .try_recover_from_state_store(&split)
301 .await?
302 {
303 recover_state
304 } else {
305 split
306 };
307
308 core.updated_splits_in_epoch
309 .entry(split_id.clone())
310 .or_insert_with(|| initial_state.clone());
311
312 target_state.insert(split_id, initial_state);
313 }
314 }
315
316 for existing_split_id in core.latest_split_info.keys() {
318 if !target_state.contains_key(existing_split_id) {
319 tracing::info!("split dropping detected: {}", existing_split_id);
320 split_changed = true;
321 }
322 }
323
324 if split_changed {
325 tracing::info!(
326 actor_id = %self.actor_ctx.id,
327 state = ?target_state,
328 "apply split change"
329 );
330
331 core.updated_splits_in_epoch
332 .retain(|split_id, _| target_state.contains_key(split_id));
333
334 let dropped_splits = core
335 .latest_split_info
336 .extract_if(|split_id, _| !target_state.contains_key(split_id))
337 .map(|(_, split)| split)
338 .collect_vec();
339
340 if should_trim_state && !dropped_splits.is_empty() {
341 core.split_state_store.trim_state(&dropped_splits).await?;
343 }
344
345 core.latest_split_info = target_state;
346 }
347
348 Ok(split_changed)
349 }
350
351 fn rebuild_stream_reader_from_error<const BIASED: bool>(
353 &mut self,
354 source_desc: &SourceDesc,
355 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
356 e: StreamExecutorError,
357 ) -> StreamExecutorResult<()> {
358 let core = &mut self.stream_source_core;
359 tracing::error!(
360 error = ?e.as_report(),
361 actor_id = %self.actor_ctx.id,
362 source_id = %core.source_id,
363 "stream source reader error",
364 );
365 GLOBAL_ERROR_METRICS.user_source_error.report([
366 e.variant_name().to_owned(),
367 core.source_id.to_string(),
368 core.source_name.clone(),
369 self.actor_ctx.fragment_id.to_string(),
370 ]);
371
372 self.rebuild_stream_reader(source_desc, stream)
373 }
374
375 fn rebuild_stream_reader<const BIASED: bool>(
376 &mut self,
377 source_desc: &SourceDesc,
378 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
379 ) -> StreamExecutorResult<()> {
380 let core = &mut self.stream_source_core;
381 let target_state: Vec<SplitImpl> = core.latest_split_info.values().cloned().collect();
382
383 tracing::info!(
384 "actor {:?} apply source split change to {:?}",
385 self.actor_ctx.id,
386 target_state
387 );
388
389 let reader_stream_builder = self.stream_reader_builder(source_desc.clone());
391 let reader_stream = reader_stream_builder.into_retry_stream(Some(target_state), false);
392
393 stream.replace_data_stream(reader_stream);
394
395 Ok(())
396 }
397
398 async fn persist_state_and_clear_cache(
399 &mut self,
400 epoch: EpochPair,
401 ) -> StreamExecutorResult<HashMap<SplitId, SplitImpl>> {
402 let core = &mut self.stream_source_core;
403
404 let cache = core
405 .updated_splits_in_epoch
406 .values()
407 .map(|split_impl| split_impl.to_owned())
408 .collect_vec();
409
410 if !cache.is_empty() {
411 tracing::debug!(state = ?cache, "take snapshot");
412
413 let source_id = core.source_id.to_string();
415 for split_impl in &cache {
416 match split_impl {
418 SplitImpl::PostgresCdc(pg_split) => {
419 if let Some(lsn_value) = pg_split.pg_lsn() {
420 self.metrics
421 .pg_cdc_state_table_lsn
422 .with_guarded_label_values(&[&source_id])
423 .set(lsn_value as i64);
424 }
425 }
426 SplitImpl::MysqlCdc(mysql_split) => {
427 if let Some((file_seq, position)) = mysql_split.mysql_binlog_offset() {
428 self.metrics
429 .mysql_cdc_state_binlog_file_seq
430 .with_guarded_label_values(&[&source_id])
431 .set(file_seq as i64);
432
433 self.metrics
434 .mysql_cdc_state_binlog_position
435 .with_guarded_label_values(&[&source_id])
436 .set(position as i64);
437 }
438 }
439 _ => {}
440 }
441 }
442
443 core.split_state_store.set_states(cache).await?;
444 }
445
446 core.split_state_store.commit(epoch).await?;
448
449 let updated_splits = core.updated_splits_in_epoch.clone();
450
451 core.updated_splits_in_epoch.clear();
452
453 Ok(updated_splits)
454 }
455
456 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
458 let core = &mut self.stream_source_core;
459 core.split_state_store.try_flush().await?;
460
461 Ok(())
462 }
463
464 #[try_stream(ok = Message, error = StreamExecutorError)]
469 async fn execute_inner(mut self) {
470 let mut barrier_receiver = self.barrier_receiver.take().unwrap();
471 let first_barrier = barrier_receiver
472 .recv()
473 .instrument_await("source_recv_first_barrier")
474 .await
475 .ok_or_else(|| {
476 anyhow!(
477 "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
478 self.actor_ctx.id,
479 self.stream_source_core.source_id
480 )
481 })?;
482 let first_epoch = first_barrier.epoch;
483 let mut boot_state =
484 if let Some(splits) = first_barrier.initial_split_assignment(self.actor_ctx.id) {
485 tracing::debug!(?splits, "boot with splits");
486 splits.to_vec()
487 } else {
488 Vec::default()
489 };
490 let is_pause_on_startup = first_barrier.is_pause_on_startup();
491 let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id);
492
493 yield Message::Barrier(first_barrier);
494
495 let mut core = self.stream_source_core;
496 let source_id = core.source_id;
497
498 let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap();
500 let mut source_desc = source_desc_builder
501 .build()
502 .map_err(StreamExecutorError::connector_error)?;
503
504 let mut wait_checkpoint_task_builder = Self::spawn_wait_checkpoint_worker(
505 &core,
506 source_desc.source.clone(),
507 self.metrics.clone(),
508 )
509 .await?;
510
511 let (Some(split_idx), Some(offset_idx), pulsar_message_id_idx) =
512 get_split_offset_col_idx(&source_desc.columns)
513 else {
514 unreachable!("Partition and offset columns must be set.");
515 };
516
517 core.split_state_store.init_epoch(first_epoch).await?;
518 {
519 let committed_reader = core
520 .split_state_store
521 .new_committed_reader(first_epoch)
522 .await?;
523 for ele in &mut boot_state {
524 if let Some(recover_state) =
525 committed_reader.try_recover_from_state_store(ele).await?
526 {
527 *ele = recover_state;
528 is_uninitialized = false;
530 } else {
531 core.updated_splits_in_epoch.insert(ele.id(), ele.clone());
535 }
536 }
537 }
538
539 core.init_split_state(boot_state.clone());
541
542 self.stream_source_core = core;
544
545 let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
546 tracing::debug!(state = ?recover_state, "start with state");
547
548 let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
549 let mut reader_stream_builder = self.stream_reader_builder(source_desc.clone());
550 let mut latest_splits = None;
551 if is_uninitialized {
553 let create_split_reader_result = reader_stream_builder
554 .fetch_latest_splits(recover_state.clone(), self.is_shared_non_cdc)
555 .await?;
556 latest_splits = create_split_reader_result.latest_splits;
557 }
558
559 if let Some(latest_splits) = latest_splits {
560 self.stream_source_core
563 .updated_splits_in_epoch
564 .extend(latest_splits.into_iter().map(|s| (s.id(), s)));
565 }
566 let mut stream = StreamReaderWithPause::<true, StreamChunkWithState>::new(
569 barrier_stream,
570 reader_stream_builder
571 .into_retry_stream(recover_state, is_uninitialized && self.is_shared_non_cdc),
572 );
573 let mut command_paused = false;
574
575 if is_pause_on_startup {
577 tracing::info!("source paused on startup");
578 stream.pause_stream();
579 command_paused = true;
580 }
581
582 let mut max_wait_barrier_time_ms =
585 self.system_params.load().barrier_interval_ms() as u128 * WAIT_BARRIER_MULTIPLE_TIMES;
586 let mut last_barrier_time = Instant::now();
587 let mut self_paused = false;
588
589 let source_output_row_count = self
590 .metrics
591 .source_output_row_count
592 .with_guarded_label_values(&self.get_metric_labels());
593
594 let source_split_change_count = self
595 .metrics
596 .source_split_change_count
597 .with_guarded_label_values(&self.get_metric_labels());
598
599 while let Some(msg) = stream.next().await {
600 let Ok(msg) = msg else {
601 tokio::time::sleep(Duration::from_millis(1000)).await;
602 self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err())?;
603 continue;
604 };
605
606 match msg {
607 Either::Left(Message::Barrier(barrier)) => {
609 last_barrier_time = Instant::now();
610
611 if self_paused {
612 self_paused = false;
613 if !command_paused {
615 stream.resume_stream();
616 }
617 }
618
619 let epoch = barrier.epoch;
620 let mut split_change = None;
621
622 if let Some(mutation) = barrier.mutation.as_deref() {
623 match mutation {
624 Mutation::Pause => {
625 command_paused = true;
626 stream.pause_stream()
627 }
628 Mutation::Resume => {
629 command_paused = false;
630 stream.resume_stream()
631 }
632 Mutation::SourceChangeSplit(actor_splits) => {
633 tracing::info!(
634 actor_id = %self.actor_ctx.id,
635 actor_splits = ?actor_splits,
636 "source change split received"
637 );
638
639 split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
640 |target_splits| {
641 (
642 &source_desc,
643 &mut stream,
644 ApplyMutationAfterBarrier::SplitChange {
645 target_splits,
646 should_trim_state: true,
647 split_change_count: &source_split_change_count,
648 },
649 )
650 },
651 );
652 }
653
654 Mutation::ConnectorPropsChange(maybe_mutation) => {
655 if let Some(new_props) = maybe_mutation.get(&source_id.as_raw_id())
656 {
657 tracing::info!(
659 actor_id = %self.actor_ctx.id,
660 source_id = %source_id,
661 "updating source connector properties",
662 );
663 source_desc.update_reader(new_props.clone())?;
664 split_change = Some((
666 &source_desc,
667 &mut stream,
668 ApplyMutationAfterBarrier::ConnectorPropsChange,
669 ));
670 }
671 }
672
673 Mutation::Update(UpdateMutation { actor_splits, .. }) => {
674 split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
675 |target_splits| {
676 (
677 &source_desc,
678 &mut stream,
679 ApplyMutationAfterBarrier::SplitChange {
680 target_splits,
681 should_trim_state: false,
682 split_change_count: &source_split_change_count,
683 },
684 )
685 },
686 );
687 }
688 Mutation::Throttle(fragment_to_apply) => {
689 if let Some(new_rate_limit) =
690 fragment_to_apply.get(&self.actor_ctx.fragment_id)
691 && *new_rate_limit != self.rate_limit_rps
692 {
693 tracing::info!(
694 "updating rate limit from {:?} to {:?}",
695 self.rate_limit_rps,
696 *new_rate_limit
697 );
698 self.rate_limit_rps = *new_rate_limit;
699 self.rebuild_stream_reader(&source_desc, &mut stream)?;
701 }
702 }
703 Mutation::ResetSource { source_id } => {
704 if *source_id == self.stream_source_core.source_id {
708 tracing::info!(
709 actor_id = %self.actor_ctx.id,
710 source_id = source_id.as_raw_id(),
711 "Resetting CDC source: clearing offset (set to None)"
712 );
713
714 let splits_with_cleared_offset: Vec<SplitImpl> = self.stream_source_core
716 .latest_split_info
717 .values()
718 .map(|split| {
719 let mut new_split = split.clone();
721 match &mut new_split {
722 SplitImpl::MysqlCdc(debezium_split) => {
723 if let Some(mysql_split) = debezium_split.mysql_split.as_mut() {
724 tracing::info!(
725 split_id = ?mysql_split.inner.split_id,
726 old_offset = ?mysql_split.inner.start_offset,
727 "Clearing MySQL CDC offset"
728 );
729 mysql_split.inner.start_offset = None;
730 }
731 }
732 SplitImpl::PostgresCdc(debezium_split) => {
733 if let Some(pg_split) = debezium_split.postgres_split.as_mut() {
734 tracing::info!(
735 split_id = ?pg_split.inner.split_id,
736 old_offset = ?pg_split.inner.start_offset,
737 "Clearing PostgreSQL CDC offset"
738 );
739 pg_split.inner.start_offset = None;
740 }
741 }
742 SplitImpl::MongodbCdc(debezium_split) => {
743 if let Some(mongo_split) = debezium_split.mongodb_split.as_mut() {
744 tracing::info!(
745 split_id = ?mongo_split.inner.split_id,
746 old_offset = ?mongo_split.inner.start_offset,
747 "Clearing MongoDB CDC offset"
748 );
749 mongo_split.inner.start_offset = None;
750 }
751 }
752 SplitImpl::CitusCdc(debezium_split) => {
753 if let Some(citus_split) = debezium_split.citus_split.as_mut() {
754 tracing::info!(
755 split_id = ?citus_split.inner.split_id,
756 old_offset = ?citus_split.inner.start_offset,
757 "Clearing Citus CDC offset"
758 );
759 citus_split.inner.start_offset = None;
760 }
761 }
762 SplitImpl::SqlServerCdc(debezium_split) => {
763 if let Some(sqlserver_split) = debezium_split.sql_server_split.as_mut() {
764 tracing::info!(
765 split_id = ?sqlserver_split.inner.split_id,
766 old_offset = ?sqlserver_split.inner.start_offset,
767 "Clearing SQL Server CDC offset"
768 );
769 sqlserver_split.inner.start_offset = None;
770 }
771 }
772 _ => {
773 tracing::warn!(
774 "RESET SOURCE called on non-CDC split type"
775 );
776 }
777 }
778 new_split
779 })
780 .collect();
781
782 if !splits_with_cleared_offset.is_empty() {
783 tracing::info!(
784 actor_id = %self.actor_ctx.id,
785 split_count = splits_with_cleared_offset.len(),
786 "Updating state table with cleared offsets"
787 );
788
789 self.stream_source_core
791 .split_state_store
792 .set_states(splits_with_cleared_offset.clone())
793 .await?;
794
795 for split in splits_with_cleared_offset {
797 self.stream_source_core
798 .latest_split_info
799 .insert(split.id(), split.clone());
800 self.stream_source_core
801 .updated_splits_in_epoch
802 .insert(split.id(), split);
803 }
804
805 tracing::info!(
806 actor_id = %self.actor_ctx.id,
807 source_id = source_id.as_raw_id(),
808 "RESET SOURCE completed: offset cleared (set to None). \
809 Trigger recovery/restart to fetch latest offset from upstream."
810 );
811 } else {
812 tracing::warn!(
813 actor_id = %self.actor_ctx.id,
814 "No splits found to reset - source may not be initialized yet"
815 );
816 }
817 } else {
818 tracing::debug!(
819 actor_id = %self.actor_ctx.id,
820 target_source_id = source_id.as_raw_id(),
821 current_source_id = self.stream_source_core.source_id.as_raw_id(),
822 "ResetSource mutation for different source, ignoring"
823 );
824 }
825 }
826 _ => {}
827 }
828 }
829
830 let updated_splits = self.persist_state_and_clear_cache(epoch).await?;
831
832 if barrier.kind.is_checkpoint()
834 && let Some(task_builder) = &mut wait_checkpoint_task_builder
835 {
836 task_builder.update_task_on_checkpoint(updated_splits);
837
838 tracing::debug!("epoch to wait {:?}", epoch);
839 task_builder.send(Epoch(epoch.prev)).await?
840 }
841
842 let barrier_epoch = barrier.epoch;
843 yield Message::Barrier(barrier);
844
845 if let Some((source_desc, stream, to_apply_mutation)) = split_change {
846 self.apply_split_change_after_yield_barrier(
847 barrier_epoch,
848 source_desc,
849 stream,
850 to_apply_mutation,
851 )
852 .await?;
853 }
854 }
855 Either::Left(_) => {
856 unreachable!();
859 }
860
861 Either::Right((chunk, latest_state)) => {
862 if let Some(task_builder) = &mut wait_checkpoint_task_builder {
863 if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
864 let pulsar_message_id_col = chunk.column_at(pulsar_message_id_idx);
865 task_builder.update_task_on_chunk(
866 source_id,
867 &latest_state,
868 pulsar_message_id_col.clone(),
869 );
870 } else {
871 let offset_col = chunk.column_at(offset_idx);
872 task_builder.update_task_on_chunk(
873 source_id,
874 &latest_state,
875 offset_col.clone(),
876 );
877 }
878 }
879 if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
880 self_paused = true;
885 tracing::warn!(
886 "source paused, wait barrier for {:?}",
887 last_barrier_time.elapsed()
888 );
889 stream.pause_stream();
890
891 max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
896 as u128
897 * WAIT_BARRIER_MULTIPLE_TIMES;
898 }
899
900 latest_state.iter().for_each(|(split_id, new_split_impl)| {
901 if let Some(split_impl) =
902 self.stream_source_core.latest_split_info.get_mut(split_id)
903 {
904 *split_impl = new_split_impl.clone();
905 }
906 });
907
908 self.stream_source_core
909 .updated_splits_in_epoch
910 .extend(latest_state);
911
912 let card = chunk.cardinality();
913 if card == 0 {
914 continue;
915 }
916 source_output_row_count.inc_by(card as u64);
917 let to_remove_col_indices =
918 if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
919 vec![split_idx, offset_idx, pulsar_message_id_idx]
920 } else {
921 vec![split_idx, offset_idx]
922 };
923 let chunk =
924 prune_additional_cols(&chunk, &to_remove_col_indices, &source_desc.columns);
925 yield Message::Chunk(chunk);
926 self.try_flush_data().await?;
927 }
928 }
929 }
930
931 tracing::error!(
933 actor_id = %self.actor_ctx.id,
934 "source executor exited unexpectedly"
935 )
936 }
937}
938
939#[derive(Debug, Clone)]
940enum ApplyMutationAfterBarrier<'a> {
941 SplitChange {
942 target_splits: Vec<SplitImpl>,
943 should_trim_state: bool,
944 split_change_count: &'a LabelGuardedMetric<GenericCounter<AtomicU64>>,
945 },
946 ConnectorPropsChange,
947}
948
949impl<S: StateStore> Execute for SourceExecutor<S> {
950 fn execute(self: Box<Self>) -> BoxedMessageStream {
951 self.execute_inner().boxed()
952 }
953}
954
955impl<S: StateStore> Debug for SourceExecutor<S> {
956 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
957 f.debug_struct("SourceExecutor")
958 .field("source_id", &self.stream_source_core.source_id)
959 .field("column_ids", &self.stream_source_core.column_ids)
960 .finish()
961 }
962}
963
964struct WaitCheckpointTaskBuilder {
965 wait_checkpoint_tx: UnboundedSender<(Epoch, WaitCheckpointTask)>,
966 source_reader: SourceReader,
967 building_task: WaitCheckpointTask,
968}
969
970impl WaitCheckpointTaskBuilder {
971 fn update_task_on_chunk(
972 &mut self,
973 source_id: SourceId,
974 latest_state: &HashMap<SplitId, SplitImpl>,
975 offset_col: ArrayRef,
976 ) {
977 match &mut self.building_task {
978 WaitCheckpointTask::AckPubsubMessage(_, arrays) => {
979 arrays.push(offset_col);
980 }
981 WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => {
982 arrays.push(offset_col);
983 }
984 WaitCheckpointTask::AckPulsarMessage(arrays) => {
985 let split_id = latest_state.keys().next().unwrap();
987 let pulsar_ack_channel_id = build_pulsar_ack_channel_id(source_id, split_id);
988 arrays.push((pulsar_ack_channel_id, offset_col));
989 }
990 WaitCheckpointTask::CommitCdcOffset(_) => {}
991 }
992 }
993
994 fn update_task_on_checkpoint(&mut self, updated_splits: HashMap<SplitId, SplitImpl>) {
995 #[expect(clippy::single_match)]
996 match &mut self.building_task {
997 WaitCheckpointTask::CommitCdcOffset(offsets) => {
998 if !updated_splits.is_empty() {
999 assert_eq!(1, updated_splits.len());
1001 for (split_id, split_impl) in updated_splits {
1002 if split_impl.is_cdc_split() {
1003 *offsets = Some((split_id, split_impl.get_cdc_split_offset()));
1004 } else {
1005 unreachable!()
1006 }
1007 }
1008 }
1009 }
1010 _ => {}
1011 }
1012 }
1013
1014 async fn send(&mut self, epoch: Epoch) -> Result<(), anyhow::Error> {
1016 let new_task = self
1017 .source_reader
1018 .create_wait_checkpoint_task()
1019 .await?
1020 .expect("wait checkpoint task should be created");
1021 self.wait_checkpoint_tx
1022 .send((epoch, std::mem::replace(&mut self.building_task, new_task)))
1023 .expect("wait_checkpoint_tx send should succeed");
1024 Ok(())
1025 }
1026}
1027
1028struct WaitCheckpointWorker<S: StateStore> {
1054 wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>,
1055 state_store: S,
1056 table_id: TableId,
1057 metrics: Arc<StreamingMetrics>,
1058}
1059
1060impl<S: StateStore> WaitCheckpointWorker<S> {
1061 pub async fn run(mut self) {
1062 tracing::debug!("wait epoch worker start success");
1063 loop {
1064 match self.wait_checkpoint_rx.recv().await {
1066 Some((epoch, task)) => {
1067 tracing::debug!("start to wait epoch {}", epoch.0);
1068 let ret = self
1069 .state_store
1070 .try_wait_epoch(
1071 HummockReadEpoch::Committed(epoch.0),
1072 TryWaitEpochOptions {
1073 table_id: self.table_id,
1074 },
1075 )
1076 .await;
1077
1078 match ret {
1079 Ok(()) => {
1080 tracing::debug!(epoch = epoch.0, "wait epoch success");
1081
1082 task.run_with_on_commit_success(|source_id: u64, offset| {
1084 if let Some(lsn_value) =
1085 extract_postgres_lsn_from_offset_str(offset)
1086 {
1087 self.metrics
1088 .pg_cdc_jni_commit_offset_lsn
1089 .with_guarded_label_values(&[&source_id.to_string()])
1090 .set(lsn_value as i64);
1091 }
1092 })
1093 .await;
1094 }
1095 Err(e) => {
1096 tracing::error!(
1097 error = %e.as_report(),
1098 "wait epoch {} failed", epoch.0
1099 );
1100 }
1101 }
1102 }
1103 None => {
1104 tracing::error!("wait epoch rx closed");
1105 break;
1106 }
1107 }
1108 }
1109 }
1110}
1111
1112#[cfg(test)]
1113mod tests {
1114 use maplit::{btreemap, convert_args, hashmap};
1115 use risingwave_common::catalog::{ColumnId, Field};
1116 use risingwave_common::id::SourceId;
1117 use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
1118 use risingwave_common::test_prelude::StreamChunkTestExt;
1119 use risingwave_common::util::epoch::{EpochExt, test_epoch};
1120 use risingwave_connector::source::datagen::DatagenSplit;
1121 use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
1122 use risingwave_pb::catalog::StreamSourceInfo;
1123 use risingwave_pb::plan_common::PbRowFormatType;
1124 use risingwave_storage::memory::MemoryStateStore;
1125 use tokio::sync::mpsc::unbounded_channel;
1126 use tracing_test::traced_test;
1127
1128 use super::*;
1129 use crate::executor::AddMutation;
1130 use crate::executor::source::{SourceStateTableHandler, default_source_internal_table};
1131 use crate::task::LocalBarrierManager;
1132
1133 const MOCK_SOURCE_NAME: &str = "mock_source";
1134
1135 #[tokio::test]
1136 async fn test_source_executor() {
1137 let source_id = 0.into();
1138 let schema = Schema {
1139 fields: vec![Field::with_name(DataType::Int32, "sequence_int")],
1140 };
1141 let row_id_index = None;
1142 let source_info = StreamSourceInfo {
1143 row_format: PbRowFormatType::Native as i32,
1144 ..Default::default()
1145 };
1146 let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1147 let column_ids = vec![0].into_iter().map(ColumnId::from).collect();
1148
1149 let properties = convert_args!(btreemap!(
1151 "connector" => "datagen",
1152 "datagen.rows.per.second" => "3",
1153 "fields.sequence_int.kind" => "sequence",
1154 "fields.sequence_int.start" => "11",
1155 "fields.sequence_int.end" => "11111",
1156 ));
1157 let source_desc_builder =
1158 create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1159 let split_state_store = SourceStateTableHandler::from_table_catalog(
1160 &default_source_internal_table(0x2333),
1161 MemoryStateStore::new(),
1162 )
1163 .await;
1164 let core = StreamSourceCore::<MemoryStateStore> {
1165 source_id,
1166 column_ids,
1167 source_desc_builder: Some(source_desc_builder),
1168 latest_split_info: HashMap::new(),
1169 split_state_store,
1170 updated_splits_in_epoch: HashMap::new(),
1171 source_name: MOCK_SOURCE_NAME.to_owned(),
1172 };
1173
1174 let system_params_manager = LocalSystemParamsManager::for_test();
1175
1176 let executor = SourceExecutor::new(
1177 ActorContext::for_test(0),
1178 core,
1179 Arc::new(StreamingMetrics::unused()),
1180 barrier_rx,
1181 system_params_manager.get_params(),
1182 None,
1183 false,
1184 LocalBarrierManager::for_test(),
1185 );
1186 let mut executor = executor.boxed().execute();
1187
1188 let init_barrier =
1189 Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
1190 splits: hashmap! {
1191 ActorId::default() => vec![
1192 SplitImpl::Datagen(DatagenSplit {
1193 split_index: 0,
1194 split_num: 1,
1195 start_offset: None,
1196 }),
1197 ],
1198 },
1199 ..Default::default()
1200 }));
1201 barrier_tx.send(init_barrier).unwrap();
1202
1203 executor.next().await.unwrap().unwrap();
1205
1206 let msg = executor.next().await.unwrap().unwrap();
1208
1209 assert_eq!(
1211 msg.into_chunk().unwrap(),
1212 StreamChunk::from_pretty(
1213 " i
1214 + 11
1215 + 12
1216 + 13"
1217 )
1218 );
1219 }
1220
1221 #[traced_test]
1222 #[tokio::test]
1223 async fn test_split_change_mutation() {
1224 let source_id = SourceId::new(0);
1225 let schema = Schema {
1226 fields: vec![Field::with_name(DataType::Int32, "v1")],
1227 };
1228 let row_id_index = None;
1229 let source_info = StreamSourceInfo {
1230 row_format: PbRowFormatType::Native as i32,
1231 ..Default::default()
1232 };
1233 let properties = convert_args!(btreemap!(
1234 "connector" => "datagen",
1235 "fields.v1.kind" => "sequence",
1236 "fields.v1.start" => "11",
1237 "fields.v1.end" => "11111",
1238 ));
1239
1240 let source_desc_builder =
1241 create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1242 let mem_state_store = MemoryStateStore::new();
1243
1244 let column_ids = vec![ColumnId::from(0)];
1245 let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1246 let split_state_store = SourceStateTableHandler::from_table_catalog(
1247 &default_source_internal_table(0x2333),
1248 mem_state_store.clone(),
1249 )
1250 .await;
1251
1252 let core = StreamSourceCore::<MemoryStateStore> {
1253 source_id,
1254 column_ids: column_ids.clone(),
1255 source_desc_builder: Some(source_desc_builder),
1256 latest_split_info: HashMap::new(),
1257 split_state_store,
1258 updated_splits_in_epoch: HashMap::new(),
1259 source_name: MOCK_SOURCE_NAME.to_owned(),
1260 };
1261
1262 let system_params_manager = LocalSystemParamsManager::for_test();
1263
1264 let executor = SourceExecutor::new(
1265 ActorContext::for_test(0),
1266 core,
1267 Arc::new(StreamingMetrics::unused()),
1268 barrier_rx,
1269 system_params_manager.get_params(),
1270 None,
1271 false,
1272 LocalBarrierManager::for_test(),
1273 );
1274 let mut handler = executor.boxed().execute();
1275
1276 let mut epoch = test_epoch(1);
1277 let init_barrier =
1278 Barrier::new_test_barrier(epoch).with_mutation(Mutation::Add(AddMutation {
1279 splits: hashmap! {
1280 ActorId::default() => vec![
1281 SplitImpl::Datagen(DatagenSplit {
1282 split_index: 0,
1283 split_num: 3,
1284 start_offset: None,
1285 }),
1286 ],
1287 },
1288 ..Default::default()
1289 }));
1290 barrier_tx.send(init_barrier).unwrap();
1291
1292 handler
1294 .next()
1295 .await
1296 .unwrap()
1297 .unwrap()
1298 .into_barrier()
1299 .unwrap();
1300
1301 let mut ready_chunks = handler.ready_chunks(10);
1302
1303 let _ = ready_chunks.next().await.unwrap();
1304
1305 let new_assignment = vec![
1306 SplitImpl::Datagen(DatagenSplit {
1307 split_index: 0,
1308 split_num: 3,
1309 start_offset: None,
1310 }),
1311 SplitImpl::Datagen(DatagenSplit {
1312 split_index: 1,
1313 split_num: 3,
1314 start_offset: None,
1315 }),
1316 SplitImpl::Datagen(DatagenSplit {
1317 split_index: 2,
1318 split_num: 3,
1319 start_offset: None,
1320 }),
1321 ];
1322
1323 epoch.inc_epoch();
1324 let change_split_mutation =
1325 Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1326 ActorId::default() => new_assignment.clone()
1327 }));
1328
1329 barrier_tx.send(change_split_mutation).unwrap();
1330
1331 let _ = ready_chunks.next().await.unwrap(); epoch.inc_epoch();
1334 let barrier = Barrier::new_test_barrier(epoch);
1335 barrier_tx.send(barrier).unwrap();
1336
1337 ready_chunks.next().await.unwrap(); let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1340 &default_source_internal_table(0x2333),
1341 mem_state_store.clone(),
1342 )
1343 .await;
1344
1345 source_state_handler
1347 .init_epoch(EpochPair::new_test_epoch(epoch))
1348 .await
1349 .unwrap();
1350 source_state_handler
1351 .get(&new_assignment[1].id())
1352 .await
1353 .unwrap()
1354 .unwrap();
1355
1356 tokio::time::sleep(Duration::from_millis(100)).await;
1357
1358 let _ = ready_chunks.next().await.unwrap();
1359
1360 epoch.inc_epoch();
1361 let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Pause);
1362 barrier_tx.send(barrier).unwrap();
1363
1364 epoch.inc_epoch();
1365 let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Resume);
1366 barrier_tx.send(barrier).unwrap();
1367
1368 ready_chunks.next().await.unwrap();
1370
1371 let prev_assignment = new_assignment;
1372 let new_assignment = vec![prev_assignment[2].clone()];
1373
1374 epoch.inc_epoch();
1375 let drop_split_mutation =
1376 Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1377 ActorId::default() => new_assignment.clone()
1378 }));
1379
1380 barrier_tx.send(drop_split_mutation).unwrap();
1381
1382 ready_chunks.next().await.unwrap(); epoch.inc_epoch();
1385 let barrier = Barrier::new_test_barrier(epoch);
1386 barrier_tx.send(barrier).unwrap();
1387
1388 ready_chunks.next().await.unwrap(); let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1391 &default_source_internal_table(0x2333),
1392 mem_state_store.clone(),
1393 )
1394 .await;
1395
1396 let new_epoch = EpochPair::new_test_epoch(epoch);
1397 source_state_handler.init_epoch(new_epoch).await.unwrap();
1398
1399 let committed_reader = source_state_handler
1400 .new_committed_reader(new_epoch)
1401 .await
1402 .unwrap();
1403 assert!(
1404 committed_reader
1405 .try_recover_from_state_store(&prev_assignment[0])
1406 .await
1407 .unwrap()
1408 .is_none()
1409 );
1410
1411 assert!(
1412 committed_reader
1413 .try_recover_from_state_store(&prev_assignment[1])
1414 .await
1415 .unwrap()
1416 .is_none()
1417 );
1418
1419 assert!(
1420 committed_reader
1421 .try_recover_from_state_store(&prev_assignment[2])
1422 .await
1423 .unwrap()
1424 .is_some()
1425 );
1426 }
1427}