1use std::collections::HashMap;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use either::Either;
20use itertools::Itertools;
21use prometheus::core::{AtomicU64, GenericCounter};
22use risingwave_common::array::ArrayRef;
23use risingwave_common::catalog::{ColumnId, TableId};
24use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedMetric};
25use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::util::epoch::{Epoch, EpochPair};
28use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
29use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
30use risingwave_connector::source::reader::reader::SourceReader;
31use risingwave_connector::source::{
32 ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData,
33 StreamChunkWithState, WaitCheckpointTask,
34};
35use risingwave_hummock_sdk::HummockReadEpoch;
36use risingwave_storage::store::TryWaitEpochOptions;
37use thiserror_ext::AsReport;
38use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
39use tokio::sync::{mpsc, oneshot};
40use tokio::time::Instant;
41
42use super::executor_core::StreamSourceCore;
43use super::{barrier_to_message_stream, get_split_offset_col_idx, prune_additional_cols};
44use crate::common::rate_limit::limited_chunk_size;
45use crate::executor::UpdateMutation;
46use crate::executor::prelude::*;
47use crate::executor::source::reader_stream::StreamReaderBuilder;
48use crate::executor::stream_reader::StreamReaderWithPause;
49
50pub const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;
53
54pub struct SourceExecutor<S: StateStore> {
55 actor_ctx: ActorContextRef,
56
57 stream_source_core: Option<StreamSourceCore<S>>,
59
60 metrics: Arc<StreamingMetrics>,
62
63 barrier_receiver: Option<UnboundedReceiver<Barrier>>,
65
66 system_params: SystemParamsReaderRef,
68
69 rate_limit_rps: Option<u32>,
71
72 is_shared_non_cdc: bool,
73}
74
75impl<S: StateStore> SourceExecutor<S> {
76 pub fn new(
77 actor_ctx: ActorContextRef,
78 stream_source_core: Option<StreamSourceCore<S>>,
79 metrics: Arc<StreamingMetrics>,
80 barrier_receiver: UnboundedReceiver<Barrier>,
81 system_params: SystemParamsReaderRef,
82 rate_limit_rps: Option<u32>,
83 is_shared_non_cdc: bool,
84 ) -> Self {
85 Self {
86 actor_ctx,
87 stream_source_core,
88 metrics,
89 barrier_receiver: Some(barrier_receiver),
90 system_params,
91 rate_limit_rps,
92 is_shared_non_cdc,
93 }
94 }
95
96 fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder {
97 StreamReaderBuilder {
98 source_desc,
99 rate_limit: self.rate_limit_rps,
100 source_id: self.stream_source_core.as_ref().unwrap().source_id,
101 source_name: self
102 .stream_source_core
103 .as_ref()
104 .unwrap()
105 .source_name
106 .clone(),
107 is_auto_schema_change_enable: self.is_auto_schema_change_enable(),
108 actor_ctx: self.actor_ctx.clone(),
109 reader_stream: None,
110 }
111 }
112
113 async fn spawn_wait_checkpoint_worker(
114 core: &StreamSourceCore<S>,
115 source_reader: SourceReader,
116 ) -> StreamExecutorResult<Option<WaitCheckpointTaskBuilder>> {
117 let Some(initial_task) = source_reader.create_wait_checkpoint_task().await? else {
118 return Ok(None);
119 };
120 let (wait_checkpoint_tx, wait_checkpoint_rx) = mpsc::unbounded_channel();
121 let wait_checkpoint_worker = WaitCheckpointWorker {
122 wait_checkpoint_rx,
123 state_store: core.split_state_store.state_table().state_store().clone(),
124 table_id: core.split_state_store.state_table().table_id().into(),
125 };
126 tokio::spawn(wait_checkpoint_worker.run());
127 Ok(Some(WaitCheckpointTaskBuilder {
128 wait_checkpoint_tx,
129 source_reader,
130 building_task: initial_task,
131 }))
132 }
133
134 pub fn prepare_source_stream_build(
136 &self,
137 source_desc: &SourceDesc,
138 ) -> (Vec<ColumnId>, SourceContext) {
139 let column_ids = source_desc
140 .columns
141 .iter()
142 .map(|column_desc| column_desc.column_id)
143 .collect_vec();
144
145 let (schema_change_tx, mut schema_change_rx) =
146 mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
147 let schema_change_tx = if self.is_auto_schema_change_enable() {
148 let meta_client = self.actor_ctx.meta_client.clone();
149 let _join_handle = tokio::task::spawn(async move {
151 while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
152 let table_ids = schema_change.table_ids();
153 tracing::info!(
154 target: "auto_schema_change",
155 "recv a schema change event for tables: {:?}", table_ids);
156 if let Some(ref meta_client) = meta_client {
158 match meta_client
159 .auto_schema_change(schema_change.to_protobuf())
160 .await
161 {
162 Ok(_) => {
163 tracing::info!(
164 target: "auto_schema_change",
165 "schema change success for tables: {:?}", table_ids);
166 finish_tx.send(()).unwrap();
167 }
168 Err(e) => {
169 tracing::error!(
170 target: "auto_schema_change",
171 error = ?e.as_report(), "schema change error");
172 finish_tx.send(()).unwrap();
173 }
174 }
175 }
176 }
177 });
178 Some(schema_change_tx)
179 } else {
180 info!("auto schema change is disabled in config");
181 None
182 };
183
184 let source_ctx = SourceContext::new(
185 self.actor_ctx.id,
186 self.stream_source_core.as_ref().unwrap().source_id,
187 self.actor_ctx.fragment_id,
188 self.stream_source_core
189 .as_ref()
190 .unwrap()
191 .source_name
192 .clone(),
193 source_desc.metrics.clone(),
194 SourceCtrlOpts {
195 chunk_size: limited_chunk_size(self.rate_limit_rps),
196 split_txn: self.rate_limit_rps.is_some(), },
198 source_desc.source.config.clone(),
199 schema_change_tx,
200 );
201
202 (column_ids, source_ctx)
203 }
204
205 fn is_auto_schema_change_enable(&self) -> bool {
206 self.actor_ctx
207 .streaming_config
208 .developer
209 .enable_auto_schema_change
210 }
211
212 #[inline]
214 fn get_metric_labels(&self) -> [String; 4] {
215 [
216 self.stream_source_core
217 .as_ref()
218 .unwrap()
219 .source_id
220 .to_string(),
221 self.stream_source_core
222 .as_ref()
223 .unwrap()
224 .source_name
225 .clone(),
226 self.actor_ctx.id.to_string(),
227 self.actor_ctx.fragment_id.to_string(),
228 ]
229 }
230
231 async fn apply_split_change_after_yield_barrier<const BIASED: bool>(
240 &mut self,
241 barrier_epoch: EpochPair,
242 source_desc: &SourceDesc,
243 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
244 apply_mutation: ApplyMutationAfterBarrier<'_>,
245 ) -> StreamExecutorResult<()> {
246 {
247 let mut should_rebuild_stream = false;
248 match apply_mutation {
249 ApplyMutationAfterBarrier::SplitChange {
250 target_splits,
251 should_trim_state,
252 split_change_count,
253 } => {
254 split_change_count.inc();
255 if self
256 .update_state_if_changed(barrier_epoch, target_splits, should_trim_state)
257 .await?
258 {
259 should_rebuild_stream = true;
260 }
261 }
262 ApplyMutationAfterBarrier::ConnectorPropsChange => {
263 should_rebuild_stream = true;
264 }
265 }
266
267 if should_rebuild_stream {
268 self.rebuild_stream_reader(source_desc, stream)?;
269 }
270 }
271
272 Ok(())
273 }
274
275 async fn update_state_if_changed(
277 &mut self,
278 barrier_epoch: EpochPair,
279 target_splits: Vec<SplitImpl>,
280 should_trim_state: bool,
281 ) -> StreamExecutorResult<bool> {
282 let core = self.stream_source_core.as_mut().unwrap();
283
284 let target_splits: HashMap<_, _> = target_splits
285 .into_iter()
286 .map(|split| (split.id(), split))
287 .collect();
288
289 let mut target_state: HashMap<SplitId, SplitImpl> =
290 HashMap::with_capacity(target_splits.len());
291
292 let mut split_changed = false;
293
294 let committed_reader = core
295 .split_state_store
296 .new_committed_reader(barrier_epoch)
297 .await?;
298
299 for (split_id, split) in target_splits {
301 if let Some(s) = core.latest_split_info.get(&split_id) {
302 target_state.insert(split_id, s.clone());
305 } else {
306 split_changed = true;
307 let initial_state = if let Some(recover_state) = committed_reader
310 .try_recover_from_state_store(&split)
311 .await?
312 {
313 recover_state
314 } else {
315 split
316 };
317
318 core.updated_splits_in_epoch
319 .entry(split_id.clone())
320 .or_insert_with(|| initial_state.clone());
321
322 target_state.insert(split_id, initial_state);
323 }
324 }
325
326 for existing_split_id in core.latest_split_info.keys() {
328 if !target_state.contains_key(existing_split_id) {
329 tracing::info!("split dropping detected: {}", existing_split_id);
330 split_changed = true;
331 }
332 }
333
334 if split_changed {
335 tracing::info!(
336 actor_id = self.actor_ctx.id,
337 state = ?target_state,
338 "apply split change"
339 );
340
341 core.updated_splits_in_epoch
342 .retain(|split_id, _| target_state.contains_key(split_id));
343
344 let dropped_splits = core
345 .latest_split_info
346 .extract_if(|split_id, _| !target_state.contains_key(split_id))
347 .map(|(_, split)| split)
348 .collect_vec();
349
350 if should_trim_state && !dropped_splits.is_empty() {
351 core.split_state_store.trim_state(&dropped_splits).await?;
353 }
354
355 core.latest_split_info = target_state;
356 }
357
358 Ok(split_changed)
359 }
360
361 fn rebuild_stream_reader_from_error<const BIASED: bool>(
363 &mut self,
364 source_desc: &SourceDesc,
365 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
366 e: StreamExecutorError,
367 ) -> StreamExecutorResult<()> {
368 let core = self.stream_source_core.as_mut().unwrap();
369 tracing::warn!(
370 error = ?e.as_report(),
371 actor_id = self.actor_ctx.id,
372 source_id = %core.source_id,
373 "stream source reader error",
374 );
375 GLOBAL_ERROR_METRICS.user_source_error.report([
376 e.variant_name().to_owned(),
377 core.source_id.to_string(),
378 core.source_name.to_owned(),
379 self.actor_ctx.fragment_id.to_string(),
380 ]);
381
382 self.rebuild_stream_reader(source_desc, stream)
383 }
384
385 fn rebuild_stream_reader<const BIASED: bool>(
386 &mut self,
387 source_desc: &SourceDesc,
388 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
389 ) -> StreamExecutorResult<()> {
390 let core = self.stream_source_core.as_mut().unwrap();
391 let target_state: Vec<SplitImpl> = core.latest_split_info.values().cloned().collect();
392
393 tracing::info!(
394 "actor {:?} apply source split change to {:?}",
395 self.actor_ctx.id,
396 target_state
397 );
398
399 let reader_stream_builder = self.stream_reader_builder(source_desc.clone());
401 let reader_stream =
402 reader_stream_builder.into_retry_stream(Some(target_state.clone()), false);
403
404 stream.replace_data_stream(reader_stream);
405
406 Ok(())
407 }
408
409 async fn persist_state_and_clear_cache(
410 &mut self,
411 epoch: EpochPair,
412 ) -> StreamExecutorResult<HashMap<SplitId, SplitImpl>> {
413 let core = self.stream_source_core.as_mut().unwrap();
414
415 let cache = core
416 .updated_splits_in_epoch
417 .values()
418 .map(|split_impl| split_impl.to_owned())
419 .collect_vec();
420
421 if !cache.is_empty() {
422 tracing::debug!(state = ?cache, "take snapshot");
423 core.split_state_store.set_states(cache).await?;
424 }
425
426 core.split_state_store.commit(epoch).await?;
428
429 let updated_splits = core.updated_splits_in_epoch.clone();
430
431 core.updated_splits_in_epoch.clear();
432
433 Ok(updated_splits)
434 }
435
436 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
438 let core = self.stream_source_core.as_mut().unwrap();
439 core.split_state_store.try_flush().await?;
440
441 Ok(())
442 }
443
444 #[try_stream(ok = Message, error = StreamExecutorError)]
449 async fn execute_with_stream_source(mut self) {
450 let mut barrier_receiver = self.barrier_receiver.take().unwrap();
451 let first_barrier = barrier_receiver
452 .recv()
453 .instrument_await("source_recv_first_barrier")
454 .await
455 .ok_or_else(|| {
456 anyhow!(
457 "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
458 self.actor_ctx.id,
459 self.stream_source_core.as_ref().unwrap().source_id
460 )
461 })?;
462 let first_epoch = first_barrier.epoch;
463 let mut boot_state =
464 if let Some(splits) = first_barrier.initial_split_assignment(self.actor_ctx.id) {
465 tracing::debug!(?splits, "boot with splits");
466 splits.to_vec()
467 } else {
468 Vec::default()
469 };
470 let is_pause_on_startup = first_barrier.is_pause_on_startup();
471 let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id);
472
473 yield Message::Barrier(first_barrier);
474
475 let mut core = self.stream_source_core.unwrap();
476 let source_id = core.source_id;
477
478 let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap();
480 let mut source_desc = source_desc_builder
481 .build()
482 .map_err(StreamExecutorError::connector_error)?;
483
484 let mut wait_checkpoint_task_builder =
485 Self::spawn_wait_checkpoint_worker(&core, source_desc.source.clone()).await?;
486
487 let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns)
488 else {
489 unreachable!("Partition and offset columns must be set.");
490 };
491
492 core.split_state_store.init_epoch(first_epoch).await?;
493 {
494 let committed_reader = core
495 .split_state_store
496 .new_committed_reader(first_epoch)
497 .await?;
498 for ele in &mut boot_state {
499 if let Some(recover_state) =
500 committed_reader.try_recover_from_state_store(ele).await?
501 {
502 *ele = recover_state;
503 is_uninitialized = false;
505 } else {
506 core.updated_splits_in_epoch.insert(ele.id(), ele.clone());
510 }
511 }
512 }
513
514 core.init_split_state(boot_state.clone());
516
517 self.stream_source_core = Some(core);
519
520 let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
521 tracing::debug!(state = ?recover_state, "start with state");
522
523 let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
524 let mut reader_stream_builder = self.stream_reader_builder(source_desc.clone());
525 let mut latest_splits = None;
526 if is_uninitialized {
528 let create_split_reader_result = reader_stream_builder
529 .fetch_latest_splits(recover_state.clone(), self.is_shared_non_cdc)
530 .await?;
531 latest_splits = create_split_reader_result.latest_splits;
532 }
533
534 if let Some(latest_splits) = latest_splits {
535 self.stream_source_core
538 .as_mut()
539 .unwrap()
540 .updated_splits_in_epoch
541 .extend(latest_splits.into_iter().map(|s| (s.id(), s)));
542 }
543 let mut stream = StreamReaderWithPause::<true, StreamChunkWithState>::new(
546 barrier_stream,
547 reader_stream_builder
548 .into_retry_stream(recover_state, is_uninitialized && self.is_shared_non_cdc),
549 );
550 let mut command_paused = false;
551
552 if is_pause_on_startup {
554 tracing::info!("source paused on startup");
555 stream.pause_stream();
556 command_paused = true;
557 }
558
559 let mut max_wait_barrier_time_ms =
562 self.system_params.load().barrier_interval_ms() as u128 * WAIT_BARRIER_MULTIPLE_TIMES;
563 let mut last_barrier_time = Instant::now();
564 let mut self_paused = false;
565
566 let source_output_row_count = self
567 .metrics
568 .source_output_row_count
569 .with_guarded_label_values(&self.get_metric_labels());
570
571 let source_split_change_count = self
572 .metrics
573 .source_split_change_count
574 .with_guarded_label_values(&self.get_metric_labels());
575
576 while let Some(msg) = stream.next().await {
577 let Ok(msg) = msg else {
578 tokio::time::sleep(Duration::from_millis(1000)).await;
579 self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err())?;
580 continue;
581 };
582
583 match msg {
584 Either::Left(Message::Barrier(barrier)) => {
586 last_barrier_time = Instant::now();
587
588 if self_paused {
589 self_paused = false;
590 if !command_paused {
592 stream.resume_stream();
593 }
594 }
595
596 let epoch = barrier.epoch;
597
598 let mut split_change = None;
599
600 if let Some(mutation) = barrier.mutation.as_deref() {
601 match mutation {
602 Mutation::Pause => {
603 command_paused = true;
604 stream.pause_stream()
605 }
606 Mutation::Resume => {
607 command_paused = false;
608 stream.resume_stream()
609 }
610 Mutation::SourceChangeSplit(actor_splits) => {
611 tracing::info!(
612 actor_id = self.actor_ctx.id,
613 actor_splits = ?actor_splits,
614 "source change split received"
615 );
616
617 split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
618 |target_splits| {
619 (
620 &source_desc,
621 &mut stream,
622 ApplyMutationAfterBarrier::SplitChange {
623 target_splits,
624 should_trim_state: true,
625 split_change_count: &source_split_change_count,
626 },
627 )
628 },
629 );
630 }
631
632 Mutation::ConnectorPropsChange(maybe_mutation) => {
633 if let Some(new_props) = maybe_mutation.get(&source_id.table_id()) {
634 tracing::info!(
636 "updating source properties from {:?} to {:?}",
637 source_desc.source.config,
638 new_props
639 );
640 source_desc.update_reader(new_props.clone())?;
641 split_change = Some((
643 &source_desc,
644 &mut stream,
645 ApplyMutationAfterBarrier::ConnectorPropsChange,
646 ));
647 }
648 }
649
650 Mutation::Update(UpdateMutation { actor_splits, .. }) => {
651 split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
652 |target_splits| {
653 (
654 &source_desc,
655 &mut stream,
656 ApplyMutationAfterBarrier::SplitChange {
657 target_splits,
658 should_trim_state: false,
659 split_change_count: &source_split_change_count,
660 },
661 )
662 },
663 );
664 }
665 Mutation::Throttle(actor_to_apply) => {
666 if let Some(new_rate_limit) = actor_to_apply.get(&self.actor_ctx.id)
667 && *new_rate_limit != self.rate_limit_rps
668 {
669 tracing::info!(
670 "updating rate limit from {:?} to {:?}",
671 self.rate_limit_rps,
672 *new_rate_limit
673 );
674 self.rate_limit_rps = *new_rate_limit;
675 self.rebuild_stream_reader(&source_desc, &mut stream)?;
677 }
678 }
679 _ => {}
680 }
681 }
682
683 let updated_splits = self.persist_state_and_clear_cache(epoch).await?;
684
685 if barrier.kind.is_checkpoint()
687 && let Some(task_builder) = &mut wait_checkpoint_task_builder
688 {
689 task_builder.update_task_on_checkpoint(updated_splits);
690
691 tracing::debug!("epoch to wait {:?}", epoch);
692 task_builder.send(Epoch(epoch.prev)).await?
693 }
694
695 let barrier_epoch = barrier.epoch;
696 yield Message::Barrier(barrier);
697
698 if let Some((source_desc, stream, to_apply_mutation)) = split_change {
699 self.apply_split_change_after_yield_barrier(
700 barrier_epoch,
701 source_desc,
702 stream,
703 to_apply_mutation,
704 )
705 .await?;
706 }
707 }
708 Either::Left(_) => {
709 unreachable!();
712 }
713
714 Either::Right((chunk, latest_state)) => {
715 if let Some(task_builder) = &mut wait_checkpoint_task_builder {
716 let offset_col = chunk.column_at(offset_idx);
717 task_builder.update_task_on_chunk(offset_col.clone());
718 }
719 if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
720 self_paused = true;
725 tracing::warn!(
726 "source paused, wait barrier for {:?}",
727 last_barrier_time.elapsed()
728 );
729 stream.pause_stream();
730
731 max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
736 as u128
737 * WAIT_BARRIER_MULTIPLE_TIMES;
738 }
739
740 latest_state.iter().for_each(|(split_id, new_split_impl)| {
741 if let Some(split_impl) = self
742 .stream_source_core
743 .as_mut()
744 .unwrap()
745 .latest_split_info
746 .get_mut(split_id)
747 {
748 *split_impl = new_split_impl.clone();
749 }
750 });
751
752 self.stream_source_core
753 .as_mut()
754 .unwrap()
755 .updated_splits_in_epoch
756 .extend(latest_state);
757
758 source_output_row_count.inc_by(chunk.cardinality() as u64);
759 let chunk =
760 prune_additional_cols(&chunk, split_idx, offset_idx, &source_desc.columns);
761 yield Message::Chunk(chunk);
762 self.try_flush_data().await?;
763 }
764 }
765 }
766
767 tracing::error!(
769 actor_id = self.actor_ctx.id,
770 "source executor exited unexpectedly"
771 )
772 }
773
774 #[try_stream(ok = Message, error = StreamExecutorError)]
777 async fn execute_without_stream_source(mut self) {
778 let mut barrier_receiver = self.barrier_receiver.take().unwrap();
779 let barrier = barrier_receiver
780 .recv()
781 .instrument_await("source_recv_first_barrier")
782 .await
783 .ok_or_else(|| {
784 anyhow!(
785 "failed to receive the first barrier, actor_id: {:?} with no stream source",
786 self.actor_ctx.id
787 )
788 })?;
789 yield Message::Barrier(barrier);
790
791 while let Some(barrier) = barrier_receiver.recv().await {
792 yield Message::Barrier(barrier);
793 }
794 }
795}
796
797#[derive(Debug, Clone)]
798enum ApplyMutationAfterBarrier<'a> {
799 SplitChange {
800 target_splits: Vec<SplitImpl>,
801 should_trim_state: bool,
802 split_change_count: &'a LabelGuardedMetric<GenericCounter<AtomicU64>>,
803 },
804 ConnectorPropsChange,
805}
806
807impl<S: StateStore> Execute for SourceExecutor<S> {
808 fn execute(self: Box<Self>) -> BoxedMessageStream {
809 if self.stream_source_core.is_some() {
810 self.execute_with_stream_source().boxed()
811 } else {
812 self.execute_without_stream_source().boxed()
813 }
814 }
815}
816
817impl<S: StateStore> Debug for SourceExecutor<S> {
818 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
819 if let Some(core) = &self.stream_source_core {
820 f.debug_struct("SourceExecutor")
821 .field("source_id", &core.source_id)
822 .field("column_ids", &core.column_ids)
823 .finish()
824 } else {
825 f.debug_struct("SourceExecutor").finish()
826 }
827 }
828}
829
830struct WaitCheckpointTaskBuilder {
831 wait_checkpoint_tx: UnboundedSender<(Epoch, WaitCheckpointTask)>,
832 source_reader: SourceReader,
833 building_task: WaitCheckpointTask,
834}
835
836impl WaitCheckpointTaskBuilder {
837 fn update_task_on_chunk(&mut self, offset_col: ArrayRef) {
838 match &mut self.building_task {
839 WaitCheckpointTask::AckPubsubMessage(_, arrays) => {
840 arrays.push(offset_col);
841 }
842 WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => {
843 arrays.push(offset_col);
844 }
845 WaitCheckpointTask::CommitCdcOffset(_) => {}
846 }
847 }
848
849 fn update_task_on_checkpoint(&mut self, updated_splits: HashMap<SplitId, SplitImpl>) {
850 #[expect(clippy::single_match)]
851 match &mut self.building_task {
852 WaitCheckpointTask::CommitCdcOffset(offsets) => {
853 if !updated_splits.is_empty() {
854 assert_eq!(1, updated_splits.len());
856 for (split_id, split_impl) in updated_splits {
857 if split_impl.is_cdc_split() {
858 *offsets = Some((split_id, split_impl.get_cdc_split_offset()));
859 } else {
860 unreachable!()
861 }
862 }
863 }
864 }
865 _ => {}
866 }
867 }
868
869 async fn send(&mut self, epoch: Epoch) -> Result<(), anyhow::Error> {
871 let new_task = self
872 .source_reader
873 .create_wait_checkpoint_task()
874 .await?
875 .expect("wait checkpoint task should be created");
876 self.wait_checkpoint_tx
877 .send((epoch, std::mem::replace(&mut self.building_task, new_task)))
878 .expect("wait_checkpoint_tx send should succeed");
879 Ok(())
880 }
881}
882
883struct WaitCheckpointWorker<S: StateStore> {
909 wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>,
910 state_store: S,
911 table_id: TableId,
912}
913
914impl<S: StateStore> WaitCheckpointWorker<S> {
915 pub async fn run(mut self) {
916 tracing::debug!("wait epoch worker start success");
917 loop {
918 match self.wait_checkpoint_rx.recv().await {
920 Some((epoch, task)) => {
921 tracing::debug!("start to wait epoch {}", epoch.0);
922 let ret = self
923 .state_store
924 .try_wait_epoch(
925 HummockReadEpoch::Committed(epoch.0),
926 TryWaitEpochOptions {
927 table_id: self.table_id,
928 },
929 )
930 .await;
931
932 match ret {
933 Ok(()) => {
934 tracing::debug!(epoch = epoch.0, "wait epoch success");
935 task.run().await;
936 }
937 Err(e) => {
938 tracing::error!(
939 error = %e.as_report(),
940 "wait epoch {} failed", epoch.0
941 );
942 }
943 }
944 }
945 None => {
946 tracing::error!("wait epoch rx closed");
947 break;
948 }
949 }
950 }
951 }
952}
953
954#[cfg(test)]
955mod tests {
956 use std::collections::HashSet;
957
958 use maplit::{btreemap, convert_args, hashmap};
959 use risingwave_common::catalog::{ColumnId, Field, TableId};
960 use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
961 use risingwave_common::test_prelude::StreamChunkTestExt;
962 use risingwave_common::util::epoch::{EpochExt, test_epoch};
963 use risingwave_connector::source::datagen::DatagenSplit;
964 use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
965 use risingwave_pb::catalog::StreamSourceInfo;
966 use risingwave_pb::plan_common::PbRowFormatType;
967 use risingwave_storage::memory::MemoryStateStore;
968 use tokio::sync::mpsc::unbounded_channel;
969 use tracing_test::traced_test;
970
971 use super::*;
972 use crate::executor::AddMutation;
973 use crate::executor::source::{SourceStateTableHandler, default_source_internal_table};
974
975 const MOCK_SOURCE_NAME: &str = "mock_source";
976
977 #[tokio::test]
978 async fn test_source_executor() {
979 let table_id = TableId::default();
980 let schema = Schema {
981 fields: vec![Field::with_name(DataType::Int32, "sequence_int")],
982 };
983 let row_id_index = None;
984 let source_info = StreamSourceInfo {
985 row_format: PbRowFormatType::Native as i32,
986 ..Default::default()
987 };
988 let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
989 let column_ids = vec![0].into_iter().map(ColumnId::from).collect();
990
991 let properties = convert_args!(btreemap!(
993 "connector" => "datagen",
994 "datagen.rows.per.second" => "3",
995 "fields.sequence_int.kind" => "sequence",
996 "fields.sequence_int.start" => "11",
997 "fields.sequence_int.end" => "11111",
998 ));
999 let source_desc_builder =
1000 create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1001 let split_state_store = SourceStateTableHandler::from_table_catalog(
1002 &default_source_internal_table(0x2333),
1003 MemoryStateStore::new(),
1004 )
1005 .await;
1006 let core = StreamSourceCore::<MemoryStateStore> {
1007 source_id: table_id,
1008 column_ids,
1009 source_desc_builder: Some(source_desc_builder),
1010 latest_split_info: HashMap::new(),
1011 split_state_store,
1012 updated_splits_in_epoch: HashMap::new(),
1013 source_name: MOCK_SOURCE_NAME.to_owned(),
1014 };
1015
1016 let system_params_manager = LocalSystemParamsManager::for_test();
1017
1018 let executor = SourceExecutor::new(
1019 ActorContext::for_test(0),
1020 Some(core),
1021 Arc::new(StreamingMetrics::unused()),
1022 barrier_rx,
1023 system_params_manager.get_params(),
1024 None,
1025 false,
1026 );
1027 let mut executor = executor.boxed().execute();
1028
1029 let init_barrier =
1030 Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
1031 adds: HashMap::new(),
1032 added_actors: HashSet::new(),
1033 splits: hashmap! {
1034 ActorId::default() => vec![
1035 SplitImpl::Datagen(DatagenSplit {
1036 split_index: 0,
1037 split_num: 1,
1038 start_offset: None,
1039 }),
1040 ],
1041 },
1042 pause: false,
1043 subscriptions_to_add: vec![],
1044 backfill_nodes_to_pause: Default::default(),
1045 }));
1046 barrier_tx.send(init_barrier).unwrap();
1047
1048 executor.next().await.unwrap().unwrap();
1050
1051 let msg = executor.next().await.unwrap().unwrap();
1053
1054 assert_eq!(
1056 msg.into_chunk().unwrap(),
1057 StreamChunk::from_pretty(
1058 " i
1059 + 11
1060 + 12
1061 + 13"
1062 )
1063 );
1064 }
1065
1066 #[traced_test]
1067 #[tokio::test]
1068 async fn test_split_change_mutation() {
1069 let table_id = TableId::default();
1070 let schema = Schema {
1071 fields: vec![Field::with_name(DataType::Int32, "v1")],
1072 };
1073 let row_id_index = None;
1074 let source_info = StreamSourceInfo {
1075 row_format: PbRowFormatType::Native as i32,
1076 ..Default::default()
1077 };
1078 let properties = convert_args!(btreemap!(
1079 "connector" => "datagen",
1080 "fields.v1.kind" => "sequence",
1081 "fields.v1.start" => "11",
1082 "fields.v1.end" => "11111",
1083 ));
1084
1085 let source_desc_builder =
1086 create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1087 let mem_state_store = MemoryStateStore::new();
1088
1089 let column_ids = vec![ColumnId::from(0)];
1090 let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1091 let split_state_store = SourceStateTableHandler::from_table_catalog(
1092 &default_source_internal_table(0x2333),
1093 mem_state_store.clone(),
1094 )
1095 .await;
1096
1097 let core = StreamSourceCore::<MemoryStateStore> {
1098 source_id: table_id,
1099 column_ids: column_ids.clone(),
1100 source_desc_builder: Some(source_desc_builder),
1101 latest_split_info: HashMap::new(),
1102 split_state_store,
1103 updated_splits_in_epoch: HashMap::new(),
1104 source_name: MOCK_SOURCE_NAME.to_owned(),
1105 };
1106
1107 let system_params_manager = LocalSystemParamsManager::for_test();
1108
1109 let executor = SourceExecutor::new(
1110 ActorContext::for_test(0),
1111 Some(core),
1112 Arc::new(StreamingMetrics::unused()),
1113 barrier_rx,
1114 system_params_manager.get_params(),
1115 None,
1116 false,
1117 );
1118 let mut handler = executor.boxed().execute();
1119
1120 let mut epoch = test_epoch(1);
1121 let init_barrier =
1122 Barrier::new_test_barrier(epoch).with_mutation(Mutation::Add(AddMutation {
1123 adds: HashMap::new(),
1124 added_actors: HashSet::new(),
1125 splits: hashmap! {
1126 ActorId::default() => vec![
1127 SplitImpl::Datagen(DatagenSplit {
1128 split_index: 0,
1129 split_num: 3,
1130 start_offset: None,
1131 }),
1132 ],
1133 },
1134 pause: false,
1135 subscriptions_to_add: vec![],
1136 backfill_nodes_to_pause: Default::default(),
1137 }));
1138 barrier_tx.send(init_barrier).unwrap();
1139
1140 handler
1142 .next()
1143 .await
1144 .unwrap()
1145 .unwrap()
1146 .into_barrier()
1147 .unwrap();
1148
1149 let mut ready_chunks = handler.ready_chunks(10);
1150
1151 let _ = ready_chunks.next().await.unwrap();
1152
1153 let new_assignment = vec![
1154 SplitImpl::Datagen(DatagenSplit {
1155 split_index: 0,
1156 split_num: 3,
1157 start_offset: None,
1158 }),
1159 SplitImpl::Datagen(DatagenSplit {
1160 split_index: 1,
1161 split_num: 3,
1162 start_offset: None,
1163 }),
1164 SplitImpl::Datagen(DatagenSplit {
1165 split_index: 2,
1166 split_num: 3,
1167 start_offset: None,
1168 }),
1169 ];
1170
1171 epoch.inc_epoch();
1172 let change_split_mutation =
1173 Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1174 ActorId::default() => new_assignment.clone()
1175 }));
1176
1177 barrier_tx.send(change_split_mutation).unwrap();
1178
1179 let _ = ready_chunks.next().await.unwrap(); epoch.inc_epoch();
1182 let barrier = Barrier::new_test_barrier(epoch);
1183 barrier_tx.send(barrier).unwrap();
1184
1185 ready_chunks.next().await.unwrap(); let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1188 &default_source_internal_table(0x2333),
1189 mem_state_store.clone(),
1190 )
1191 .await;
1192
1193 source_state_handler
1195 .init_epoch(EpochPair::new_test_epoch(epoch))
1196 .await
1197 .unwrap();
1198 source_state_handler
1199 .get(&new_assignment[1].id())
1200 .await
1201 .unwrap()
1202 .unwrap();
1203
1204 tokio::time::sleep(Duration::from_millis(100)).await;
1205
1206 let _ = ready_chunks.next().await.unwrap();
1207
1208 epoch.inc_epoch();
1209 let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Pause);
1210 barrier_tx.send(barrier).unwrap();
1211
1212 epoch.inc_epoch();
1213 let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Resume);
1214 barrier_tx.send(barrier).unwrap();
1215
1216 ready_chunks.next().await.unwrap();
1218
1219 let prev_assignment = new_assignment;
1220 let new_assignment = vec![prev_assignment[2].clone()];
1221
1222 epoch.inc_epoch();
1223 let drop_split_mutation =
1224 Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1225 ActorId::default() => new_assignment.clone()
1226 }));
1227
1228 barrier_tx.send(drop_split_mutation).unwrap();
1229
1230 ready_chunks.next().await.unwrap(); epoch.inc_epoch();
1233 let barrier = Barrier::new_test_barrier(epoch);
1234 barrier_tx.send(barrier).unwrap();
1235
1236 ready_chunks.next().await.unwrap(); let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1239 &default_source_internal_table(0x2333),
1240 mem_state_store.clone(),
1241 )
1242 .await;
1243
1244 let new_epoch = EpochPair::new_test_epoch(epoch);
1245 source_state_handler.init_epoch(new_epoch).await.unwrap();
1246
1247 let committed_reader = source_state_handler
1248 .new_committed_reader(new_epoch)
1249 .await
1250 .unwrap();
1251 assert!(
1252 committed_reader
1253 .try_recover_from_state_store(&prev_assignment[0])
1254 .await
1255 .unwrap()
1256 .is_none()
1257 );
1258
1259 assert!(
1260 committed_reader
1261 .try_recover_from_state_store(&prev_assignment[1])
1262 .await
1263 .unwrap()
1264 .is_none()
1265 );
1266
1267 assert!(
1268 committed_reader
1269 .try_recover_from_state_store(&prev_assignment[2])
1270 .await
1271 .unwrap()
1272 .is_some()
1273 );
1274 }
1275}