risingwave_stream/executor/source/
source_executor.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use either::Either;
20use itertools::Itertools;
21use prometheus::core::{AtomicU64, GenericCounter};
22use risingwave_common::array::ArrayRef;
23use risingwave_common::catalog::{ColumnId, TableId};
24use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedMetric};
25use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::util::epoch::{Epoch, EpochPair};
28use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
29use risingwave_connector::source::cdc::split::extract_postgres_lsn_from_offset_str;
30use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
31use risingwave_connector::source::reader::reader::SourceReader;
32use risingwave_connector::source::{
33    ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData,
34    StreamChunkWithState, WaitCheckpointTask, build_pulsar_ack_channel_id,
35};
36use risingwave_hummock_sdk::HummockReadEpoch;
37use risingwave_pb::id::SourceId;
38use risingwave_storage::store::TryWaitEpochOptions;
39use thiserror_ext::AsReport;
40use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
41use tokio::sync::{mpsc, oneshot};
42use tokio::time::Instant;
43
44use super::executor_core::StreamSourceCore;
45use super::{barrier_to_message_stream, get_split_offset_col_idx, prune_additional_cols};
46use crate::common::rate_limit::limited_chunk_size;
47use crate::executor::UpdateMutation;
48use crate::executor::prelude::*;
49use crate::executor::source::reader_stream::StreamReaderBuilder;
50use crate::executor::stream_reader::StreamReaderWithPause;
51use crate::task::LocalBarrierManager;
52
53/// A constant to multiply when calculating the maximum time to wait for a barrier. This is due to
54/// some latencies in network and cost in meta.
55pub const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;
56
57pub struct SourceExecutor<S: StateStore> {
58    actor_ctx: ActorContextRef,
59
60    /// Streaming source for external
61    stream_source_core: StreamSourceCore<S>,
62
63    /// Metrics for monitor.
64    metrics: Arc<StreamingMetrics>,
65
66    /// Receiver of barrier channel.
67    barrier_receiver: Option<UnboundedReceiver<Barrier>>,
68
69    /// System parameter reader to read barrier interval
70    system_params: SystemParamsReaderRef,
71
72    /// Rate limit in rows/s.
73    rate_limit_rps: Option<u32>,
74
75    is_shared_non_cdc: bool,
76
77    /// Local barrier manager for reporting source load finished events
78    _barrier_manager: LocalBarrierManager,
79}
80
81impl<S: StateStore> SourceExecutor<S> {
82    #[expect(clippy::too_many_arguments)]
83    pub fn new(
84        actor_ctx: ActorContextRef,
85        stream_source_core: StreamSourceCore<S>,
86        metrics: Arc<StreamingMetrics>,
87        barrier_receiver: UnboundedReceiver<Barrier>,
88        system_params: SystemParamsReaderRef,
89        rate_limit_rps: Option<u32>,
90        is_shared_non_cdc: bool,
91        barrier_manager: LocalBarrierManager,
92    ) -> Self {
93        Self {
94            actor_ctx,
95            stream_source_core,
96            metrics,
97            barrier_receiver: Some(barrier_receiver),
98            system_params,
99            rate_limit_rps,
100            is_shared_non_cdc,
101            _barrier_manager: barrier_manager,
102        }
103    }
104
105    fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder {
106        StreamReaderBuilder {
107            source_desc,
108            rate_limit: self.rate_limit_rps,
109            source_id: self.stream_source_core.source_id,
110            source_name: self.stream_source_core.source_name.clone(),
111            is_auto_schema_change_enable: self.is_auto_schema_change_enable(),
112            actor_ctx: self.actor_ctx.clone(),
113            reader_stream: None,
114        }
115    }
116
117    async fn spawn_wait_checkpoint_worker(
118        core: &StreamSourceCore<S>,
119        source_reader: SourceReader,
120        metrics: Arc<StreamingMetrics>,
121    ) -> StreamExecutorResult<Option<WaitCheckpointTaskBuilder>> {
122        let Some(initial_task) = source_reader.create_wait_checkpoint_task().await? else {
123            return Ok(None);
124        };
125        let (wait_checkpoint_tx, wait_checkpoint_rx) = mpsc::unbounded_channel();
126        let wait_checkpoint_worker = WaitCheckpointWorker {
127            wait_checkpoint_rx,
128            state_store: core.split_state_store.state_table().state_store().clone(),
129            table_id: core.split_state_store.state_table().table_id(),
130            metrics,
131        };
132        tokio::spawn(wait_checkpoint_worker.run());
133        Ok(Some(WaitCheckpointTaskBuilder {
134            wait_checkpoint_tx,
135            source_reader,
136            building_task: initial_task,
137        }))
138    }
139
140    /// build the source column ids and the source context which will be used to build the source stream
141    pub fn prepare_source_stream_build(
142        &self,
143        source_desc: &SourceDesc,
144    ) -> (Vec<ColumnId>, SourceContext) {
145        let column_ids = source_desc
146            .columns
147            .iter()
148            .map(|column_desc| column_desc.column_id)
149            .collect_vec();
150
151        let (schema_change_tx, mut schema_change_rx) =
152            mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
153        let schema_change_tx = if self.is_auto_schema_change_enable() {
154            let meta_client = self.actor_ctx.meta_client.clone();
155            // spawn a task to handle schema change event from source parser
156            let _join_handle = tokio::task::spawn(async move {
157                while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
158                    let table_ids = schema_change.table_ids();
159                    tracing::info!(
160                        target: "auto_schema_change",
161                        "recv a schema change event for tables: {:?}", table_ids);
162                    // TODO: retry on rpc error
163                    if let Some(ref meta_client) = meta_client {
164                        match meta_client
165                            .auto_schema_change(schema_change.to_protobuf())
166                            .await
167                        {
168                            Ok(_) => {
169                                tracing::info!(
170                                    target: "auto_schema_change",
171                                    "schema change success for tables: {:?}", table_ids);
172                                finish_tx.send(()).unwrap();
173                            }
174                            Err(e) => {
175                                tracing::error!(
176                                    target: "auto_schema_change",
177                                    error = ?e.as_report(), "schema change error");
178                                finish_tx.send(()).unwrap();
179                            }
180                        }
181                    }
182                }
183            });
184            Some(schema_change_tx)
185        } else {
186            info!("auto schema change is disabled in config");
187            None
188        };
189        let source_ctx = SourceContext::new(
190            self.actor_ctx.id,
191            self.stream_source_core.source_id,
192            self.actor_ctx.fragment_id,
193            self.stream_source_core.source_name.clone(),
194            source_desc.metrics.clone(),
195            SourceCtrlOpts {
196                chunk_size: limited_chunk_size(self.rate_limit_rps),
197                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
198            },
199            source_desc.source.config.clone(),
200            schema_change_tx,
201        );
202
203        (column_ids, source_ctx)
204    }
205
206    fn is_auto_schema_change_enable(&self) -> bool {
207        self.actor_ctx.config.developer.enable_auto_schema_change
208    }
209
210    /// `source_id | source_name | actor_id | fragment_id`
211    #[inline]
212    fn get_metric_labels(&self) -> [String; 4] {
213        [
214            self.stream_source_core.source_id.to_string(),
215            self.stream_source_core.source_name.clone(),
216            self.actor_ctx.id.to_string(),
217            self.actor_ctx.fragment_id.to_string(),
218        ]
219    }
220
221    /// - `should_trim_state`: whether to trim state for dropped splits.
222    ///
223    ///   For scaling, the connector splits can be migrated to other actors, but
224    ///   won't be added or removed. Actors should not trim states for splits that
225    ///   are moved to other actors.
226    ///
227    ///   For source split change, split will not be migrated and we can trim states
228    ///   for deleted splits.
229    async fn apply_split_change_after_yield_barrier<const BIASED: bool>(
230        &mut self,
231        barrier_epoch: EpochPair,
232        source_desc: &SourceDesc,
233        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
234        apply_mutation: ApplyMutationAfterBarrier<'_>,
235    ) -> StreamExecutorResult<()> {
236        {
237            let mut should_rebuild_stream = false;
238            match apply_mutation {
239                ApplyMutationAfterBarrier::SplitChange {
240                    target_splits,
241                    should_trim_state,
242                    split_change_count,
243                } => {
244                    split_change_count.inc();
245                    if self
246                        .update_state_if_changed(barrier_epoch, target_splits, should_trim_state)
247                        .await?
248                    {
249                        should_rebuild_stream = true;
250                    }
251                }
252                ApplyMutationAfterBarrier::ConnectorPropsChange => {
253                    should_rebuild_stream = true;
254                }
255            }
256
257            if should_rebuild_stream {
258                self.rebuild_stream_reader(source_desc, stream)?;
259            }
260        }
261
262        Ok(())
263    }
264
265    /// Returns `true` if split changed. Otherwise `false`.
266    async fn update_state_if_changed(
267        &mut self,
268        barrier_epoch: EpochPair,
269        target_splits: Vec<SplitImpl>,
270        should_trim_state: bool,
271    ) -> StreamExecutorResult<bool> {
272        let core = &mut self.stream_source_core;
273
274        let target_splits: HashMap<_, _> = target_splits
275            .into_iter()
276            .map(|split| (split.id(), split))
277            .collect();
278
279        let mut target_state: HashMap<SplitId, SplitImpl> =
280            HashMap::with_capacity(target_splits.len());
281
282        let mut split_changed = false;
283
284        let committed_reader = core
285            .split_state_store
286            .new_committed_reader(barrier_epoch)
287            .await?;
288
289        // Checks added splits
290        for (split_id, split) in target_splits {
291            if let Some(s) = core.latest_split_info.get(&split_id) {
292                // For existing splits, we should use the latest offset from the cache.
293                // `target_splits` is from meta and contains the initial offset.
294                target_state.insert(split_id, s.clone());
295            } else {
296                split_changed = true;
297                // write new assigned split to state cache. snapshot is base on cache.
298
299                let initial_state = if let Some(recover_state) = committed_reader
300                    .try_recover_from_state_store(&split)
301                    .await?
302                {
303                    recover_state
304                } else {
305                    split
306                };
307
308                core.updated_splits_in_epoch
309                    .entry(split_id.clone())
310                    .or_insert_with(|| initial_state.clone());
311
312                target_state.insert(split_id, initial_state);
313            }
314        }
315
316        // Checks dropped splits
317        for existing_split_id in core.latest_split_info.keys() {
318            if !target_state.contains_key(existing_split_id) {
319                tracing::info!("split dropping detected: {}", existing_split_id);
320                split_changed = true;
321            }
322        }
323
324        if split_changed {
325            tracing::info!(
326                actor_id = %self.actor_ctx.id,
327                state = ?target_state,
328                "apply split change"
329            );
330
331            core.updated_splits_in_epoch
332                .retain(|split_id, _| target_state.contains_key(split_id));
333
334            let dropped_splits = core
335                .latest_split_info
336                .extract_if(|split_id, _| !target_state.contains_key(split_id))
337                .map(|(_, split)| split)
338                .collect_vec();
339
340            if should_trim_state && !dropped_splits.is_empty() {
341                // trim dropped splits' state
342                core.split_state_store.trim_state(&dropped_splits).await?;
343            }
344
345            core.latest_split_info = target_state;
346        }
347
348        Ok(split_changed)
349    }
350
351    /// Rebuild stream if there is a err in stream
352    fn rebuild_stream_reader_from_error<const BIASED: bool>(
353        &mut self,
354        source_desc: &SourceDesc,
355        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
356        e: StreamExecutorError,
357    ) -> StreamExecutorResult<()> {
358        let core = &mut self.stream_source_core;
359        tracing::error!(
360            error = ?e.as_report(),
361            actor_id = %self.actor_ctx.id,
362            source_id = %core.source_id,
363            "stream source reader error",
364        );
365        GLOBAL_ERROR_METRICS.user_source_error.report([
366            e.variant_name().to_owned(),
367            core.source_id.to_string(),
368            core.source_name.clone(),
369            self.actor_ctx.fragment_id.to_string(),
370        ]);
371
372        self.rebuild_stream_reader(source_desc, stream)
373    }
374
375    fn rebuild_stream_reader<const BIASED: bool>(
376        &mut self,
377        source_desc: &SourceDesc,
378        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
379    ) -> StreamExecutorResult<()> {
380        let core = &mut self.stream_source_core;
381        let target_state: Vec<SplitImpl> = core.latest_split_info.values().cloned().collect();
382
383        tracing::info!(
384            "actor {:?} apply source split change to {:?}",
385            self.actor_ctx.id,
386            target_state
387        );
388
389        // Replace the source reader with a new one of the new state.
390        let reader_stream_builder = self.stream_reader_builder(source_desc.clone());
391        let reader_stream = reader_stream_builder.into_retry_stream(Some(target_state), false);
392
393        stream.replace_data_stream(reader_stream);
394
395        Ok(())
396    }
397
398    async fn persist_state_and_clear_cache(
399        &mut self,
400        epoch: EpochPair,
401    ) -> StreamExecutorResult<HashMap<SplitId, SplitImpl>> {
402        let core = &mut self.stream_source_core;
403
404        let cache = core
405            .updated_splits_in_epoch
406            .values()
407            .map(|split_impl| split_impl.to_owned())
408            .collect_vec();
409
410        if !cache.is_empty() {
411            tracing::debug!(state = ?cache, "take snapshot");
412
413            // Record metrics for CDC sources before moving cache
414            let source_id = core.source_id.to_string();
415            for split_impl in &cache {
416                // Extract and record CDC-specific metrics based on split type
417                match split_impl {
418                    SplitImpl::PostgresCdc(pg_split) => {
419                        if let Some(lsn_value) = pg_split.pg_lsn() {
420                            self.metrics
421                                .pg_cdc_state_table_lsn
422                                .with_guarded_label_values(&[&source_id])
423                                .set(lsn_value as i64);
424                        }
425                    }
426                    SplitImpl::MysqlCdc(mysql_split) => {
427                        if let Some((file_seq, position)) = mysql_split.mysql_binlog_offset() {
428                            self.metrics
429                                .mysql_cdc_state_binlog_file_seq
430                                .with_guarded_label_values(&[&source_id])
431                                .set(file_seq as i64);
432
433                            self.metrics
434                                .mysql_cdc_state_binlog_position
435                                .with_guarded_label_values(&[&source_id])
436                                .set(position as i64);
437                        }
438                    }
439                    _ => {}
440                }
441            }
442
443            core.split_state_store.set_states(cache).await?;
444        }
445
446        // commit anyway, even if no message saved
447        core.split_state_store.commit(epoch).await?;
448
449        let updated_splits = core.updated_splits_in_epoch.clone();
450
451        core.updated_splits_in_epoch.clear();
452
453        Ok(updated_splits)
454    }
455
456    /// try mem table spill
457    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
458        let core = &mut self.stream_source_core;
459        core.split_state_store.try_flush().await?;
460
461        Ok(())
462    }
463
464    /// A source executor with a stream source receives:
465    /// 1. Barrier messages
466    /// 2. Data from external source
467    /// and acts accordingly.
468    #[try_stream(ok = Message, error = StreamExecutorError)]
469    async fn execute_inner(mut self) {
470        let mut barrier_receiver = self.barrier_receiver.take().unwrap();
471        let first_barrier = barrier_receiver
472            .recv()
473            .instrument_await("source_recv_first_barrier")
474            .await
475            .ok_or_else(|| {
476                anyhow!(
477                    "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
478                    self.actor_ctx.id,
479                    self.stream_source_core.source_id
480                )
481            })?;
482        let first_epoch = first_barrier.epoch;
483        let mut boot_state =
484            if let Some(splits) = first_barrier.initial_split_assignment(self.actor_ctx.id) {
485                tracing::debug!(?splits, "boot with splits");
486                splits.to_vec()
487            } else {
488                Vec::default()
489            };
490        let is_pause_on_startup = first_barrier.is_pause_on_startup();
491        let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id);
492
493        yield Message::Barrier(first_barrier);
494
495        let mut core = self.stream_source_core;
496        let source_id = core.source_id;
497
498        // Build source description from the builder.
499        let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap();
500        let mut source_desc = source_desc_builder
501            .build()
502            .map_err(StreamExecutorError::connector_error)?;
503
504        let mut wait_checkpoint_task_builder = Self::spawn_wait_checkpoint_worker(
505            &core,
506            source_desc.source.clone(),
507            self.metrics.clone(),
508        )
509        .await?;
510
511        let (Some(split_idx), Some(offset_idx), pulsar_message_id_idx) =
512            get_split_offset_col_idx(&source_desc.columns)
513        else {
514            unreachable!("Partition and offset columns must be set.");
515        };
516
517        core.split_state_store.init_epoch(first_epoch).await?;
518        {
519            let committed_reader = core
520                .split_state_store
521                .new_committed_reader(first_epoch)
522                .await?;
523            for ele in &mut boot_state {
524                if let Some(recover_state) =
525                    committed_reader.try_recover_from_state_store(ele).await?
526                {
527                    *ele = recover_state;
528                    // if state store is non-empty, we consider it's initialized.
529                    is_uninitialized = false;
530                } else {
531                    // This is a new split, not in state table.
532                    // make sure it is written to state table later.
533                    // Then even it receives no messages, we can observe it in state table.
534                    core.updated_splits_in_epoch.insert(ele.id(), ele.clone());
535                }
536            }
537        }
538
539        // init in-memory split states with persisted state if any
540        core.init_split_state(boot_state.clone());
541
542        // Return the ownership of `stream_source_core` to the source executor.
543        self.stream_source_core = core;
544
545        let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
546        tracing::debug!(state = ?recover_state, "start with state");
547
548        let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
549        let mut reader_stream_builder = self.stream_reader_builder(source_desc.clone());
550        let mut latest_splits = None;
551        // Build the source stream reader.
552        if is_uninitialized {
553            let create_split_reader_result = reader_stream_builder
554                .fetch_latest_splits(recover_state.clone(), self.is_shared_non_cdc)
555                .await?;
556            latest_splits = create_split_reader_result.latest_splits;
557        }
558
559        if let Some(latest_splits) = latest_splits {
560            // make sure it is written to state table later.
561            // Then even it receives no messages, we can observe it in state table.
562            self.stream_source_core
563                .updated_splits_in_epoch
564                .extend(latest_splits.into_iter().map(|s| (s.id(), s)));
565        }
566        // Merge the chunks from source and the barriers into a single stream. We prioritize
567        // barriers over source data chunks here.
568        let mut stream = StreamReaderWithPause::<true, StreamChunkWithState>::new(
569            barrier_stream,
570            reader_stream_builder
571                .into_retry_stream(recover_state, is_uninitialized && self.is_shared_non_cdc),
572        );
573        let mut command_paused = false;
574
575        // - If the first barrier requires us to pause on startup, pause the stream.
576        if is_pause_on_startup {
577            tracing::info!("source paused on startup");
578            stream.pause_stream();
579            command_paused = true;
580        }
581
582        // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms`
583        // milliseconds, considering some other latencies like network and cost in Meta.
584        let mut max_wait_barrier_time_ms =
585            self.system_params.load().barrier_interval_ms() as u128 * WAIT_BARRIER_MULTIPLE_TIMES;
586        let mut last_barrier_time = Instant::now();
587        let mut self_paused = false;
588
589        let source_output_row_count = self
590            .metrics
591            .source_output_row_count
592            .with_guarded_label_values(&self.get_metric_labels());
593
594        let source_split_change_count = self
595            .metrics
596            .source_split_change_count
597            .with_guarded_label_values(&self.get_metric_labels());
598
599        while let Some(msg) = stream.next().await {
600            let Ok(msg) = msg else {
601                tokio::time::sleep(Duration::from_millis(1000)).await;
602                self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err())?;
603                continue;
604            };
605
606            match msg {
607                // This branch will be preferred.
608                Either::Left(Message::Barrier(barrier)) => {
609                    last_barrier_time = Instant::now();
610
611                    if self_paused {
612                        self_paused = false;
613                        // command_paused has a higher priority.
614                        if !command_paused {
615                            stream.resume_stream();
616                        }
617                    }
618
619                    let epoch = barrier.epoch;
620                    let mut split_change = None;
621
622                    if let Some(mutation) = barrier.mutation.as_deref() {
623                        match mutation {
624                            Mutation::Pause => {
625                                command_paused = true;
626                                stream.pause_stream()
627                            }
628                            Mutation::Resume => {
629                                command_paused = false;
630                                stream.resume_stream()
631                            }
632                            Mutation::SourceChangeSplit(actor_splits) => {
633                                tracing::info!(
634                                    actor_id = %self.actor_ctx.id,
635                                    actor_splits = ?actor_splits,
636                                    "source change split received"
637                                );
638
639                                split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
640                                    |target_splits| {
641                                        (
642                                            &source_desc,
643                                            &mut stream,
644                                            ApplyMutationAfterBarrier::SplitChange {
645                                                target_splits,
646                                                should_trim_state: true,
647                                                split_change_count: &source_split_change_count,
648                                            },
649                                        )
650                                    },
651                                );
652                            }
653
654                            Mutation::ConnectorPropsChange(maybe_mutation) => {
655                                if let Some(new_props) = maybe_mutation.get(&source_id.as_raw_id())
656                                {
657                                    // rebuild the stream reader with new props
658                                    tracing::info!(
659                                        actor_id = %self.actor_ctx.id,
660                                        source_id = %source_id,
661                                        "updating source connector properties",
662                                    );
663                                    source_desc.update_reader(new_props.clone())?;
664                                    // suppose the connector props change will not involve state change
665                                    split_change = Some((
666                                        &source_desc,
667                                        &mut stream,
668                                        ApplyMutationAfterBarrier::ConnectorPropsChange,
669                                    ));
670                                }
671                            }
672
673                            Mutation::Update(UpdateMutation { actor_splits, .. }) => {
674                                split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
675                                    |target_splits| {
676                                        (
677                                            &source_desc,
678                                            &mut stream,
679                                            ApplyMutationAfterBarrier::SplitChange {
680                                                target_splits,
681                                                should_trim_state: false,
682                                                split_change_count: &source_split_change_count,
683                                            },
684                                        )
685                                    },
686                                );
687                            }
688                            Mutation::Throttle(fragment_to_apply) => {
689                                if let Some(new_rate_limit) =
690                                    fragment_to_apply.get(&self.actor_ctx.fragment_id)
691                                    && *new_rate_limit != self.rate_limit_rps
692                                {
693                                    tracing::info!(
694                                        "updating rate limit from {:?} to {:?}",
695                                        self.rate_limit_rps,
696                                        *new_rate_limit
697                                    );
698                                    self.rate_limit_rps = *new_rate_limit;
699                                    // recreate from latest_split_info
700                                    self.rebuild_stream_reader(&source_desc, &mut stream)?;
701                                }
702                            }
703                            Mutation::ResetSource { source_id } => {
704                                // Note: RESET SOURCE only clears the offset, does NOT pause the source.
705                                // When offset is None, after recovery/restart, Debezium will automatically
706                                // enter recovery mode and fetch the latest offset from upstream.
707                                if *source_id == self.stream_source_core.source_id {
708                                    tracing::info!(
709                                        actor_id = %self.actor_ctx.id,
710                                        source_id = source_id.as_raw_id(),
711                                        "Resetting CDC source: clearing offset (set to None)"
712                                    );
713
714                                    // Step 1: Collect all current splits and clear their offsets
715                                    let splits_with_cleared_offset: Vec<SplitImpl> = self.stream_source_core
716                                        .latest_split_info
717                                        .values()
718                                        .map(|split| {
719                                            // Clone the split and clear its offset
720                                            let mut new_split = split.clone();
721                                            match &mut new_split {
722                                                SplitImpl::MysqlCdc(debezium_split) => {
723                                                    if let Some(mysql_split) = debezium_split.mysql_split.as_mut() {
724                                                        tracing::info!(
725                                                            split_id = ?mysql_split.inner.split_id,
726                                                            old_offset = ?mysql_split.inner.start_offset,
727                                                            "Clearing MySQL CDC offset"
728                                                        );
729                                                        mysql_split.inner.start_offset = None;
730                                                    }
731                                                }
732                                                SplitImpl::PostgresCdc(debezium_split) => {
733                                                    if let Some(pg_split) = debezium_split.postgres_split.as_mut() {
734                                                        tracing::info!(
735                                                            split_id = ?pg_split.inner.split_id,
736                                                            old_offset = ?pg_split.inner.start_offset,
737                                                            "Clearing PostgreSQL CDC offset"
738                                                        );
739                                                        pg_split.inner.start_offset = None;
740                                                    }
741                                                }
742                                                SplitImpl::MongodbCdc(debezium_split) => {
743                                                    if let Some(mongo_split) = debezium_split.mongodb_split.as_mut() {
744                                                        tracing::info!(
745                                                            split_id = ?mongo_split.inner.split_id,
746                                                            old_offset = ?mongo_split.inner.start_offset,
747                                                            "Clearing MongoDB CDC offset"
748                                                        );
749                                                        mongo_split.inner.start_offset = None;
750                                                    }
751                                                }
752                                                SplitImpl::CitusCdc(debezium_split) => {
753                                                    if let Some(citus_split) = debezium_split.citus_split.as_mut() {
754                                                        tracing::info!(
755                                                            split_id = ?citus_split.inner.split_id,
756                                                            old_offset = ?citus_split.inner.start_offset,
757                                                            "Clearing Citus CDC offset"
758                                                        );
759                                                        citus_split.inner.start_offset = None;
760                                                    }
761                                                }
762                                                SplitImpl::SqlServerCdc(debezium_split) => {
763                                                    if let Some(sqlserver_split) = debezium_split.sql_server_split.as_mut() {
764                                                        tracing::info!(
765                                                            split_id = ?sqlserver_split.inner.split_id,
766                                                            old_offset = ?sqlserver_split.inner.start_offset,
767                                                            "Clearing SQL Server CDC offset"
768                                                        );
769                                                        sqlserver_split.inner.start_offset = None;
770                                                    }
771                                                }
772                                                _ => {
773                                                    tracing::warn!(
774                                                        "RESET SOURCE called on non-CDC split type"
775                                                    );
776                                                }
777                                            }
778                                            new_split
779                                        })
780                                        .collect();
781
782                                    if !splits_with_cleared_offset.is_empty() {
783                                        tracing::info!(
784                                            actor_id = %self.actor_ctx.id,
785                                            split_count = splits_with_cleared_offset.len(),
786                                            "Updating state table with cleared offsets"
787                                        );
788
789                                        // Step 2: Write splits back to state table with offset = None
790                                        self.stream_source_core
791                                            .split_state_store
792                                            .set_states(splits_with_cleared_offset.clone())
793                                            .await?;
794
795                                        // Step 3: Update in-memory split info with cleared offsets
796                                        for split in splits_with_cleared_offset {
797                                            self.stream_source_core
798                                                .latest_split_info
799                                                .insert(split.id(), split.clone());
800                                            self.stream_source_core
801                                                .updated_splits_in_epoch
802                                                .insert(split.id(), split);
803                                        }
804
805                                        tracing::info!(
806                                            actor_id = %self.actor_ctx.id,
807                                            source_id = source_id.as_raw_id(),
808                                            "RESET SOURCE completed: offset cleared (set to None). \
809                                             Trigger recovery/restart to fetch latest offset from upstream."
810                                        );
811                                    } else {
812                                        tracing::warn!(
813                                            actor_id = %self.actor_ctx.id,
814                                            "No splits found to reset - source may not be initialized yet"
815                                        );
816                                    }
817                                } else {
818                                    tracing::debug!(
819                                        actor_id = %self.actor_ctx.id,
820                                        target_source_id = source_id.as_raw_id(),
821                                        current_source_id = self.stream_source_core.source_id.as_raw_id(),
822                                        "ResetSource mutation for different source, ignoring"
823                                    );
824                                }
825                            }
826                            _ => {}
827                        }
828                    }
829
830                    let updated_splits = self.persist_state_and_clear_cache(epoch).await?;
831
832                    // when handle a checkpoint barrier, spawn a task to wait for epoch commit notification
833                    if barrier.kind.is_checkpoint()
834                        && let Some(task_builder) = &mut wait_checkpoint_task_builder
835                    {
836                        task_builder.update_task_on_checkpoint(updated_splits);
837
838                        tracing::debug!("epoch to wait {:?}", epoch);
839                        task_builder.send(Epoch(epoch.prev)).await?
840                    }
841
842                    let barrier_epoch = barrier.epoch;
843                    yield Message::Barrier(barrier);
844
845                    if let Some((source_desc, stream, to_apply_mutation)) = split_change {
846                        self.apply_split_change_after_yield_barrier(
847                            barrier_epoch,
848                            source_desc,
849                            stream,
850                            to_apply_mutation,
851                        )
852                        .await?;
853                    }
854                }
855                Either::Left(_) => {
856                    // For the source executor, the message we receive from this arm
857                    // should always be barrier message.
858                    unreachable!();
859                }
860
861                Either::Right((chunk, latest_state)) => {
862                    if let Some(task_builder) = &mut wait_checkpoint_task_builder {
863                        if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
864                            let pulsar_message_id_col = chunk.column_at(pulsar_message_id_idx);
865                            task_builder.update_task_on_chunk(
866                                source_id,
867                                &latest_state,
868                                pulsar_message_id_col.clone(),
869                            );
870                        } else {
871                            let offset_col = chunk.column_at(offset_idx);
872                            task_builder.update_task_on_chunk(
873                                source_id,
874                                &latest_state,
875                                offset_col.clone(),
876                            );
877                        }
878                    }
879                    if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
880                        // Exceeds the max wait barrier time, the source will be paused.
881                        // Currently we can guarantee the
882                        // source is not paused since it received stream
883                        // chunks.
884                        self_paused = true;
885                        tracing::warn!(
886                            "source paused, wait barrier for {:?}",
887                            last_barrier_time.elapsed()
888                        );
889                        stream.pause_stream();
890
891                        // Only update `max_wait_barrier_time_ms` to capture
892                        // `barrier_interval_ms`
893                        // changes here to avoid frequently accessing the shared
894                        // `system_params`.
895                        max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
896                            as u128
897                            * WAIT_BARRIER_MULTIPLE_TIMES;
898                    }
899
900                    latest_state.iter().for_each(|(split_id, new_split_impl)| {
901                        if let Some(split_impl) =
902                            self.stream_source_core.latest_split_info.get_mut(split_id)
903                        {
904                            *split_impl = new_split_impl.clone();
905                        }
906                    });
907
908                    self.stream_source_core
909                        .updated_splits_in_epoch
910                        .extend(latest_state);
911
912                    let card = chunk.cardinality();
913                    if card == 0 {
914                        continue;
915                    }
916                    source_output_row_count.inc_by(card as u64);
917                    let to_remove_col_indices =
918                        if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
919                            vec![split_idx, offset_idx, pulsar_message_id_idx]
920                        } else {
921                            vec![split_idx, offset_idx]
922                        };
923                    let chunk =
924                        prune_additional_cols(&chunk, &to_remove_col_indices, &source_desc.columns);
925                    yield Message::Chunk(chunk);
926                    self.try_flush_data().await?;
927                }
928            }
929        }
930
931        // The source executor should only be stopped by the actor when finding a `Stop` mutation.
932        tracing::error!(
933            actor_id = %self.actor_ctx.id,
934            "source executor exited unexpectedly"
935        )
936    }
937}
938
939#[derive(Debug, Clone)]
940enum ApplyMutationAfterBarrier<'a> {
941    SplitChange {
942        target_splits: Vec<SplitImpl>,
943        should_trim_state: bool,
944        split_change_count: &'a LabelGuardedMetric<GenericCounter<AtomicU64>>,
945    },
946    ConnectorPropsChange,
947}
948
949impl<S: StateStore> Execute for SourceExecutor<S> {
950    fn execute(self: Box<Self>) -> BoxedMessageStream {
951        self.execute_inner().boxed()
952    }
953}
954
955impl<S: StateStore> Debug for SourceExecutor<S> {
956    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
957        f.debug_struct("SourceExecutor")
958            .field("source_id", &self.stream_source_core.source_id)
959            .field("column_ids", &self.stream_source_core.column_ids)
960            .finish()
961    }
962}
963
964struct WaitCheckpointTaskBuilder {
965    wait_checkpoint_tx: UnboundedSender<(Epoch, WaitCheckpointTask)>,
966    source_reader: SourceReader,
967    building_task: WaitCheckpointTask,
968}
969
970impl WaitCheckpointTaskBuilder {
971    fn update_task_on_chunk(
972        &mut self,
973        source_id: SourceId,
974        latest_state: &HashMap<SplitId, SplitImpl>,
975        offset_col: ArrayRef,
976    ) {
977        match &mut self.building_task {
978            WaitCheckpointTask::AckPubsubMessage(_, arrays) => {
979                arrays.push(offset_col);
980            }
981            WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => {
982                arrays.push(offset_col);
983            }
984            WaitCheckpointTask::AckPulsarMessage(arrays) => {
985                // each pulsar chunk will only contain one split
986                let split_id = latest_state.keys().next().unwrap();
987                let pulsar_ack_channel_id = build_pulsar_ack_channel_id(source_id, split_id);
988                arrays.push((pulsar_ack_channel_id, offset_col));
989            }
990            WaitCheckpointTask::CommitCdcOffset(_) => {}
991        }
992    }
993
994    fn update_task_on_checkpoint(&mut self, updated_splits: HashMap<SplitId, SplitImpl>) {
995        #[expect(clippy::single_match)]
996        match &mut self.building_task {
997            WaitCheckpointTask::CommitCdcOffset(offsets) => {
998                if !updated_splits.is_empty() {
999                    // cdc source only has one split
1000                    assert_eq!(1, updated_splits.len());
1001                    for (split_id, split_impl) in updated_splits {
1002                        if split_impl.is_cdc_split() {
1003                            *offsets = Some((split_id, split_impl.get_cdc_split_offset()));
1004                        } else {
1005                            unreachable!()
1006                        }
1007                    }
1008                }
1009            }
1010            _ => {}
1011        }
1012    }
1013
1014    /// Send and reset the building task to a new one.
1015    async fn send(&mut self, epoch: Epoch) -> Result<(), anyhow::Error> {
1016        let new_task = self
1017            .source_reader
1018            .create_wait_checkpoint_task()
1019            .await?
1020            .expect("wait checkpoint task should be created");
1021        self.wait_checkpoint_tx
1022            .send((epoch, std::mem::replace(&mut self.building_task, new_task)))
1023            .expect("wait_checkpoint_tx send should succeed");
1024        Ok(())
1025    }
1026}
1027
1028/// A worker used to do some work after each checkpoint epoch is committed.
1029///
1030/// # Usage Cases
1031///
1032/// Typically there are 2 issues related with ack on checkpoint:
1033///
1034/// 1. Correctness (at-least-once), or don't let upstream clean uncommitted data.
1035///    For message queueing semantics (delete after ack), we should ack to avoid redelivery,
1036///    and only ack after checkpoint to avoid data loss.
1037///
1038/// 2. Allow upstream to clean data after commit.
1039///
1040/// See also <https://github.com/risingwavelabs/risingwave/issues/16736#issuecomment-2109379790>
1041///
1042/// ## CDC
1043///
1044/// Commit last consumed offset to upstream DB, so that old data can be discarded.
1045///
1046/// ## Google Pub/Sub
1047///
1048/// Due to queueing semantics.
1049/// Although Pub/Sub supports `retain_acked_messages` and `seek` functionality,
1050/// it's quite limited unlike Kafka.
1051///
1052/// See also <https://cloud.google.com/pubsub/docs/subscribe-best-practices#process-messages>
1053struct WaitCheckpointWorker<S: StateStore> {
1054    wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>,
1055    state_store: S,
1056    table_id: TableId,
1057    metrics: Arc<StreamingMetrics>,
1058}
1059
1060impl<S: StateStore> WaitCheckpointWorker<S> {
1061    pub async fn run(mut self) {
1062        tracing::debug!("wait epoch worker start success");
1063        loop {
1064            // poll the rx and wait for the epoch commit
1065            match self.wait_checkpoint_rx.recv().await {
1066                Some((epoch, task)) => {
1067                    tracing::debug!("start to wait epoch {}", epoch.0);
1068                    let ret = self
1069                        .state_store
1070                        .try_wait_epoch(
1071                            HummockReadEpoch::Committed(epoch.0),
1072                            TryWaitEpochOptions {
1073                                table_id: self.table_id,
1074                            },
1075                        )
1076                        .await;
1077
1078                    match ret {
1079                        Ok(()) => {
1080                            tracing::debug!(epoch = epoch.0, "wait epoch success");
1081
1082                            // Run task with callback to record LSN after successful commit
1083                            task.run_with_on_commit_success(|source_id: u64, offset| {
1084                                if let Some(lsn_value) =
1085                                    extract_postgres_lsn_from_offset_str(offset)
1086                                {
1087                                    self.metrics
1088                                        .pg_cdc_jni_commit_offset_lsn
1089                                        .with_guarded_label_values(&[&source_id.to_string()])
1090                                        .set(lsn_value as i64);
1091                                }
1092                            })
1093                            .await;
1094                        }
1095                        Err(e) => {
1096                            tracing::error!(
1097                            error = %e.as_report(),
1098                            "wait epoch {} failed", epoch.0
1099                            );
1100                        }
1101                    }
1102                }
1103                None => {
1104                    tracing::error!("wait epoch rx closed");
1105                    break;
1106                }
1107            }
1108        }
1109    }
1110}
1111
1112#[cfg(test)]
1113mod tests {
1114    use maplit::{btreemap, convert_args, hashmap};
1115    use risingwave_common::catalog::{ColumnId, Field};
1116    use risingwave_common::id::SourceId;
1117    use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
1118    use risingwave_common::test_prelude::StreamChunkTestExt;
1119    use risingwave_common::util::epoch::{EpochExt, test_epoch};
1120    use risingwave_connector::source::datagen::DatagenSplit;
1121    use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
1122    use risingwave_pb::catalog::StreamSourceInfo;
1123    use risingwave_pb::plan_common::PbRowFormatType;
1124    use risingwave_storage::memory::MemoryStateStore;
1125    use tokio::sync::mpsc::unbounded_channel;
1126    use tracing_test::traced_test;
1127
1128    use super::*;
1129    use crate::executor::AddMutation;
1130    use crate::executor::source::{SourceStateTableHandler, default_source_internal_table};
1131    use crate::task::LocalBarrierManager;
1132
1133    const MOCK_SOURCE_NAME: &str = "mock_source";
1134
1135    #[tokio::test]
1136    async fn test_source_executor() {
1137        let source_id = 0.into();
1138        let schema = Schema {
1139            fields: vec![Field::with_name(DataType::Int32, "sequence_int")],
1140        };
1141        let row_id_index = None;
1142        let source_info = StreamSourceInfo {
1143            row_format: PbRowFormatType::Native as i32,
1144            ..Default::default()
1145        };
1146        let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1147        let column_ids = vec![0].into_iter().map(ColumnId::from).collect();
1148
1149        // This datagen will generate 3 rows at one time.
1150        let properties = convert_args!(btreemap!(
1151            "connector" => "datagen",
1152            "datagen.rows.per.second" => "3",
1153            "fields.sequence_int.kind" => "sequence",
1154            "fields.sequence_int.start" => "11",
1155            "fields.sequence_int.end" => "11111",
1156        ));
1157        let source_desc_builder =
1158            create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1159        let split_state_store = SourceStateTableHandler::from_table_catalog(
1160            &default_source_internal_table(0x2333),
1161            MemoryStateStore::new(),
1162        )
1163        .await;
1164        let core = StreamSourceCore::<MemoryStateStore> {
1165            source_id,
1166            column_ids,
1167            source_desc_builder: Some(source_desc_builder),
1168            latest_split_info: HashMap::new(),
1169            split_state_store,
1170            updated_splits_in_epoch: HashMap::new(),
1171            source_name: MOCK_SOURCE_NAME.to_owned(),
1172        };
1173
1174        let system_params_manager = LocalSystemParamsManager::for_test();
1175
1176        let executor = SourceExecutor::new(
1177            ActorContext::for_test(0),
1178            core,
1179            Arc::new(StreamingMetrics::unused()),
1180            barrier_rx,
1181            system_params_manager.get_params(),
1182            None,
1183            false,
1184            LocalBarrierManager::for_test(),
1185        );
1186        let mut executor = executor.boxed().execute();
1187
1188        let init_barrier =
1189            Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
1190                splits: hashmap! {
1191                    ActorId::default() => vec![
1192                        SplitImpl::Datagen(DatagenSplit {
1193                            split_index: 0,
1194                            split_num: 1,
1195                            start_offset: None,
1196                        }),
1197                    ],
1198                },
1199                ..Default::default()
1200            }));
1201        barrier_tx.send(init_barrier).unwrap();
1202
1203        // Consume barrier.
1204        executor.next().await.unwrap().unwrap();
1205
1206        // Consume data chunk.
1207        let msg = executor.next().await.unwrap().unwrap();
1208
1209        // Row id will not be filled here.
1210        assert_eq!(
1211            msg.into_chunk().unwrap(),
1212            StreamChunk::from_pretty(
1213                " i
1214                + 11
1215                + 12
1216                + 13"
1217            )
1218        );
1219    }
1220
1221    #[traced_test]
1222    #[tokio::test]
1223    async fn test_split_change_mutation() {
1224        let source_id = SourceId::new(0);
1225        let schema = Schema {
1226            fields: vec![Field::with_name(DataType::Int32, "v1")],
1227        };
1228        let row_id_index = None;
1229        let source_info = StreamSourceInfo {
1230            row_format: PbRowFormatType::Native as i32,
1231            ..Default::default()
1232        };
1233        let properties = convert_args!(btreemap!(
1234            "connector" => "datagen",
1235            "fields.v1.kind" => "sequence",
1236            "fields.v1.start" => "11",
1237            "fields.v1.end" => "11111",
1238        ));
1239
1240        let source_desc_builder =
1241            create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1242        let mem_state_store = MemoryStateStore::new();
1243
1244        let column_ids = vec![ColumnId::from(0)];
1245        let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1246        let split_state_store = SourceStateTableHandler::from_table_catalog(
1247            &default_source_internal_table(0x2333),
1248            mem_state_store.clone(),
1249        )
1250        .await;
1251
1252        let core = StreamSourceCore::<MemoryStateStore> {
1253            source_id,
1254            column_ids: column_ids.clone(),
1255            source_desc_builder: Some(source_desc_builder),
1256            latest_split_info: HashMap::new(),
1257            split_state_store,
1258            updated_splits_in_epoch: HashMap::new(),
1259            source_name: MOCK_SOURCE_NAME.to_owned(),
1260        };
1261
1262        let system_params_manager = LocalSystemParamsManager::for_test();
1263
1264        let executor = SourceExecutor::new(
1265            ActorContext::for_test(0),
1266            core,
1267            Arc::new(StreamingMetrics::unused()),
1268            barrier_rx,
1269            system_params_manager.get_params(),
1270            None,
1271            false,
1272            LocalBarrierManager::for_test(),
1273        );
1274        let mut handler = executor.boxed().execute();
1275
1276        let mut epoch = test_epoch(1);
1277        let init_barrier =
1278            Barrier::new_test_barrier(epoch).with_mutation(Mutation::Add(AddMutation {
1279                splits: hashmap! {
1280                    ActorId::default() => vec![
1281                        SplitImpl::Datagen(DatagenSplit {
1282                            split_index: 0,
1283                            split_num: 3,
1284                            start_offset: None,
1285                        }),
1286                    ],
1287                },
1288                ..Default::default()
1289            }));
1290        barrier_tx.send(init_barrier).unwrap();
1291
1292        // Consume barrier.
1293        handler
1294            .next()
1295            .await
1296            .unwrap()
1297            .unwrap()
1298            .into_barrier()
1299            .unwrap();
1300
1301        let mut ready_chunks = handler.ready_chunks(10);
1302
1303        let _ = ready_chunks.next().await.unwrap();
1304
1305        let new_assignment = vec![
1306            SplitImpl::Datagen(DatagenSplit {
1307                split_index: 0,
1308                split_num: 3,
1309                start_offset: None,
1310            }),
1311            SplitImpl::Datagen(DatagenSplit {
1312                split_index: 1,
1313                split_num: 3,
1314                start_offset: None,
1315            }),
1316            SplitImpl::Datagen(DatagenSplit {
1317                split_index: 2,
1318                split_num: 3,
1319                start_offset: None,
1320            }),
1321        ];
1322
1323        epoch.inc_epoch();
1324        let change_split_mutation =
1325            Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1326                ActorId::default() => new_assignment.clone()
1327            }));
1328
1329        barrier_tx.send(change_split_mutation).unwrap();
1330
1331        let _ = ready_chunks.next().await.unwrap(); // barrier
1332
1333        epoch.inc_epoch();
1334        let barrier = Barrier::new_test_barrier(epoch);
1335        barrier_tx.send(barrier).unwrap();
1336
1337        ready_chunks.next().await.unwrap(); // barrier
1338
1339        let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1340            &default_source_internal_table(0x2333),
1341            mem_state_store.clone(),
1342        )
1343        .await;
1344
1345        // there must exist state for new add partition
1346        source_state_handler
1347            .init_epoch(EpochPair::new_test_epoch(epoch))
1348            .await
1349            .unwrap();
1350        source_state_handler
1351            .get(&new_assignment[1].id())
1352            .await
1353            .unwrap()
1354            .unwrap();
1355
1356        tokio::time::sleep(Duration::from_millis(100)).await;
1357
1358        let _ = ready_chunks.next().await.unwrap();
1359
1360        epoch.inc_epoch();
1361        let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Pause);
1362        barrier_tx.send(barrier).unwrap();
1363
1364        epoch.inc_epoch();
1365        let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Resume);
1366        barrier_tx.send(barrier).unwrap();
1367
1368        // receive all
1369        ready_chunks.next().await.unwrap();
1370
1371        let prev_assignment = new_assignment;
1372        let new_assignment = vec![prev_assignment[2].clone()];
1373
1374        epoch.inc_epoch();
1375        let drop_split_mutation =
1376            Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1377                ActorId::default() => new_assignment.clone()
1378            }));
1379
1380        barrier_tx.send(drop_split_mutation).unwrap();
1381
1382        ready_chunks.next().await.unwrap(); // barrier
1383
1384        epoch.inc_epoch();
1385        let barrier = Barrier::new_test_barrier(epoch);
1386        barrier_tx.send(barrier).unwrap();
1387
1388        ready_chunks.next().await.unwrap(); // barrier
1389
1390        let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1391            &default_source_internal_table(0x2333),
1392            mem_state_store.clone(),
1393        )
1394        .await;
1395
1396        let new_epoch = EpochPair::new_test_epoch(epoch);
1397        source_state_handler.init_epoch(new_epoch).await.unwrap();
1398
1399        let committed_reader = source_state_handler
1400            .new_committed_reader(new_epoch)
1401            .await
1402            .unwrap();
1403        assert!(
1404            committed_reader
1405                .try_recover_from_state_store(&prev_assignment[0])
1406                .await
1407                .unwrap()
1408                .is_none()
1409        );
1410
1411        assert!(
1412            committed_reader
1413                .try_recover_from_state_store(&prev_assignment[1])
1414                .await
1415                .unwrap()
1416                .is_none()
1417        );
1418
1419        assert!(
1420            committed_reader
1421                .try_recover_from_state_store(&prev_assignment[2])
1422                .await
1423                .unwrap()
1424                .is_some()
1425        );
1426    }
1427}