risingwave_stream/executor/source/
source_executor.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use either::Either;
20use itertools::Itertools;
21use prometheus::core::{AtomicU64, GenericCounter};
22use risingwave_common::array::ArrayRef;
23use risingwave_common::catalog::{ColumnId, TableId};
24use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedMetric};
25use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::util::epoch::{Epoch, EpochPair};
28use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
29use risingwave_connector::source::cdc::split::extract_postgres_lsn_from_offset_str;
30use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
31use risingwave_connector::source::reader::reader::SourceReader;
32use risingwave_connector::source::{
33    ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData,
34    StreamChunkWithState, WaitCheckpointTask, build_pulsar_ack_channel_id,
35};
36use risingwave_hummock_sdk::HummockReadEpoch;
37use risingwave_pb::common::ThrottleType;
38use risingwave_pb::id::SourceId;
39use risingwave_storage::store::TryWaitEpochOptions;
40use thiserror_ext::AsReport;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
42use tokio::sync::{mpsc, oneshot};
43use tokio::time::Instant;
44
45use super::executor_core::StreamSourceCore;
46use super::{barrier_to_message_stream, get_split_offset_col_idx, prune_additional_cols};
47use crate::common::rate_limit::limited_chunk_size;
48use crate::executor::UpdateMutation;
49use crate::executor::prelude::*;
50use crate::executor::source::reader_stream::StreamReaderBuilder;
51use crate::executor::stream_reader::StreamReaderWithPause;
52use crate::task::LocalBarrierManager;
53
54/// A constant to multiply when calculating the maximum time to wait for a barrier. This is due to
55/// some latencies in network and cost in meta.
56pub const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;
57
58pub struct SourceExecutor<S: StateStore> {
59    actor_ctx: ActorContextRef,
60
61    /// Streaming source for external
62    stream_source_core: StreamSourceCore<S>,
63
64    /// Metrics for monitor.
65    metrics: Arc<StreamingMetrics>,
66
67    /// Receiver of barrier channel.
68    barrier_receiver: Option<UnboundedReceiver<Barrier>>,
69
70    /// System parameter reader to read barrier interval
71    system_params: SystemParamsReaderRef,
72
73    /// Rate limit in rows/s.
74    rate_limit_rps: Option<u32>,
75
76    is_shared_non_cdc: bool,
77
78    /// Local barrier manager for reporting source load finished events
79    barrier_manager: LocalBarrierManager,
80}
81
82impl<S: StateStore> SourceExecutor<S> {
83    #[expect(clippy::too_many_arguments)]
84    pub fn new(
85        actor_ctx: ActorContextRef,
86        stream_source_core: StreamSourceCore<S>,
87        metrics: Arc<StreamingMetrics>,
88        barrier_receiver: UnboundedReceiver<Barrier>,
89        system_params: SystemParamsReaderRef,
90        rate_limit_rps: Option<u32>,
91        is_shared_non_cdc: bool,
92        barrier_manager: LocalBarrierManager,
93    ) -> Self {
94        Self {
95            actor_ctx,
96            stream_source_core,
97            metrics,
98            barrier_receiver: Some(barrier_receiver),
99            system_params,
100            rate_limit_rps,
101            is_shared_non_cdc,
102            barrier_manager,
103        }
104    }
105
106    fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder {
107        StreamReaderBuilder {
108            source_desc,
109            rate_limit: self.rate_limit_rps,
110            source_id: self.stream_source_core.source_id,
111            source_name: self.stream_source_core.source_name.clone(),
112            is_auto_schema_change_enable: self.is_auto_schema_change_enable(),
113            actor_ctx: self.actor_ctx.clone(),
114            reader_stream: None,
115        }
116    }
117
118    async fn spawn_wait_checkpoint_worker(
119        core: &StreamSourceCore<S>,
120        source_reader: SourceReader,
121        metrics: Arc<StreamingMetrics>,
122    ) -> StreamExecutorResult<Option<WaitCheckpointTaskBuilder>> {
123        let Some(initial_task) = source_reader.create_wait_checkpoint_task().await? else {
124            return Ok(None);
125        };
126        let (wait_checkpoint_tx, wait_checkpoint_rx) = mpsc::unbounded_channel();
127        let wait_checkpoint_worker = WaitCheckpointWorker {
128            wait_checkpoint_rx,
129            state_store: core.split_state_store.state_table().state_store().clone(),
130            table_id: core.split_state_store.state_table().table_id(),
131            metrics,
132        };
133        tokio::spawn(wait_checkpoint_worker.run());
134        Ok(Some(WaitCheckpointTaskBuilder {
135            wait_checkpoint_tx,
136            source_reader,
137            building_task: initial_task,
138        }))
139    }
140
141    /// build the source column ids and the source context which will be used to build the source stream
142    pub fn prepare_source_stream_build(
143        &self,
144        source_desc: &SourceDesc,
145    ) -> (Vec<ColumnId>, SourceContext) {
146        let column_ids = source_desc
147            .columns
148            .iter()
149            .map(|column_desc| column_desc.column_id)
150            .collect_vec();
151
152        let (schema_change_tx, mut schema_change_rx) =
153            mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
154        let schema_change_tx = if self.is_auto_schema_change_enable() {
155            let meta_client = self.actor_ctx.meta_client.clone();
156            // spawn a task to handle schema change event from source parser
157            let _join_handle = tokio::task::spawn(async move {
158                while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
159                    let table_ids = schema_change.table_ids();
160                    tracing::info!(
161                        target: "auto_schema_change",
162                        "recv a schema change event for tables: {:?}", table_ids);
163                    // TODO: retry on rpc error
164                    if let Some(ref meta_client) = meta_client {
165                        match meta_client
166                            .auto_schema_change(schema_change.to_protobuf())
167                            .await
168                        {
169                            Ok(_) => {
170                                tracing::info!(
171                                    target: "auto_schema_change",
172                                    "schema change success for tables: {:?}", table_ids);
173                                finish_tx.send(()).unwrap();
174                            }
175                            Err(e) => {
176                                tracing::error!(
177                                    target: "auto_schema_change",
178                                    error = ?e.as_report(), "schema change error");
179                                finish_tx.send(()).unwrap();
180                            }
181                        }
182                    }
183                }
184            });
185            Some(schema_change_tx)
186        } else {
187            info!("auto schema change is disabled in config");
188            None
189        };
190        let source_ctx = SourceContext::new(
191            self.actor_ctx.id,
192            self.stream_source_core.source_id,
193            self.actor_ctx.fragment_id,
194            self.stream_source_core.source_name.clone(),
195            source_desc.metrics.clone(),
196            SourceCtrlOpts {
197                chunk_size: limited_chunk_size(self.rate_limit_rps),
198                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
199            },
200            source_desc.source.config.clone(),
201            schema_change_tx,
202        );
203
204        (column_ids, source_ctx)
205    }
206
207    fn is_auto_schema_change_enable(&self) -> bool {
208        self.actor_ctx.config.developer.enable_auto_schema_change
209    }
210
211    /// `source_id | source_name | actor_id | fragment_id`
212    #[inline]
213    fn get_metric_labels(&self) -> [String; 4] {
214        [
215            self.stream_source_core.source_id.to_string(),
216            self.stream_source_core.source_name.clone(),
217            self.actor_ctx.id.to_string(),
218            self.actor_ctx.fragment_id.to_string(),
219        ]
220    }
221
222    /// - `should_trim_state`: whether to trim state for dropped splits.
223    ///
224    ///   For scaling, the connector splits can be migrated to other actors, but
225    ///   won't be added or removed. Actors should not trim states for splits that
226    ///   are moved to other actors.
227    ///
228    ///   For source split change, split will not be migrated and we can trim states
229    ///   for deleted splits.
230    async fn apply_split_change_after_yield_barrier<const BIASED: bool>(
231        &mut self,
232        barrier_epoch: EpochPair,
233        source_desc: &SourceDesc,
234        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
235        apply_mutation: ApplyMutationAfterBarrier<'_>,
236    ) -> StreamExecutorResult<()> {
237        {
238            let mut should_rebuild_stream = false;
239            match apply_mutation {
240                ApplyMutationAfterBarrier::SplitChange {
241                    target_splits,
242                    should_trim_state,
243                    split_change_count,
244                } => {
245                    split_change_count.inc();
246                    if self
247                        .update_state_if_changed(barrier_epoch, target_splits, should_trim_state)
248                        .await?
249                    {
250                        should_rebuild_stream = true;
251                    }
252                }
253                ApplyMutationAfterBarrier::ConnectorPropsChange => {
254                    should_rebuild_stream = true;
255                }
256            }
257
258            if should_rebuild_stream {
259                self.rebuild_stream_reader(source_desc, stream)?;
260            }
261        }
262
263        Ok(())
264    }
265
266    /// Returns `true` if split changed. Otherwise `false`.
267    async fn update_state_if_changed(
268        &mut self,
269        barrier_epoch: EpochPair,
270        target_splits: Vec<SplitImpl>,
271        should_trim_state: bool,
272    ) -> StreamExecutorResult<bool> {
273        let core = &mut self.stream_source_core;
274
275        let target_splits: HashMap<_, _> = target_splits
276            .into_iter()
277            .map(|split| (split.id(), split))
278            .collect();
279
280        let mut target_state: HashMap<SplitId, SplitImpl> =
281            HashMap::with_capacity(target_splits.len());
282
283        let mut split_changed = false;
284
285        let committed_reader = core
286            .split_state_store
287            .new_committed_reader(barrier_epoch)
288            .await?;
289
290        // Checks added splits
291        for (split_id, split) in target_splits {
292            if let Some(s) = core.latest_split_info.get(&split_id) {
293                // For existing splits, we should use the latest offset from the cache.
294                // `target_splits` is from meta and contains the initial offset.
295                target_state.insert(split_id, s.clone());
296            } else {
297                split_changed = true;
298                // write new assigned split to state cache. snapshot is base on cache.
299
300                let initial_state = if let Some(recover_state) = committed_reader
301                    .try_recover_from_state_store(&split)
302                    .await?
303                {
304                    recover_state
305                } else {
306                    split
307                };
308
309                core.updated_splits_in_epoch
310                    .entry(split_id.clone())
311                    .or_insert_with(|| initial_state.clone());
312
313                target_state.insert(split_id, initial_state);
314            }
315        }
316
317        // Checks dropped splits
318        for existing_split_id in core.latest_split_info.keys() {
319            if !target_state.contains_key(existing_split_id) {
320                tracing::info!("split dropping detected: {}", existing_split_id);
321                split_changed = true;
322            }
323        }
324
325        if split_changed {
326            tracing::info!(
327                actor_id = %self.actor_ctx.id,
328                state = ?target_state,
329                "apply split change"
330            );
331
332            core.updated_splits_in_epoch
333                .retain(|split_id, _| target_state.contains_key(split_id));
334
335            let dropped_splits = core
336                .latest_split_info
337                .extract_if(|split_id, _| !target_state.contains_key(split_id))
338                .map(|(_, split)| split)
339                .collect_vec();
340
341            if should_trim_state && !dropped_splits.is_empty() {
342                // trim dropped splits' state
343                core.split_state_store.trim_state(&dropped_splits).await?;
344            }
345
346            core.latest_split_info = target_state;
347        }
348
349        Ok(split_changed)
350    }
351
352    /// Rebuild stream if there is a err in stream
353    fn rebuild_stream_reader_from_error<const BIASED: bool>(
354        &mut self,
355        source_desc: &SourceDesc,
356        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
357        e: StreamExecutorError,
358    ) -> StreamExecutorResult<()> {
359        let core = &mut self.stream_source_core;
360        tracing::error!(
361            error = ?e.as_report(),
362            actor_id = %self.actor_ctx.id,
363            source_id = %core.source_id,
364            "stream source reader error",
365        );
366        GLOBAL_ERROR_METRICS.user_source_error.report([
367            e.variant_name().to_owned(),
368            core.source_id.to_string(),
369            core.source_name.clone(),
370            self.actor_ctx.fragment_id.to_string(),
371        ]);
372
373        self.rebuild_stream_reader(source_desc, stream)
374    }
375
376    fn rebuild_stream_reader<const BIASED: bool>(
377        &mut self,
378        source_desc: &SourceDesc,
379        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
380    ) -> StreamExecutorResult<()> {
381        let core = &mut self.stream_source_core;
382        let target_state: Vec<SplitImpl> = core.latest_split_info.values().cloned().collect();
383
384        tracing::info!(
385            "actor {:?} apply source split change to {:?}",
386            self.actor_ctx.id,
387            target_state
388        );
389
390        // Replace the source reader with a new one of the new state.
391        let reader_stream_builder = self.stream_reader_builder(source_desc.clone());
392        let reader_stream = reader_stream_builder.into_retry_stream(Some(target_state), false);
393
394        stream.replace_data_stream(reader_stream);
395
396        Ok(())
397    }
398
399    async fn persist_state_and_clear_cache(
400        &mut self,
401        epoch: EpochPair,
402    ) -> StreamExecutorResult<HashMap<SplitId, SplitImpl>> {
403        let core = &mut self.stream_source_core;
404
405        let cache = core
406            .updated_splits_in_epoch
407            .values()
408            .map(|split_impl| split_impl.to_owned())
409            .collect_vec();
410
411        if !cache.is_empty() {
412            tracing::debug!(state = ?cache, "take snapshot");
413
414            // Record metrics for CDC sources before moving cache
415            let source_id = core.source_id.to_string();
416            for split_impl in &cache {
417                // Extract and record CDC-specific metrics based on split type
418                match split_impl {
419                    SplitImpl::PostgresCdc(pg_split) => {
420                        if let Some(lsn_value) = pg_split.pg_lsn() {
421                            self.metrics
422                                .pg_cdc_state_table_lsn
423                                .with_guarded_label_values(&[&source_id])
424                                .set(lsn_value as i64);
425                        }
426                    }
427                    SplitImpl::MysqlCdc(mysql_split) => {
428                        if let Some((file_seq, position)) = mysql_split.mysql_binlog_offset() {
429                            self.metrics
430                                .mysql_cdc_state_binlog_file_seq
431                                .with_guarded_label_values(&[&source_id])
432                                .set(file_seq as i64);
433
434                            self.metrics
435                                .mysql_cdc_state_binlog_position
436                                .with_guarded_label_values(&[&source_id])
437                                .set(position as i64);
438                        }
439                    }
440                    _ => {}
441                }
442            }
443
444            core.split_state_store.set_states(cache).await?;
445        }
446
447        // commit anyway, even if no message saved
448        core.split_state_store.commit(epoch).await?;
449
450        let updated_splits = core.updated_splits_in_epoch.clone();
451
452        core.updated_splits_in_epoch.clear();
453
454        Ok(updated_splits)
455    }
456
457    /// try mem table spill
458    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
459        let core = &mut self.stream_source_core;
460        core.split_state_store.try_flush().await?;
461
462        Ok(())
463    }
464
465    /// Report CDC source offset updated only the first time.
466    fn maybe_report_cdc_source_offset(
467        &self,
468        updated_splits: &HashMap<SplitId, SplitImpl>,
469        epoch: EpochPair,
470        source_id: SourceId,
471        must_report_cdc_offset_once: &mut bool,
472        must_wait_cdc_offset_before_report: bool,
473    ) {
474        // Report CDC source offset updated only the first time
475        if *must_report_cdc_offset_once
476            && (!must_wait_cdc_offset_before_report
477                || updated_splits
478                    .values()
479                    .any(|split| split.is_cdc_split() && !split.get_cdc_split_offset().is_empty()))
480        {
481            // Report only once, then ignore all subsequent offset updates
482            self.barrier_manager.report_cdc_source_offset_updated(
483                epoch,
484                self.actor_ctx.id,
485                source_id,
486            );
487            tracing::info!(
488                actor_id = %self.actor_ctx.id,
489                source_id = %source_id,
490                epoch = ?epoch,
491                "Reported CDC source offset updated to meta (first time only)"
492            );
493            // Mark as reported to prevent any future reports, even if offset changes
494            *must_report_cdc_offset_once = false;
495        }
496    }
497
498    /// A source executor with a stream source receives:
499    /// 1. Barrier messages
500    /// 2. Data from external source
501    /// and acts accordingly.
502    #[try_stream(ok = Message, error = StreamExecutorError)]
503    async fn execute_inner(mut self) {
504        let mut barrier_receiver = self.barrier_receiver.take().unwrap();
505        let first_barrier = barrier_receiver
506            .recv()
507            .instrument_await("source_recv_first_barrier")
508            .await
509            .ok_or_else(|| {
510                anyhow!(
511                    "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
512                    self.actor_ctx.id,
513                    self.stream_source_core.source_id
514                )
515            })?;
516        let first_epoch = first_barrier.epoch;
517        // must_report_cdc_offset is true if and only if the source is a CDC source.
518        // must_wait_cdc_offset_before_report is true if and only if the source is a MySQL or SQL Server CDC source.
519        let (mut boot_state, mut must_report_cdc_offset_once, must_wait_cdc_offset_before_report) =
520            if let Some(splits) = first_barrier.initial_split_assignment(self.actor_ctx.id) {
521                // CDC source must reach this branch.
522                tracing::debug!(?splits, "boot with splits");
523                // Skip report for non-CDC.
524                let must_report_cdc_offset_once = splits.iter().any(|split| split.is_cdc_split());
525                // Only for MySQL and SQL Server CDC, we need to wait for the offset to be non-empty before reporting.
526                let must_wait_cdc_offset_before_report = must_report_cdc_offset_once
527                    && splits.iter().any(|split| {
528                        matches!(split, SplitImpl::MysqlCdc(_) | SplitImpl::SqlServerCdc(_))
529                    });
530                (
531                    splits.to_vec(),
532                    must_report_cdc_offset_once,
533                    must_wait_cdc_offset_before_report,
534                )
535            } else {
536                (Vec::default(), true, true)
537            };
538        let is_pause_on_startup = first_barrier.is_pause_on_startup();
539        let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id);
540
541        yield Message::Barrier(first_barrier);
542
543        let mut core = self.stream_source_core;
544        let source_id = core.source_id;
545
546        // Build source description from the builder.
547        let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap();
548        let mut source_desc = source_desc_builder
549            .build()
550            .map_err(StreamExecutorError::connector_error)?;
551
552        let mut wait_checkpoint_task_builder = Self::spawn_wait_checkpoint_worker(
553            &core,
554            source_desc.source.clone(),
555            self.metrics.clone(),
556        )
557        .await?;
558
559        let (Some(split_idx), Some(offset_idx), pulsar_message_id_idx) =
560            get_split_offset_col_idx(&source_desc.columns)
561        else {
562            unreachable!("Partition and offset columns must be set.");
563        };
564
565        core.split_state_store.init_epoch(first_epoch).await?;
566        {
567            let committed_reader = core
568                .split_state_store
569                .new_committed_reader(first_epoch)
570                .await?;
571            for ele in &mut boot_state {
572                if let Some(recover_state) =
573                    committed_reader.try_recover_from_state_store(ele).await?
574                {
575                    *ele = recover_state;
576                    // if state store is non-empty, we consider it's initialized.
577                    is_uninitialized = false;
578                } else {
579                    // This is a new split, not in state table.
580                    // make sure it is written to state table later.
581                    // Then even it receives no messages, we can observe it in state table.
582                    core.updated_splits_in_epoch.insert(ele.id(), ele.clone());
583                }
584            }
585        }
586
587        // init in-memory split states with persisted state if any
588        core.init_split_state(boot_state.clone());
589
590        // Return the ownership of `stream_source_core` to the source executor.
591        self.stream_source_core = core;
592
593        let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
594        tracing::debug!(state = ?recover_state, "start with state");
595
596        let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
597        let mut reader_stream_builder = self.stream_reader_builder(source_desc.clone());
598        let mut latest_splits = None;
599        // Build the source stream reader.
600        if is_uninitialized {
601            let create_split_reader_result = reader_stream_builder
602                .fetch_latest_splits(recover_state.clone(), self.is_shared_non_cdc)
603                .await?;
604            latest_splits = create_split_reader_result.latest_splits;
605        }
606
607        if let Some(latest_splits) = latest_splits {
608            // make sure it is written to state table later.
609            // Then even it receives no messages, we can observe it in state table.
610            self.stream_source_core
611                .updated_splits_in_epoch
612                .extend(latest_splits.into_iter().map(|s| (s.id(), s)));
613        }
614        // Merge the chunks from source and the barriers into a single stream. We prioritize
615        // barriers over source data chunks here.
616        let mut stream = StreamReaderWithPause::<true, StreamChunkWithState>::new(
617            barrier_stream,
618            reader_stream_builder
619                .into_retry_stream(recover_state, is_uninitialized && self.is_shared_non_cdc),
620        );
621        let mut command_paused = false;
622
623        // - If the first barrier requires us to pause on startup, pause the stream.
624        if is_pause_on_startup {
625            tracing::info!("source paused on startup");
626            stream.pause_stream();
627            command_paused = true;
628        }
629
630        // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms`
631        // milliseconds, considering some other latencies like network and cost in Meta.
632        let mut max_wait_barrier_time_ms =
633            self.system_params.load().barrier_interval_ms() as u128 * WAIT_BARRIER_MULTIPLE_TIMES;
634        let mut last_barrier_time = Instant::now();
635        let mut self_paused = false;
636
637        let source_output_row_count = self
638            .metrics
639            .source_output_row_count
640            .with_guarded_label_values(&self.get_metric_labels());
641
642        let source_split_change_count = self
643            .metrics
644            .source_split_change_count
645            .with_guarded_label_values(&self.get_metric_labels());
646
647        while let Some(msg) = stream.next().await {
648            let Ok(msg) = msg else {
649                tokio::time::sleep(Duration::from_millis(1000)).await;
650                self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err())?;
651                continue;
652            };
653
654            match msg {
655                // This branch will be preferred.
656                Either::Left(Message::Barrier(barrier)) => {
657                    last_barrier_time = Instant::now();
658
659                    if self_paused {
660                        self_paused = false;
661                        // command_paused has a higher priority.
662                        if !command_paused {
663                            stream.resume_stream();
664                        }
665                    }
666
667                    let epoch = barrier.epoch;
668                    let mut split_change = None;
669
670                    if let Some(mutation) = barrier.mutation.as_deref() {
671                        match mutation {
672                            Mutation::Pause => {
673                                command_paused = true;
674                                stream.pause_stream()
675                            }
676                            Mutation::Resume => {
677                                command_paused = false;
678                                stream.resume_stream()
679                            }
680                            Mutation::SourceChangeSplit(actor_splits) => {
681                                tracing::info!(
682                                    actor_id = %self.actor_ctx.id,
683                                    actor_splits = ?actor_splits,
684                                    "source change split received"
685                                );
686
687                                split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
688                                    |target_splits| {
689                                        (
690                                            &source_desc,
691                                            &mut stream,
692                                            ApplyMutationAfterBarrier::SplitChange {
693                                                target_splits,
694                                                should_trim_state: true,
695                                                split_change_count: &source_split_change_count,
696                                            },
697                                        )
698                                    },
699                                );
700                            }
701
702                            Mutation::ConnectorPropsChange(maybe_mutation) => {
703                                if let Some(new_props) = maybe_mutation.get(&source_id.as_raw_id())
704                                {
705                                    // rebuild the stream reader with new props
706                                    tracing::info!(
707                                        actor_id = %self.actor_ctx.id,
708                                        source_id = %source_id,
709                                        "updating source connector properties",
710                                    );
711                                    source_desc.update_reader(new_props.clone())?;
712                                    // suppose the connector props change will not involve state change
713                                    split_change = Some((
714                                        &source_desc,
715                                        &mut stream,
716                                        ApplyMutationAfterBarrier::ConnectorPropsChange,
717                                    ));
718                                }
719                            }
720
721                            Mutation::Update(UpdateMutation { actor_splits, .. }) => {
722                                split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
723                                    |target_splits| {
724                                        (
725                                            &source_desc,
726                                            &mut stream,
727                                            ApplyMutationAfterBarrier::SplitChange {
728                                                target_splits,
729                                                should_trim_state: false,
730                                                split_change_count: &source_split_change_count,
731                                            },
732                                        )
733                                    },
734                                );
735                            }
736                            Mutation::Throttle(fragment_to_apply) => {
737                                if let Some(entry) =
738                                    fragment_to_apply.get(&self.actor_ctx.fragment_id)
739                                    && entry.throttle_type() == ThrottleType::Source
740                                    && entry.rate_limit != self.rate_limit_rps
741                                {
742                                    tracing::info!(
743                                        "updating rate limit from {:?} to {:?}",
744                                        self.rate_limit_rps,
745                                        entry.rate_limit
746                                    );
747                                    self.rate_limit_rps = entry.rate_limit;
748                                    // recreate from latest_split_info
749                                    self.rebuild_stream_reader(&source_desc, &mut stream)?;
750                                }
751                            }
752                            Mutation::ResetSource { source_id } => {
753                                // Note: RESET SOURCE only clears the offset, does NOT pause the source.
754                                // When offset is None, after recovery/restart, Debezium will automatically
755                                // enter recovery mode and fetch the latest offset from upstream.
756                                if *source_id == self.stream_source_core.source_id {
757                                    tracing::info!(
758                                        actor_id = %self.actor_ctx.id,
759                                        source_id = source_id.as_raw_id(),
760                                        "Resetting CDC source: clearing offset (set to None)"
761                                    );
762
763                                    // Step 1: Collect all current splits and clear their offsets
764                                    let splits_with_cleared_offset: Vec<SplitImpl> = self.stream_source_core
765                                        .latest_split_info
766                                        .values()
767                                        .map(|split| {
768                                            // Clone the split and clear its offset
769                                            let mut new_split = split.clone();
770                                            match &mut new_split {
771                                                SplitImpl::MysqlCdc(debezium_split) => {
772                                                    if let Some(mysql_split) = debezium_split.mysql_split.as_mut() {
773                                                        tracing::info!(
774                                                            split_id = ?mysql_split.inner.split_id,
775                                                            old_offset = ?mysql_split.inner.start_offset,
776                                                            "Clearing MySQL CDC offset"
777                                                        );
778                                                        mysql_split.inner.start_offset = None;
779                                                    }
780                                                }
781                                                SplitImpl::PostgresCdc(debezium_split) => {
782                                                    if let Some(pg_split) = debezium_split.postgres_split.as_mut() {
783                                                        tracing::info!(
784                                                            split_id = ?pg_split.inner.split_id,
785                                                            old_offset = ?pg_split.inner.start_offset,
786                                                            "Clearing PostgreSQL CDC offset"
787                                                        );
788                                                        pg_split.inner.start_offset = None;
789                                                    }
790                                                }
791                                                SplitImpl::MongodbCdc(debezium_split) => {
792                                                    if let Some(mongo_split) = debezium_split.mongodb_split.as_mut() {
793                                                        tracing::info!(
794                                                            split_id = ?mongo_split.inner.split_id,
795                                                            old_offset = ?mongo_split.inner.start_offset,
796                                                            "Clearing MongoDB CDC offset"
797                                                        );
798                                                        mongo_split.inner.start_offset = None;
799                                                    }
800                                                }
801                                                SplitImpl::CitusCdc(debezium_split) => {
802                                                    if let Some(citus_split) = debezium_split.citus_split.as_mut() {
803                                                        tracing::info!(
804                                                            split_id = ?citus_split.inner.split_id,
805                                                            old_offset = ?citus_split.inner.start_offset,
806                                                            "Clearing Citus CDC offset"
807                                                        );
808                                                        citus_split.inner.start_offset = None;
809                                                    }
810                                                }
811                                                SplitImpl::SqlServerCdc(debezium_split) => {
812                                                    if let Some(sqlserver_split) = debezium_split.sql_server_split.as_mut() {
813                                                        tracing::info!(
814                                                            split_id = ?sqlserver_split.inner.split_id,
815                                                            old_offset = ?sqlserver_split.inner.start_offset,
816                                                            "Clearing SQL Server CDC offset"
817                                                        );
818                                                        sqlserver_split.inner.start_offset = None;
819                                                    }
820                                                }
821                                                _ => {
822                                                    tracing::warn!(
823                                                        "RESET SOURCE called on non-CDC split type"
824                                                    );
825                                                }
826                                            }
827                                            new_split
828                                        })
829                                        .collect();
830
831                                    if !splits_with_cleared_offset.is_empty() {
832                                        tracing::info!(
833                                            actor_id = %self.actor_ctx.id,
834                                            split_count = splits_with_cleared_offset.len(),
835                                            "Updating state table with cleared offsets"
836                                        );
837
838                                        // Step 2: Write splits back to state table with offset = None
839                                        self.stream_source_core
840                                            .split_state_store
841                                            .set_states(splits_with_cleared_offset.clone())
842                                            .await?;
843
844                                        // Step 3: Update in-memory split info with cleared offsets
845                                        for split in splits_with_cleared_offset {
846                                            self.stream_source_core
847                                                .latest_split_info
848                                                .insert(split.id(), split.clone());
849                                            self.stream_source_core
850                                                .updated_splits_in_epoch
851                                                .insert(split.id(), split);
852                                        }
853
854                                        tracing::info!(
855                                            actor_id = %self.actor_ctx.id,
856                                            source_id = source_id.as_raw_id(),
857                                            "RESET SOURCE completed: offset cleared (set to None). \
858                                             Trigger recovery/restart to fetch latest offset from upstream."
859                                        );
860                                    } else {
861                                        tracing::warn!(
862                                            actor_id = %self.actor_ctx.id,
863                                            "No splits found to reset - source may not be initialized yet"
864                                        );
865                                    }
866                                } else {
867                                    tracing::debug!(
868                                        actor_id = %self.actor_ctx.id,
869                                        target_source_id = source_id.as_raw_id(),
870                                        current_source_id = self.stream_source_core.source_id.as_raw_id(),
871                                        "ResetSource mutation for different source, ignoring"
872                                    );
873                                }
874                            }
875                            _ => {}
876                        }
877                    }
878
879                    let updated_splits = self.persist_state_and_clear_cache(epoch).await?;
880
881                    self.maybe_report_cdc_source_offset(
882                        &updated_splits,
883                        epoch,
884                        source_id,
885                        &mut must_report_cdc_offset_once,
886                        must_wait_cdc_offset_before_report,
887                    );
888
889                    // when handle a checkpoint barrier, spawn a task to wait for epoch commit notification
890                    if barrier.kind.is_checkpoint()
891                        && let Some(task_builder) = &mut wait_checkpoint_task_builder
892                    {
893                        task_builder.update_task_on_checkpoint(updated_splits);
894
895                        tracing::debug!("epoch to wait {:?}", epoch);
896                        task_builder.send(Epoch(epoch.prev)).await?
897                    }
898
899                    let barrier_epoch = barrier.epoch;
900                    yield Message::Barrier(barrier);
901
902                    if let Some((source_desc, stream, to_apply_mutation)) = split_change {
903                        self.apply_split_change_after_yield_barrier(
904                            barrier_epoch,
905                            source_desc,
906                            stream,
907                            to_apply_mutation,
908                        )
909                        .await?;
910                    }
911                }
912                Either::Left(_) => {
913                    // For the source executor, the message we receive from this arm
914                    // should always be barrier message.
915                    unreachable!();
916                }
917
918                Either::Right((chunk, latest_state)) => {
919                    if let Some(task_builder) = &mut wait_checkpoint_task_builder {
920                        if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
921                            let pulsar_message_id_col = chunk.column_at(pulsar_message_id_idx);
922                            task_builder.update_task_on_chunk(
923                                source_id,
924                                &latest_state,
925                                pulsar_message_id_col.clone(),
926                            );
927                        } else {
928                            let offset_col = chunk.column_at(offset_idx);
929                            task_builder.update_task_on_chunk(
930                                source_id,
931                                &latest_state,
932                                offset_col.clone(),
933                            );
934                        }
935                    }
936                    if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
937                        // Exceeds the max wait barrier time, the source will be paused.
938                        // Currently we can guarantee the
939                        // source is not paused since it received stream
940                        // chunks.
941                        self_paused = true;
942                        tracing::warn!(
943                            "source paused, wait barrier for {:?}",
944                            last_barrier_time.elapsed()
945                        );
946                        stream.pause_stream();
947
948                        // Only update `max_wait_barrier_time_ms` to capture
949                        // `barrier_interval_ms`
950                        // changes here to avoid frequently accessing the shared
951                        // `system_params`.
952                        max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
953                            as u128
954                            * WAIT_BARRIER_MULTIPLE_TIMES;
955                    }
956
957                    latest_state.iter().for_each(|(split_id, new_split_impl)| {
958                        if let Some(split_impl) =
959                            self.stream_source_core.latest_split_info.get_mut(split_id)
960                        {
961                            *split_impl = new_split_impl.clone();
962                        }
963                    });
964
965                    self.stream_source_core
966                        .updated_splits_in_epoch
967                        .extend(latest_state);
968
969                    let card = chunk.cardinality();
970                    if card == 0 {
971                        continue;
972                    }
973                    source_output_row_count.inc_by(card as u64);
974                    let to_remove_col_indices =
975                        if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
976                            vec![split_idx, offset_idx, pulsar_message_id_idx]
977                        } else {
978                            vec![split_idx, offset_idx]
979                        };
980                    let chunk =
981                        prune_additional_cols(&chunk, &to_remove_col_indices, &source_desc.columns);
982                    yield Message::Chunk(chunk);
983                    self.try_flush_data().await?;
984                }
985            }
986        }
987
988        // The source executor should only be stopped by the actor when finding a `Stop` mutation.
989        tracing::error!(
990            actor_id = %self.actor_ctx.id,
991            "source executor exited unexpectedly"
992        )
993    }
994}
995
996#[derive(Debug, Clone)]
997enum ApplyMutationAfterBarrier<'a> {
998    SplitChange {
999        target_splits: Vec<SplitImpl>,
1000        should_trim_state: bool,
1001        split_change_count: &'a LabelGuardedMetric<GenericCounter<AtomicU64>>,
1002    },
1003    ConnectorPropsChange,
1004}
1005
1006impl<S: StateStore> Execute for SourceExecutor<S> {
1007    fn execute(self: Box<Self>) -> BoxedMessageStream {
1008        self.execute_inner().boxed()
1009    }
1010}
1011
1012impl<S: StateStore> Debug for SourceExecutor<S> {
1013    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1014        f.debug_struct("SourceExecutor")
1015            .field("source_id", &self.stream_source_core.source_id)
1016            .field("column_ids", &self.stream_source_core.column_ids)
1017            .finish()
1018    }
1019}
1020
1021struct WaitCheckpointTaskBuilder {
1022    wait_checkpoint_tx: UnboundedSender<(Epoch, WaitCheckpointTask)>,
1023    source_reader: SourceReader,
1024    building_task: WaitCheckpointTask,
1025}
1026
1027impl WaitCheckpointTaskBuilder {
1028    fn update_task_on_chunk(
1029        &mut self,
1030        source_id: SourceId,
1031        latest_state: &HashMap<SplitId, SplitImpl>,
1032        offset_col: ArrayRef,
1033    ) {
1034        match &mut self.building_task {
1035            WaitCheckpointTask::AckPubsubMessage(_, arrays) => {
1036                arrays.push(offset_col);
1037            }
1038            WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => {
1039                arrays.push(offset_col);
1040            }
1041            WaitCheckpointTask::AckPulsarMessage(arrays) => {
1042                // each pulsar chunk will only contain one split
1043                let split_id = latest_state.keys().next().unwrap();
1044                let pulsar_ack_channel_id = build_pulsar_ack_channel_id(source_id, split_id);
1045                arrays.push((pulsar_ack_channel_id, offset_col));
1046            }
1047            WaitCheckpointTask::CommitCdcOffset(_) => {}
1048        }
1049    }
1050
1051    fn update_task_on_checkpoint(&mut self, updated_splits: HashMap<SplitId, SplitImpl>) {
1052        #[expect(clippy::single_match)]
1053        match &mut self.building_task {
1054            WaitCheckpointTask::CommitCdcOffset(offsets) => {
1055                if !updated_splits.is_empty() {
1056                    // cdc source only has one split
1057                    assert_eq!(1, updated_splits.len());
1058                    for (split_id, split_impl) in updated_splits {
1059                        if split_impl.is_cdc_split() {
1060                            *offsets = Some((split_id, split_impl.get_cdc_split_offset()));
1061                        } else {
1062                            unreachable!()
1063                        }
1064                    }
1065                }
1066            }
1067            _ => {}
1068        }
1069    }
1070
1071    /// Send and reset the building task to a new one.
1072    async fn send(&mut self, epoch: Epoch) -> Result<(), anyhow::Error> {
1073        let new_task = self
1074            .source_reader
1075            .create_wait_checkpoint_task()
1076            .await?
1077            .expect("wait checkpoint task should be created");
1078        self.wait_checkpoint_tx
1079            .send((epoch, std::mem::replace(&mut self.building_task, new_task)))
1080            .expect("wait_checkpoint_tx send should succeed");
1081        Ok(())
1082    }
1083}
1084
1085/// A worker used to do some work after each checkpoint epoch is committed.
1086///
1087/// # Usage Cases
1088///
1089/// Typically there are 2 issues related with ack on checkpoint:
1090///
1091/// 1. Correctness (at-least-once), or don't let upstream clean uncommitted data.
1092///    For message queueing semantics (delete after ack), we should ack to avoid redelivery,
1093///    and only ack after checkpoint to avoid data loss.
1094///
1095/// 2. Allow upstream to clean data after commit.
1096///
1097/// See also <https://github.com/risingwavelabs/risingwave/issues/16736#issuecomment-2109379790>
1098///
1099/// ## CDC
1100///
1101/// Commit last consumed offset to upstream DB, so that old data can be discarded.
1102///
1103/// ## Google Pub/Sub
1104///
1105/// Due to queueing semantics.
1106/// Although Pub/Sub supports `retain_acked_messages` and `seek` functionality,
1107/// it's quite limited unlike Kafka.
1108///
1109/// See also <https://cloud.google.com/pubsub/docs/subscribe-best-practices#process-messages>
1110struct WaitCheckpointWorker<S: StateStore> {
1111    wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>,
1112    state_store: S,
1113    table_id: TableId,
1114    metrics: Arc<StreamingMetrics>,
1115}
1116
1117impl<S: StateStore> WaitCheckpointWorker<S> {
1118    pub async fn run(mut self) {
1119        tracing::debug!("wait epoch worker start success");
1120        loop {
1121            // poll the rx and wait for the epoch commit
1122            match self.wait_checkpoint_rx.recv().await {
1123                Some((epoch, task)) => {
1124                    tracing::debug!("start to wait epoch {}", epoch.0);
1125                    let ret = self
1126                        .state_store
1127                        .try_wait_epoch(
1128                            HummockReadEpoch::Committed(epoch.0),
1129                            TryWaitEpochOptions {
1130                                table_id: self.table_id,
1131                            },
1132                        )
1133                        .await;
1134
1135                    match ret {
1136                        Ok(()) => {
1137                            tracing::debug!(epoch = epoch.0, "wait epoch success");
1138
1139                            // Run task with callback to record LSN after successful commit
1140                            task.run_with_on_commit_success(|source_id: u64, offset| {
1141                                if let Some(lsn_value) =
1142                                    extract_postgres_lsn_from_offset_str(offset)
1143                                {
1144                                    self.metrics
1145                                        .pg_cdc_jni_commit_offset_lsn
1146                                        .with_guarded_label_values(&[&source_id.to_string()])
1147                                        .set(lsn_value as i64);
1148                                }
1149                            })
1150                            .await;
1151                        }
1152                        Err(e) => {
1153                            tracing::error!(
1154                            error = %e.as_report(),
1155                            "wait epoch {} failed", epoch.0
1156                            );
1157                        }
1158                    }
1159                }
1160                None => {
1161                    tracing::error!("wait epoch rx closed");
1162                    break;
1163                }
1164            }
1165        }
1166    }
1167}
1168
1169#[cfg(test)]
1170mod tests {
1171    use maplit::{btreemap, convert_args, hashmap};
1172    use risingwave_common::catalog::{ColumnId, Field};
1173    use risingwave_common::id::SourceId;
1174    use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
1175    use risingwave_common::test_prelude::StreamChunkTestExt;
1176    use risingwave_common::util::epoch::{EpochExt, test_epoch};
1177    use risingwave_connector::source::datagen::DatagenSplit;
1178    use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
1179    use risingwave_pb::catalog::StreamSourceInfo;
1180    use risingwave_pb::plan_common::PbRowFormatType;
1181    use risingwave_storage::memory::MemoryStateStore;
1182    use tokio::sync::mpsc::unbounded_channel;
1183    use tracing_test::traced_test;
1184
1185    use super::*;
1186    use crate::executor::AddMutation;
1187    use crate::executor::source::{SourceStateTableHandler, default_source_internal_table};
1188    use crate::task::LocalBarrierManager;
1189
1190    const MOCK_SOURCE_NAME: &str = "mock_source";
1191
1192    #[tokio::test]
1193    async fn test_source_executor() {
1194        let source_id = 0.into();
1195        let schema = Schema {
1196            fields: vec![Field::with_name(DataType::Int32, "sequence_int")],
1197        };
1198        let row_id_index = None;
1199        let source_info = StreamSourceInfo {
1200            row_format: PbRowFormatType::Native as i32,
1201            ..Default::default()
1202        };
1203        let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1204        let column_ids = vec![0].into_iter().map(ColumnId::from).collect();
1205
1206        // This datagen will generate 3 rows at one time.
1207        let properties = convert_args!(btreemap!(
1208            "connector" => "datagen",
1209            "datagen.rows.per.second" => "3",
1210            "fields.sequence_int.kind" => "sequence",
1211            "fields.sequence_int.start" => "11",
1212            "fields.sequence_int.end" => "11111",
1213        ));
1214        let source_desc_builder =
1215            create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1216        let split_state_store = SourceStateTableHandler::from_table_catalog(
1217            &default_source_internal_table(0x2333),
1218            MemoryStateStore::new(),
1219        )
1220        .await;
1221        let core = StreamSourceCore::<MemoryStateStore> {
1222            source_id,
1223            column_ids,
1224            source_desc_builder: Some(source_desc_builder),
1225            latest_split_info: HashMap::new(),
1226            split_state_store,
1227            updated_splits_in_epoch: HashMap::new(),
1228            source_name: MOCK_SOURCE_NAME.to_owned(),
1229        };
1230
1231        let system_params_manager = LocalSystemParamsManager::for_test();
1232
1233        let executor = SourceExecutor::new(
1234            ActorContext::for_test(0),
1235            core,
1236            Arc::new(StreamingMetrics::unused()),
1237            barrier_rx,
1238            system_params_manager.get_params(),
1239            None,
1240            false,
1241            LocalBarrierManager::for_test(),
1242        );
1243        let mut executor = executor.boxed().execute();
1244
1245        let init_barrier =
1246            Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
1247                splits: hashmap! {
1248                    ActorId::default() => vec![
1249                        SplitImpl::Datagen(DatagenSplit {
1250                            split_index: 0,
1251                            split_num: 1,
1252                            start_offset: None,
1253                        }),
1254                    ],
1255                },
1256                ..Default::default()
1257            }));
1258        barrier_tx.send(init_barrier).unwrap();
1259
1260        // Consume barrier.
1261        executor.next().await.unwrap().unwrap();
1262
1263        // Consume data chunk.
1264        let msg = executor.next().await.unwrap().unwrap();
1265
1266        // Row id will not be filled here.
1267        assert_eq!(
1268            msg.into_chunk().unwrap(),
1269            StreamChunk::from_pretty(
1270                " i
1271                + 11
1272                + 12
1273                + 13"
1274            )
1275        );
1276    }
1277
1278    #[traced_test]
1279    #[tokio::test]
1280    async fn test_split_change_mutation() {
1281        let source_id = SourceId::new(0);
1282        let schema = Schema {
1283            fields: vec![Field::with_name(DataType::Int32, "v1")],
1284        };
1285        let row_id_index = None;
1286        let source_info = StreamSourceInfo {
1287            row_format: PbRowFormatType::Native as i32,
1288            ..Default::default()
1289        };
1290        let properties = convert_args!(btreemap!(
1291            "connector" => "datagen",
1292            "fields.v1.kind" => "sequence",
1293            "fields.v1.start" => "11",
1294            "fields.v1.end" => "11111",
1295        ));
1296
1297        let source_desc_builder =
1298            create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1299        let mem_state_store = MemoryStateStore::new();
1300
1301        let column_ids = vec![ColumnId::from(0)];
1302        let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1303        let split_state_store = SourceStateTableHandler::from_table_catalog(
1304            &default_source_internal_table(0x2333),
1305            mem_state_store.clone(),
1306        )
1307        .await;
1308
1309        let core = StreamSourceCore::<MemoryStateStore> {
1310            source_id,
1311            column_ids: column_ids.clone(),
1312            source_desc_builder: Some(source_desc_builder),
1313            latest_split_info: HashMap::new(),
1314            split_state_store,
1315            updated_splits_in_epoch: HashMap::new(),
1316            source_name: MOCK_SOURCE_NAME.to_owned(),
1317        };
1318
1319        let system_params_manager = LocalSystemParamsManager::for_test();
1320
1321        let executor = SourceExecutor::new(
1322            ActorContext::for_test(0),
1323            core,
1324            Arc::new(StreamingMetrics::unused()),
1325            barrier_rx,
1326            system_params_manager.get_params(),
1327            None,
1328            false,
1329            LocalBarrierManager::for_test(),
1330        );
1331        let mut handler = executor.boxed().execute();
1332
1333        let mut epoch = test_epoch(1);
1334        let init_barrier =
1335            Barrier::new_test_barrier(epoch).with_mutation(Mutation::Add(AddMutation {
1336                splits: hashmap! {
1337                    ActorId::default() => vec![
1338                        SplitImpl::Datagen(DatagenSplit {
1339                            split_index: 0,
1340                            split_num: 3,
1341                            start_offset: None,
1342                        }),
1343                    ],
1344                },
1345                ..Default::default()
1346            }));
1347        barrier_tx.send(init_barrier).unwrap();
1348
1349        // Consume barrier.
1350        handler
1351            .next()
1352            .await
1353            .unwrap()
1354            .unwrap()
1355            .into_barrier()
1356            .unwrap();
1357
1358        let mut ready_chunks = handler.ready_chunks(10);
1359
1360        let _ = ready_chunks.next().await.unwrap();
1361
1362        let new_assignment = vec![
1363            SplitImpl::Datagen(DatagenSplit {
1364                split_index: 0,
1365                split_num: 3,
1366                start_offset: None,
1367            }),
1368            SplitImpl::Datagen(DatagenSplit {
1369                split_index: 1,
1370                split_num: 3,
1371                start_offset: None,
1372            }),
1373            SplitImpl::Datagen(DatagenSplit {
1374                split_index: 2,
1375                split_num: 3,
1376                start_offset: None,
1377            }),
1378        ];
1379
1380        epoch.inc_epoch();
1381        let change_split_mutation =
1382            Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1383                ActorId::default() => new_assignment.clone()
1384            }));
1385
1386        barrier_tx.send(change_split_mutation).unwrap();
1387
1388        let _ = ready_chunks.next().await.unwrap(); // barrier
1389
1390        epoch.inc_epoch();
1391        let barrier = Barrier::new_test_barrier(epoch);
1392        barrier_tx.send(barrier).unwrap();
1393
1394        ready_chunks.next().await.unwrap(); // barrier
1395
1396        let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1397            &default_source_internal_table(0x2333),
1398            mem_state_store.clone(),
1399        )
1400        .await;
1401
1402        // there must exist state for new add partition
1403        source_state_handler
1404            .init_epoch(EpochPair::new_test_epoch(epoch))
1405            .await
1406            .unwrap();
1407        source_state_handler
1408            .get(&new_assignment[1].id())
1409            .await
1410            .unwrap()
1411            .unwrap();
1412
1413        tokio::time::sleep(Duration::from_millis(100)).await;
1414
1415        let _ = ready_chunks.next().await.unwrap();
1416
1417        epoch.inc_epoch();
1418        let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Pause);
1419        barrier_tx.send(barrier).unwrap();
1420
1421        epoch.inc_epoch();
1422        let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Resume);
1423        barrier_tx.send(barrier).unwrap();
1424
1425        // receive all
1426        ready_chunks.next().await.unwrap();
1427
1428        let prev_assignment = new_assignment;
1429        let new_assignment = vec![prev_assignment[2].clone()];
1430
1431        epoch.inc_epoch();
1432        let drop_split_mutation =
1433            Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1434                ActorId::default() => new_assignment.clone()
1435            }));
1436
1437        barrier_tx.send(drop_split_mutation).unwrap();
1438
1439        ready_chunks.next().await.unwrap(); // barrier
1440
1441        epoch.inc_epoch();
1442        let barrier = Barrier::new_test_barrier(epoch);
1443        barrier_tx.send(barrier).unwrap();
1444
1445        ready_chunks.next().await.unwrap(); // barrier
1446
1447        let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1448            &default_source_internal_table(0x2333),
1449            mem_state_store.clone(),
1450        )
1451        .await;
1452
1453        let new_epoch = EpochPair::new_test_epoch(epoch);
1454        source_state_handler.init_epoch(new_epoch).await.unwrap();
1455
1456        let committed_reader = source_state_handler
1457            .new_committed_reader(new_epoch)
1458            .await
1459            .unwrap();
1460        assert!(
1461            committed_reader
1462                .try_recover_from_state_store(&prev_assignment[0])
1463                .await
1464                .unwrap()
1465                .is_none()
1466        );
1467
1468        assert!(
1469            committed_reader
1470                .try_recover_from_state_store(&prev_assignment[1])
1471                .await
1472                .unwrap()
1473                .is_none()
1474        );
1475
1476        assert!(
1477            committed_reader
1478                .try_recover_from_state_store(&prev_assignment[2])
1479                .await
1480                .unwrap()
1481                .is_some()
1482        );
1483    }
1484}