1use std::collections::HashMap;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use either::Either;
20use itertools::Itertools;
21use risingwave_common::array::ArrayRef;
22use risingwave_common::catalog::{ColumnId, TableId};
23use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntCounter};
24use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
25use risingwave_common::system_param::reader::SystemParamsRead;
26use risingwave_common::util::epoch::{Epoch, EpochPair};
27use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
28use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
29use risingwave_connector::source::reader::reader::SourceReader;
30use risingwave_connector::source::{
31 ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData,
32 StreamChunkWithState, WaitCheckpointTask,
33};
34use risingwave_hummock_sdk::HummockReadEpoch;
35use risingwave_storage::store::TryWaitEpochOptions;
36use thiserror_ext::AsReport;
37use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
38use tokio::sync::{mpsc, oneshot};
39use tokio::time::Instant;
40
41use super::executor_core::StreamSourceCore;
42use super::{barrier_to_message_stream, get_split_offset_col_idx, prune_additional_cols};
43use crate::common::rate_limit::limited_chunk_size;
44use crate::executor::UpdateMutation;
45use crate::executor::prelude::*;
46use crate::executor::source::reader_stream::StreamReaderBuilder;
47use crate::executor::stream_reader::StreamReaderWithPause;
48
49pub const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;
52
53pub struct SourceExecutor<S: StateStore> {
54 actor_ctx: ActorContextRef,
55
56 stream_source_core: Option<StreamSourceCore<S>>,
58
59 metrics: Arc<StreamingMetrics>,
61
62 barrier_receiver: Option<UnboundedReceiver<Barrier>>,
64
65 system_params: SystemParamsReaderRef,
67
68 rate_limit_rps: Option<u32>,
70
71 is_shared_non_cdc: bool,
72}
73
74impl<S: StateStore> SourceExecutor<S> {
75 pub fn new(
76 actor_ctx: ActorContextRef,
77 stream_source_core: Option<StreamSourceCore<S>>,
78 metrics: Arc<StreamingMetrics>,
79 barrier_receiver: UnboundedReceiver<Barrier>,
80 system_params: SystemParamsReaderRef,
81 rate_limit_rps: Option<u32>,
82 is_shared_non_cdc: bool,
83 ) -> Self {
84 Self {
85 actor_ctx,
86 stream_source_core,
87 metrics,
88 barrier_receiver: Some(barrier_receiver),
89 system_params,
90 rate_limit_rps,
91 is_shared_non_cdc,
92 }
93 }
94
95 fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder {
96 StreamReaderBuilder {
97 source_desc,
98 rate_limit: self.rate_limit_rps,
99 source_id: self.stream_source_core.as_ref().unwrap().source_id,
100 source_name: self
101 .stream_source_core
102 .as_ref()
103 .unwrap()
104 .source_name
105 .clone(),
106 is_auto_schema_change_enable: self.is_auto_schema_change_enable(),
107 actor_ctx: self.actor_ctx.clone(),
108 reader_stream: None,
109 }
110 }
111
112 async fn spawn_wait_checkpoint_worker(
113 core: &StreamSourceCore<S>,
114 source_reader: SourceReader,
115 ) -> StreamExecutorResult<Option<WaitCheckpointTaskBuilder>> {
116 let Some(initial_task) = source_reader.create_wait_checkpoint_task().await? else {
117 return Ok(None);
118 };
119 let (wait_checkpoint_tx, wait_checkpoint_rx) = mpsc::unbounded_channel();
120 let wait_checkpoint_worker = WaitCheckpointWorker {
121 wait_checkpoint_rx,
122 state_store: core.split_state_store.state_table().state_store().clone(),
123 table_id: core.split_state_store.state_table().table_id().into(),
124 };
125 tokio::spawn(wait_checkpoint_worker.run());
126 Ok(Some(WaitCheckpointTaskBuilder {
127 wait_checkpoint_tx,
128 source_reader,
129 building_task: initial_task,
130 }))
131 }
132
133 pub fn prepare_source_stream_build(
135 &self,
136 source_desc: &SourceDesc,
137 ) -> (Vec<ColumnId>, SourceContext) {
138 let column_ids = source_desc
139 .columns
140 .iter()
141 .map(|column_desc| column_desc.column_id)
142 .collect_vec();
143
144 let (schema_change_tx, mut schema_change_rx) =
145 mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
146 let schema_change_tx = if self.is_auto_schema_change_enable() {
147 let meta_client = self.actor_ctx.meta_client.clone();
148 let _join_handle = tokio::task::spawn(async move {
150 while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
151 let table_ids = schema_change.table_ids();
152 tracing::info!(
153 target: "auto_schema_change",
154 "recv a schema change event for tables: {:?}", table_ids);
155 if let Some(ref meta_client) = meta_client {
157 match meta_client
158 .auto_schema_change(schema_change.to_protobuf())
159 .await
160 {
161 Ok(_) => {
162 tracing::info!(
163 target: "auto_schema_change",
164 "schema change success for tables: {:?}", table_ids);
165 finish_tx.send(()).unwrap();
166 }
167 Err(e) => {
168 tracing::error!(
169 target: "auto_schema_change",
170 error = ?e.as_report(), "schema change error");
171 finish_tx.send(()).unwrap();
172 }
173 }
174 }
175 }
176 });
177 Some(schema_change_tx)
178 } else {
179 info!("auto schema change is disabled in config");
180 None
181 };
182
183 let source_ctx = SourceContext::new(
184 self.actor_ctx.id,
185 self.stream_source_core.as_ref().unwrap().source_id,
186 self.actor_ctx.fragment_id,
187 self.stream_source_core
188 .as_ref()
189 .unwrap()
190 .source_name
191 .clone(),
192 source_desc.metrics.clone(),
193 SourceCtrlOpts {
194 chunk_size: limited_chunk_size(self.rate_limit_rps),
195 split_txn: self.rate_limit_rps.is_some(), },
197 source_desc.source.config.clone(),
198 schema_change_tx,
199 );
200
201 (column_ids, source_ctx)
202 }
203
204 fn is_auto_schema_change_enable(&self) -> bool {
205 self.actor_ctx
206 .streaming_config
207 .developer
208 .enable_auto_schema_change
209 }
210
211 #[inline]
213 fn get_metric_labels(&self) -> [String; 4] {
214 [
215 self.stream_source_core
216 .as_ref()
217 .unwrap()
218 .source_id
219 .to_string(),
220 self.stream_source_core
221 .as_ref()
222 .unwrap()
223 .source_name
224 .clone(),
225 self.actor_ctx.id.to_string(),
226 self.actor_ctx.fragment_id.to_string(),
227 ]
228 }
229
230 async fn apply_split_change_after_yield_barrier<const BIASED: bool>(
239 &mut self,
240 barrier_epoch: EpochPair,
241 source_desc: &SourceDesc,
242 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
243 target_splits: Vec<SplitImpl>,
244 should_trim_state: bool,
245 source_split_change_count_metrics: &LabelGuardedIntCounter<4>,
246 ) -> StreamExecutorResult<()> {
247 {
248 source_split_change_count_metrics.inc();
249 if self
250 .update_state_if_changed(barrier_epoch, target_splits, should_trim_state)
251 .await?
252 {
253 self.rebuild_stream_reader(source_desc, stream)?;
254 }
255 }
256
257 Ok(())
258 }
259
260 async fn update_state_if_changed(
262 &mut self,
263 barrier_epoch: EpochPair,
264 target_splits: Vec<SplitImpl>,
265 should_trim_state: bool,
266 ) -> StreamExecutorResult<bool> {
267 let core = self.stream_source_core.as_mut().unwrap();
268
269 let target_splits: HashMap<_, _> = target_splits
270 .into_iter()
271 .map(|split| (split.id(), split))
272 .collect();
273
274 let mut target_state: HashMap<SplitId, SplitImpl> =
275 HashMap::with_capacity(target_splits.len());
276
277 let mut split_changed = false;
278
279 let committed_reader = core
280 .split_state_store
281 .new_committed_reader(barrier_epoch)
282 .await?;
283
284 for (split_id, split) in target_splits {
286 if let Some(s) = core.latest_split_info.get(&split_id) {
287 target_state.insert(split_id, s.clone());
290 } else {
291 split_changed = true;
292 let initial_state = if let Some(recover_state) = committed_reader
295 .try_recover_from_state_store(&split)
296 .await?
297 {
298 recover_state
299 } else {
300 split
301 };
302
303 core.updated_splits_in_epoch
304 .entry(split_id.clone())
305 .or_insert_with(|| initial_state.clone());
306
307 target_state.insert(split_id, initial_state);
308 }
309 }
310
311 for existing_split_id in core.latest_split_info.keys() {
313 if !target_state.contains_key(existing_split_id) {
314 tracing::info!("split dropping detected: {}", existing_split_id);
315 split_changed = true;
316 }
317 }
318
319 if split_changed {
320 tracing::info!(
321 actor_id = self.actor_ctx.id,
322 state = ?target_state,
323 "apply split change"
324 );
325
326 core.updated_splits_in_epoch
327 .retain(|split_id, _| target_state.contains_key(split_id));
328
329 let dropped_splits = core
330 .latest_split_info
331 .extract_if(|split_id, _| !target_state.contains_key(split_id))
332 .map(|(_, split)| split)
333 .collect_vec();
334
335 if should_trim_state && !dropped_splits.is_empty() {
336 core.split_state_store.trim_state(&dropped_splits).await?;
338 }
339
340 core.latest_split_info = target_state;
341 }
342
343 Ok(split_changed)
344 }
345
346 fn rebuild_stream_reader_from_error<const BIASED: bool>(
348 &mut self,
349 source_desc: &SourceDesc,
350 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
351 e: StreamExecutorError,
352 ) -> StreamExecutorResult<()> {
353 let core = self.stream_source_core.as_mut().unwrap();
354 tracing::warn!(
355 error = ?e.as_report(),
356 actor_id = self.actor_ctx.id,
357 source_id = %core.source_id,
358 "stream source reader error",
359 );
360 GLOBAL_ERROR_METRICS.user_source_error.report([
361 e.variant_name().to_owned(),
362 core.source_id.to_string(),
363 core.source_name.to_owned(),
364 self.actor_ctx.fragment_id.to_string(),
365 ]);
366
367 self.rebuild_stream_reader(source_desc, stream)
368 }
369
370 fn rebuild_stream_reader<const BIASED: bool>(
371 &mut self,
372 source_desc: &SourceDesc,
373 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
374 ) -> StreamExecutorResult<()> {
375 let core = self.stream_source_core.as_mut().unwrap();
376 let target_state: Vec<SplitImpl> = core.latest_split_info.values().cloned().collect();
377
378 tracing::info!(
379 "actor {:?} apply source split change to {:?}",
380 self.actor_ctx.id,
381 target_state
382 );
383
384 let reader_stream_builder = self.stream_reader_builder(source_desc.clone());
386 let reader_stream =
387 reader_stream_builder.into_retry_stream(Some(target_state.clone()), false);
388
389 stream.replace_data_stream(reader_stream);
390
391 Ok(())
392 }
393
394 async fn persist_state_and_clear_cache(
395 &mut self,
396 epoch: EpochPair,
397 ) -> StreamExecutorResult<HashMap<SplitId, SplitImpl>> {
398 let core = self.stream_source_core.as_mut().unwrap();
399
400 let cache = core
401 .updated_splits_in_epoch
402 .values()
403 .map(|split_impl| split_impl.to_owned())
404 .collect_vec();
405
406 if !cache.is_empty() {
407 tracing::debug!(state = ?cache, "take snapshot");
408 core.split_state_store.set_states(cache).await?;
409 }
410
411 core.split_state_store.commit(epoch).await?;
413
414 let updated_splits = core.updated_splits_in_epoch.clone();
415
416 core.updated_splits_in_epoch.clear();
417
418 Ok(updated_splits)
419 }
420
421 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
423 let core = self.stream_source_core.as_mut().unwrap();
424 core.split_state_store.try_flush().await?;
425
426 Ok(())
427 }
428
429 #[try_stream(ok = Message, error = StreamExecutorError)]
434 async fn execute_with_stream_source(mut self) {
435 let mut barrier_receiver = self.barrier_receiver.take().unwrap();
436 let first_barrier = barrier_receiver
437 .recv()
438 .instrument_await("source_recv_first_barrier")
439 .await
440 .ok_or_else(|| {
441 anyhow!(
442 "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
443 self.actor_ctx.id,
444 self.stream_source_core.as_ref().unwrap().source_id
445 )
446 })?;
447 let first_epoch = first_barrier.epoch;
448 let mut boot_state =
449 if let Some(splits) = first_barrier.initial_split_assignment(self.actor_ctx.id) {
450 tracing::debug!(?splits, "boot with splits");
451 splits.to_vec()
452 } else {
453 Vec::default()
454 };
455 let is_pause_on_startup = first_barrier.is_pause_on_startup();
456 let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id);
457
458 yield Message::Barrier(first_barrier);
459
460 let mut core = self.stream_source_core.unwrap();
461
462 let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap();
464 let source_desc = source_desc_builder
465 .build()
466 .map_err(StreamExecutorError::connector_error)?;
467
468 let mut wait_checkpoint_task_builder =
469 Self::spawn_wait_checkpoint_worker(&core, source_desc.source.clone()).await?;
470
471 let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns)
472 else {
473 unreachable!("Partition and offset columns must be set.");
474 };
475
476 core.split_state_store.init_epoch(first_epoch).await?;
477 {
478 let committed_reader = core
479 .split_state_store
480 .new_committed_reader(first_epoch)
481 .await?;
482 for ele in &mut boot_state {
483 if let Some(recover_state) =
484 committed_reader.try_recover_from_state_store(ele).await?
485 {
486 *ele = recover_state;
487 is_uninitialized = false;
489 } else {
490 core.updated_splits_in_epoch.insert(ele.id(), ele.clone());
494 }
495 }
496 }
497
498 core.init_split_state(boot_state.clone());
500
501 self.stream_source_core = Some(core);
503
504 let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
505 tracing::debug!(state = ?recover_state, "start with state");
506
507 let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
508 let mut reader_stream_builder = self.stream_reader_builder(source_desc.clone());
509 let mut latest_splits = None;
510 if is_uninitialized {
512 let create_split_reader_result = reader_stream_builder
513 .fetch_latest_splits(recover_state.clone(), self.is_shared_non_cdc)
514 .await?;
515 latest_splits = create_split_reader_result.latest_splits;
516 }
517
518 if let Some(latest_splits) = latest_splits {
519 self.stream_source_core
522 .as_mut()
523 .unwrap()
524 .updated_splits_in_epoch
525 .extend(latest_splits.into_iter().map(|s| (s.id(), s)));
526 }
527 let mut stream = StreamReaderWithPause::<true, StreamChunkWithState>::new(
530 barrier_stream,
531 reader_stream_builder
532 .into_retry_stream(recover_state, is_uninitialized && self.is_shared_non_cdc),
533 );
534 let mut command_paused = false;
535
536 if is_pause_on_startup {
538 tracing::info!("source paused on startup");
539 stream.pause_stream();
540 command_paused = true;
541 }
542
543 let mut max_wait_barrier_time_ms =
546 self.system_params.load().barrier_interval_ms() as u128 * WAIT_BARRIER_MULTIPLE_TIMES;
547 let mut last_barrier_time = Instant::now();
548 let mut self_paused = false;
549
550 let source_output_row_count = self
551 .metrics
552 .source_output_row_count
553 .with_guarded_label_values(&self.get_metric_labels().each_ref().map(AsRef::as_ref));
554
555 let source_split_change_count = self
556 .metrics
557 .source_split_change_count
558 .with_guarded_label_values(&self.get_metric_labels().each_ref().map(AsRef::as_ref));
559
560 while let Some(msg) = stream.next().await {
561 let Ok(msg) = msg else {
562 tokio::time::sleep(Duration::from_millis(1000)).await;
563 self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err())?;
564 continue;
565 };
566
567 match msg {
568 Either::Left(Message::Barrier(barrier)) => {
570 last_barrier_time = Instant::now();
571
572 if self_paused {
573 self_paused = false;
574 if !command_paused {
576 stream.resume_stream();
577 }
578 }
579
580 let epoch = barrier.epoch;
581
582 let mut split_change = None;
583
584 if let Some(mutation) = barrier.mutation.as_deref() {
585 match mutation {
586 Mutation::Pause => {
587 command_paused = true;
588 stream.pause_stream()
589 }
590 Mutation::Resume => {
591 command_paused = false;
592 stream.resume_stream()
593 }
594 Mutation::SourceChangeSplit(actor_splits) => {
595 tracing::info!(
596 actor_id = self.actor_ctx.id,
597 actor_splits = ?actor_splits,
598 "source change split received"
599 );
600
601 split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
602 |target_splits| {
603 (
604 &source_desc,
605 &mut stream,
606 target_splits,
607 true,
608 &source_split_change_count,
609 )
610 },
611 );
612 }
613
614 Mutation::Update(UpdateMutation { actor_splits, .. }) => {
615 split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
616 |target_splits| {
617 (
618 &source_desc,
619 &mut stream,
620 target_splits,
621 false,
622 &source_split_change_count,
623 )
624 },
625 );
626 }
627 Mutation::Throttle(actor_to_apply) => {
628 if let Some(new_rate_limit) = actor_to_apply.get(&self.actor_ctx.id)
629 && *new_rate_limit != self.rate_limit_rps
630 {
631 tracing::info!(
632 "updating rate limit from {:?} to {:?}",
633 self.rate_limit_rps,
634 *new_rate_limit
635 );
636 self.rate_limit_rps = *new_rate_limit;
637 self.rebuild_stream_reader(&source_desc, &mut stream)?;
639 }
640 }
641 _ => {}
642 }
643 }
644
645 let updated_splits = self.persist_state_and_clear_cache(epoch).await?;
646
647 if barrier.kind.is_checkpoint()
649 && let Some(task_builder) = &mut wait_checkpoint_task_builder
650 {
651 task_builder.update_task_on_checkpoint(updated_splits);
652
653 tracing::debug!("epoch to wait {:?}", epoch);
654 task_builder.send(Epoch(epoch.prev)).await?
655 }
656
657 let barrier_epoch = barrier.epoch;
658 yield Message::Barrier(barrier);
659
660 if let Some((
661 source_desc,
662 stream,
663 target_splits,
664 should_trim_state,
665 source_split_change_count,
666 )) = split_change
667 {
668 self.apply_split_change_after_yield_barrier(
669 barrier_epoch,
670 source_desc,
671 stream,
672 target_splits,
673 should_trim_state,
674 source_split_change_count,
675 )
676 .await?;
677 }
678 }
679 Either::Left(_) => {
680 unreachable!();
683 }
684
685 Either::Right((chunk, latest_state)) => {
686 if let Some(task_builder) = &mut wait_checkpoint_task_builder {
687 let offset_col = chunk.column_at(offset_idx);
688 task_builder.update_task_on_chunk(offset_col.clone());
689 }
690 if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
691 self_paused = true;
696 tracing::warn!(
697 "source paused, wait barrier for {:?}",
698 last_barrier_time.elapsed()
699 );
700 stream.pause_stream();
701
702 max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
707 as u128
708 * WAIT_BARRIER_MULTIPLE_TIMES;
709 }
710
711 latest_state.iter().for_each(|(split_id, new_split_impl)| {
712 if let Some(split_impl) = self
713 .stream_source_core
714 .as_mut()
715 .unwrap()
716 .latest_split_info
717 .get_mut(split_id)
718 {
719 *split_impl = new_split_impl.clone();
720 }
721 });
722
723 self.stream_source_core
724 .as_mut()
725 .unwrap()
726 .updated_splits_in_epoch
727 .extend(latest_state);
728
729 source_output_row_count.inc_by(chunk.cardinality() as u64);
730 let chunk =
731 prune_additional_cols(&chunk, split_idx, offset_idx, &source_desc.columns);
732 yield Message::Chunk(chunk);
733 self.try_flush_data().await?;
734 }
735 }
736 }
737
738 tracing::error!(
740 actor_id = self.actor_ctx.id,
741 "source executor exited unexpectedly"
742 )
743 }
744
745 #[try_stream(ok = Message, error = StreamExecutorError)]
748 async fn execute_without_stream_source(mut self) {
749 let mut barrier_receiver = self.barrier_receiver.take().unwrap();
750 let barrier = barrier_receiver
751 .recv()
752 .instrument_await("source_recv_first_barrier")
753 .await
754 .ok_or_else(|| {
755 anyhow!(
756 "failed to receive the first barrier, actor_id: {:?} with no stream source",
757 self.actor_ctx.id
758 )
759 })?;
760 yield Message::Barrier(barrier);
761
762 while let Some(barrier) = barrier_receiver.recv().await {
763 yield Message::Barrier(barrier);
764 }
765 }
766}
767
768impl<S: StateStore> Execute for SourceExecutor<S> {
769 fn execute(self: Box<Self>) -> BoxedMessageStream {
770 if self.stream_source_core.is_some() {
771 self.execute_with_stream_source().boxed()
772 } else {
773 self.execute_without_stream_source().boxed()
774 }
775 }
776}
777
778impl<S: StateStore> Debug for SourceExecutor<S> {
779 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
780 if let Some(core) = &self.stream_source_core {
781 f.debug_struct("SourceExecutor")
782 .field("source_id", &core.source_id)
783 .field("column_ids", &core.column_ids)
784 .finish()
785 } else {
786 f.debug_struct("SourceExecutor").finish()
787 }
788 }
789}
790
791struct WaitCheckpointTaskBuilder {
792 wait_checkpoint_tx: UnboundedSender<(Epoch, WaitCheckpointTask)>,
793 source_reader: SourceReader,
794 building_task: WaitCheckpointTask,
795}
796
797impl WaitCheckpointTaskBuilder {
798 fn update_task_on_chunk(&mut self, offset_col: ArrayRef) {
799 match &mut self.building_task {
800 WaitCheckpointTask::AckPubsubMessage(_, arrays) => {
801 arrays.push(offset_col);
802 }
803 WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => {
804 arrays.push(offset_col);
805 }
806 WaitCheckpointTask::CommitCdcOffset(_) => {}
807 }
808 }
809
810 fn update_task_on_checkpoint(&mut self, updated_splits: HashMap<SplitId, SplitImpl>) {
811 #[expect(clippy::single_match)]
812 match &mut self.building_task {
813 WaitCheckpointTask::CommitCdcOffset(offsets) => {
814 if !updated_splits.is_empty() {
815 assert_eq!(1, updated_splits.len());
817 for (split_id, split_impl) in updated_splits {
818 if split_impl.is_cdc_split() {
819 *offsets = Some((split_id, split_impl.get_cdc_split_offset()));
820 } else {
821 unreachable!()
822 }
823 }
824 }
825 }
826 _ => {}
827 }
828 }
829
830 async fn send(&mut self, epoch: Epoch) -> Result<(), anyhow::Error> {
832 let new_task = self
833 .source_reader
834 .create_wait_checkpoint_task()
835 .await?
836 .expect("wait checkpoint task should be created");
837 self.wait_checkpoint_tx
838 .send((epoch, std::mem::replace(&mut self.building_task, new_task)))
839 .expect("wait_checkpoint_tx send should succeed");
840 Ok(())
841 }
842}
843
844struct WaitCheckpointWorker<S: StateStore> {
870 wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>,
871 state_store: S,
872 table_id: TableId,
873}
874
875impl<S: StateStore> WaitCheckpointWorker<S> {
876 pub async fn run(mut self) {
877 tracing::debug!("wait epoch worker start success");
878 loop {
879 match self.wait_checkpoint_rx.recv().await {
881 Some((epoch, task)) => {
882 tracing::debug!("start to wait epoch {}", epoch.0);
883 let ret = self
884 .state_store
885 .try_wait_epoch(
886 HummockReadEpoch::Committed(epoch.0),
887 TryWaitEpochOptions {
888 table_id: self.table_id,
889 },
890 )
891 .await;
892
893 match ret {
894 Ok(()) => {
895 tracing::debug!(epoch = epoch.0, "wait epoch success");
896 task.run().await;
897 }
898 Err(e) => {
899 tracing::error!(
900 error = %e.as_report(),
901 "wait epoch {} failed", epoch.0
902 );
903 }
904 }
905 }
906 None => {
907 tracing::error!("wait epoch rx closed");
908 break;
909 }
910 }
911 }
912 }
913}
914
915#[cfg(test)]
916mod tests {
917 use std::collections::HashSet;
918
919 use maplit::{btreemap, convert_args, hashmap};
920 use risingwave_common::catalog::{ColumnId, Field, TableId};
921 use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
922 use risingwave_common::test_prelude::StreamChunkTestExt;
923 use risingwave_common::util::epoch::{EpochExt, test_epoch};
924 use risingwave_connector::source::datagen::DatagenSplit;
925 use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
926 use risingwave_pb::catalog::StreamSourceInfo;
927 use risingwave_pb::plan_common::PbRowFormatType;
928 use risingwave_storage::memory::MemoryStateStore;
929 use tokio::sync::mpsc::unbounded_channel;
930 use tracing_test::traced_test;
931
932 use super::*;
933 use crate::executor::AddMutation;
934 use crate::executor::source::{SourceStateTableHandler, default_source_internal_table};
935
936 const MOCK_SOURCE_NAME: &str = "mock_source";
937
938 #[tokio::test]
939 async fn test_source_executor() {
940 let table_id = TableId::default();
941 let schema = Schema {
942 fields: vec![Field::with_name(DataType::Int32, "sequence_int")],
943 };
944 let row_id_index = None;
945 let source_info = StreamSourceInfo {
946 row_format: PbRowFormatType::Native as i32,
947 ..Default::default()
948 };
949 let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
950 let column_ids = vec![0].into_iter().map(ColumnId::from).collect();
951
952 let properties = convert_args!(btreemap!(
954 "connector" => "datagen",
955 "datagen.rows.per.second" => "3",
956 "fields.sequence_int.kind" => "sequence",
957 "fields.sequence_int.start" => "11",
958 "fields.sequence_int.end" => "11111",
959 ));
960 let source_desc_builder =
961 create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
962 let split_state_store = SourceStateTableHandler::from_table_catalog(
963 &default_source_internal_table(0x2333),
964 MemoryStateStore::new(),
965 )
966 .await;
967 let core = StreamSourceCore::<MemoryStateStore> {
968 source_id: table_id,
969 column_ids,
970 source_desc_builder: Some(source_desc_builder),
971 latest_split_info: HashMap::new(),
972 split_state_store,
973 updated_splits_in_epoch: HashMap::new(),
974 source_name: MOCK_SOURCE_NAME.to_owned(),
975 };
976
977 let system_params_manager = LocalSystemParamsManager::for_test();
978
979 let executor = SourceExecutor::new(
980 ActorContext::for_test(0),
981 Some(core),
982 Arc::new(StreamingMetrics::unused()),
983 barrier_rx,
984 system_params_manager.get_params(),
985 None,
986 false,
987 );
988 let mut executor = executor.boxed().execute();
989
990 let init_barrier =
991 Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
992 adds: HashMap::new(),
993 added_actors: HashSet::new(),
994 splits: hashmap! {
995 ActorId::default() => vec![
996 SplitImpl::Datagen(DatagenSplit {
997 split_index: 0,
998 split_num: 1,
999 start_offset: None,
1000 }),
1001 ],
1002 },
1003 pause: false,
1004 subscriptions_to_add: vec![],
1005 }));
1006 barrier_tx.send(init_barrier).unwrap();
1007
1008 executor.next().await.unwrap().unwrap();
1010
1011 let msg = executor.next().await.unwrap().unwrap();
1013
1014 assert_eq!(
1016 msg.into_chunk().unwrap(),
1017 StreamChunk::from_pretty(
1018 " i
1019 + 11
1020 + 12
1021 + 13"
1022 )
1023 );
1024 }
1025
1026 #[traced_test]
1027 #[tokio::test]
1028 async fn test_split_change_mutation() {
1029 let table_id = TableId::default();
1030 let schema = Schema {
1031 fields: vec![Field::with_name(DataType::Int32, "v1")],
1032 };
1033 let row_id_index = None;
1034 let source_info = StreamSourceInfo {
1035 row_format: PbRowFormatType::Native as i32,
1036 ..Default::default()
1037 };
1038 let properties = convert_args!(btreemap!(
1039 "connector" => "datagen",
1040 "fields.v1.kind" => "sequence",
1041 "fields.v1.start" => "11",
1042 "fields.v1.end" => "11111",
1043 ));
1044
1045 let source_desc_builder =
1046 create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1047 let mem_state_store = MemoryStateStore::new();
1048
1049 let column_ids = vec![ColumnId::from(0)];
1050 let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1051 let split_state_store = SourceStateTableHandler::from_table_catalog(
1052 &default_source_internal_table(0x2333),
1053 mem_state_store.clone(),
1054 )
1055 .await;
1056
1057 let core = StreamSourceCore::<MemoryStateStore> {
1058 source_id: table_id,
1059 column_ids: column_ids.clone(),
1060 source_desc_builder: Some(source_desc_builder),
1061 latest_split_info: HashMap::new(),
1062 split_state_store,
1063 updated_splits_in_epoch: HashMap::new(),
1064 source_name: MOCK_SOURCE_NAME.to_owned(),
1065 };
1066
1067 let system_params_manager = LocalSystemParamsManager::for_test();
1068
1069 let executor = SourceExecutor::new(
1070 ActorContext::for_test(0),
1071 Some(core),
1072 Arc::new(StreamingMetrics::unused()),
1073 barrier_rx,
1074 system_params_manager.get_params(),
1075 None,
1076 false,
1077 );
1078 let mut handler = executor.boxed().execute();
1079
1080 let mut epoch = test_epoch(1);
1081 let init_barrier =
1082 Barrier::new_test_barrier(epoch).with_mutation(Mutation::Add(AddMutation {
1083 adds: HashMap::new(),
1084 added_actors: HashSet::new(),
1085 splits: hashmap! {
1086 ActorId::default() => vec![
1087 SplitImpl::Datagen(DatagenSplit {
1088 split_index: 0,
1089 split_num: 3,
1090 start_offset: None,
1091 }),
1092 ],
1093 },
1094 pause: false,
1095 subscriptions_to_add: vec![],
1096 }));
1097 barrier_tx.send(init_barrier).unwrap();
1098
1099 handler
1101 .next()
1102 .await
1103 .unwrap()
1104 .unwrap()
1105 .into_barrier()
1106 .unwrap();
1107
1108 let mut ready_chunks = handler.ready_chunks(10);
1109
1110 let _ = ready_chunks.next().await.unwrap();
1111
1112 let new_assignment = vec![
1113 SplitImpl::Datagen(DatagenSplit {
1114 split_index: 0,
1115 split_num: 3,
1116 start_offset: None,
1117 }),
1118 SplitImpl::Datagen(DatagenSplit {
1119 split_index: 1,
1120 split_num: 3,
1121 start_offset: None,
1122 }),
1123 SplitImpl::Datagen(DatagenSplit {
1124 split_index: 2,
1125 split_num: 3,
1126 start_offset: None,
1127 }),
1128 ];
1129
1130 epoch.inc_epoch();
1131 let change_split_mutation =
1132 Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1133 ActorId::default() => new_assignment.clone()
1134 }));
1135
1136 barrier_tx.send(change_split_mutation).unwrap();
1137
1138 let _ = ready_chunks.next().await.unwrap(); epoch.inc_epoch();
1141 let barrier = Barrier::new_test_barrier(epoch);
1142 barrier_tx.send(barrier).unwrap();
1143
1144 ready_chunks.next().await.unwrap(); let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1147 &default_source_internal_table(0x2333),
1148 mem_state_store.clone(),
1149 )
1150 .await;
1151
1152 source_state_handler
1154 .init_epoch(EpochPair::new_test_epoch(epoch))
1155 .await
1156 .unwrap();
1157 source_state_handler
1158 .get(&new_assignment[1].id())
1159 .await
1160 .unwrap()
1161 .unwrap();
1162
1163 tokio::time::sleep(Duration::from_millis(100)).await;
1164
1165 let _ = ready_chunks.next().await.unwrap();
1166
1167 epoch.inc_epoch();
1168 let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Pause);
1169 barrier_tx.send(barrier).unwrap();
1170
1171 epoch.inc_epoch();
1172 let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Resume);
1173 barrier_tx.send(barrier).unwrap();
1174
1175 ready_chunks.next().await.unwrap();
1177
1178 let prev_assignment = new_assignment;
1179 let new_assignment = vec![prev_assignment[2].clone()];
1180
1181 epoch.inc_epoch();
1182 let drop_split_mutation =
1183 Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1184 ActorId::default() => new_assignment.clone()
1185 }));
1186
1187 barrier_tx.send(drop_split_mutation).unwrap();
1188
1189 ready_chunks.next().await.unwrap(); epoch.inc_epoch();
1192 let barrier = Barrier::new_test_barrier(epoch);
1193 barrier_tx.send(barrier).unwrap();
1194
1195 ready_chunks.next().await.unwrap(); let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1198 &default_source_internal_table(0x2333),
1199 mem_state_store.clone(),
1200 )
1201 .await;
1202
1203 let new_epoch = EpochPair::new_test_epoch(epoch);
1204 source_state_handler.init_epoch(new_epoch).await.unwrap();
1205
1206 let committed_reader = source_state_handler
1207 .new_committed_reader(new_epoch)
1208 .await
1209 .unwrap();
1210 assert!(
1211 committed_reader
1212 .try_recover_from_state_store(&prev_assignment[0])
1213 .await
1214 .unwrap()
1215 .is_none()
1216 );
1217
1218 assert!(
1219 committed_reader
1220 .try_recover_from_state_store(&prev_assignment[1])
1221 .await
1222 .unwrap()
1223 .is_none()
1224 );
1225
1226 assert!(
1227 committed_reader
1228 .try_recover_from_state_store(&prev_assignment[2])
1229 .await
1230 .unwrap()
1231 .is_some()
1232 );
1233 }
1234}