1use std::collections::HashMap;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use either::Either;
20use itertools::Itertools;
21use prometheus::core::{AtomicU64, GenericCounter};
22use risingwave_common::array::ArrayRef;
23use risingwave_common::catalog::{ColumnId, TableId};
24use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedMetric};
25use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::util::epoch::{Epoch, EpochPair};
28use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
29use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
30use risingwave_connector::source::reader::reader::SourceReader;
31use risingwave_connector::source::{
32 ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData,
33 StreamChunkWithState, WaitCheckpointTask,
34};
35use risingwave_hummock_sdk::HummockReadEpoch;
36use risingwave_storage::store::TryWaitEpochOptions;
37use thiserror_ext::AsReport;
38use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
39use tokio::sync::{mpsc, oneshot};
40use tokio::time::Instant;
41
42use super::executor_core::StreamSourceCore;
43use super::{barrier_to_message_stream, get_split_offset_col_idx, prune_additional_cols};
44use crate::common::rate_limit::limited_chunk_size;
45use crate::executor::UpdateMutation;
46use crate::executor::prelude::*;
47use crate::executor::source::reader_stream::StreamReaderBuilder;
48use crate::executor::stream_reader::StreamReaderWithPause;
49use crate::task::LocalBarrierManager;
50
51pub const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;
54
55pub struct SourceExecutor<S: StateStore> {
56 actor_ctx: ActorContextRef,
57
58 stream_source_core: StreamSourceCore<S>,
60
61 metrics: Arc<StreamingMetrics>,
63
64 barrier_receiver: Option<UnboundedReceiver<Barrier>>,
66
67 system_params: SystemParamsReaderRef,
69
70 rate_limit_rps: Option<u32>,
72
73 is_shared_non_cdc: bool,
74
75 barrier_manager: LocalBarrierManager,
77}
78
79impl<S: StateStore> SourceExecutor<S> {
80 #[expect(clippy::too_many_arguments)]
81 pub fn new(
82 actor_ctx: ActorContextRef,
83 stream_source_core: StreamSourceCore<S>,
84 metrics: Arc<StreamingMetrics>,
85 barrier_receiver: UnboundedReceiver<Barrier>,
86 system_params: SystemParamsReaderRef,
87 rate_limit_rps: Option<u32>,
88 is_shared_non_cdc: bool,
89 barrier_manager: LocalBarrierManager,
90 ) -> Self {
91 Self {
92 actor_ctx,
93 stream_source_core,
94 metrics,
95 barrier_receiver: Some(barrier_receiver),
96 system_params,
97 rate_limit_rps,
98 is_shared_non_cdc,
99 barrier_manager,
100 }
101 }
102
103 fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder {
104 StreamReaderBuilder {
105 source_desc,
106 rate_limit: self.rate_limit_rps,
107 source_id: self.stream_source_core.source_id,
108 source_name: self.stream_source_core.source_name.clone(),
109 is_auto_schema_change_enable: self.is_auto_schema_change_enable(),
110 actor_ctx: self.actor_ctx.clone(),
111 reader_stream: None,
112 }
113 }
114
115 async fn spawn_wait_checkpoint_worker(
116 core: &StreamSourceCore<S>,
117 source_reader: SourceReader,
118 ) -> StreamExecutorResult<Option<WaitCheckpointTaskBuilder>> {
119 let Some(initial_task) = source_reader.create_wait_checkpoint_task().await? else {
120 return Ok(None);
121 };
122 let (wait_checkpoint_tx, wait_checkpoint_rx) = mpsc::unbounded_channel();
123 let wait_checkpoint_worker = WaitCheckpointWorker {
124 wait_checkpoint_rx,
125 state_store: core.split_state_store.state_table().state_store().clone(),
126 table_id: core.split_state_store.state_table().table_id().into(),
127 };
128 tokio::spawn(wait_checkpoint_worker.run());
129 Ok(Some(WaitCheckpointTaskBuilder {
130 wait_checkpoint_tx,
131 source_reader,
132 building_task: initial_task,
133 }))
134 }
135
136 pub fn prepare_source_stream_build(
138 &self,
139 source_desc: &SourceDesc,
140 ) -> (Vec<ColumnId>, SourceContext) {
141 let column_ids = source_desc
142 .columns
143 .iter()
144 .map(|column_desc| column_desc.column_id)
145 .collect_vec();
146
147 let (schema_change_tx, mut schema_change_rx) =
148 mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
149 let schema_change_tx = if self.is_auto_schema_change_enable() {
150 let meta_client = self.actor_ctx.meta_client.clone();
151 let _join_handle = tokio::task::spawn(async move {
153 while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
154 let table_ids = schema_change.table_ids();
155 tracing::info!(
156 target: "auto_schema_change",
157 "recv a schema change event for tables: {:?}", table_ids);
158 if let Some(ref meta_client) = meta_client {
160 match meta_client
161 .auto_schema_change(schema_change.to_protobuf())
162 .await
163 {
164 Ok(_) => {
165 tracing::info!(
166 target: "auto_schema_change",
167 "schema change success for tables: {:?}", table_ids);
168 finish_tx.send(()).unwrap();
169 }
170 Err(e) => {
171 tracing::error!(
172 target: "auto_schema_change",
173 error = ?e.as_report(), "schema change error");
174 finish_tx.send(()).unwrap();
175 }
176 }
177 }
178 }
179 });
180 Some(schema_change_tx)
181 } else {
182 info!("auto schema change is disabled in config");
183 None
184 };
185
186 let source_ctx = SourceContext::new(
187 self.actor_ctx.id,
188 self.stream_source_core.source_id,
189 self.actor_ctx.fragment_id,
190 self.stream_source_core.source_name.clone(),
191 source_desc.metrics.clone(),
192 SourceCtrlOpts {
193 chunk_size: limited_chunk_size(self.rate_limit_rps),
194 split_txn: self.rate_limit_rps.is_some(), },
196 source_desc.source.config.clone(),
197 schema_change_tx,
198 );
199
200 (column_ids, source_ctx)
201 }
202
203 fn is_batch_source(&self) -> bool {
205 self.stream_source_core.is_batch_source
206 }
207
208 fn refresh_batch_splits(&mut self) -> StreamExecutorResult<Vec<SplitImpl>> {
210 debug_assert!(self.is_batch_source());
211 let core = &self.stream_source_core;
212 let mut split = core.get_batch_split();
213 split.refresh();
214 Ok(vec![split.into()])
215 }
216
217 fn is_auto_schema_change_enable(&self) -> bool {
218 self.actor_ctx
219 .streaming_config
220 .developer
221 .enable_auto_schema_change
222 }
223
224 #[inline]
226 fn get_metric_labels(&self) -> [String; 4] {
227 [
228 self.stream_source_core.source_id.to_string(),
229 self.stream_source_core.source_name.clone(),
230 self.actor_ctx.id.to_string(),
231 self.actor_ctx.fragment_id.to_string(),
232 ]
233 }
234
235 async fn apply_split_change_after_yield_barrier<const BIASED: bool>(
244 &mut self,
245 barrier_epoch: EpochPair,
246 source_desc: &SourceDesc,
247 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
248 apply_mutation: ApplyMutationAfterBarrier<'_>,
249 ) -> StreamExecutorResult<()> {
250 {
251 let mut should_rebuild_stream = false;
252 match apply_mutation {
253 ApplyMutationAfterBarrier::SplitChange {
254 target_splits,
255 should_trim_state,
256 split_change_count,
257 } => {
258 split_change_count.inc();
259 if self
260 .update_state_if_changed(barrier_epoch, target_splits, should_trim_state)
261 .await?
262 {
263 should_rebuild_stream = true;
264 }
265 }
266 ApplyMutationAfterBarrier::RefreshBatchSplits(splits) => {
267 self.stream_source_core.latest_split_info =
269 splits.into_iter().map(|s| (s.id(), s)).collect();
270 should_rebuild_stream = true;
271 }
272 ApplyMutationAfterBarrier::ConnectorPropsChange => {
273 should_rebuild_stream = true;
274 }
275 }
276
277 if should_rebuild_stream {
278 self.rebuild_stream_reader(source_desc, stream)?;
279 }
280 }
281
282 Ok(())
283 }
284
285 async fn update_state_if_changed(
287 &mut self,
288 barrier_epoch: EpochPair,
289 target_splits: Vec<SplitImpl>,
290 should_trim_state: bool,
291 ) -> StreamExecutorResult<bool> {
292 let core = &mut self.stream_source_core;
293
294 let target_splits: HashMap<_, _> = target_splits
295 .into_iter()
296 .map(|split| (split.id(), split))
297 .collect();
298
299 let mut target_state: HashMap<SplitId, SplitImpl> =
300 HashMap::with_capacity(target_splits.len());
301
302 let mut split_changed = false;
303
304 let committed_reader = core
305 .split_state_store
306 .new_committed_reader(barrier_epoch)
307 .await?;
308
309 for (split_id, split) in target_splits {
311 if let Some(s) = core.latest_split_info.get(&split_id) {
312 target_state.insert(split_id, s.clone());
315 } else {
316 split_changed = true;
317 let initial_state = if let Some(recover_state) = committed_reader
320 .try_recover_from_state_store(&split)
321 .await?
322 {
323 recover_state
324 } else {
325 split
326 };
327
328 core.updated_splits_in_epoch
329 .entry(split_id.clone())
330 .or_insert_with(|| initial_state.clone());
331
332 target_state.insert(split_id, initial_state);
333 }
334 }
335
336 for existing_split_id in core.latest_split_info.keys() {
338 if !target_state.contains_key(existing_split_id) {
339 tracing::info!("split dropping detected: {}", existing_split_id);
340 split_changed = true;
341 }
342 }
343
344 if split_changed {
345 tracing::info!(
346 actor_id = self.actor_ctx.id,
347 state = ?target_state,
348 "apply split change"
349 );
350
351 core.updated_splits_in_epoch
352 .retain(|split_id, _| target_state.contains_key(split_id));
353
354 let dropped_splits = core
355 .latest_split_info
356 .extract_if(|split_id, _| !target_state.contains_key(split_id))
357 .map(|(_, split)| split)
358 .collect_vec();
359
360 if should_trim_state && !dropped_splits.is_empty() {
361 core.split_state_store.trim_state(&dropped_splits).await?;
363 }
364
365 core.latest_split_info = target_state;
366 }
367
368 Ok(split_changed)
369 }
370
371 fn rebuild_stream_reader_from_error<const BIASED: bool>(
373 &mut self,
374 source_desc: &SourceDesc,
375 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
376 e: StreamExecutorError,
377 ) -> StreamExecutorResult<()> {
378 let core = &mut self.stream_source_core;
379 tracing::warn!(
380 error = ?e.as_report(),
381 actor_id = self.actor_ctx.id,
382 source_id = %core.source_id,
383 "stream source reader error",
384 );
385 GLOBAL_ERROR_METRICS.user_source_error.report([
386 e.variant_name().to_owned(),
387 core.source_id.to_string(),
388 core.source_name.to_owned(),
389 self.actor_ctx.fragment_id.to_string(),
390 ]);
391
392 self.rebuild_stream_reader(source_desc, stream)
393 }
394
395 fn rebuild_stream_reader<const BIASED: bool>(
396 &mut self,
397 source_desc: &SourceDesc,
398 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
399 ) -> StreamExecutorResult<()> {
400 let core = &mut self.stream_source_core;
401 let target_state: Vec<SplitImpl> = core.latest_split_info.values().cloned().collect();
402
403 tracing::info!(
404 "actor {:?} apply source split change to {:?}",
405 self.actor_ctx.id,
406 target_state
407 );
408
409 let reader_stream_builder = self.stream_reader_builder(source_desc.clone());
411 let reader_stream =
412 reader_stream_builder.into_retry_stream(Some(target_state.clone()), false);
413
414 stream.replace_data_stream(reader_stream);
415
416 Ok(())
417 }
418
419 async fn persist_state_and_clear_cache(
420 &mut self,
421 epoch: EpochPair,
422 ) -> StreamExecutorResult<HashMap<SplitId, SplitImpl>> {
423 let core = &mut self.stream_source_core;
424
425 let cache = core
426 .updated_splits_in_epoch
427 .values()
428 .map(|split_impl| split_impl.to_owned())
429 .collect_vec();
430
431 if !cache.is_empty() {
432 tracing::debug!(state = ?cache, "take snapshot");
433 core.split_state_store.set_states(cache).await?;
434 }
435
436 core.split_state_store.commit(epoch).await?;
438
439 let updated_splits = core.updated_splits_in_epoch.clone();
440
441 core.updated_splits_in_epoch.clear();
442
443 Ok(updated_splits)
444 }
445
446 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
448 let core = &mut self.stream_source_core;
449 core.split_state_store.try_flush().await?;
450
451 Ok(())
452 }
453
454 #[try_stream(ok = Message, error = StreamExecutorError)]
459 async fn execute_inner(mut self) {
460 let mut barrier_receiver = self.barrier_receiver.take().unwrap();
461 let first_barrier = barrier_receiver
462 .recv()
463 .instrument_await("source_recv_first_barrier")
464 .await
465 .ok_or_else(|| {
466 anyhow!(
467 "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
468 self.actor_ctx.id,
469 self.stream_source_core.source_id
470 )
471 })?;
472 let first_epoch = first_barrier.epoch;
473 let mut boot_state =
474 if let Some(splits) = first_barrier.initial_split_assignment(self.actor_ctx.id) {
475 tracing::debug!(?splits, "boot with splits");
476 splits.to_vec()
477 } else {
478 Vec::default()
479 };
480 let is_pause_on_startup = first_barrier.is_pause_on_startup();
481 let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id);
482
483 yield Message::Barrier(first_barrier);
484
485 let mut core = self.stream_source_core;
486 let source_id = core.source_id;
487
488 let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap();
490 let mut source_desc = source_desc_builder
491 .build()
492 .map_err(StreamExecutorError::connector_error)?;
493
494 let mut wait_checkpoint_task_builder =
495 Self::spawn_wait_checkpoint_worker(&core, source_desc.source.clone()).await?;
496
497 let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns)
498 else {
499 unreachable!("Partition and offset columns must be set.");
500 };
501
502 core.split_state_store.init_epoch(first_epoch).await?;
503 {
504 let committed_reader = core
505 .split_state_store
506 .new_committed_reader(first_epoch)
507 .await?;
508 for ele in &mut boot_state {
509 if let Some(recover_state) =
510 committed_reader.try_recover_from_state_store(ele).await?
511 {
512 *ele = recover_state;
513 is_uninitialized = false;
515 } else {
516 core.updated_splits_in_epoch.insert(ele.id(), ele.clone());
520 }
521 }
522 }
523
524 core.init_split_state(boot_state.clone());
526
527 self.stream_source_core = core;
529
530 let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
531 tracing::debug!(state = ?recover_state, "start with state");
532
533 let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
534 let mut reader_stream_builder = self.stream_reader_builder(source_desc.clone());
535 let mut latest_splits = None;
536 if is_uninitialized {
538 let create_split_reader_result = reader_stream_builder
539 .fetch_latest_splits(recover_state.clone(), self.is_shared_non_cdc)
540 .await?;
541 latest_splits = create_split_reader_result.latest_splits;
542 }
543
544 if let Some(latest_splits) = latest_splits {
545 self.stream_source_core
548 .updated_splits_in_epoch
549 .extend(latest_splits.into_iter().map(|s| (s.id(), s)));
550 }
551 let mut stream = StreamReaderWithPause::<true, StreamChunkWithState>::new(
554 barrier_stream,
555 reader_stream_builder
556 .into_retry_stream(recover_state, is_uninitialized && self.is_shared_non_cdc),
557 );
558 let mut command_paused = false;
559
560 if is_pause_on_startup {
562 tracing::info!("source paused on startup");
563 stream.pause_stream();
564 command_paused = true;
565 }
566
567 let mut max_wait_barrier_time_ms =
570 self.system_params.load().barrier_interval_ms() as u128 * WAIT_BARRIER_MULTIPLE_TIMES;
571 let mut last_barrier_time = Instant::now();
572 let mut self_paused = false;
573
574 let source_output_row_count = self
575 .metrics
576 .source_output_row_count
577 .with_guarded_label_values(&self.get_metric_labels());
578
579 let source_split_change_count = self
580 .metrics
581 .source_split_change_count
582 .with_guarded_label_values(&self.get_metric_labels());
583
584 let mut is_refreshing = false;
585
586 while let Some(msg) = stream.next().await {
587 let Ok(msg) = msg else {
588 tokio::time::sleep(Duration::from_millis(1000)).await;
589 self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err())?;
590 continue;
591 };
592
593 match msg {
594 Either::Left(Message::Barrier(barrier)) => {
596 last_barrier_time = Instant::now();
597
598 if self_paused {
599 self_paused = false;
600 if !command_paused {
602 stream.resume_stream();
603 }
604 }
605
606 let epoch = barrier.epoch;
607
608 if barrier.is_checkpoint() && self.is_batch_source() && is_refreshing {
611 let batch_split = self.stream_source_core.get_batch_split();
612 if batch_split.finished() {
613 tracing::info!(?epoch, "emitting load finish");
614 self.barrier_manager.report_source_load_finished(
615 epoch,
616 self.actor_ctx.id,
617 source_id.table_id(),
618 source_id.table_id(),
619 );
620 is_refreshing = false;
621 }
622 }
623
624 let mut split_change = None;
625
626 if let Some(mutation) = barrier.mutation.as_deref() {
627 match mutation {
628 Mutation::Pause => {
629 command_paused = true;
630 stream.pause_stream()
631 }
632 Mutation::Resume => {
633 command_paused = false;
634 stream.resume_stream()
635 }
636 Mutation::SourceChangeSplit(actor_splits) => {
637 tracing::info!(
638 actor_id = self.actor_ctx.id,
639 actor_splits = ?actor_splits,
640 "source change split received"
641 );
642
643 split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
644 |target_splits| {
645 (
646 &source_desc,
647 &mut stream,
648 ApplyMutationAfterBarrier::SplitChange {
649 target_splits,
650 should_trim_state: true,
651 split_change_count: &source_split_change_count,
652 },
653 )
654 },
655 );
656 }
657
658 Mutation::ConnectorPropsChange(maybe_mutation) => {
659 if let Some(new_props) = maybe_mutation.get(&source_id.table_id()) {
660 tracing::info!(
662 "updating source properties from {:?} to {:?}",
663 source_desc.source.config,
664 new_props
665 );
666 source_desc.update_reader(new_props.clone())?;
667 split_change = Some((
669 &source_desc,
670 &mut stream,
671 ApplyMutationAfterBarrier::ConnectorPropsChange,
672 ));
673 }
674 }
675
676 Mutation::Update(UpdateMutation { actor_splits, .. }) => {
677 split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
678 |target_splits| {
679 (
680 &source_desc,
681 &mut stream,
682 ApplyMutationAfterBarrier::SplitChange {
683 target_splits,
684 should_trim_state: false,
685 split_change_count: &source_split_change_count,
686 },
687 )
688 },
689 );
690 }
691 Mutation::Throttle(actor_to_apply) => {
692 if let Some(new_rate_limit) = actor_to_apply.get(&self.actor_ctx.id)
693 && *new_rate_limit != self.rate_limit_rps
694 {
695 tracing::info!(
696 "updating rate limit from {:?} to {:?}",
697 self.rate_limit_rps,
698 *new_rate_limit
699 );
700 self.rate_limit_rps = *new_rate_limit;
701 self.rebuild_stream_reader(&source_desc, &mut stream)?;
703 }
704 }
705 Mutation::RefreshStart {
706 table_id: _,
707 associated_source_id,
708 } if *associated_source_id == source_id => {
709 debug_assert!(self.is_batch_source());
710 is_refreshing = true;
711
712 if let Ok(new_splits) = self.refresh_batch_splits() {
716 tracing::info!(
717 actor_id = self.actor_ctx.id,
718 %associated_source_id,
719 new_splits_count = new_splits.len(),
720 "RefreshStart triggered split re-enumeration"
721 );
722 split_change = Some((
723 &source_desc,
724 &mut stream,
725 ApplyMutationAfterBarrier::RefreshBatchSplits(new_splits),
726 ));
727 } else {
728 tracing::warn!(
729 actor_id = self.actor_ctx.id,
730 %associated_source_id,
731 "Failed to refresh splits during RefreshStart"
732 );
733 }
734 }
735 _ => {}
736 }
737 }
738
739 let updated_splits = self.persist_state_and_clear_cache(epoch).await?;
740
741 if barrier.kind.is_checkpoint()
743 && let Some(task_builder) = &mut wait_checkpoint_task_builder
744 {
745 task_builder.update_task_on_checkpoint(updated_splits);
746
747 tracing::debug!("epoch to wait {:?}", epoch);
748 task_builder.send(Epoch(epoch.prev)).await?
749 }
750
751 let barrier_epoch = barrier.epoch;
752 yield Message::Barrier(barrier);
753
754 if let Some((source_desc, stream, to_apply_mutation)) = split_change {
755 self.apply_split_change_after_yield_barrier(
756 barrier_epoch,
757 source_desc,
758 stream,
759 to_apply_mutation,
760 )
761 .await?;
762 }
763 }
764 Either::Left(_) => {
765 unreachable!();
768 }
769
770 Either::Right((chunk, latest_state)) => {
771 if let Some(task_builder) = &mut wait_checkpoint_task_builder {
772 let offset_col = chunk.column_at(offset_idx);
773 task_builder.update_task_on_chunk(offset_col.clone());
774 }
775 if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
776 self_paused = true;
781 tracing::warn!(
782 "source paused, wait barrier for {:?}",
783 last_barrier_time.elapsed()
784 );
785 stream.pause_stream();
786
787 max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
792 as u128
793 * WAIT_BARRIER_MULTIPLE_TIMES;
794 }
795
796 latest_state.iter().for_each(|(split_id, new_split_impl)| {
797 if let Some(split_impl) =
798 self.stream_source_core.latest_split_info.get_mut(split_id)
799 {
800 *split_impl = new_split_impl.clone();
801 }
802 });
803
804 self.stream_source_core
805 .updated_splits_in_epoch
806 .extend(latest_state);
807
808 let card = chunk.cardinality();
809 source_output_row_count.inc_by(card as u64);
810 let chunk =
811 prune_additional_cols(&chunk, split_idx, offset_idx, &source_desc.columns);
812 yield Message::Chunk(chunk);
813 self.try_flush_data().await?;
814 }
815 }
816 }
817
818 tracing::error!(
820 actor_id = self.actor_ctx.id,
821 "source executor exited unexpectedly"
822 )
823 }
824}
825
826#[derive(Debug, Clone)]
827enum ApplyMutationAfterBarrier<'a> {
828 SplitChange {
829 target_splits: Vec<SplitImpl>,
830 should_trim_state: bool,
831 split_change_count: &'a LabelGuardedMetric<GenericCounter<AtomicU64>>,
832 },
833 RefreshBatchSplits(Vec<SplitImpl>),
834 ConnectorPropsChange,
835}
836
837impl<S: StateStore> Execute for SourceExecutor<S> {
838 fn execute(self: Box<Self>) -> BoxedMessageStream {
839 self.execute_inner().boxed()
840 }
841}
842
843impl<S: StateStore> Debug for SourceExecutor<S> {
844 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
845 f.debug_struct("SourceExecutor")
846 .field("source_id", &self.stream_source_core.source_id)
847 .field("column_ids", &self.stream_source_core.column_ids)
848 .finish()
849 }
850}
851
852struct WaitCheckpointTaskBuilder {
853 wait_checkpoint_tx: UnboundedSender<(Epoch, WaitCheckpointTask)>,
854 source_reader: SourceReader,
855 building_task: WaitCheckpointTask,
856}
857
858impl WaitCheckpointTaskBuilder {
859 fn update_task_on_chunk(&mut self, offset_col: ArrayRef) {
860 match &mut self.building_task {
861 WaitCheckpointTask::AckPubsubMessage(_, arrays) => {
862 arrays.push(offset_col);
863 }
864 WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => {
865 arrays.push(offset_col);
866 }
867 WaitCheckpointTask::CommitCdcOffset(_) => {}
868 }
869 }
870
871 fn update_task_on_checkpoint(&mut self, updated_splits: HashMap<SplitId, SplitImpl>) {
872 #[expect(clippy::single_match)]
873 match &mut self.building_task {
874 WaitCheckpointTask::CommitCdcOffset(offsets) => {
875 if !updated_splits.is_empty() {
876 assert_eq!(1, updated_splits.len());
878 for (split_id, split_impl) in updated_splits {
879 if split_impl.is_cdc_split() {
880 *offsets = Some((split_id, split_impl.get_cdc_split_offset()));
881 } else {
882 unreachable!()
883 }
884 }
885 }
886 }
887 _ => {}
888 }
889 }
890
891 async fn send(&mut self, epoch: Epoch) -> Result<(), anyhow::Error> {
893 let new_task = self
894 .source_reader
895 .create_wait_checkpoint_task()
896 .await?
897 .expect("wait checkpoint task should be created");
898 self.wait_checkpoint_tx
899 .send((epoch, std::mem::replace(&mut self.building_task, new_task)))
900 .expect("wait_checkpoint_tx send should succeed");
901 Ok(())
902 }
903}
904
905struct WaitCheckpointWorker<S: StateStore> {
931 wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>,
932 state_store: S,
933 table_id: TableId,
934}
935
936impl<S: StateStore> WaitCheckpointWorker<S> {
937 pub async fn run(mut self) {
938 tracing::debug!("wait epoch worker start success");
939 loop {
940 match self.wait_checkpoint_rx.recv().await {
942 Some((epoch, task)) => {
943 tracing::debug!("start to wait epoch {}", epoch.0);
944 let ret = self
945 .state_store
946 .try_wait_epoch(
947 HummockReadEpoch::Committed(epoch.0),
948 TryWaitEpochOptions {
949 table_id: self.table_id,
950 },
951 )
952 .await;
953
954 match ret {
955 Ok(()) => {
956 tracing::debug!(epoch = epoch.0, "wait epoch success");
957 task.run().await;
958 }
959 Err(e) => {
960 tracing::error!(
961 error = %e.as_report(),
962 "wait epoch {} failed", epoch.0
963 );
964 }
965 }
966 }
967 None => {
968 tracing::error!("wait epoch rx closed");
969 break;
970 }
971 }
972 }
973 }
974}
975
976#[cfg(test)]
977mod tests {
978 use std::collections::HashSet;
979
980 use maplit::{btreemap, convert_args, hashmap};
981 use risingwave_common::catalog::{ColumnId, Field, TableId};
982 use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
983 use risingwave_common::test_prelude::StreamChunkTestExt;
984 use risingwave_common::util::epoch::{EpochExt, test_epoch};
985 use risingwave_connector::source::datagen::DatagenSplit;
986 use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
987 use risingwave_pb::catalog::StreamSourceInfo;
988 use risingwave_pb::plan_common::PbRowFormatType;
989 use risingwave_storage::memory::MemoryStateStore;
990 use tokio::sync::mpsc::unbounded_channel;
991 use tracing_test::traced_test;
992
993 use super::*;
994 use crate::executor::AddMutation;
995 use crate::executor::source::{SourceStateTableHandler, default_source_internal_table};
996 use crate::task::LocalBarrierManager;
997
998 const MOCK_SOURCE_NAME: &str = "mock_source";
999
1000 #[tokio::test]
1001 async fn test_source_executor() {
1002 let table_id = TableId::default();
1003 let schema = Schema {
1004 fields: vec![Field::with_name(DataType::Int32, "sequence_int")],
1005 };
1006 let row_id_index = None;
1007 let source_info = StreamSourceInfo {
1008 row_format: PbRowFormatType::Native as i32,
1009 ..Default::default()
1010 };
1011 let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1012 let column_ids = vec![0].into_iter().map(ColumnId::from).collect();
1013
1014 let properties = convert_args!(btreemap!(
1016 "connector" => "datagen",
1017 "datagen.rows.per.second" => "3",
1018 "fields.sequence_int.kind" => "sequence",
1019 "fields.sequence_int.start" => "11",
1020 "fields.sequence_int.end" => "11111",
1021 ));
1022 let source_desc_builder =
1023 create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1024 let split_state_store = SourceStateTableHandler::from_table_catalog(
1025 &default_source_internal_table(0x2333),
1026 MemoryStateStore::new(),
1027 )
1028 .await;
1029 let core = StreamSourceCore::<MemoryStateStore> {
1030 source_id: table_id,
1031 column_ids,
1032 source_desc_builder: Some(source_desc_builder),
1033 latest_split_info: HashMap::new(),
1034 split_state_store,
1035 updated_splits_in_epoch: HashMap::new(),
1036 source_name: MOCK_SOURCE_NAME.to_owned(),
1037 is_batch_source: false,
1038 };
1039
1040 let system_params_manager = LocalSystemParamsManager::for_test();
1041
1042 let executor = SourceExecutor::new(
1043 ActorContext::for_test(0),
1044 core,
1045 Arc::new(StreamingMetrics::unused()),
1046 barrier_rx,
1047 system_params_manager.get_params(),
1048 None,
1049 false,
1050 LocalBarrierManager::for_test(),
1051 );
1052 let mut executor = executor.boxed().execute();
1053
1054 let init_barrier =
1055 Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
1056 adds: HashMap::new(),
1057 added_actors: HashSet::new(),
1058 splits: hashmap! {
1059 ActorId::default() => vec![
1060 SplitImpl::Datagen(DatagenSplit {
1061 split_index: 0,
1062 split_num: 1,
1063 start_offset: None,
1064 }),
1065 ],
1066 },
1067 pause: false,
1068 subscriptions_to_add: vec![],
1069 backfill_nodes_to_pause: Default::default(),
1070 actor_cdc_table_snapshot_splits: Default::default(),
1071 }));
1072 barrier_tx.send(init_barrier).unwrap();
1073
1074 executor.next().await.unwrap().unwrap();
1076
1077 let msg = executor.next().await.unwrap().unwrap();
1079
1080 assert_eq!(
1082 msg.into_chunk().unwrap(),
1083 StreamChunk::from_pretty(
1084 " i
1085 + 11
1086 + 12
1087 + 13"
1088 )
1089 );
1090 }
1091
1092 #[traced_test]
1093 #[tokio::test]
1094 async fn test_split_change_mutation() {
1095 let table_id = TableId::default();
1096 let schema = Schema {
1097 fields: vec![Field::with_name(DataType::Int32, "v1")],
1098 };
1099 let row_id_index = None;
1100 let source_info = StreamSourceInfo {
1101 row_format: PbRowFormatType::Native as i32,
1102 ..Default::default()
1103 };
1104 let properties = convert_args!(btreemap!(
1105 "connector" => "datagen",
1106 "fields.v1.kind" => "sequence",
1107 "fields.v1.start" => "11",
1108 "fields.v1.end" => "11111",
1109 ));
1110
1111 let source_desc_builder =
1112 create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1113 let mem_state_store = MemoryStateStore::new();
1114
1115 let column_ids = vec![ColumnId::from(0)];
1116 let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1117 let split_state_store = SourceStateTableHandler::from_table_catalog(
1118 &default_source_internal_table(0x2333),
1119 mem_state_store.clone(),
1120 )
1121 .await;
1122
1123 let core = StreamSourceCore::<MemoryStateStore> {
1124 source_id: table_id,
1125 column_ids: column_ids.clone(),
1126 source_desc_builder: Some(source_desc_builder),
1127 latest_split_info: HashMap::new(),
1128 split_state_store,
1129 updated_splits_in_epoch: HashMap::new(),
1130 source_name: MOCK_SOURCE_NAME.to_owned(),
1131 is_batch_source: false,
1132 };
1133
1134 let system_params_manager = LocalSystemParamsManager::for_test();
1135
1136 let executor = SourceExecutor::new(
1137 ActorContext::for_test(0),
1138 core,
1139 Arc::new(StreamingMetrics::unused()),
1140 barrier_rx,
1141 system_params_manager.get_params(),
1142 None,
1143 false,
1144 LocalBarrierManager::for_test(),
1145 );
1146 let mut handler = executor.boxed().execute();
1147
1148 let mut epoch = test_epoch(1);
1149 let init_barrier =
1150 Barrier::new_test_barrier(epoch).with_mutation(Mutation::Add(AddMutation {
1151 adds: HashMap::new(),
1152 added_actors: HashSet::new(),
1153 splits: hashmap! {
1154 ActorId::default() => vec![
1155 SplitImpl::Datagen(DatagenSplit {
1156 split_index: 0,
1157 split_num: 3,
1158 start_offset: None,
1159 }),
1160 ],
1161 },
1162 pause: false,
1163 subscriptions_to_add: vec![],
1164 backfill_nodes_to_pause: Default::default(),
1165 actor_cdc_table_snapshot_splits: Default::default(),
1166 }));
1167 barrier_tx.send(init_barrier).unwrap();
1168
1169 handler
1171 .next()
1172 .await
1173 .unwrap()
1174 .unwrap()
1175 .into_barrier()
1176 .unwrap();
1177
1178 let mut ready_chunks = handler.ready_chunks(10);
1179
1180 let _ = ready_chunks.next().await.unwrap();
1181
1182 let new_assignment = vec![
1183 SplitImpl::Datagen(DatagenSplit {
1184 split_index: 0,
1185 split_num: 3,
1186 start_offset: None,
1187 }),
1188 SplitImpl::Datagen(DatagenSplit {
1189 split_index: 1,
1190 split_num: 3,
1191 start_offset: None,
1192 }),
1193 SplitImpl::Datagen(DatagenSplit {
1194 split_index: 2,
1195 split_num: 3,
1196 start_offset: None,
1197 }),
1198 ];
1199
1200 epoch.inc_epoch();
1201 let change_split_mutation =
1202 Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1203 ActorId::default() => new_assignment.clone()
1204 }));
1205
1206 barrier_tx.send(change_split_mutation).unwrap();
1207
1208 let _ = ready_chunks.next().await.unwrap(); epoch.inc_epoch();
1211 let barrier = Barrier::new_test_barrier(epoch);
1212 barrier_tx.send(barrier).unwrap();
1213
1214 ready_chunks.next().await.unwrap(); let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1217 &default_source_internal_table(0x2333),
1218 mem_state_store.clone(),
1219 )
1220 .await;
1221
1222 source_state_handler
1224 .init_epoch(EpochPair::new_test_epoch(epoch))
1225 .await
1226 .unwrap();
1227 source_state_handler
1228 .get(&new_assignment[1].id())
1229 .await
1230 .unwrap()
1231 .unwrap();
1232
1233 tokio::time::sleep(Duration::from_millis(100)).await;
1234
1235 let _ = ready_chunks.next().await.unwrap();
1236
1237 epoch.inc_epoch();
1238 let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Pause);
1239 barrier_tx.send(barrier).unwrap();
1240
1241 epoch.inc_epoch();
1242 let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Resume);
1243 barrier_tx.send(barrier).unwrap();
1244
1245 ready_chunks.next().await.unwrap();
1247
1248 let prev_assignment = new_assignment;
1249 let new_assignment = vec![prev_assignment[2].clone()];
1250
1251 epoch.inc_epoch();
1252 let drop_split_mutation =
1253 Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1254 ActorId::default() => new_assignment.clone()
1255 }));
1256
1257 barrier_tx.send(drop_split_mutation).unwrap();
1258
1259 ready_chunks.next().await.unwrap(); epoch.inc_epoch();
1262 let barrier = Barrier::new_test_barrier(epoch);
1263 barrier_tx.send(barrier).unwrap();
1264
1265 ready_chunks.next().await.unwrap(); let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1268 &default_source_internal_table(0x2333),
1269 mem_state_store.clone(),
1270 )
1271 .await;
1272
1273 let new_epoch = EpochPair::new_test_epoch(epoch);
1274 source_state_handler.init_epoch(new_epoch).await.unwrap();
1275
1276 let committed_reader = source_state_handler
1277 .new_committed_reader(new_epoch)
1278 .await
1279 .unwrap();
1280 assert!(
1281 committed_reader
1282 .try_recover_from_state_store(&prev_assignment[0])
1283 .await
1284 .unwrap()
1285 .is_none()
1286 );
1287
1288 assert!(
1289 committed_reader
1290 .try_recover_from_state_store(&prev_assignment[1])
1291 .await
1292 .unwrap()
1293 .is_none()
1294 );
1295
1296 assert!(
1297 committed_reader
1298 .try_recover_from_state_store(&prev_assignment[2])
1299 .await
1300 .unwrap()
1301 .is_some()
1302 );
1303 }
1304}