1use std::collections::{HashMap, HashSet};
16use std::time::Duration;
17
18use anyhow::anyhow;
19use either::Either;
20use itertools::Itertools;
21use prometheus::core::{AtomicU64, GenericCounter};
22use risingwave_common::array::ArrayRef;
23use risingwave_common::catalog::{ColumnId, TableId};
24use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedMetric};
25use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::types::JsonbVal;
28use risingwave_common::util::epoch::{Epoch, EpochPair};
29use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
30use risingwave_connector::source::cdc::split::{
31 extract_postgres_lsn_from_offset_str, extract_sql_server_commit_lsn_from_offset_str,
32};
33use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
34use risingwave_connector::source::reader::reader::SourceReader;
35use risingwave_connector::source::{
36 ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData,
37 StreamChunkWithState, WaitCheckpointTask, build_pulsar_ack_channel_id,
38};
39use risingwave_hummock_sdk::HummockReadEpoch;
40use risingwave_pb::common::ThrottleType;
41use risingwave_pb::id::SourceId;
42use risingwave_storage::store::TryWaitEpochOptions;
43use thiserror_ext::AsReport;
44use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
45use tokio::sync::{mpsc, oneshot};
46use tokio::time::Instant;
47
48use super::executor_core::StreamSourceCore;
49use super::{barrier_to_message_stream, get_split_offset_col_idx, prune_additional_cols};
50use crate::common::rate_limit::limited_chunk_size;
51use crate::executor::UpdateMutation;
52use crate::executor::prelude::*;
53use crate::executor::source::reader_stream::StreamReaderBuilder;
54use crate::executor::stream_reader::StreamReaderWithPause;
55use crate::task::LocalBarrierManager;
56
57pub const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;
60
61fn lsn_u128_to_i64(lsn: u128) -> i64 {
62 lsn.min(i64::MAX as u128) as i64
63}
64
65pub struct SourceExecutor<S: StateStore> {
66 actor_ctx: ActorContextRef,
67
68 stream_source_core: StreamSourceCore<S>,
70
71 metrics: Arc<StreamingMetrics>,
73
74 barrier_receiver: Option<UnboundedReceiver<Barrier>>,
76
77 system_params: SystemParamsReaderRef,
79
80 rate_limit_rps: Option<u32>,
82
83 is_shared_non_cdc: bool,
84
85 barrier_manager: LocalBarrierManager,
87}
88
89impl<S: StateStore> SourceExecutor<S> {
90 #[expect(clippy::too_many_arguments)]
91 pub fn new(
92 actor_ctx: ActorContextRef,
93 stream_source_core: StreamSourceCore<S>,
94 metrics: Arc<StreamingMetrics>,
95 barrier_receiver: UnboundedReceiver<Barrier>,
96 system_params: SystemParamsReaderRef,
97 rate_limit_rps: Option<u32>,
98 is_shared_non_cdc: bool,
99 barrier_manager: LocalBarrierManager,
100 ) -> Self {
101 Self {
102 actor_ctx,
103 stream_source_core,
104 metrics,
105 barrier_receiver: Some(barrier_receiver),
106 system_params,
107 rate_limit_rps,
108 is_shared_non_cdc,
109 barrier_manager,
110 }
111 }
112
113 fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder {
114 StreamReaderBuilder {
115 source_desc,
116 rate_limit: self.rate_limit_rps,
117 source_id: self.stream_source_core.source_id,
118 source_name: self.stream_source_core.source_name.clone(),
119 is_auto_schema_change_enable: self.is_auto_schema_change_enable(),
120 actor_ctx: self.actor_ctx.clone(),
121 reader_stream: None,
122 }
123 }
124
125 async fn spawn_wait_checkpoint_worker(
126 core: &StreamSourceCore<S>,
127 source_reader: SourceReader,
128 metrics: Arc<StreamingMetrics>,
129 ) -> StreamExecutorResult<Option<WaitCheckpointTaskBuilder>> {
130 let Some(initial_task) = source_reader.create_wait_checkpoint_task().await? else {
131 return Ok(None);
132 };
133 let (wait_checkpoint_tx, wait_checkpoint_rx) = mpsc::unbounded_channel();
134 let wait_checkpoint_worker = WaitCheckpointWorker {
135 wait_checkpoint_rx,
136 state_store: core.split_state_store.state_table().state_store().clone(),
137 table_id: core.split_state_store.state_table().table_id(),
138 metrics,
139 };
140 tokio::spawn(wait_checkpoint_worker.run());
141 Ok(Some(WaitCheckpointTaskBuilder {
142 wait_checkpoint_tx,
143 source_reader,
144 building_task: initial_task,
145 }))
146 }
147
148 pub fn prepare_source_stream_build(
150 &self,
151 source_desc: &SourceDesc,
152 ) -> (Vec<ColumnId>, SourceContext) {
153 let column_ids = source_desc
154 .columns
155 .iter()
156 .map(|column_desc| column_desc.column_id)
157 .collect_vec();
158
159 let (schema_change_tx, mut schema_change_rx) =
160 mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
161 let schema_change_tx = if self.is_auto_schema_change_enable() {
162 let meta_client = self.actor_ctx.meta_client.clone();
163 let _join_handle = tokio::task::spawn(async move {
165 while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
166 let table_ids = schema_change.table_ids();
167 tracing::info!(
168 target: "auto_schema_change",
169 "recv a schema change event for tables: {:?}", table_ids);
170 if let Some(ref meta_client) = meta_client {
172 match meta_client
173 .auto_schema_change(schema_change.to_protobuf())
174 .await
175 {
176 Ok(_) => {
177 tracing::info!(
178 target: "auto_schema_change",
179 "schema change success for tables: {:?}", table_ids);
180 finish_tx.send(()).unwrap();
181 }
182 Err(e) => {
183 tracing::error!(
184 target: "auto_schema_change",
185 error = ?e.as_report(), "schema change error");
186 finish_tx.send(()).unwrap();
187 }
188 }
189 }
190 }
191 });
192 Some(schema_change_tx)
193 } else {
194 info!("auto schema change is disabled in config");
195 None
196 };
197 let source_ctx = SourceContext::new(
198 self.actor_ctx.id,
199 self.stream_source_core.source_id,
200 self.actor_ctx.fragment_id,
201 self.stream_source_core.source_name.clone(),
202 source_desc.metrics.clone(),
203 SourceCtrlOpts {
204 chunk_size: limited_chunk_size(self.rate_limit_rps),
205 split_txn: self.rate_limit_rps.is_some(), },
207 source_desc.source.config.clone(),
208 schema_change_tx,
209 );
210
211 (column_ids, source_ctx)
212 }
213
214 fn is_auto_schema_change_enable(&self) -> bool {
215 self.actor_ctx.config.developer.enable_auto_schema_change
216 }
217
218 #[inline]
220 fn get_metric_labels(&self) -> [String; 4] {
221 [
222 self.stream_source_core.source_id.to_string(),
223 self.stream_source_core.source_name.clone(),
224 self.actor_ctx.id.to_string(),
225 self.actor_ctx.fragment_id.to_string(),
226 ]
227 }
228
229 async fn apply_split_change_after_yield_barrier<const BIASED: bool>(
238 &mut self,
239 barrier_epoch: EpochPair,
240 source_desc: &SourceDesc,
241 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
242 apply_mutation: ApplyMutationAfterBarrier<'_>,
243 ) -> StreamExecutorResult<()> {
244 {
245 let mut should_rebuild_stream = false;
246 match apply_mutation {
247 ApplyMutationAfterBarrier::SplitChange {
248 target_splits,
249 should_trim_state,
250 split_change_count,
251 } => {
252 split_change_count.inc();
253 if self
254 .update_state_if_changed(barrier_epoch, target_splits, should_trim_state)
255 .await?
256 {
257 should_rebuild_stream = true;
258 }
259 }
260 ApplyMutationAfterBarrier::ConnectorPropsChange => {
261 should_rebuild_stream = true;
262 }
263 }
264
265 if should_rebuild_stream {
266 self.rebuild_stream_reader(source_desc, stream)?;
267 }
268 }
269
270 Ok(())
271 }
272
273 async fn update_state_if_changed(
275 &mut self,
276 barrier_epoch: EpochPair,
277 target_splits: Vec<SplitImpl>,
278 should_trim_state: bool,
279 ) -> StreamExecutorResult<bool> {
280 let core = &mut self.stream_source_core;
281
282 let target_splits: HashMap<_, _> = target_splits
283 .into_iter()
284 .map(|split| (split.id(), split))
285 .collect();
286
287 let mut target_state: HashMap<SplitId, SplitImpl> =
288 HashMap::with_capacity(target_splits.len());
289
290 let mut split_changed = false;
291
292 let committed_reader = core
293 .split_state_store
294 .new_committed_reader(barrier_epoch)
295 .await?;
296
297 for (split_id, split) in target_splits {
299 if let Some(s) = core.latest_split_info.get(&split_id) {
300 target_state.insert(split_id, s.clone());
303 } else {
304 split_changed = true;
305 let initial_state = if let Some(recover_state) = committed_reader
308 .try_recover_from_state_store(&split)
309 .await?
310 {
311 recover_state
312 } else {
313 split
314 };
315
316 core.updated_splits_in_epoch
317 .entry(split_id.clone())
318 .or_insert_with(|| initial_state.clone());
319
320 target_state.insert(split_id, initial_state);
321 }
322 }
323
324 for existing_split_id in core.latest_split_info.keys() {
326 if !target_state.contains_key(existing_split_id) {
327 tracing::info!("split dropping detected: {}", existing_split_id);
328 split_changed = true;
329 }
330 }
331
332 if split_changed {
333 tracing::info!(
334 actor_id = %self.actor_ctx.id,
335 state = ?target_state,
336 "apply split change"
337 );
338
339 core.updated_splits_in_epoch
340 .retain(|split_id, _| target_state.contains_key(split_id));
341
342 let dropped_splits = core
343 .latest_split_info
344 .extract_if(|split_id, _| !target_state.contains_key(split_id))
345 .map(|(_, split)| split)
346 .collect_vec();
347
348 if should_trim_state && !dropped_splits.is_empty() {
349 core.split_state_store.trim_state(&dropped_splits).await?;
351 }
352
353 core.latest_split_info = target_state;
354 }
355
356 Ok(split_changed)
357 }
358
359 fn rebuild_stream_reader_from_error<const BIASED: bool>(
361 &mut self,
362 source_desc: &SourceDesc,
363 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
364 e: StreamExecutorError,
365 ) -> StreamExecutorResult<()> {
366 let core = &mut self.stream_source_core;
367 tracing::error!(
368 error = ?e.as_report(),
369 actor_id = %self.actor_ctx.id,
370 source_id = %core.source_id,
371 "stream source reader error",
372 );
373 GLOBAL_ERROR_METRICS.user_source_error.report([
374 e.variant_name().to_owned(),
375 core.source_id.to_string(),
376 core.source_name.clone(),
377 self.actor_ctx.fragment_id.to_string(),
378 ]);
379
380 self.rebuild_stream_reader(source_desc, stream)
381 }
382
383 fn rebuild_stream_reader<const BIASED: bool>(
384 &mut self,
385 source_desc: &SourceDesc,
386 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
387 ) -> StreamExecutorResult<()> {
388 let core = &mut self.stream_source_core;
389 let target_state: Vec<SplitImpl> = core.latest_split_info.values().cloned().collect();
390
391 tracing::info!(
392 "actor {:?} apply source split change to {:?}",
393 self.actor_ctx.id,
394 target_state
395 );
396
397 let reader_stream_builder = self.stream_reader_builder(source_desc.clone());
399 let reader_stream = reader_stream_builder.into_retry_stream(Some(target_state), false);
400
401 stream.replace_data_stream(reader_stream);
402
403 Ok(())
404 }
405
406 async fn handle_inject_source_offsets(
412 &mut self,
413 split_offsets: &HashMap<String, String>,
414 ) -> StreamExecutorResult<bool> {
415 tracing::warn!(
416 actor_id = %self.actor_ctx.id,
417 source_id = %self.stream_source_core.source_id,
418 num_offsets = split_offsets.len(),
419 "UNSAFE: Injecting source offsets - this may cause data duplication or loss"
420 );
421
422 let offsets_to_inject: Vec<(String, String)> = {
424 let owned_splits: HashSet<&str> = self
425 .stream_source_core
426 .latest_split_info
427 .keys()
428 .map(|s| s.as_ref())
429 .collect();
430 split_offsets
431 .iter()
432 .filter(|(split_id, _)| owned_splits.contains(split_id.as_str()))
433 .map(|(k, v)| (k.clone(), v.clone()))
434 .collect()
435 };
436
437 let mut json_states: Vec<(String, JsonbVal)> = Vec::new();
439 let mut failed_splits: Vec<String> = Vec::new();
440
441 for (split_id, offset) in &offsets_to_inject {
442 if let Some(split) = self
443 .stream_source_core
444 .latest_split_info
445 .get_mut(split_id.as_str())
446 {
447 tracing::info!(
448 actor_id = %self.actor_ctx.id,
449 split_id = %split_id,
450 offset = %offset,
451 "Injecting offset for owned split"
452 );
453 if let Err(e) = split.update_in_place(offset.clone()) {
455 tracing::error!(
456 actor_id = %self.actor_ctx.id,
457 split_id = %split_id,
458 error = ?e.as_report(),
459 "Failed to update split offset"
460 );
461 failed_splits.push(split_id.clone());
462 continue;
463 }
464 self.stream_source_core
466 .updated_splits_in_epoch
467 .insert(split_id.clone().into(), split.clone());
468 let json_value: serde_json::Value = serde_json::from_str(offset)
470 .unwrap_or_else(|_| serde_json::json!({ "offset": offset }));
471 json_states.push((split_id.clone(), JsonbVal::from(json_value)));
472 }
473 }
474
475 if !failed_splits.is_empty() {
476 return Err(StreamExecutorError::connector_error(anyhow!(
477 "failed to inject offsets for splits: {:?}",
478 failed_splits
479 )));
480 }
481
482 let num_injected = json_states.len();
483 if num_injected > 0 {
484 self.stream_source_core
486 .split_state_store
487 .set_states_json(json_states)
488 .await?;
489
490 tracing::info!(
491 actor_id = %self.actor_ctx.id,
492 source_id = %self.stream_source_core.source_id,
493 num_injected = num_injected,
494 "Offset injection completed for owned splits, triggering rebuild"
495 );
496 Ok(true)
497 } else {
498 tracing::info!(
499 actor_id = %self.actor_ctx.id,
500 source_id = %self.stream_source_core.source_id,
501 "No owned splits to inject offsets for"
502 );
503 Ok(false)
504 }
505 }
506
507 async fn persist_state_and_clear_cache(
508 &mut self,
509 epoch: EpochPair,
510 ) -> StreamExecutorResult<HashMap<SplitId, SplitImpl>> {
511 let core = &mut self.stream_source_core;
512
513 let cache = core
514 .updated_splits_in_epoch
515 .values()
516 .map(|split_impl| split_impl.to_owned())
517 .collect_vec();
518
519 if !cache.is_empty() {
520 tracing::debug!(state = ?cache, "take snapshot");
521
522 let source_id = core.source_id.to_string();
524 for split_impl in &cache {
525 match split_impl {
527 SplitImpl::PostgresCdc(pg_split) => {
528 if let Some(lsn_value) = pg_split.pg_lsn() {
529 self.metrics
530 .pg_cdc_state_table_lsn
531 .with_guarded_label_values(&[&source_id])
532 .set(lsn_value as i64);
533 }
534 }
535 SplitImpl::MysqlCdc(mysql_split) => {
536 if let Some((file_seq, position)) = mysql_split.mysql_binlog_offset() {
537 self.metrics
538 .mysql_cdc_state_binlog_file_seq
539 .with_guarded_label_values(&[&source_id])
540 .set(file_seq as i64);
541
542 self.metrics
543 .mysql_cdc_state_binlog_position
544 .with_guarded_label_values(&[&source_id])
545 .set(position as i64);
546 }
547 }
548 SplitImpl::SqlServerCdc(sqlserver_split) => {
549 if let Some(lsn) = sqlserver_split.sql_server_change_lsn() {
550 self.metrics
551 .sqlserver_cdc_state_change_lsn
552 .with_guarded_label_values(&[&source_id])
553 .set(lsn_u128_to_i64(lsn));
554 }
555 if let Some(lsn) = sqlserver_split.sql_server_commit_lsn() {
556 self.metrics
557 .sqlserver_cdc_state_commit_lsn
558 .with_guarded_label_values(&[&source_id])
559 .set(lsn_u128_to_i64(lsn));
560 }
561 }
562 _ => {}
563 }
564 }
565
566 core.split_state_store.set_states(cache).await?;
567 }
568
569 core.split_state_store.commit(epoch).await?;
571
572 let updated_splits = core.updated_splits_in_epoch.clone();
573
574 core.updated_splits_in_epoch.clear();
575
576 Ok(updated_splits)
577 }
578
579 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
581 let core = &mut self.stream_source_core;
582 core.split_state_store.try_flush().await?;
583
584 Ok(())
585 }
586
587 fn maybe_report_cdc_source_offset(
589 &self,
590 updated_splits: &HashMap<SplitId, SplitImpl>,
591 epoch: EpochPair,
592 source_id: SourceId,
593 must_report_cdc_offset_once: &mut bool,
594 must_wait_cdc_offset_before_report: bool,
595 ) {
596 if *must_report_cdc_offset_once
598 && (!must_wait_cdc_offset_before_report
599 || updated_splits
600 .values()
601 .any(|split| split.is_cdc_split() && !split.get_cdc_split_offset().is_empty()))
602 {
603 self.barrier_manager.report_cdc_source_offset_updated(
605 epoch,
606 self.actor_ctx.id,
607 source_id,
608 );
609 tracing::info!(
610 actor_id = %self.actor_ctx.id,
611 source_id = %source_id,
612 epoch = ?epoch,
613 "Reported CDC source offset updated to meta (first time only)"
614 );
615 *must_report_cdc_offset_once = false;
617 }
618 }
619
620 #[try_stream(ok = Message, error = StreamExecutorError)]
625 async fn execute_inner(mut self) {
626 let mut barrier_receiver = self.barrier_receiver.take().unwrap();
627 let first_barrier = barrier_receiver
628 .recv()
629 .instrument_await("source_recv_first_barrier")
630 .await
631 .ok_or_else(|| {
632 anyhow!(
633 "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
634 self.actor_ctx.id,
635 self.stream_source_core.source_id
636 )
637 })?;
638 let first_epoch = first_barrier.epoch;
639 let (mut boot_state, mut must_report_cdc_offset_once, must_wait_cdc_offset_before_report) =
642 if let Some(splits) = first_barrier.initial_split_assignment(self.actor_ctx.id) {
643 tracing::debug!(?splits, "boot with splits");
645 let must_report_cdc_offset_once = splits.iter().any(|split| split.is_cdc_split());
647 let must_wait_cdc_offset_before_report = must_report_cdc_offset_once
649 && splits.iter().any(|split| {
650 matches!(split, SplitImpl::MysqlCdc(_) | SplitImpl::SqlServerCdc(_))
651 });
652 (
653 splits.to_vec(),
654 must_report_cdc_offset_once,
655 must_wait_cdc_offset_before_report,
656 )
657 } else {
658 (Vec::default(), true, true)
659 };
660 let is_pause_on_startup = first_barrier.is_pause_on_startup();
661 let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id);
662
663 yield Message::Barrier(first_barrier);
664
665 let mut core = self.stream_source_core;
666 let source_id = core.source_id;
667
668 let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap();
670 let mut source_desc = source_desc_builder
671 .build()
672 .map_err(StreamExecutorError::connector_error)?;
673
674 let mut wait_checkpoint_task_builder = Self::spawn_wait_checkpoint_worker(
675 &core,
676 source_desc.source.clone(),
677 self.metrics.clone(),
678 )
679 .await?;
680
681 let (Some(split_idx), Some(offset_idx), pulsar_message_id_idx) =
682 get_split_offset_col_idx(&source_desc.columns)
683 else {
684 unreachable!("Partition and offset columns must be set.");
685 };
686
687 core.split_state_store.init_epoch(first_epoch).await?;
688 {
689 let committed_reader = core
690 .split_state_store
691 .new_committed_reader(first_epoch)
692 .await?;
693 for ele in &mut boot_state {
694 if let Some(recover_state) =
695 committed_reader.try_recover_from_state_store(ele).await?
696 {
697 *ele = recover_state;
698 is_uninitialized = false;
700 } else {
701 core.updated_splits_in_epoch.insert(ele.id(), ele.clone());
705 }
706 }
707 }
708
709 core.init_split_state(boot_state.clone());
711
712 self.stream_source_core = core;
714
715 let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
716 tracing::debug!(state = ?recover_state, "start with state");
717
718 let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
719 let mut reader_stream_builder = self.stream_reader_builder(source_desc.clone());
720 let mut latest_splits = None;
721 if is_uninitialized {
723 let create_split_reader_result = reader_stream_builder
724 .fetch_latest_splits(recover_state.clone(), self.is_shared_non_cdc)
725 .await?;
726 latest_splits = create_split_reader_result.latest_splits;
727 }
728
729 if let Some(latest_splits) = latest_splits {
730 self.stream_source_core
733 .updated_splits_in_epoch
734 .extend(latest_splits.into_iter().map(|s| (s.id(), s)));
735 }
736 let mut stream = StreamReaderWithPause::<true, StreamChunkWithState>::new(
739 barrier_stream,
740 reader_stream_builder
741 .into_retry_stream(recover_state, is_uninitialized && self.is_shared_non_cdc),
742 );
743 let mut command_paused = false;
744
745 if is_pause_on_startup {
747 tracing::info!("source paused on startup");
748 stream.pause_stream();
749 command_paused = true;
750 }
751
752 let mut max_wait_barrier_time_ms =
755 self.system_params.load().barrier_interval_ms() as u128 * WAIT_BARRIER_MULTIPLE_TIMES;
756 let mut last_barrier_time = Instant::now();
757 let mut self_paused = false;
758
759 let source_output_row_count = self
760 .metrics
761 .source_output_row_count
762 .with_guarded_label_values(&self.get_metric_labels());
763
764 let source_split_change_count = self
765 .metrics
766 .source_split_change_count
767 .with_guarded_label_values(&self.get_metric_labels());
768
769 while let Some(msg) = stream.next().await {
770 let Ok(msg) = msg else {
771 tokio::time::sleep(Duration::from_millis(1000)).await;
772 self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err())?;
773 continue;
774 };
775
776 match msg {
777 Either::Left(Message::Barrier(barrier)) => {
779 last_barrier_time = Instant::now();
780
781 if self_paused {
782 self_paused = false;
783 if !command_paused {
785 stream.resume_stream();
786 }
787 }
788
789 let epoch = barrier.epoch;
790 let mut split_change = None;
791
792 if let Some(mutation) = barrier.mutation.as_deref() {
793 match mutation {
794 Mutation::Pause => {
795 command_paused = true;
796 stream.pause_stream()
797 }
798 Mutation::Resume => {
799 command_paused = false;
800 stream.resume_stream()
801 }
802 Mutation::SourceChangeSplit(actor_splits) => {
803 tracing::info!(
804 actor_id = %self.actor_ctx.id,
805 actor_splits = ?actor_splits,
806 "source change split received"
807 );
808
809 split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
810 |target_splits| {
811 (
812 &source_desc,
813 &mut stream,
814 ApplyMutationAfterBarrier::SplitChange {
815 target_splits,
816 should_trim_state: true,
817 split_change_count: &source_split_change_count,
818 },
819 )
820 },
821 );
822 }
823
824 Mutation::ConnectorPropsChange(maybe_mutation) => {
825 if let Some(new_props) = maybe_mutation.get(&source_id.as_raw_id())
826 {
827 tracing::info!(
829 actor_id = %self.actor_ctx.id,
830 source_id = %source_id,
831 "updating source connector properties",
832 );
833 source_desc.update_reader(new_props.clone())?;
834 split_change = Some((
836 &source_desc,
837 &mut stream,
838 ApplyMutationAfterBarrier::ConnectorPropsChange,
839 ));
840 }
841 }
842
843 Mutation::Update(UpdateMutation { actor_splits, .. }) => {
844 split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
845 |target_splits| {
846 (
847 &source_desc,
848 &mut stream,
849 ApplyMutationAfterBarrier::SplitChange {
850 target_splits,
851 should_trim_state: false,
852 split_change_count: &source_split_change_count,
853 },
854 )
855 },
856 );
857 }
858 Mutation::Throttle(fragment_to_apply) => {
859 if let Some(entry) =
860 fragment_to_apply.get(&self.actor_ctx.fragment_id)
861 && entry.throttle_type() == ThrottleType::Source
862 && entry.rate_limit != self.rate_limit_rps
863 {
864 tracing::info!(
865 "updating rate limit from {:?} to {:?}",
866 self.rate_limit_rps,
867 entry.rate_limit
868 );
869 self.rate_limit_rps = entry.rate_limit;
870 self.rebuild_stream_reader(&source_desc, &mut stream)?;
872 }
873 }
874 Mutation::ResetSource { source_id } => {
875 if *source_id == self.stream_source_core.source_id {
879 tracing::info!(
880 actor_id = %self.actor_ctx.id,
881 source_id = source_id.as_raw_id(),
882 "Resetting CDC source: clearing offset (set to None)"
883 );
884
885 let splits_with_cleared_offset: Vec<SplitImpl> = self.stream_source_core
887 .latest_split_info
888 .values()
889 .map(|split| {
890 let mut new_split = split.clone();
892 match &mut new_split {
893 SplitImpl::MysqlCdc(debezium_split) => {
894 if let Some(mysql_split) = debezium_split.mysql_split.as_mut() {
895 tracing::info!(
896 split_id = ?mysql_split.inner.split_id,
897 old_offset = ?mysql_split.inner.start_offset,
898 "Clearing MySQL CDC offset"
899 );
900 mysql_split.inner.start_offset = None;
901 }
902 }
903 SplitImpl::PostgresCdc(debezium_split) => {
904 if let Some(pg_split) = debezium_split.postgres_split.as_mut() {
905 tracing::info!(
906 split_id = ?pg_split.inner.split_id,
907 old_offset = ?pg_split.inner.start_offset,
908 "Clearing PostgreSQL CDC offset"
909 );
910 pg_split.inner.start_offset = None;
911 }
912 }
913 SplitImpl::MongodbCdc(debezium_split) => {
914 if let Some(mongo_split) = debezium_split.mongodb_split.as_mut() {
915 tracing::info!(
916 split_id = ?mongo_split.inner.split_id,
917 old_offset = ?mongo_split.inner.start_offset,
918 "Clearing MongoDB CDC offset"
919 );
920 mongo_split.inner.start_offset = None;
921 }
922 }
923 SplitImpl::CitusCdc(debezium_split) => {
924 if let Some(citus_split) = debezium_split.citus_split.as_mut() {
925 tracing::info!(
926 split_id = ?citus_split.inner.split_id,
927 old_offset = ?citus_split.inner.start_offset,
928 "Clearing Citus CDC offset"
929 );
930 citus_split.inner.start_offset = None;
931 }
932 }
933 SplitImpl::SqlServerCdc(debezium_split) => {
934 if let Some(sqlserver_split) = debezium_split.sql_server_split.as_mut() {
935 tracing::info!(
936 split_id = ?sqlserver_split.inner.split_id,
937 old_offset = ?sqlserver_split.inner.start_offset,
938 "Clearing SQL Server CDC offset"
939 );
940 sqlserver_split.inner.start_offset = None;
941 }
942 }
943 _ => {
944 tracing::warn!(
945 "RESET SOURCE called on non-CDC split type"
946 );
947 }
948 }
949 new_split
950 })
951 .collect();
952
953 if !splits_with_cleared_offset.is_empty() {
954 tracing::info!(
955 actor_id = %self.actor_ctx.id,
956 split_count = splits_with_cleared_offset.len(),
957 "Updating state table with cleared offsets"
958 );
959
960 self.stream_source_core
962 .split_state_store
963 .set_states(splits_with_cleared_offset.clone())
964 .await?;
965
966 for split in splits_with_cleared_offset {
968 self.stream_source_core
969 .latest_split_info
970 .insert(split.id(), split.clone());
971 self.stream_source_core
972 .updated_splits_in_epoch
973 .insert(split.id(), split);
974 }
975
976 tracing::info!(
977 actor_id = %self.actor_ctx.id,
978 source_id = source_id.as_raw_id(),
979 "RESET SOURCE completed: offset cleared (set to None). \
980 Trigger recovery/restart to fetch latest offset from upstream."
981 );
982 } else {
983 tracing::warn!(
984 actor_id = %self.actor_ctx.id,
985 "No splits found to reset - source may not be initialized yet"
986 );
987 }
988 } else {
989 tracing::debug!(
990 actor_id = %self.actor_ctx.id,
991 target_source_id = source_id.as_raw_id(),
992 current_source_id = self.stream_source_core.source_id.as_raw_id(),
993 "ResetSource mutation for different source, ignoring"
994 );
995 }
996 }
997
998 Mutation::InjectSourceOffsets {
999 source_id,
1000 split_offsets,
1001 } => {
1002 if *source_id == self.stream_source_core.source_id {
1003 if self.handle_inject_source_offsets(split_offsets).await? {
1004 split_change = Some((
1006 &source_desc,
1007 &mut stream,
1008 ApplyMutationAfterBarrier::ConnectorPropsChange,
1009 ));
1010 }
1011 } else {
1012 tracing::debug!(
1013 actor_id = %self.actor_ctx.id,
1014 target_source_id = source_id.as_raw_id(),
1015 current_source_id = self.stream_source_core.source_id.as_raw_id(),
1016 "InjectSourceOffsets mutation for different source, ignoring"
1017 );
1018 }
1019 }
1020
1021 _ => {}
1022 }
1023 }
1024
1025 let updated_splits = self.persist_state_and_clear_cache(epoch).await?;
1026
1027 self.maybe_report_cdc_source_offset(
1028 &updated_splits,
1029 epoch,
1030 source_id,
1031 &mut must_report_cdc_offset_once,
1032 must_wait_cdc_offset_before_report,
1033 );
1034
1035 if barrier.kind.is_checkpoint()
1037 && let Some(task_builder) = &mut wait_checkpoint_task_builder
1038 {
1039 task_builder.update_task_on_checkpoint(updated_splits);
1040
1041 tracing::debug!("epoch to wait {:?}", epoch);
1042 task_builder.send(Epoch(epoch.prev)).await?
1043 }
1044
1045 let barrier_epoch = barrier.epoch;
1046 yield Message::Barrier(barrier);
1047
1048 if let Some((source_desc, stream, to_apply_mutation)) = split_change {
1049 self.apply_split_change_after_yield_barrier(
1050 barrier_epoch,
1051 source_desc,
1052 stream,
1053 to_apply_mutation,
1054 )
1055 .await?;
1056 }
1057 }
1058 Either::Left(_) => {
1059 unreachable!();
1062 }
1063
1064 Either::Right((chunk, latest_state)) => {
1065 if let Some(task_builder) = &mut wait_checkpoint_task_builder {
1066 if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
1067 let pulsar_message_id_col = chunk.column_at(pulsar_message_id_idx);
1068 task_builder.update_task_on_chunk(
1069 source_id,
1070 &latest_state,
1071 pulsar_message_id_col.clone(),
1072 );
1073 } else {
1074 let offset_col = chunk.column_at(offset_idx);
1075 task_builder.update_task_on_chunk(
1076 source_id,
1077 &latest_state,
1078 offset_col.clone(),
1079 );
1080 }
1081 }
1082 if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
1083 self_paused = true;
1088 tracing::warn!(
1089 "source paused, wait barrier for {:?}",
1090 last_barrier_time.elapsed()
1091 );
1092 stream.pause_stream();
1093
1094 max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
1099 as u128
1100 * WAIT_BARRIER_MULTIPLE_TIMES;
1101 }
1102
1103 latest_state.iter().for_each(|(split_id, new_split_impl)| {
1104 if let Some(split_impl) =
1105 self.stream_source_core.latest_split_info.get_mut(split_id)
1106 {
1107 *split_impl = new_split_impl.clone();
1108 }
1109 });
1110
1111 self.stream_source_core
1112 .updated_splits_in_epoch
1113 .extend(latest_state);
1114
1115 let card = chunk.cardinality();
1116 if card == 0 {
1117 continue;
1118 }
1119 source_output_row_count.inc_by(card as u64);
1120 let to_remove_col_indices =
1121 if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
1122 vec![split_idx, offset_idx, pulsar_message_id_idx]
1123 } else {
1124 vec![split_idx, offset_idx]
1125 };
1126 let chunk =
1127 prune_additional_cols(&chunk, &to_remove_col_indices, &source_desc.columns);
1128 yield Message::Chunk(chunk);
1129 self.try_flush_data().await?;
1130 }
1131 }
1132 }
1133
1134 tracing::error!(
1136 actor_id = %self.actor_ctx.id,
1137 "source executor exited unexpectedly"
1138 )
1139 }
1140}
1141
1142#[derive(Debug, Clone)]
1143enum ApplyMutationAfterBarrier<'a> {
1144 SplitChange {
1145 target_splits: Vec<SplitImpl>,
1146 should_trim_state: bool,
1147 split_change_count: &'a LabelGuardedMetric<GenericCounter<AtomicU64>>,
1148 },
1149 ConnectorPropsChange,
1150}
1151
1152impl<S: StateStore> Execute for SourceExecutor<S> {
1153 fn execute(self: Box<Self>) -> BoxedMessageStream {
1154 self.execute_inner().boxed()
1155 }
1156}
1157
1158impl<S: StateStore> Debug for SourceExecutor<S> {
1159 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1160 f.debug_struct("SourceExecutor")
1161 .field("source_id", &self.stream_source_core.source_id)
1162 .field("column_ids", &self.stream_source_core.column_ids)
1163 .finish()
1164 }
1165}
1166
1167struct WaitCheckpointTaskBuilder {
1168 wait_checkpoint_tx: UnboundedSender<(Epoch, WaitCheckpointTask)>,
1169 source_reader: SourceReader,
1170 building_task: WaitCheckpointTask,
1171}
1172
1173impl WaitCheckpointTaskBuilder {
1174 fn update_task_on_chunk(
1175 &mut self,
1176 source_id: SourceId,
1177 latest_state: &HashMap<SplitId, SplitImpl>,
1178 offset_col: ArrayRef,
1179 ) {
1180 match &mut self.building_task {
1181 WaitCheckpointTask::AckPubsubMessage(_, arrays) => {
1182 arrays.push(offset_col);
1183 }
1184 WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => {
1185 arrays.push(offset_col);
1186 }
1187 WaitCheckpointTask::AckPulsarMessage(arrays) => {
1188 let split_id = latest_state.keys().next().unwrap();
1190 let pulsar_ack_channel_id = build_pulsar_ack_channel_id(source_id, split_id);
1191 arrays.push((pulsar_ack_channel_id, offset_col));
1192 }
1193 WaitCheckpointTask::CommitCdcOffset(_) => {}
1194 }
1195 }
1196
1197 fn update_task_on_checkpoint(&mut self, updated_splits: HashMap<SplitId, SplitImpl>) {
1198 #[expect(clippy::single_match)]
1199 match &mut self.building_task {
1200 WaitCheckpointTask::CommitCdcOffset(offsets) => {
1201 if !updated_splits.is_empty() {
1202 assert_eq!(1, updated_splits.len());
1204 for (split_id, split_impl) in updated_splits {
1205 if split_impl.is_cdc_split() {
1206 *offsets = Some((split_id, split_impl.get_cdc_split_offset()));
1207 } else {
1208 unreachable!()
1209 }
1210 }
1211 }
1212 }
1213 _ => {}
1214 }
1215 }
1216
1217 async fn send(&mut self, epoch: Epoch) -> Result<(), anyhow::Error> {
1219 let new_task = self
1220 .source_reader
1221 .create_wait_checkpoint_task()
1222 .await?
1223 .expect("wait checkpoint task should be created");
1224 self.wait_checkpoint_tx
1225 .send((epoch, std::mem::replace(&mut self.building_task, new_task)))
1226 .expect("wait_checkpoint_tx send should succeed");
1227 Ok(())
1228 }
1229}
1230
1231struct WaitCheckpointWorker<S: StateStore> {
1257 wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>,
1258 state_store: S,
1259 table_id: TableId,
1260 metrics: Arc<StreamingMetrics>,
1261}
1262
1263impl<S: StateStore> WaitCheckpointWorker<S> {
1264 pub async fn run(mut self) {
1265 tracing::debug!("wait epoch worker start success");
1266 loop {
1267 match self.wait_checkpoint_rx.recv().await {
1269 Some((epoch, task)) => {
1270 tracing::debug!("start to wait epoch {}", epoch.0);
1271 let ret = self
1272 .state_store
1273 .try_wait_epoch(
1274 HummockReadEpoch::Committed(epoch.0),
1275 TryWaitEpochOptions {
1276 table_id: self.table_id,
1277 },
1278 )
1279 .await;
1280 if self.wait_checkpoint_rx.is_closed() {
1281 tracing::debug!(epoch = epoch.0, "Drop stale wait checkpoint task.");
1285 break;
1286 }
1287 match ret {
1288 Ok(()) => {
1289 tracing::debug!(epoch = epoch.0, "wait epoch success");
1290
1291 task.run_with_on_commit_success(|source_id: u64, offset| {
1293 if let Some(lsn_value) =
1294 extract_postgres_lsn_from_offset_str(offset)
1295 {
1296 self.metrics
1297 .pg_cdc_jni_commit_offset_lsn
1298 .with_guarded_label_values(&[&source_id.to_string()])
1299 .set(lsn_value as i64);
1300 }
1301 if let Some(lsn_value) =
1302 extract_sql_server_commit_lsn_from_offset_str(offset)
1303 {
1304 self.metrics
1305 .sqlserver_cdc_jni_commit_offset_lsn
1306 .with_guarded_label_values(&[&source_id.to_string()])
1307 .set(lsn_u128_to_i64(lsn_value));
1308 }
1309 })
1310 .await;
1311 }
1312 Err(e) => {
1313 tracing::error!(
1314 error = %e.as_report(),
1315 "wait epoch {} failed", epoch.0
1316 );
1317 }
1318 }
1319 }
1320 None => {
1321 tracing::error!("wait epoch rx closed");
1322 break;
1323 }
1324 }
1325 }
1326 }
1327}
1328
1329#[cfg(test)]
1330mod tests {
1331 use maplit::{btreemap, convert_args, hashmap};
1332 use risingwave_common::catalog::{ColumnId, Field};
1333 use risingwave_common::id::SourceId;
1334 use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
1335 use risingwave_common::test_prelude::StreamChunkTestExt;
1336 use risingwave_common::util::epoch::{EpochExt, test_epoch};
1337 use risingwave_connector::source::datagen::DatagenSplit;
1338 use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
1339 use risingwave_pb::catalog::StreamSourceInfo;
1340 use risingwave_pb::plan_common::PbRowFormatType;
1341 use risingwave_storage::memory::MemoryStateStore;
1342 use tokio::sync::mpsc::unbounded_channel;
1343 use tracing_test::traced_test;
1344
1345 use super::*;
1346 use crate::executor::AddMutation;
1347 use crate::executor::source::{SourceStateTableHandler, default_source_internal_table};
1348 use crate::task::LocalBarrierManager;
1349
1350 const MOCK_SOURCE_NAME: &str = "mock_source";
1351
1352 #[tokio::test]
1353 async fn test_source_executor() {
1354 let source_id = 0.into();
1355 let schema = Schema {
1356 fields: vec![Field::with_name(DataType::Int32, "sequence_int")],
1357 };
1358 let row_id_index = None;
1359 let source_info = StreamSourceInfo {
1360 row_format: PbRowFormatType::Native as i32,
1361 ..Default::default()
1362 };
1363 let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1364 let column_ids = vec![0].into_iter().map(ColumnId::from).collect();
1365
1366 let properties = convert_args!(btreemap!(
1368 "connector" => "datagen",
1369 "datagen.rows.per.second" => "3",
1370 "fields.sequence_int.kind" => "sequence",
1371 "fields.sequence_int.start" => "11",
1372 "fields.sequence_int.end" => "11111",
1373 ));
1374 let source_desc_builder =
1375 create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1376 let split_state_store = SourceStateTableHandler::from_table_catalog(
1377 &default_source_internal_table(0x2333),
1378 MemoryStateStore::new(),
1379 )
1380 .await;
1381 let core = StreamSourceCore::<MemoryStateStore> {
1382 source_id,
1383 column_ids,
1384 source_desc_builder: Some(source_desc_builder),
1385 latest_split_info: HashMap::new(),
1386 split_state_store,
1387 updated_splits_in_epoch: HashMap::new(),
1388 source_name: MOCK_SOURCE_NAME.to_owned(),
1389 };
1390
1391 let system_params_manager = LocalSystemParamsManager::for_test();
1392
1393 let executor = SourceExecutor::new(
1394 ActorContext::for_test(0),
1395 core,
1396 Arc::new(StreamingMetrics::unused()),
1397 barrier_rx,
1398 system_params_manager.get_params(),
1399 None,
1400 false,
1401 LocalBarrierManager::for_test(),
1402 );
1403 let mut executor = executor.boxed().execute();
1404
1405 let init_barrier =
1406 Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
1407 splits: hashmap! {
1408 ActorId::default() => vec![
1409 SplitImpl::Datagen(DatagenSplit {
1410 split_index: 0,
1411 split_num: 1,
1412 start_offset: None,
1413 }),
1414 ],
1415 },
1416 ..Default::default()
1417 }));
1418 barrier_tx.send(init_barrier).unwrap();
1419
1420 executor.next().await.unwrap().unwrap();
1422
1423 let msg = executor.next().await.unwrap().unwrap();
1425
1426 assert_eq!(
1428 msg.into_chunk().unwrap(),
1429 StreamChunk::from_pretty(
1430 " i
1431 + 11
1432 + 12
1433 + 13"
1434 )
1435 );
1436 }
1437
1438 #[traced_test]
1439 #[tokio::test]
1440 async fn test_split_change_mutation() {
1441 let source_id = SourceId::new(0);
1442 let schema = Schema {
1443 fields: vec![Field::with_name(DataType::Int32, "v1")],
1444 };
1445 let row_id_index = None;
1446 let source_info = StreamSourceInfo {
1447 row_format: PbRowFormatType::Native as i32,
1448 ..Default::default()
1449 };
1450 let properties = convert_args!(btreemap!(
1451 "connector" => "datagen",
1452 "fields.v1.kind" => "sequence",
1453 "fields.v1.start" => "11",
1454 "fields.v1.end" => "11111",
1455 ));
1456
1457 let source_desc_builder =
1458 create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1459 let mem_state_store = MemoryStateStore::new();
1460
1461 let column_ids = vec![ColumnId::from(0)];
1462 let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1463 let split_state_store = SourceStateTableHandler::from_table_catalog(
1464 &default_source_internal_table(0x2333),
1465 mem_state_store.clone(),
1466 )
1467 .await;
1468
1469 let core = StreamSourceCore::<MemoryStateStore> {
1470 source_id,
1471 column_ids: column_ids.clone(),
1472 source_desc_builder: Some(source_desc_builder),
1473 latest_split_info: HashMap::new(),
1474 split_state_store,
1475 updated_splits_in_epoch: HashMap::new(),
1476 source_name: MOCK_SOURCE_NAME.to_owned(),
1477 };
1478
1479 let system_params_manager = LocalSystemParamsManager::for_test();
1480
1481 let executor = SourceExecutor::new(
1482 ActorContext::for_test(0),
1483 core,
1484 Arc::new(StreamingMetrics::unused()),
1485 barrier_rx,
1486 system_params_manager.get_params(),
1487 None,
1488 false,
1489 LocalBarrierManager::for_test(),
1490 );
1491 let mut handler = executor.boxed().execute();
1492
1493 let mut epoch = test_epoch(1);
1494 let init_barrier =
1495 Barrier::new_test_barrier(epoch).with_mutation(Mutation::Add(AddMutation {
1496 splits: hashmap! {
1497 ActorId::default() => vec![
1498 SplitImpl::Datagen(DatagenSplit {
1499 split_index: 0,
1500 split_num: 3,
1501 start_offset: None,
1502 }),
1503 ],
1504 },
1505 ..Default::default()
1506 }));
1507 barrier_tx.send(init_barrier).unwrap();
1508
1509 handler
1511 .next()
1512 .await
1513 .unwrap()
1514 .unwrap()
1515 .into_barrier()
1516 .unwrap();
1517
1518 let mut ready_chunks = handler.ready_chunks(10);
1519
1520 let _ = ready_chunks.next().await.unwrap();
1521
1522 let new_assignment = vec![
1523 SplitImpl::Datagen(DatagenSplit {
1524 split_index: 0,
1525 split_num: 3,
1526 start_offset: None,
1527 }),
1528 SplitImpl::Datagen(DatagenSplit {
1529 split_index: 1,
1530 split_num: 3,
1531 start_offset: None,
1532 }),
1533 SplitImpl::Datagen(DatagenSplit {
1534 split_index: 2,
1535 split_num: 3,
1536 start_offset: None,
1537 }),
1538 ];
1539
1540 epoch.inc_epoch();
1541 let change_split_mutation =
1542 Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1543 ActorId::default() => new_assignment.clone()
1544 }));
1545
1546 barrier_tx.send(change_split_mutation).unwrap();
1547
1548 let _ = ready_chunks.next().await.unwrap(); epoch.inc_epoch();
1551 let barrier = Barrier::new_test_barrier(epoch);
1552 barrier_tx.send(barrier).unwrap();
1553
1554 ready_chunks.next().await.unwrap(); let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1557 &default_source_internal_table(0x2333),
1558 mem_state_store.clone(),
1559 )
1560 .await;
1561
1562 source_state_handler
1564 .init_epoch(EpochPair::new_test_epoch(epoch))
1565 .await
1566 .unwrap();
1567 source_state_handler
1568 .get(&new_assignment[1].id())
1569 .await
1570 .unwrap()
1571 .unwrap();
1572
1573 tokio::time::sleep(Duration::from_millis(100)).await;
1574
1575 let _ = ready_chunks.next().await.unwrap();
1576
1577 epoch.inc_epoch();
1578 let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Pause);
1579 barrier_tx.send(barrier).unwrap();
1580
1581 epoch.inc_epoch();
1582 let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Resume);
1583 barrier_tx.send(barrier).unwrap();
1584
1585 ready_chunks.next().await.unwrap();
1587
1588 let prev_assignment = new_assignment;
1589 let new_assignment = vec![prev_assignment[2].clone()];
1590
1591 epoch.inc_epoch();
1592 let drop_split_mutation =
1593 Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1594 ActorId::default() => new_assignment.clone()
1595 }));
1596
1597 barrier_tx.send(drop_split_mutation).unwrap();
1598
1599 ready_chunks.next().await.unwrap(); epoch.inc_epoch();
1602 let barrier = Barrier::new_test_barrier(epoch);
1603 barrier_tx.send(barrier).unwrap();
1604
1605 ready_chunks.next().await.unwrap(); let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1608 &default_source_internal_table(0x2333),
1609 mem_state_store.clone(),
1610 )
1611 .await;
1612
1613 let new_epoch = EpochPair::new_test_epoch(epoch);
1614 source_state_handler.init_epoch(new_epoch).await.unwrap();
1615
1616 let committed_reader = source_state_handler
1617 .new_committed_reader(new_epoch)
1618 .await
1619 .unwrap();
1620 assert!(
1621 committed_reader
1622 .try_recover_from_state_store(&prev_assignment[0])
1623 .await
1624 .unwrap()
1625 .is_none()
1626 );
1627
1628 assert!(
1629 committed_reader
1630 .try_recover_from_state_store(&prev_assignment[1])
1631 .await
1632 .unwrap()
1633 .is_none()
1634 );
1635
1636 assert!(
1637 committed_reader
1638 .try_recover_from_state_store(&prev_assignment[2])
1639 .await
1640 .unwrap()
1641 .is_some()
1642 );
1643 }
1644}