1use std::collections::{HashMap, HashSet};
16use std::time::Duration;
17
18use anyhow::anyhow;
19use either::Either;
20use itertools::Itertools;
21use prometheus::core::{AtomicU64, GenericCounter};
22use risingwave_common::array::ArrayRef;
23use risingwave_common::catalog::{ColumnId, TableId};
24use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedMetric};
25use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::types::JsonbVal;
28use risingwave_common::util::epoch::{Epoch, EpochPair};
29use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
30use risingwave_connector::source::cdc::split::extract_postgres_lsn_from_offset_str;
31use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
32use risingwave_connector::source::reader::reader::SourceReader;
33use risingwave_connector::source::{
34 ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData,
35 StreamChunkWithState, WaitCheckpointTask, build_pulsar_ack_channel_id,
36};
37use risingwave_hummock_sdk::HummockReadEpoch;
38use risingwave_pb::common::ThrottleType;
39use risingwave_pb::id::SourceId;
40use risingwave_storage::store::TryWaitEpochOptions;
41use thiserror_ext::AsReport;
42use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
43use tokio::sync::{mpsc, oneshot};
44use tokio::time::Instant;
45
46use super::executor_core::StreamSourceCore;
47use super::{barrier_to_message_stream, get_split_offset_col_idx, prune_additional_cols};
48use crate::common::rate_limit::limited_chunk_size;
49use crate::executor::UpdateMutation;
50use crate::executor::prelude::*;
51use crate::executor::source::reader_stream::StreamReaderBuilder;
52use crate::executor::stream_reader::StreamReaderWithPause;
53use crate::task::LocalBarrierManager;
54
55pub const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;
58
59pub struct SourceExecutor<S: StateStore> {
60 actor_ctx: ActorContextRef,
61
62 stream_source_core: StreamSourceCore<S>,
64
65 metrics: Arc<StreamingMetrics>,
67
68 barrier_receiver: Option<UnboundedReceiver<Barrier>>,
70
71 system_params: SystemParamsReaderRef,
73
74 rate_limit_rps: Option<u32>,
76
77 is_shared_non_cdc: bool,
78
79 barrier_manager: LocalBarrierManager,
81}
82
83impl<S: StateStore> SourceExecutor<S> {
84 #[expect(clippy::too_many_arguments)]
85 pub fn new(
86 actor_ctx: ActorContextRef,
87 stream_source_core: StreamSourceCore<S>,
88 metrics: Arc<StreamingMetrics>,
89 barrier_receiver: UnboundedReceiver<Barrier>,
90 system_params: SystemParamsReaderRef,
91 rate_limit_rps: Option<u32>,
92 is_shared_non_cdc: bool,
93 barrier_manager: LocalBarrierManager,
94 ) -> Self {
95 Self {
96 actor_ctx,
97 stream_source_core,
98 metrics,
99 barrier_receiver: Some(barrier_receiver),
100 system_params,
101 rate_limit_rps,
102 is_shared_non_cdc,
103 barrier_manager,
104 }
105 }
106
107 fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder {
108 StreamReaderBuilder {
109 source_desc,
110 rate_limit: self.rate_limit_rps,
111 source_id: self.stream_source_core.source_id,
112 source_name: self.stream_source_core.source_name.clone(),
113 is_auto_schema_change_enable: self.is_auto_schema_change_enable(),
114 actor_ctx: self.actor_ctx.clone(),
115 reader_stream: None,
116 }
117 }
118
119 async fn spawn_wait_checkpoint_worker(
120 core: &StreamSourceCore<S>,
121 source_reader: SourceReader,
122 metrics: Arc<StreamingMetrics>,
123 ) -> StreamExecutorResult<Option<WaitCheckpointTaskBuilder>> {
124 let Some(initial_task) = source_reader.create_wait_checkpoint_task().await? else {
125 return Ok(None);
126 };
127 let (wait_checkpoint_tx, wait_checkpoint_rx) = mpsc::unbounded_channel();
128 let wait_checkpoint_worker = WaitCheckpointWorker {
129 wait_checkpoint_rx,
130 state_store: core.split_state_store.state_table().state_store().clone(),
131 table_id: core.split_state_store.state_table().table_id(),
132 metrics,
133 };
134 tokio::spawn(wait_checkpoint_worker.run());
135 Ok(Some(WaitCheckpointTaskBuilder {
136 wait_checkpoint_tx,
137 source_reader,
138 building_task: initial_task,
139 }))
140 }
141
142 pub fn prepare_source_stream_build(
144 &self,
145 source_desc: &SourceDesc,
146 ) -> (Vec<ColumnId>, SourceContext) {
147 let column_ids = source_desc
148 .columns
149 .iter()
150 .map(|column_desc| column_desc.column_id)
151 .collect_vec();
152
153 let (schema_change_tx, mut schema_change_rx) =
154 mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
155 let schema_change_tx = if self.is_auto_schema_change_enable() {
156 let meta_client = self.actor_ctx.meta_client.clone();
157 let _join_handle = tokio::task::spawn(async move {
159 while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
160 let table_ids = schema_change.table_ids();
161 tracing::info!(
162 target: "auto_schema_change",
163 "recv a schema change event for tables: {:?}", table_ids);
164 if let Some(ref meta_client) = meta_client {
166 match meta_client
167 .auto_schema_change(schema_change.to_protobuf())
168 .await
169 {
170 Ok(_) => {
171 tracing::info!(
172 target: "auto_schema_change",
173 "schema change success for tables: {:?}", table_ids);
174 finish_tx.send(()).unwrap();
175 }
176 Err(e) => {
177 tracing::error!(
178 target: "auto_schema_change",
179 error = ?e.as_report(), "schema change error");
180 finish_tx.send(()).unwrap();
181 }
182 }
183 }
184 }
185 });
186 Some(schema_change_tx)
187 } else {
188 info!("auto schema change is disabled in config");
189 None
190 };
191 let source_ctx = SourceContext::new(
192 self.actor_ctx.id,
193 self.stream_source_core.source_id,
194 self.actor_ctx.fragment_id,
195 self.stream_source_core.source_name.clone(),
196 source_desc.metrics.clone(),
197 SourceCtrlOpts {
198 chunk_size: limited_chunk_size(self.rate_limit_rps),
199 split_txn: self.rate_limit_rps.is_some(), },
201 source_desc.source.config.clone(),
202 schema_change_tx,
203 );
204
205 (column_ids, source_ctx)
206 }
207
208 fn is_auto_schema_change_enable(&self) -> bool {
209 self.actor_ctx.config.developer.enable_auto_schema_change
210 }
211
212 #[inline]
214 fn get_metric_labels(&self) -> [String; 4] {
215 [
216 self.stream_source_core.source_id.to_string(),
217 self.stream_source_core.source_name.clone(),
218 self.actor_ctx.id.to_string(),
219 self.actor_ctx.fragment_id.to_string(),
220 ]
221 }
222
223 async fn apply_split_change_after_yield_barrier<const BIASED: bool>(
232 &mut self,
233 barrier_epoch: EpochPair,
234 source_desc: &SourceDesc,
235 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
236 apply_mutation: ApplyMutationAfterBarrier<'_>,
237 ) -> StreamExecutorResult<()> {
238 {
239 let mut should_rebuild_stream = false;
240 match apply_mutation {
241 ApplyMutationAfterBarrier::SplitChange {
242 target_splits,
243 should_trim_state,
244 split_change_count,
245 } => {
246 split_change_count.inc();
247 if self
248 .update_state_if_changed(barrier_epoch, target_splits, should_trim_state)
249 .await?
250 {
251 should_rebuild_stream = true;
252 }
253 }
254 ApplyMutationAfterBarrier::ConnectorPropsChange => {
255 should_rebuild_stream = true;
256 }
257 }
258
259 if should_rebuild_stream {
260 self.rebuild_stream_reader(source_desc, stream)?;
261 }
262 }
263
264 Ok(())
265 }
266
267 async fn update_state_if_changed(
269 &mut self,
270 barrier_epoch: EpochPair,
271 target_splits: Vec<SplitImpl>,
272 should_trim_state: bool,
273 ) -> StreamExecutorResult<bool> {
274 let core = &mut self.stream_source_core;
275
276 let target_splits: HashMap<_, _> = target_splits
277 .into_iter()
278 .map(|split| (split.id(), split))
279 .collect();
280
281 let mut target_state: HashMap<SplitId, SplitImpl> =
282 HashMap::with_capacity(target_splits.len());
283
284 let mut split_changed = false;
285
286 let committed_reader = core
287 .split_state_store
288 .new_committed_reader(barrier_epoch)
289 .await?;
290
291 for (split_id, split) in target_splits {
293 if let Some(s) = core.latest_split_info.get(&split_id) {
294 target_state.insert(split_id, s.clone());
297 } else {
298 split_changed = true;
299 let initial_state = if let Some(recover_state) = committed_reader
302 .try_recover_from_state_store(&split)
303 .await?
304 {
305 recover_state
306 } else {
307 split
308 };
309
310 core.updated_splits_in_epoch
311 .entry(split_id.clone())
312 .or_insert_with(|| initial_state.clone());
313
314 target_state.insert(split_id, initial_state);
315 }
316 }
317
318 for existing_split_id in core.latest_split_info.keys() {
320 if !target_state.contains_key(existing_split_id) {
321 tracing::info!("split dropping detected: {}", existing_split_id);
322 split_changed = true;
323 }
324 }
325
326 if split_changed {
327 tracing::info!(
328 actor_id = %self.actor_ctx.id,
329 state = ?target_state,
330 "apply split change"
331 );
332
333 core.updated_splits_in_epoch
334 .retain(|split_id, _| target_state.contains_key(split_id));
335
336 let dropped_splits = core
337 .latest_split_info
338 .extract_if(|split_id, _| !target_state.contains_key(split_id))
339 .map(|(_, split)| split)
340 .collect_vec();
341
342 if should_trim_state && !dropped_splits.is_empty() {
343 core.split_state_store.trim_state(&dropped_splits).await?;
345 }
346
347 core.latest_split_info = target_state;
348 }
349
350 Ok(split_changed)
351 }
352
353 fn rebuild_stream_reader_from_error<const BIASED: bool>(
355 &mut self,
356 source_desc: &SourceDesc,
357 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
358 e: StreamExecutorError,
359 ) -> StreamExecutorResult<()> {
360 let core = &mut self.stream_source_core;
361 tracing::error!(
362 error = ?e.as_report(),
363 actor_id = %self.actor_ctx.id,
364 source_id = %core.source_id,
365 "stream source reader error",
366 );
367 GLOBAL_ERROR_METRICS.user_source_error.report([
368 e.variant_name().to_owned(),
369 core.source_id.to_string(),
370 core.source_name.clone(),
371 self.actor_ctx.fragment_id.to_string(),
372 ]);
373
374 self.rebuild_stream_reader(source_desc, stream)
375 }
376
377 fn rebuild_stream_reader<const BIASED: bool>(
378 &mut self,
379 source_desc: &SourceDesc,
380 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
381 ) -> StreamExecutorResult<()> {
382 let core = &mut self.stream_source_core;
383 let target_state: Vec<SplitImpl> = core.latest_split_info.values().cloned().collect();
384
385 tracing::info!(
386 "actor {:?} apply source split change to {:?}",
387 self.actor_ctx.id,
388 target_state
389 );
390
391 let reader_stream_builder = self.stream_reader_builder(source_desc.clone());
393 let reader_stream = reader_stream_builder.into_retry_stream(Some(target_state), false);
394
395 stream.replace_data_stream(reader_stream);
396
397 Ok(())
398 }
399
400 async fn handle_inject_source_offsets(
406 &mut self,
407 split_offsets: &HashMap<String, String>,
408 ) -> StreamExecutorResult<bool> {
409 tracing::warn!(
410 actor_id = %self.actor_ctx.id,
411 source_id = %self.stream_source_core.source_id,
412 num_offsets = split_offsets.len(),
413 "UNSAFE: Injecting source offsets - this may cause data duplication or loss"
414 );
415
416 let offsets_to_inject: Vec<(String, String)> = {
418 let owned_splits: HashSet<&str> = self
419 .stream_source_core
420 .latest_split_info
421 .keys()
422 .map(|s| s.as_ref())
423 .collect();
424 split_offsets
425 .iter()
426 .filter(|(split_id, _)| owned_splits.contains(split_id.as_str()))
427 .map(|(k, v)| (k.clone(), v.clone()))
428 .collect()
429 };
430
431 let mut json_states: Vec<(String, JsonbVal)> = Vec::new();
433 let mut failed_splits: Vec<String> = Vec::new();
434
435 for (split_id, offset) in &offsets_to_inject {
436 if let Some(split) = self
437 .stream_source_core
438 .latest_split_info
439 .get_mut(split_id.as_str())
440 {
441 tracing::info!(
442 actor_id = %self.actor_ctx.id,
443 split_id = %split_id,
444 offset = %offset,
445 "Injecting offset for owned split"
446 );
447 if let Err(e) = split.update_in_place(offset.clone()) {
449 tracing::error!(
450 actor_id = %self.actor_ctx.id,
451 split_id = %split_id,
452 error = ?e.as_report(),
453 "Failed to update split offset"
454 );
455 failed_splits.push(split_id.clone());
456 continue;
457 }
458 self.stream_source_core
460 .updated_splits_in_epoch
461 .insert(split_id.clone().into(), split.clone());
462 let json_value: serde_json::Value = serde_json::from_str(offset)
464 .unwrap_or_else(|_| serde_json::json!({ "offset": offset }));
465 json_states.push((split_id.clone(), JsonbVal::from(json_value)));
466 }
467 }
468
469 if !failed_splits.is_empty() {
470 return Err(StreamExecutorError::connector_error(anyhow!(
471 "failed to inject offsets for splits: {:?}",
472 failed_splits
473 )));
474 }
475
476 let num_injected = json_states.len();
477 if num_injected > 0 {
478 self.stream_source_core
480 .split_state_store
481 .set_states_json(json_states)
482 .await?;
483
484 tracing::info!(
485 actor_id = %self.actor_ctx.id,
486 source_id = %self.stream_source_core.source_id,
487 num_injected = num_injected,
488 "Offset injection completed for owned splits, triggering rebuild"
489 );
490 Ok(true)
491 } else {
492 tracing::info!(
493 actor_id = %self.actor_ctx.id,
494 source_id = %self.stream_source_core.source_id,
495 "No owned splits to inject offsets for"
496 );
497 Ok(false)
498 }
499 }
500
501 async fn persist_state_and_clear_cache(
502 &mut self,
503 epoch: EpochPair,
504 ) -> StreamExecutorResult<HashMap<SplitId, SplitImpl>> {
505 let core = &mut self.stream_source_core;
506
507 let cache = core
508 .updated_splits_in_epoch
509 .values()
510 .map(|split_impl| split_impl.to_owned())
511 .collect_vec();
512
513 if !cache.is_empty() {
514 tracing::debug!(state = ?cache, "take snapshot");
515
516 let source_id = core.source_id.to_string();
518 for split_impl in &cache {
519 match split_impl {
521 SplitImpl::PostgresCdc(pg_split) => {
522 if let Some(lsn_value) = pg_split.pg_lsn() {
523 self.metrics
524 .pg_cdc_state_table_lsn
525 .with_guarded_label_values(&[&source_id])
526 .set(lsn_value as i64);
527 }
528 }
529 SplitImpl::MysqlCdc(mysql_split) => {
530 if let Some((file_seq, position)) = mysql_split.mysql_binlog_offset() {
531 self.metrics
532 .mysql_cdc_state_binlog_file_seq
533 .with_guarded_label_values(&[&source_id])
534 .set(file_seq as i64);
535
536 self.metrics
537 .mysql_cdc_state_binlog_position
538 .with_guarded_label_values(&[&source_id])
539 .set(position as i64);
540 }
541 }
542 _ => {}
543 }
544 }
545
546 core.split_state_store.set_states(cache).await?;
547 }
548
549 core.split_state_store.commit(epoch).await?;
551
552 let updated_splits = core.updated_splits_in_epoch.clone();
553
554 core.updated_splits_in_epoch.clear();
555
556 Ok(updated_splits)
557 }
558
559 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
561 let core = &mut self.stream_source_core;
562 core.split_state_store.try_flush().await?;
563
564 Ok(())
565 }
566
567 fn maybe_report_cdc_source_offset(
569 &self,
570 updated_splits: &HashMap<SplitId, SplitImpl>,
571 epoch: EpochPair,
572 source_id: SourceId,
573 must_report_cdc_offset_once: &mut bool,
574 must_wait_cdc_offset_before_report: bool,
575 ) {
576 if *must_report_cdc_offset_once
578 && (!must_wait_cdc_offset_before_report
579 || updated_splits
580 .values()
581 .any(|split| split.is_cdc_split() && !split.get_cdc_split_offset().is_empty()))
582 {
583 self.barrier_manager.report_cdc_source_offset_updated(
585 epoch,
586 self.actor_ctx.id,
587 source_id,
588 );
589 tracing::info!(
590 actor_id = %self.actor_ctx.id,
591 source_id = %source_id,
592 epoch = ?epoch,
593 "Reported CDC source offset updated to meta (first time only)"
594 );
595 *must_report_cdc_offset_once = false;
597 }
598 }
599
600 #[try_stream(ok = Message, error = StreamExecutorError)]
605 async fn execute_inner(mut self) {
606 let mut barrier_receiver = self.barrier_receiver.take().unwrap();
607 let first_barrier = barrier_receiver
608 .recv()
609 .instrument_await("source_recv_first_barrier")
610 .await
611 .ok_or_else(|| {
612 anyhow!(
613 "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
614 self.actor_ctx.id,
615 self.stream_source_core.source_id
616 )
617 })?;
618 let first_epoch = first_barrier.epoch;
619 let (mut boot_state, mut must_report_cdc_offset_once, must_wait_cdc_offset_before_report) =
622 if let Some(splits) = first_barrier.initial_split_assignment(self.actor_ctx.id) {
623 tracing::debug!(?splits, "boot with splits");
625 let must_report_cdc_offset_once = splits.iter().any(|split| split.is_cdc_split());
627 let must_wait_cdc_offset_before_report = must_report_cdc_offset_once
629 && splits.iter().any(|split| {
630 matches!(split, SplitImpl::MysqlCdc(_) | SplitImpl::SqlServerCdc(_))
631 });
632 (
633 splits.to_vec(),
634 must_report_cdc_offset_once,
635 must_wait_cdc_offset_before_report,
636 )
637 } else {
638 (Vec::default(), true, true)
639 };
640 let is_pause_on_startup = first_barrier.is_pause_on_startup();
641 let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id);
642
643 yield Message::Barrier(first_barrier);
644
645 let mut core = self.stream_source_core;
646 let source_id = core.source_id;
647
648 let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap();
650 let mut source_desc = source_desc_builder
651 .build()
652 .map_err(StreamExecutorError::connector_error)?;
653
654 let mut wait_checkpoint_task_builder = Self::spawn_wait_checkpoint_worker(
655 &core,
656 source_desc.source.clone(),
657 self.metrics.clone(),
658 )
659 .await?;
660
661 let (Some(split_idx), Some(offset_idx), pulsar_message_id_idx) =
662 get_split_offset_col_idx(&source_desc.columns)
663 else {
664 unreachable!("Partition and offset columns must be set.");
665 };
666
667 core.split_state_store.init_epoch(first_epoch).await?;
668 {
669 let committed_reader = core
670 .split_state_store
671 .new_committed_reader(first_epoch)
672 .await?;
673 for ele in &mut boot_state {
674 if let Some(recover_state) =
675 committed_reader.try_recover_from_state_store(ele).await?
676 {
677 *ele = recover_state;
678 is_uninitialized = false;
680 } else {
681 core.updated_splits_in_epoch.insert(ele.id(), ele.clone());
685 }
686 }
687 }
688
689 core.init_split_state(boot_state.clone());
691
692 self.stream_source_core = core;
694
695 let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
696 tracing::debug!(state = ?recover_state, "start with state");
697
698 let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
699 let mut reader_stream_builder = self.stream_reader_builder(source_desc.clone());
700 let mut latest_splits = None;
701 if is_uninitialized {
703 let create_split_reader_result = reader_stream_builder
704 .fetch_latest_splits(recover_state.clone(), self.is_shared_non_cdc)
705 .await?;
706 latest_splits = create_split_reader_result.latest_splits;
707 }
708
709 if let Some(latest_splits) = latest_splits {
710 self.stream_source_core
713 .updated_splits_in_epoch
714 .extend(latest_splits.into_iter().map(|s| (s.id(), s)));
715 }
716 let mut stream = StreamReaderWithPause::<true, StreamChunkWithState>::new(
719 barrier_stream,
720 reader_stream_builder
721 .into_retry_stream(recover_state, is_uninitialized && self.is_shared_non_cdc),
722 );
723 let mut command_paused = false;
724
725 if is_pause_on_startup {
727 tracing::info!("source paused on startup");
728 stream.pause_stream();
729 command_paused = true;
730 }
731
732 let mut max_wait_barrier_time_ms =
735 self.system_params.load().barrier_interval_ms() as u128 * WAIT_BARRIER_MULTIPLE_TIMES;
736 let mut last_barrier_time = Instant::now();
737 let mut self_paused = false;
738
739 let source_output_row_count = self
740 .metrics
741 .source_output_row_count
742 .with_guarded_label_values(&self.get_metric_labels());
743
744 let source_split_change_count = self
745 .metrics
746 .source_split_change_count
747 .with_guarded_label_values(&self.get_metric_labels());
748
749 while let Some(msg) = stream.next().await {
750 let Ok(msg) = msg else {
751 tokio::time::sleep(Duration::from_millis(1000)).await;
752 self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err())?;
753 continue;
754 };
755
756 match msg {
757 Either::Left(Message::Barrier(barrier)) => {
759 last_barrier_time = Instant::now();
760
761 if self_paused {
762 self_paused = false;
763 if !command_paused {
765 stream.resume_stream();
766 }
767 }
768
769 let epoch = barrier.epoch;
770 let mut split_change = None;
771
772 if let Some(mutation) = barrier.mutation.as_deref() {
773 match mutation {
774 Mutation::Pause => {
775 command_paused = true;
776 stream.pause_stream()
777 }
778 Mutation::Resume => {
779 command_paused = false;
780 stream.resume_stream()
781 }
782 Mutation::SourceChangeSplit(actor_splits) => {
783 tracing::info!(
784 actor_id = %self.actor_ctx.id,
785 actor_splits = ?actor_splits,
786 "source change split received"
787 );
788
789 split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
790 |target_splits| {
791 (
792 &source_desc,
793 &mut stream,
794 ApplyMutationAfterBarrier::SplitChange {
795 target_splits,
796 should_trim_state: true,
797 split_change_count: &source_split_change_count,
798 },
799 )
800 },
801 );
802 }
803
804 Mutation::ConnectorPropsChange(maybe_mutation) => {
805 if let Some(new_props) = maybe_mutation.get(&source_id.as_raw_id())
806 {
807 tracing::info!(
809 actor_id = %self.actor_ctx.id,
810 source_id = %source_id,
811 "updating source connector properties",
812 );
813 source_desc.update_reader(new_props.clone())?;
814 split_change = Some((
816 &source_desc,
817 &mut stream,
818 ApplyMutationAfterBarrier::ConnectorPropsChange,
819 ));
820 }
821 }
822
823 Mutation::Update(UpdateMutation { actor_splits, .. }) => {
824 split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
825 |target_splits| {
826 (
827 &source_desc,
828 &mut stream,
829 ApplyMutationAfterBarrier::SplitChange {
830 target_splits,
831 should_trim_state: false,
832 split_change_count: &source_split_change_count,
833 },
834 )
835 },
836 );
837 }
838 Mutation::Throttle(fragment_to_apply) => {
839 if let Some(entry) =
840 fragment_to_apply.get(&self.actor_ctx.fragment_id)
841 && entry.throttle_type() == ThrottleType::Source
842 && entry.rate_limit != self.rate_limit_rps
843 {
844 tracing::info!(
845 "updating rate limit from {:?} to {:?}",
846 self.rate_limit_rps,
847 entry.rate_limit
848 );
849 self.rate_limit_rps = entry.rate_limit;
850 self.rebuild_stream_reader(&source_desc, &mut stream)?;
852 }
853 }
854 Mutation::ResetSource { source_id } => {
855 if *source_id == self.stream_source_core.source_id {
859 tracing::info!(
860 actor_id = %self.actor_ctx.id,
861 source_id = source_id.as_raw_id(),
862 "Resetting CDC source: clearing offset (set to None)"
863 );
864
865 let splits_with_cleared_offset: Vec<SplitImpl> = self.stream_source_core
867 .latest_split_info
868 .values()
869 .map(|split| {
870 let mut new_split = split.clone();
872 match &mut new_split {
873 SplitImpl::MysqlCdc(debezium_split) => {
874 if let Some(mysql_split) = debezium_split.mysql_split.as_mut() {
875 tracing::info!(
876 split_id = ?mysql_split.inner.split_id,
877 old_offset = ?mysql_split.inner.start_offset,
878 "Clearing MySQL CDC offset"
879 );
880 mysql_split.inner.start_offset = None;
881 }
882 }
883 SplitImpl::PostgresCdc(debezium_split) => {
884 if let Some(pg_split) = debezium_split.postgres_split.as_mut() {
885 tracing::info!(
886 split_id = ?pg_split.inner.split_id,
887 old_offset = ?pg_split.inner.start_offset,
888 "Clearing PostgreSQL CDC offset"
889 );
890 pg_split.inner.start_offset = None;
891 }
892 }
893 SplitImpl::MongodbCdc(debezium_split) => {
894 if let Some(mongo_split) = debezium_split.mongodb_split.as_mut() {
895 tracing::info!(
896 split_id = ?mongo_split.inner.split_id,
897 old_offset = ?mongo_split.inner.start_offset,
898 "Clearing MongoDB CDC offset"
899 );
900 mongo_split.inner.start_offset = None;
901 }
902 }
903 SplitImpl::CitusCdc(debezium_split) => {
904 if let Some(citus_split) = debezium_split.citus_split.as_mut() {
905 tracing::info!(
906 split_id = ?citus_split.inner.split_id,
907 old_offset = ?citus_split.inner.start_offset,
908 "Clearing Citus CDC offset"
909 );
910 citus_split.inner.start_offset = None;
911 }
912 }
913 SplitImpl::SqlServerCdc(debezium_split) => {
914 if let Some(sqlserver_split) = debezium_split.sql_server_split.as_mut() {
915 tracing::info!(
916 split_id = ?sqlserver_split.inner.split_id,
917 old_offset = ?sqlserver_split.inner.start_offset,
918 "Clearing SQL Server CDC offset"
919 );
920 sqlserver_split.inner.start_offset = None;
921 }
922 }
923 _ => {
924 tracing::warn!(
925 "RESET SOURCE called on non-CDC split type"
926 );
927 }
928 }
929 new_split
930 })
931 .collect();
932
933 if !splits_with_cleared_offset.is_empty() {
934 tracing::info!(
935 actor_id = %self.actor_ctx.id,
936 split_count = splits_with_cleared_offset.len(),
937 "Updating state table with cleared offsets"
938 );
939
940 self.stream_source_core
942 .split_state_store
943 .set_states(splits_with_cleared_offset.clone())
944 .await?;
945
946 for split in splits_with_cleared_offset {
948 self.stream_source_core
949 .latest_split_info
950 .insert(split.id(), split.clone());
951 self.stream_source_core
952 .updated_splits_in_epoch
953 .insert(split.id(), split);
954 }
955
956 tracing::info!(
957 actor_id = %self.actor_ctx.id,
958 source_id = source_id.as_raw_id(),
959 "RESET SOURCE completed: offset cleared (set to None). \
960 Trigger recovery/restart to fetch latest offset from upstream."
961 );
962 } else {
963 tracing::warn!(
964 actor_id = %self.actor_ctx.id,
965 "No splits found to reset - source may not be initialized yet"
966 );
967 }
968 } else {
969 tracing::debug!(
970 actor_id = %self.actor_ctx.id,
971 target_source_id = source_id.as_raw_id(),
972 current_source_id = self.stream_source_core.source_id.as_raw_id(),
973 "ResetSource mutation for different source, ignoring"
974 );
975 }
976 }
977
978 Mutation::InjectSourceOffsets {
979 source_id,
980 split_offsets,
981 } => {
982 if *source_id == self.stream_source_core.source_id {
983 if self.handle_inject_source_offsets(split_offsets).await? {
984 split_change = Some((
986 &source_desc,
987 &mut stream,
988 ApplyMutationAfterBarrier::ConnectorPropsChange,
989 ));
990 }
991 } else {
992 tracing::debug!(
993 actor_id = %self.actor_ctx.id,
994 target_source_id = source_id.as_raw_id(),
995 current_source_id = self.stream_source_core.source_id.as_raw_id(),
996 "InjectSourceOffsets mutation for different source, ignoring"
997 );
998 }
999 }
1000
1001 _ => {}
1002 }
1003 }
1004
1005 let updated_splits = self.persist_state_and_clear_cache(epoch).await?;
1006
1007 self.maybe_report_cdc_source_offset(
1008 &updated_splits,
1009 epoch,
1010 source_id,
1011 &mut must_report_cdc_offset_once,
1012 must_wait_cdc_offset_before_report,
1013 );
1014
1015 if barrier.kind.is_checkpoint()
1017 && let Some(task_builder) = &mut wait_checkpoint_task_builder
1018 {
1019 task_builder.update_task_on_checkpoint(updated_splits);
1020
1021 tracing::debug!("epoch to wait {:?}", epoch);
1022 task_builder.send(Epoch(epoch.prev)).await?
1023 }
1024
1025 let barrier_epoch = barrier.epoch;
1026 yield Message::Barrier(barrier);
1027
1028 if let Some((source_desc, stream, to_apply_mutation)) = split_change {
1029 self.apply_split_change_after_yield_barrier(
1030 barrier_epoch,
1031 source_desc,
1032 stream,
1033 to_apply_mutation,
1034 )
1035 .await?;
1036 }
1037 }
1038 Either::Left(_) => {
1039 unreachable!();
1042 }
1043
1044 Either::Right((chunk, latest_state)) => {
1045 if let Some(task_builder) = &mut wait_checkpoint_task_builder {
1046 if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
1047 let pulsar_message_id_col = chunk.column_at(pulsar_message_id_idx);
1048 task_builder.update_task_on_chunk(
1049 source_id,
1050 &latest_state,
1051 pulsar_message_id_col.clone(),
1052 );
1053 } else {
1054 let offset_col = chunk.column_at(offset_idx);
1055 task_builder.update_task_on_chunk(
1056 source_id,
1057 &latest_state,
1058 offset_col.clone(),
1059 );
1060 }
1061 }
1062 if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
1063 self_paused = true;
1068 tracing::warn!(
1069 "source paused, wait barrier for {:?}",
1070 last_barrier_time.elapsed()
1071 );
1072 stream.pause_stream();
1073
1074 max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
1079 as u128
1080 * WAIT_BARRIER_MULTIPLE_TIMES;
1081 }
1082
1083 latest_state.iter().for_each(|(split_id, new_split_impl)| {
1084 if let Some(split_impl) =
1085 self.stream_source_core.latest_split_info.get_mut(split_id)
1086 {
1087 *split_impl = new_split_impl.clone();
1088 }
1089 });
1090
1091 self.stream_source_core
1092 .updated_splits_in_epoch
1093 .extend(latest_state);
1094
1095 let card = chunk.cardinality();
1096 if card == 0 {
1097 continue;
1098 }
1099 source_output_row_count.inc_by(card as u64);
1100 let to_remove_col_indices =
1101 if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
1102 vec![split_idx, offset_idx, pulsar_message_id_idx]
1103 } else {
1104 vec![split_idx, offset_idx]
1105 };
1106 let chunk =
1107 prune_additional_cols(&chunk, &to_remove_col_indices, &source_desc.columns);
1108 yield Message::Chunk(chunk);
1109 self.try_flush_data().await?;
1110 }
1111 }
1112 }
1113
1114 tracing::error!(
1116 actor_id = %self.actor_ctx.id,
1117 "source executor exited unexpectedly"
1118 )
1119 }
1120}
1121
1122#[derive(Debug, Clone)]
1123enum ApplyMutationAfterBarrier<'a> {
1124 SplitChange {
1125 target_splits: Vec<SplitImpl>,
1126 should_trim_state: bool,
1127 split_change_count: &'a LabelGuardedMetric<GenericCounter<AtomicU64>>,
1128 },
1129 ConnectorPropsChange,
1130}
1131
1132impl<S: StateStore> Execute for SourceExecutor<S> {
1133 fn execute(self: Box<Self>) -> BoxedMessageStream {
1134 self.execute_inner().boxed()
1135 }
1136}
1137
1138impl<S: StateStore> Debug for SourceExecutor<S> {
1139 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1140 f.debug_struct("SourceExecutor")
1141 .field("source_id", &self.stream_source_core.source_id)
1142 .field("column_ids", &self.stream_source_core.column_ids)
1143 .finish()
1144 }
1145}
1146
1147struct WaitCheckpointTaskBuilder {
1148 wait_checkpoint_tx: UnboundedSender<(Epoch, WaitCheckpointTask)>,
1149 source_reader: SourceReader,
1150 building_task: WaitCheckpointTask,
1151}
1152
1153impl WaitCheckpointTaskBuilder {
1154 fn update_task_on_chunk(
1155 &mut self,
1156 source_id: SourceId,
1157 latest_state: &HashMap<SplitId, SplitImpl>,
1158 offset_col: ArrayRef,
1159 ) {
1160 match &mut self.building_task {
1161 WaitCheckpointTask::AckPubsubMessage(_, arrays) => {
1162 arrays.push(offset_col);
1163 }
1164 WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => {
1165 arrays.push(offset_col);
1166 }
1167 WaitCheckpointTask::AckPulsarMessage(arrays) => {
1168 let split_id = latest_state.keys().next().unwrap();
1170 let pulsar_ack_channel_id = build_pulsar_ack_channel_id(source_id, split_id);
1171 arrays.push((pulsar_ack_channel_id, offset_col));
1172 }
1173 WaitCheckpointTask::CommitCdcOffset(_) => {}
1174 }
1175 }
1176
1177 fn update_task_on_checkpoint(&mut self, updated_splits: HashMap<SplitId, SplitImpl>) {
1178 #[expect(clippy::single_match)]
1179 match &mut self.building_task {
1180 WaitCheckpointTask::CommitCdcOffset(offsets) => {
1181 if !updated_splits.is_empty() {
1182 assert_eq!(1, updated_splits.len());
1184 for (split_id, split_impl) in updated_splits {
1185 if split_impl.is_cdc_split() {
1186 *offsets = Some((split_id, split_impl.get_cdc_split_offset()));
1187 } else {
1188 unreachable!()
1189 }
1190 }
1191 }
1192 }
1193 _ => {}
1194 }
1195 }
1196
1197 async fn send(&mut self, epoch: Epoch) -> Result<(), anyhow::Error> {
1199 let new_task = self
1200 .source_reader
1201 .create_wait_checkpoint_task()
1202 .await?
1203 .expect("wait checkpoint task should be created");
1204 self.wait_checkpoint_tx
1205 .send((epoch, std::mem::replace(&mut self.building_task, new_task)))
1206 .expect("wait_checkpoint_tx send should succeed");
1207 Ok(())
1208 }
1209}
1210
1211struct WaitCheckpointWorker<S: StateStore> {
1237 wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>,
1238 state_store: S,
1239 table_id: TableId,
1240 metrics: Arc<StreamingMetrics>,
1241}
1242
1243impl<S: StateStore> WaitCheckpointWorker<S> {
1244 pub async fn run(mut self) {
1245 tracing::debug!("wait epoch worker start success");
1246 loop {
1247 match self.wait_checkpoint_rx.recv().await {
1249 Some((epoch, task)) => {
1250 tracing::debug!("start to wait epoch {}", epoch.0);
1251 let ret = self
1252 .state_store
1253 .try_wait_epoch(
1254 HummockReadEpoch::Committed(epoch.0),
1255 TryWaitEpochOptions {
1256 table_id: self.table_id,
1257 },
1258 )
1259 .await;
1260
1261 match ret {
1262 Ok(()) => {
1263 tracing::debug!(epoch = epoch.0, "wait epoch success");
1264
1265 task.run_with_on_commit_success(|source_id: u64, offset| {
1267 if let Some(lsn_value) =
1268 extract_postgres_lsn_from_offset_str(offset)
1269 {
1270 self.metrics
1271 .pg_cdc_jni_commit_offset_lsn
1272 .with_guarded_label_values(&[&source_id.to_string()])
1273 .set(lsn_value as i64);
1274 }
1275 })
1276 .await;
1277 }
1278 Err(e) => {
1279 tracing::error!(
1280 error = %e.as_report(),
1281 "wait epoch {} failed", epoch.0
1282 );
1283 }
1284 }
1285 }
1286 None => {
1287 tracing::error!("wait epoch rx closed");
1288 break;
1289 }
1290 }
1291 }
1292 }
1293}
1294
1295#[cfg(test)]
1296mod tests {
1297 use maplit::{btreemap, convert_args, hashmap};
1298 use risingwave_common::catalog::{ColumnId, Field};
1299 use risingwave_common::id::SourceId;
1300 use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
1301 use risingwave_common::test_prelude::StreamChunkTestExt;
1302 use risingwave_common::util::epoch::{EpochExt, test_epoch};
1303 use risingwave_connector::source::datagen::DatagenSplit;
1304 use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
1305 use risingwave_pb::catalog::StreamSourceInfo;
1306 use risingwave_pb::plan_common::PbRowFormatType;
1307 use risingwave_storage::memory::MemoryStateStore;
1308 use tokio::sync::mpsc::unbounded_channel;
1309 use tracing_test::traced_test;
1310
1311 use super::*;
1312 use crate::executor::AddMutation;
1313 use crate::executor::source::{SourceStateTableHandler, default_source_internal_table};
1314 use crate::task::LocalBarrierManager;
1315
1316 const MOCK_SOURCE_NAME: &str = "mock_source";
1317
1318 #[tokio::test]
1319 async fn test_source_executor() {
1320 let source_id = 0.into();
1321 let schema = Schema {
1322 fields: vec![Field::with_name(DataType::Int32, "sequence_int")],
1323 };
1324 let row_id_index = None;
1325 let source_info = StreamSourceInfo {
1326 row_format: PbRowFormatType::Native as i32,
1327 ..Default::default()
1328 };
1329 let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1330 let column_ids = vec![0].into_iter().map(ColumnId::from).collect();
1331
1332 let properties = convert_args!(btreemap!(
1334 "connector" => "datagen",
1335 "datagen.rows.per.second" => "3",
1336 "fields.sequence_int.kind" => "sequence",
1337 "fields.sequence_int.start" => "11",
1338 "fields.sequence_int.end" => "11111",
1339 ));
1340 let source_desc_builder =
1341 create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1342 let split_state_store = SourceStateTableHandler::from_table_catalog(
1343 &default_source_internal_table(0x2333),
1344 MemoryStateStore::new(),
1345 )
1346 .await;
1347 let core = StreamSourceCore::<MemoryStateStore> {
1348 source_id,
1349 column_ids,
1350 source_desc_builder: Some(source_desc_builder),
1351 latest_split_info: HashMap::new(),
1352 split_state_store,
1353 updated_splits_in_epoch: HashMap::new(),
1354 source_name: MOCK_SOURCE_NAME.to_owned(),
1355 };
1356
1357 let system_params_manager = LocalSystemParamsManager::for_test();
1358
1359 let executor = SourceExecutor::new(
1360 ActorContext::for_test(0),
1361 core,
1362 Arc::new(StreamingMetrics::unused()),
1363 barrier_rx,
1364 system_params_manager.get_params(),
1365 None,
1366 false,
1367 LocalBarrierManager::for_test(),
1368 );
1369 let mut executor = executor.boxed().execute();
1370
1371 let init_barrier =
1372 Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
1373 splits: hashmap! {
1374 ActorId::default() => vec![
1375 SplitImpl::Datagen(DatagenSplit {
1376 split_index: 0,
1377 split_num: 1,
1378 start_offset: None,
1379 }),
1380 ],
1381 },
1382 ..Default::default()
1383 }));
1384 barrier_tx.send(init_barrier).unwrap();
1385
1386 executor.next().await.unwrap().unwrap();
1388
1389 let msg = executor.next().await.unwrap().unwrap();
1391
1392 assert_eq!(
1394 msg.into_chunk().unwrap(),
1395 StreamChunk::from_pretty(
1396 " i
1397 + 11
1398 + 12
1399 + 13"
1400 )
1401 );
1402 }
1403
1404 #[traced_test]
1405 #[tokio::test]
1406 async fn test_split_change_mutation() {
1407 let source_id = SourceId::new(0);
1408 let schema = Schema {
1409 fields: vec![Field::with_name(DataType::Int32, "v1")],
1410 };
1411 let row_id_index = None;
1412 let source_info = StreamSourceInfo {
1413 row_format: PbRowFormatType::Native as i32,
1414 ..Default::default()
1415 };
1416 let properties = convert_args!(btreemap!(
1417 "connector" => "datagen",
1418 "fields.v1.kind" => "sequence",
1419 "fields.v1.start" => "11",
1420 "fields.v1.end" => "11111",
1421 ));
1422
1423 let source_desc_builder =
1424 create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1425 let mem_state_store = MemoryStateStore::new();
1426
1427 let column_ids = vec![ColumnId::from(0)];
1428 let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1429 let split_state_store = SourceStateTableHandler::from_table_catalog(
1430 &default_source_internal_table(0x2333),
1431 mem_state_store.clone(),
1432 )
1433 .await;
1434
1435 let core = StreamSourceCore::<MemoryStateStore> {
1436 source_id,
1437 column_ids: column_ids.clone(),
1438 source_desc_builder: Some(source_desc_builder),
1439 latest_split_info: HashMap::new(),
1440 split_state_store,
1441 updated_splits_in_epoch: HashMap::new(),
1442 source_name: MOCK_SOURCE_NAME.to_owned(),
1443 };
1444
1445 let system_params_manager = LocalSystemParamsManager::for_test();
1446
1447 let executor = SourceExecutor::new(
1448 ActorContext::for_test(0),
1449 core,
1450 Arc::new(StreamingMetrics::unused()),
1451 barrier_rx,
1452 system_params_manager.get_params(),
1453 None,
1454 false,
1455 LocalBarrierManager::for_test(),
1456 );
1457 let mut handler = executor.boxed().execute();
1458
1459 let mut epoch = test_epoch(1);
1460 let init_barrier =
1461 Barrier::new_test_barrier(epoch).with_mutation(Mutation::Add(AddMutation {
1462 splits: hashmap! {
1463 ActorId::default() => vec![
1464 SplitImpl::Datagen(DatagenSplit {
1465 split_index: 0,
1466 split_num: 3,
1467 start_offset: None,
1468 }),
1469 ],
1470 },
1471 ..Default::default()
1472 }));
1473 barrier_tx.send(init_barrier).unwrap();
1474
1475 handler
1477 .next()
1478 .await
1479 .unwrap()
1480 .unwrap()
1481 .into_barrier()
1482 .unwrap();
1483
1484 let mut ready_chunks = handler.ready_chunks(10);
1485
1486 let _ = ready_chunks.next().await.unwrap();
1487
1488 let new_assignment = vec![
1489 SplitImpl::Datagen(DatagenSplit {
1490 split_index: 0,
1491 split_num: 3,
1492 start_offset: None,
1493 }),
1494 SplitImpl::Datagen(DatagenSplit {
1495 split_index: 1,
1496 split_num: 3,
1497 start_offset: None,
1498 }),
1499 SplitImpl::Datagen(DatagenSplit {
1500 split_index: 2,
1501 split_num: 3,
1502 start_offset: None,
1503 }),
1504 ];
1505
1506 epoch.inc_epoch();
1507 let change_split_mutation =
1508 Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1509 ActorId::default() => new_assignment.clone()
1510 }));
1511
1512 barrier_tx.send(change_split_mutation).unwrap();
1513
1514 let _ = ready_chunks.next().await.unwrap(); epoch.inc_epoch();
1517 let barrier = Barrier::new_test_barrier(epoch);
1518 barrier_tx.send(barrier).unwrap();
1519
1520 ready_chunks.next().await.unwrap(); let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1523 &default_source_internal_table(0x2333),
1524 mem_state_store.clone(),
1525 )
1526 .await;
1527
1528 source_state_handler
1530 .init_epoch(EpochPair::new_test_epoch(epoch))
1531 .await
1532 .unwrap();
1533 source_state_handler
1534 .get(&new_assignment[1].id())
1535 .await
1536 .unwrap()
1537 .unwrap();
1538
1539 tokio::time::sleep(Duration::from_millis(100)).await;
1540
1541 let _ = ready_chunks.next().await.unwrap();
1542
1543 epoch.inc_epoch();
1544 let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Pause);
1545 barrier_tx.send(barrier).unwrap();
1546
1547 epoch.inc_epoch();
1548 let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Resume);
1549 barrier_tx.send(barrier).unwrap();
1550
1551 ready_chunks.next().await.unwrap();
1553
1554 let prev_assignment = new_assignment;
1555 let new_assignment = vec![prev_assignment[2].clone()];
1556
1557 epoch.inc_epoch();
1558 let drop_split_mutation =
1559 Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1560 ActorId::default() => new_assignment.clone()
1561 }));
1562
1563 barrier_tx.send(drop_split_mutation).unwrap();
1564
1565 ready_chunks.next().await.unwrap(); epoch.inc_epoch();
1568 let barrier = Barrier::new_test_barrier(epoch);
1569 barrier_tx.send(barrier).unwrap();
1570
1571 ready_chunks.next().await.unwrap(); let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1574 &default_source_internal_table(0x2333),
1575 mem_state_store.clone(),
1576 )
1577 .await;
1578
1579 let new_epoch = EpochPair::new_test_epoch(epoch);
1580 source_state_handler.init_epoch(new_epoch).await.unwrap();
1581
1582 let committed_reader = source_state_handler
1583 .new_committed_reader(new_epoch)
1584 .await
1585 .unwrap();
1586 assert!(
1587 committed_reader
1588 .try_recover_from_state_store(&prev_assignment[0])
1589 .await
1590 .unwrap()
1591 .is_none()
1592 );
1593
1594 assert!(
1595 committed_reader
1596 .try_recover_from_state_store(&prev_assignment[1])
1597 .await
1598 .unwrap()
1599 .is_none()
1600 );
1601
1602 assert!(
1603 committed_reader
1604 .try_recover_from_state_store(&prev_assignment[2])
1605 .await
1606 .unwrap()
1607 .is_some()
1608 );
1609 }
1610}