risingwave_stream/executor/source/
source_executor.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::time::Duration;
17
18use anyhow::anyhow;
19use either::Either;
20use itertools::Itertools;
21use prometheus::core::{AtomicU64, GenericCounter};
22use risingwave_common::array::ArrayRef;
23use risingwave_common::catalog::TableId;
24use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedMetric};
25use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::types::JsonbVal;
28use risingwave_common::util::epoch::{Epoch, EpochPair};
29use risingwave_connector::source::cdc::split::{
30    extract_postgres_lsn_from_offset_str, extract_sql_server_commit_lsn_from_offset_str,
31};
32use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
33use risingwave_connector::source::reader::reader::SourceReader;
34use risingwave_connector::source::{
35    ConnectorState, SplitId, SplitImpl, SplitMetaData, WaitCheckpointTask,
36    build_pulsar_ack_channel_id,
37};
38use risingwave_hummock_sdk::HummockReadEpoch;
39use risingwave_pb::common::ThrottleType;
40use risingwave_pb::id::SourceId;
41use risingwave_storage::store::TryWaitEpochOptions;
42use thiserror_ext::AsReport;
43use tokio::sync::mpsc;
44use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
45use tokio::time::Instant;
46
47use super::executor_core::StreamSourceCore;
48use super::{barrier_to_message_stream, get_split_offset_col_idx, prune_additional_cols};
49use crate::executor::UpdateMutation;
50use crate::executor::prelude::*;
51use crate::executor::source::reader_stream::{SourceReaderEventWithState, StreamReaderBuilder};
52use crate::executor::stream_reader::StreamReaderWithPause;
53use crate::task::LocalBarrierManager;
54
55/// A constant to multiply when calculating the maximum time to wait for a barrier. This is due to
56/// some latencies in network and cost in meta.
57pub const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;
58
59fn lsn_u128_to_i64(lsn: u128) -> i64 {
60    lsn.min(i64::MAX as u128) as i64
61}
62
63pub struct SourceExecutor<S: StateStore> {
64    actor_ctx: ActorContextRef,
65
66    /// Streaming source for external
67    stream_source_core: StreamSourceCore<S>,
68
69    /// Metrics for monitor.
70    metrics: Arc<StreamingMetrics>,
71
72    /// Receiver of barrier channel.
73    barrier_receiver: Option<UnboundedReceiver<Barrier>>,
74
75    /// System parameter reader to read barrier interval
76    system_params: SystemParamsReaderRef,
77
78    /// Rate limit in rows/s.
79    rate_limit_rps: Option<u32>,
80
81    is_shared_non_cdc: bool,
82
83    /// Local barrier manager for reporting source load finished events
84    barrier_manager: LocalBarrierManager,
85}
86
87impl<S: StateStore> SourceExecutor<S> {
88    #[expect(clippy::too_many_arguments)]
89    pub fn new(
90        actor_ctx: ActorContextRef,
91        stream_source_core: StreamSourceCore<S>,
92        metrics: Arc<StreamingMetrics>,
93        barrier_receiver: UnboundedReceiver<Barrier>,
94        system_params: SystemParamsReaderRef,
95        rate_limit_rps: Option<u32>,
96        is_shared_non_cdc: bool,
97        barrier_manager: LocalBarrierManager,
98    ) -> Self {
99        Self {
100            actor_ctx,
101            stream_source_core,
102            metrics,
103            barrier_receiver: Some(barrier_receiver),
104            system_params,
105            rate_limit_rps,
106            is_shared_non_cdc,
107            barrier_manager,
108        }
109    }
110
111    fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder {
112        StreamReaderBuilder {
113            source_desc,
114            rate_limit: self.rate_limit_rps,
115            source_id: self.stream_source_core.source_id,
116            source_name: self.stream_source_core.source_name.clone(),
117            is_auto_schema_change_enable: self.is_auto_schema_change_enable(),
118            actor_ctx: self.actor_ctx.clone(),
119            reader_stream: None,
120        }
121    }
122
123    async fn spawn_wait_checkpoint_worker(
124        core: &StreamSourceCore<S>,
125        source_reader: SourceReader,
126        metrics: Arc<StreamingMetrics>,
127    ) -> StreamExecutorResult<Option<WaitCheckpointTaskBuilder>> {
128        let Some(initial_task) = source_reader.create_wait_checkpoint_task().await? else {
129            return Ok(None);
130        };
131        let (wait_checkpoint_tx, wait_checkpoint_rx) = mpsc::unbounded_channel();
132        let wait_checkpoint_worker = WaitCheckpointWorker {
133            wait_checkpoint_rx,
134            state_store: core.split_state_store.state_table().state_store().clone(),
135            table_id: core.split_state_store.state_table().table_id(),
136            source_id: core.source_id,
137            source_name: core.source_name.clone(),
138            metrics,
139        };
140        tokio::spawn(wait_checkpoint_worker.run());
141        Ok(Some(WaitCheckpointTaskBuilder {
142            wait_checkpoint_tx,
143            building_task: initial_task,
144        }))
145    }
146
147    fn is_auto_schema_change_enable(&self) -> bool {
148        self.actor_ctx.config.developer.enable_auto_schema_change
149    }
150
151    /// `source_id | source_name | actor_id | fragment_id`
152    #[inline]
153    fn get_metric_labels(&self) -> [String; 4] {
154        [
155            self.stream_source_core.source_id.to_string(),
156            self.stream_source_core.source_name.clone(),
157            self.actor_ctx.id.to_string(),
158            self.actor_ctx.fragment_id.to_string(),
159        ]
160    }
161
162    /// - `should_trim_state`: whether to trim state for dropped splits.
163    ///
164    ///   For scaling, the connector splits can be migrated to other actors, but
165    ///   won't be added or removed. Actors should not trim states for splits that
166    ///   are moved to other actors.
167    ///
168    ///   For source split change, split will not be migrated and we can trim states
169    ///   for deleted splits.
170    async fn apply_split_change_after_yield_barrier<const BIASED: bool>(
171        &mut self,
172        barrier_epoch: EpochPair,
173        source_desc: &SourceDesc,
174        stream: &mut StreamReaderWithPause<BIASED, SourceReaderEventWithState>,
175        apply_mutation: ApplyMutationAfterBarrier<'_>,
176    ) -> StreamExecutorResult<()> {
177        {
178            let mut should_rebuild_stream = false;
179            match apply_mutation {
180                ApplyMutationAfterBarrier::SplitChange {
181                    target_splits,
182                    should_trim_state,
183                    split_change_count,
184                } => {
185                    split_change_count.inc();
186                    if self
187                        .update_state_if_changed(barrier_epoch, target_splits, should_trim_state)
188                        .await?
189                    {
190                        should_rebuild_stream = true;
191                    }
192                }
193                ApplyMutationAfterBarrier::ConnectorPropsChange => {
194                    should_rebuild_stream = true;
195                }
196            }
197
198            if should_rebuild_stream {
199                self.rebuild_stream_reader(source_desc, stream)?;
200            }
201        }
202
203        Ok(())
204    }
205
206    /// Returns `true` if split changed. Otherwise `false`.
207    async fn update_state_if_changed(
208        &mut self,
209        barrier_epoch: EpochPair,
210        target_splits: Vec<SplitImpl>,
211        should_trim_state: bool,
212    ) -> StreamExecutorResult<bool> {
213        let core = &mut self.stream_source_core;
214
215        let target_splits: HashMap<_, _> = target_splits
216            .into_iter()
217            .map(|split| (split.id(), split))
218            .collect();
219
220        let mut target_state: HashMap<SplitId, SplitImpl> =
221            HashMap::with_capacity(target_splits.len());
222
223        let mut split_changed = false;
224
225        let committed_reader = core
226            .split_state_store
227            .new_committed_reader(barrier_epoch)
228            .await?;
229
230        // Checks added splits
231        for (split_id, split) in target_splits {
232            if let Some(s) = core.latest_split_info.get(&split_id) {
233                // For existing splits, we should use the latest offset from the cache.
234                // `target_splits` is from meta and contains the initial offset.
235                target_state.insert(split_id, s.clone());
236            } else {
237                split_changed = true;
238                // write new assigned split to state cache. snapshot is base on cache.
239
240                let initial_state = if let Some(recover_state) = committed_reader
241                    .try_recover_from_state_store(&split)
242                    .await?
243                {
244                    recover_state
245                } else {
246                    split
247                };
248
249                core.updated_splits_in_epoch
250                    .entry(split_id.clone())
251                    .or_insert_with(|| initial_state.clone());
252
253                target_state.insert(split_id, initial_state);
254            }
255        }
256
257        // Checks dropped splits
258        for existing_split_id in core.latest_split_info.keys() {
259            if !target_state.contains_key(existing_split_id) {
260                tracing::info!("split dropping detected: {}", existing_split_id);
261                split_changed = true;
262            }
263        }
264
265        if split_changed {
266            tracing::info!(
267                actor_id = %self.actor_ctx.id,
268                state = ?target_state,
269                "apply split change"
270            );
271
272            core.updated_splits_in_epoch
273                .retain(|split_id, _| target_state.contains_key(split_id));
274
275            let dropped_splits = core
276                .latest_split_info
277                .extract_if(|split_id, _| !target_state.contains_key(split_id))
278                .map(|(_, split)| split)
279                .collect_vec();
280
281            if should_trim_state && !dropped_splits.is_empty() {
282                // trim dropped splits' state
283                core.split_state_store.trim_state(&dropped_splits).await?;
284            }
285
286            core.latest_split_info = target_state;
287        }
288
289        Ok(split_changed)
290    }
291
292    /// Rebuild stream if there is a err in stream
293    fn rebuild_stream_reader_from_error<const BIASED: bool>(
294        &mut self,
295        source_desc: &SourceDesc,
296        stream: &mut StreamReaderWithPause<BIASED, SourceReaderEventWithState>,
297        e: StreamExecutorError,
298    ) -> StreamExecutorResult<()> {
299        let core = &mut self.stream_source_core;
300        tracing::error!(
301            error = ?e.as_report(),
302            actor_id = %self.actor_ctx.id,
303            source_id = %core.source_id,
304            "stream source reader error",
305        );
306        GLOBAL_ERROR_METRICS.user_source_error.report([
307            e.variant_name().to_owned(),
308            core.source_id.to_string(),
309            core.source_name.clone(),
310            self.actor_ctx.fragment_id.to_string(),
311        ]);
312
313        self.rebuild_stream_reader(source_desc, stream)
314    }
315
316    fn rebuild_stream_reader<const BIASED: bool>(
317        &mut self,
318        source_desc: &SourceDesc,
319        stream: &mut StreamReaderWithPause<BIASED, SourceReaderEventWithState>,
320    ) -> StreamExecutorResult<()> {
321        let core = &mut self.stream_source_core;
322        let target_state: Vec<SplitImpl> = core.latest_split_info.values().cloned().collect();
323
324        tracing::info!(
325            "actor {:?} apply source split change to {:?}",
326            self.actor_ctx.id,
327            target_state
328        );
329
330        // Replace the source reader with a new one of the new state.
331        let reader_stream_builder = self.stream_reader_builder(source_desc.clone());
332        let reader_stream = reader_stream_builder.into_retry_stream(Some(target_state), false);
333
334        stream.replace_data_stream(reader_stream);
335
336        Ok(())
337    }
338
339    /// Handle `InjectSourceOffsets` mutation by updating split offsets in place.
340    /// Returns `true` if any offsets were injected and a stream rebuild is needed.
341    ///
342    /// This is an UNSAFE operation that can cause data duplication or loss
343    /// depending on the correctness of the provided offsets.
344    async fn handle_inject_source_offsets(
345        &mut self,
346        split_offsets: &HashMap<String, String>,
347    ) -> StreamExecutorResult<bool> {
348        tracing::warn!(
349            actor_id = %self.actor_ctx.id,
350            source_id = %self.stream_source_core.source_id,
351            num_offsets = split_offsets.len(),
352            "UNSAFE: Injecting source offsets - this may cause data duplication or loss"
353        );
354
355        // First, filter to only offsets for splits this actor owns
356        let offsets_to_inject: Vec<(String, String)> = {
357            let owned_splits: HashSet<&str> = self
358                .stream_source_core
359                .latest_split_info
360                .keys()
361                .map(|s| s.as_ref())
362                .collect();
363            split_offsets
364                .iter()
365                .filter(|(split_id, _)| owned_splits.contains(split_id.as_str()))
366                .map(|(k, v)| (k.clone(), v.clone()))
367                .collect()
368        };
369
370        // Update splits and prepare JSON states for persistence
371        let mut json_states: Vec<(String, JsonbVal)> = Vec::new();
372        let mut failed_splits: Vec<String> = Vec::new();
373
374        for (split_id, offset) in &offsets_to_inject {
375            if let Some(split) = self
376                .stream_source_core
377                .latest_split_info
378                .get_mut(split_id.as_str())
379            {
380                tracing::info!(
381                    actor_id = %self.actor_ctx.id,
382                    split_id = %split_id,
383                    offset = %offset,
384                    "Injecting offset for owned split"
385                );
386                // Update the split's offset in place
387                if let Err(e) = split.update_in_place(offset.clone()) {
388                    tracing::error!(
389                        actor_id = %self.actor_ctx.id,
390                        split_id = %split_id,
391                        error = ?e.as_report(),
392                        "Failed to update split offset"
393                    );
394                    failed_splits.push(split_id.clone());
395                    continue;
396                }
397                // Mark this split as updated for persistence
398                self.stream_source_core
399                    .updated_splits_in_epoch
400                    .insert(split_id.clone().into(), split.clone());
401                // Parse the offset as JSON and store it
402                let json_value: serde_json::Value = serde_json::from_str(offset)
403                    .unwrap_or_else(|_| serde_json::json!({ "offset": offset }));
404                json_states.push((split_id.clone(), JsonbVal::from(json_value)));
405            }
406        }
407
408        if !failed_splits.is_empty() {
409            return Err(StreamExecutorError::connector_error(anyhow!(
410                "failed to inject offsets for splits: {:?}",
411                failed_splits
412            )));
413        }
414
415        let num_injected = json_states.len();
416        if num_injected > 0 {
417            // Store the injected offsets as JSON in the state table
418            self.stream_source_core
419                .split_state_store
420                .set_states_json(json_states)
421                .await?;
422
423            tracing::info!(
424                actor_id = %self.actor_ctx.id,
425                source_id = %self.stream_source_core.source_id,
426                num_injected = num_injected,
427                "Offset injection completed for owned splits, triggering rebuild"
428            );
429            Ok(true)
430        } else {
431            tracing::info!(
432                actor_id = %self.actor_ctx.id,
433                source_id = %self.stream_source_core.source_id,
434                "No owned splits to inject offsets for"
435            );
436            Ok(false)
437        }
438    }
439
440    async fn persist_state_and_clear_cache(
441        &mut self,
442        epoch: EpochPair,
443    ) -> StreamExecutorResult<HashMap<SplitId, SplitImpl>> {
444        let core = &mut self.stream_source_core;
445
446        let cache = core
447            .updated_splits_in_epoch
448            .values()
449            .map(|split_impl| split_impl.to_owned())
450            .collect_vec();
451
452        if !cache.is_empty() {
453            tracing::debug!(state = ?cache, "take snapshot");
454
455            // Record metrics for CDC sources before moving cache
456            let source_id = core.source_id.to_string();
457            for split_impl in &cache {
458                // Extract and record CDC-specific metrics based on split type
459                match split_impl {
460                    SplitImpl::PostgresCdc(pg_split) => {
461                        if let Some(lsn_value) = pg_split.pg_lsn() {
462                            self.metrics
463                                .pg_cdc_state_table_lsn
464                                .with_guarded_label_values(&[&source_id])
465                                .set(lsn_value as i64);
466                        }
467                    }
468                    SplitImpl::MysqlCdc(mysql_split) => {
469                        if let Some((file_seq, position)) = mysql_split.mysql_binlog_offset() {
470                            self.metrics
471                                .mysql_cdc_state_binlog_file_seq
472                                .with_guarded_label_values(&[&source_id])
473                                .set(file_seq as i64);
474
475                            self.metrics
476                                .mysql_cdc_state_binlog_position
477                                .with_guarded_label_values(&[&source_id])
478                                .set(position as i64);
479                        }
480                    }
481                    SplitImpl::SqlServerCdc(sqlserver_split) => {
482                        if let Some(lsn) = sqlserver_split.sql_server_change_lsn() {
483                            self.metrics
484                                .sqlserver_cdc_state_change_lsn
485                                .with_guarded_label_values(&[&source_id])
486                                .set(lsn_u128_to_i64(lsn));
487                        }
488                        if let Some(lsn) = sqlserver_split.sql_server_commit_lsn() {
489                            self.metrics
490                                .sqlserver_cdc_state_commit_lsn
491                                .with_guarded_label_values(&[&source_id])
492                                .set(lsn_u128_to_i64(lsn));
493                        }
494                    }
495                    _ => {}
496                }
497            }
498
499            core.split_state_store.set_states(cache).await?;
500        }
501
502        // commit anyway, even if no message saved
503        core.split_state_store.commit(epoch).await?;
504
505        let updated_splits = core.updated_splits_in_epoch.clone();
506
507        core.updated_splits_in_epoch.clear();
508
509        Ok(updated_splits)
510    }
511
512    /// try mem table spill
513    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
514        let core = &mut self.stream_source_core;
515        core.split_state_store.try_flush().await?;
516
517        Ok(())
518    }
519
520    fn apply_latest_split_state(&mut self, latest_state: HashMap<SplitId, SplitImpl>) {
521        for (split_id, new_split_impl) in &latest_state {
522            if let Some(split_impl) = self.stream_source_core.latest_split_info.get_mut(split_id) {
523                *split_impl = new_split_impl.clone();
524            }
525        }
526        self.stream_source_core
527            .updated_splits_in_epoch
528            .extend(latest_state);
529    }
530
531    /// Report CDC source offset updated only the first time.
532    fn maybe_report_cdc_source_offset(
533        &self,
534        updated_splits: &HashMap<SplitId, SplitImpl>,
535        epoch: EpochPair,
536        source_id: SourceId,
537        must_report_cdc_offset_once: &mut bool,
538        must_wait_cdc_offset_before_report: bool,
539    ) {
540        // Report CDC source offset updated only the first time
541        if *must_report_cdc_offset_once
542            && (!must_wait_cdc_offset_before_report
543                || updated_splits
544                    .values()
545                    .any(|split| split.is_cdc_split() && !split.get_cdc_split_offset().is_empty()))
546        {
547            // Report only once, then ignore all subsequent offset updates
548            self.barrier_manager.report_cdc_source_offset_updated(
549                epoch,
550                self.actor_ctx.id,
551                source_id,
552            );
553            tracing::info!(
554                actor_id = %self.actor_ctx.id,
555                source_id = %source_id,
556                epoch = ?epoch,
557                "Reported CDC source offset updated to meta (first time only)"
558            );
559            // Mark as reported to prevent any future reports, even if offset changes
560            *must_report_cdc_offset_once = false;
561        }
562    }
563
564    /// A source executor with a stream source receives:
565    /// 1. Barrier messages
566    /// 2. Data from external source
567    /// and acts accordingly.
568    #[try_stream(ok = Message, error = StreamExecutorError)]
569    async fn execute_inner(mut self) {
570        let mut barrier_receiver = self.barrier_receiver.take().unwrap();
571        let first_barrier = barrier_receiver
572            .recv()
573            .instrument_await("source_recv_first_barrier")
574            .await
575            .ok_or_else(|| {
576                anyhow!(
577                    "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
578                    self.actor_ctx.id,
579                    self.stream_source_core.source_id
580                )
581            })?;
582        let first_epoch = first_barrier.epoch;
583        // must_report_cdc_offset is true if and only if the source is a CDC source.
584        // must_wait_cdc_offset_before_report is true if and only if the source is a MySQL or SQL Server CDC source.
585        let (mut boot_state, mut must_report_cdc_offset_once, must_wait_cdc_offset_before_report) =
586            if let Some(splits) = first_barrier.initial_split_assignment(self.actor_ctx.id) {
587                // CDC source must reach this branch.
588                tracing::debug!(?splits, "boot with splits");
589                // Skip report for non-CDC.
590                let must_report_cdc_offset_once = splits.iter().any(|split| split.is_cdc_split());
591                // Only for MySQL and SQL Server CDC, we need to wait for the offset to be non-empty before reporting.
592                let must_wait_cdc_offset_before_report = must_report_cdc_offset_once
593                    && splits.iter().any(|split| {
594                        matches!(split, SplitImpl::MysqlCdc(_) | SplitImpl::SqlServerCdc(_))
595                    });
596                (
597                    splits.to_vec(),
598                    must_report_cdc_offset_once,
599                    must_wait_cdc_offset_before_report,
600                )
601            } else {
602                (Vec::default(), true, true)
603            };
604        let is_pause_on_startup = first_barrier.is_pause_on_startup();
605        let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id);
606
607        yield Message::Barrier(first_barrier);
608
609        let mut core = self.stream_source_core;
610        let source_id = core.source_id;
611
612        // Build source description from the builder.
613        let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap();
614        let mut source_desc = source_desc_builder
615            .build()
616            .map_err(StreamExecutorError::connector_error)?;
617
618        let mut wait_checkpoint_task_builder = Self::spawn_wait_checkpoint_worker(
619            &core,
620            source_desc.source.clone(),
621            self.metrics.clone(),
622        )
623        .await?;
624
625        let (Some(split_idx), Some(offset_idx), pulsar_message_id_idx) =
626            get_split_offset_col_idx(&source_desc.columns)
627        else {
628            unreachable!("Partition and offset columns must be set.");
629        };
630
631        core.split_state_store.init_epoch(first_epoch).await?;
632        {
633            let committed_reader = core
634                .split_state_store
635                .new_committed_reader(first_epoch)
636                .await?;
637            for ele in &mut boot_state {
638                if let Some(recover_state) =
639                    committed_reader.try_recover_from_state_store(ele).await?
640                {
641                    *ele = recover_state;
642                    // if state store is non-empty, we consider it's initialized.
643                    is_uninitialized = false;
644                } else {
645                    // This is a new split, not in state table.
646                    // make sure it is written to state table later.
647                    // Then even it receives no messages, we can observe it in state table.
648                    core.updated_splits_in_epoch.insert(ele.id(), ele.clone());
649                }
650            }
651        }
652
653        // init in-memory split states with persisted state if any
654        core.init_split_state(boot_state.clone());
655
656        // Return the ownership of `stream_source_core` to the source executor.
657        self.stream_source_core = core;
658
659        let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
660        tracing::debug!(state = ?recover_state, "start with state");
661
662        let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
663        let mut reader_stream_builder = self.stream_reader_builder(source_desc.clone());
664        let mut latest_splits = None;
665        // Build the source stream reader.
666        if is_uninitialized {
667            let create_split_reader_result = reader_stream_builder
668                .fetch_latest_splits(recover_state.clone(), self.is_shared_non_cdc)
669                .await?;
670            latest_splits = create_split_reader_result.latest_splits;
671        }
672
673        if let Some(latest_splits) = latest_splits {
674            // make sure it is written to state table later.
675            // Then even it receives no messages, we can observe it in state table.
676            self.stream_source_core
677                .updated_splits_in_epoch
678                .extend(latest_splits.into_iter().map(|s| (s.id(), s)));
679        }
680        // Merge the chunks from source and the barriers into a single stream. We prioritize
681        // barriers over source data chunks here.
682        let mut stream = StreamReaderWithPause::<true, SourceReaderEventWithState>::new(
683            barrier_stream,
684            reader_stream_builder
685                .into_retry_stream(recover_state, is_uninitialized && self.is_shared_non_cdc),
686        );
687        let mut command_paused = false;
688
689        // - If the first barrier requires us to pause on startup, pause the stream.
690        if is_pause_on_startup {
691            tracing::info!("source paused on startup");
692            stream.pause_stream();
693            command_paused = true;
694        }
695
696        // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms`
697        // milliseconds, considering some other latencies like network and cost in Meta.
698        let mut max_wait_barrier_time_ms =
699            self.system_params.load().barrier_interval_ms() as u128 * WAIT_BARRIER_MULTIPLE_TIMES;
700        let mut last_barrier_time = Instant::now();
701        let mut self_paused = false;
702
703        let source_output_row_count = self
704            .metrics
705            .source_output_row_count
706            .with_guarded_label_values(&self.get_metric_labels());
707
708        let source_split_change_count = self
709            .metrics
710            .source_split_change_count
711            .with_guarded_label_values(&self.get_metric_labels());
712
713        while let Some(msg) = stream.next().await {
714            let Ok(msg) = msg else {
715                tokio::time::sleep(Duration::from_millis(1000)).await;
716                self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err())?;
717                continue;
718            };
719
720            match msg {
721                // This branch will be preferred.
722                Either::Left(Message::Barrier(barrier)) => {
723                    last_barrier_time = Instant::now();
724
725                    if self_paused {
726                        self_paused = false;
727                        // command_paused has a higher priority.
728                        if !command_paused {
729                            stream.resume_stream();
730                        }
731                    }
732
733                    let epoch = barrier.epoch;
734                    let mut split_change = None;
735
736                    if let Some(mutation) = barrier.mutation.as_deref() {
737                        match mutation {
738                            Mutation::Pause => {
739                                command_paused = true;
740                                stream.pause_stream()
741                            }
742                            Mutation::Resume => {
743                                command_paused = false;
744                                stream.resume_stream()
745                            }
746                            Mutation::SourceChangeSplit(actor_splits) => {
747                                tracing::info!(
748                                    actor_id = %self.actor_ctx.id,
749                                    actor_splits = ?actor_splits,
750                                    "source change split received"
751                                );
752
753                                split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
754                                    |target_splits| {
755                                        (
756                                            &source_desc,
757                                            &mut stream,
758                                            ApplyMutationAfterBarrier::SplitChange {
759                                                target_splits,
760                                                should_trim_state: true,
761                                                split_change_count: &source_split_change_count,
762                                            },
763                                        )
764                                    },
765                                );
766                            }
767
768                            Mutation::ConnectorPropsChange(maybe_mutation) => {
769                                if let Some(new_props) = maybe_mutation.get(&source_id.as_raw_id())
770                                {
771                                    // rebuild the stream reader with new props
772                                    tracing::info!(
773                                        actor_id = %self.actor_ctx.id,
774                                        source_id = %source_id,
775                                        "updating source connector properties",
776                                    );
777                                    source_desc.update_reader(new_props.clone())?;
778                                    // suppose the connector props change will not involve state change
779                                    split_change = Some((
780                                        &source_desc,
781                                        &mut stream,
782                                        ApplyMutationAfterBarrier::ConnectorPropsChange,
783                                    ));
784                                }
785                            }
786
787                            Mutation::Update(UpdateMutation { actor_splits, .. }) => {
788                                split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
789                                    |target_splits| {
790                                        (
791                                            &source_desc,
792                                            &mut stream,
793                                            ApplyMutationAfterBarrier::SplitChange {
794                                                target_splits,
795                                                should_trim_state: false,
796                                                split_change_count: &source_split_change_count,
797                                            },
798                                        )
799                                    },
800                                );
801                            }
802                            Mutation::Throttle(fragment_to_apply) => {
803                                if let Some(entry) =
804                                    fragment_to_apply.get(&self.actor_ctx.fragment_id)
805                                    && entry.throttle_type() == ThrottleType::Source
806                                    && entry.rate_limit != self.rate_limit_rps
807                                {
808                                    tracing::info!(
809                                        "updating rate limit from {:?} to {:?}",
810                                        self.rate_limit_rps,
811                                        entry.rate_limit
812                                    );
813                                    self.rate_limit_rps = entry.rate_limit;
814                                    // recreate from latest_split_info
815                                    self.rebuild_stream_reader(&source_desc, &mut stream)?;
816                                }
817                            }
818                            Mutation::ResetSource { source_id } => {
819                                // Note: RESET SOURCE only clears the offset, does NOT pause the source.
820                                // When offset is None, after recovery/restart, Debezium will automatically
821                                // enter recovery mode and fetch the latest offset from upstream.
822                                if *source_id == self.stream_source_core.source_id {
823                                    tracing::info!(
824                                        actor_id = %self.actor_ctx.id,
825                                        source_id = source_id.as_raw_id(),
826                                        "Resetting CDC source: clearing offset (set to None)"
827                                    );
828
829                                    // Step 1: Collect all current splits and clear their offsets
830                                    let splits_with_cleared_offset: Vec<SplitImpl> = self.stream_source_core
831                                        .latest_split_info
832                                        .values()
833                                        .map(|split| {
834                                            // Clone the split and clear its offset
835                                            let mut new_split = split.clone();
836                                            match &mut new_split {
837                                                SplitImpl::MysqlCdc(debezium_split) => {
838                                                    if let Some(mysql_split) = debezium_split.mysql_split.as_mut() {
839                                                        tracing::info!(
840                                                            split_id = ?mysql_split.inner.split_id,
841                                                            old_offset = ?mysql_split.inner.start_offset,
842                                                            "Clearing MySQL CDC offset"
843                                                        );
844                                                        mysql_split.inner.start_offset = None;
845                                                    }
846                                                }
847                                                SplitImpl::PostgresCdc(debezium_split) => {
848                                                    if let Some(pg_split) = debezium_split.postgres_split.as_mut() {
849                                                        tracing::info!(
850                                                            split_id = ?pg_split.inner.split_id,
851                                                            old_offset = ?pg_split.inner.start_offset,
852                                                            "Clearing PostgreSQL CDC offset"
853                                                        );
854                                                        pg_split.inner.start_offset = None;
855                                                    }
856                                                }
857                                                SplitImpl::MongodbCdc(debezium_split) => {
858                                                    if let Some(mongo_split) = debezium_split.mongodb_split.as_mut() {
859                                                        tracing::info!(
860                                                            split_id = ?mongo_split.inner.split_id,
861                                                            old_offset = ?mongo_split.inner.start_offset,
862                                                            "Clearing MongoDB CDC offset"
863                                                        );
864                                                        mongo_split.inner.start_offset = None;
865                                                    }
866                                                }
867                                                SplitImpl::CitusCdc(debezium_split) => {
868                                                    if let Some(citus_split) = debezium_split.citus_split.as_mut() {
869                                                        tracing::info!(
870                                                            split_id = ?citus_split.inner.split_id,
871                                                            old_offset = ?citus_split.inner.start_offset,
872                                                            "Clearing Citus CDC offset"
873                                                        );
874                                                        citus_split.inner.start_offset = None;
875                                                    }
876                                                }
877                                                SplitImpl::SqlServerCdc(debezium_split) => {
878                                                    if let Some(sqlserver_split) = debezium_split.sql_server_split.as_mut() {
879                                                        tracing::info!(
880                                                            split_id = ?sqlserver_split.inner.split_id,
881                                                            old_offset = ?sqlserver_split.inner.start_offset,
882                                                            "Clearing SQL Server CDC offset"
883                                                        );
884                                                        sqlserver_split.inner.start_offset = None;
885                                                    }
886                                                }
887                                                _ => {
888                                                    tracing::warn!(
889                                                        "RESET SOURCE called on non-CDC split type"
890                                                    );
891                                                }
892                                            }
893                                            new_split
894                                        })
895                                        .collect();
896
897                                    if !splits_with_cleared_offset.is_empty() {
898                                        tracing::info!(
899                                            actor_id = %self.actor_ctx.id,
900                                            split_count = splits_with_cleared_offset.len(),
901                                            "Updating state table with cleared offsets"
902                                        );
903
904                                        // Step 2: Write splits back to state table with offset = None
905                                        self.stream_source_core
906                                            .split_state_store
907                                            .set_states(splits_with_cleared_offset.clone())
908                                            .await?;
909
910                                        // Step 3: Update in-memory split info with cleared offsets
911                                        for split in splits_with_cleared_offset {
912                                            self.stream_source_core
913                                                .latest_split_info
914                                                .insert(split.id(), split.clone());
915                                            self.stream_source_core
916                                                .updated_splits_in_epoch
917                                                .insert(split.id(), split);
918                                        }
919
920                                        tracing::info!(
921                                            actor_id = %self.actor_ctx.id,
922                                            source_id = source_id.as_raw_id(),
923                                            "RESET SOURCE completed: offset cleared (set to None). \
924                                             Trigger recovery/restart to fetch latest offset from upstream."
925                                        );
926                                    } else {
927                                        tracing::warn!(
928                                            actor_id = %self.actor_ctx.id,
929                                            "No splits found to reset - source may not be initialized yet"
930                                        );
931                                    }
932                                } else {
933                                    tracing::debug!(
934                                        actor_id = %self.actor_ctx.id,
935                                        target_source_id = source_id.as_raw_id(),
936                                        current_source_id = self.stream_source_core.source_id.as_raw_id(),
937                                        "ResetSource mutation for different source, ignoring"
938                                    );
939                                }
940                            }
941
942                            Mutation::InjectSourceOffsets {
943                                source_id,
944                                split_offsets,
945                            } => {
946                                if *source_id == self.stream_source_core.source_id {
947                                    if self.handle_inject_source_offsets(split_offsets).await? {
948                                        // Trigger a rebuild to apply the new offsets
949                                        split_change = Some((
950                                            &source_desc,
951                                            &mut stream,
952                                            ApplyMutationAfterBarrier::ConnectorPropsChange,
953                                        ));
954                                    }
955                                } else {
956                                    tracing::debug!(
957                                        actor_id = %self.actor_ctx.id,
958                                        target_source_id = source_id.as_raw_id(),
959                                        current_source_id = self.stream_source_core.source_id.as_raw_id(),
960                                        "InjectSourceOffsets mutation for different source, ignoring"
961                                    );
962                                }
963                            }
964
965                            _ => {}
966                        }
967                    }
968
969                    let updated_splits = self.persist_state_and_clear_cache(epoch).await?;
970
971                    self.maybe_report_cdc_source_offset(
972                        &updated_splits,
973                        epoch,
974                        source_id,
975                        &mut must_report_cdc_offset_once,
976                        must_wait_cdc_offset_before_report,
977                    );
978
979                    // when handle a checkpoint barrier, spawn a task to wait for epoch commit notification
980                    if barrier.kind.is_checkpoint()
981                        && let Some(task_builder) = &mut wait_checkpoint_task_builder
982                    {
983                        task_builder.update_task_on_checkpoint(updated_splits);
984
985                        tracing::debug!("epoch to wait {:?}", epoch);
986                        task_builder.send(Epoch(epoch.prev));
987                    }
988
989                    let barrier_epoch = barrier.epoch;
990                    yield Message::Barrier(barrier);
991
992                    if let Some((source_desc, stream, to_apply_mutation)) = split_change {
993                        self.apply_split_change_after_yield_barrier(
994                            barrier_epoch,
995                            source_desc,
996                            stream,
997                            to_apply_mutation,
998                        )
999                        .await?;
1000                    }
1001                }
1002                Either::Left(_) => {
1003                    // For the source executor, the message we receive from this arm
1004                    // should always be barrier message.
1005                    unreachable!();
1006                }
1007
1008                Either::Right(event) => {
1009                    let (chunk, latest_state) = match event {
1010                        SourceReaderEventWithState::Progress(latest_state) => {
1011                            self.apply_latest_split_state(latest_state);
1012                            continue;
1013                        }
1014                        SourceReaderEventWithState::Data((chunk, latest_state)) => {
1015                            (chunk, latest_state)
1016                        }
1017                    };
1018
1019                    if let Some(task_builder) = &mut wait_checkpoint_task_builder {
1020                        if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
1021                            let pulsar_message_id_col = chunk.column_at(pulsar_message_id_idx);
1022                            task_builder.update_task_on_chunk(
1023                                source_id,
1024                                &latest_state,
1025                                pulsar_message_id_col.clone(),
1026                            );
1027                        } else {
1028                            let offset_col = chunk.column_at(offset_idx);
1029                            task_builder.update_task_on_chunk(
1030                                source_id,
1031                                &latest_state,
1032                                offset_col.clone(),
1033                            );
1034                        }
1035                    }
1036                    if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
1037                        // Exceeds the max wait barrier time, the source will be paused.
1038                        // Currently we can guarantee the
1039                        // source is not paused since it received stream
1040                        // chunks.
1041                        self_paused = true;
1042                        tracing::warn!(
1043                            "source paused, wait barrier for {:?}",
1044                            last_barrier_time.elapsed()
1045                        );
1046                        stream.pause_stream();
1047
1048                        // Only update `max_wait_barrier_time_ms` to capture
1049                        // `barrier_interval_ms`
1050                        // changes here to avoid frequently accessing the shared
1051                        // `system_params`.
1052                        max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
1053                            as u128
1054                            * WAIT_BARRIER_MULTIPLE_TIMES;
1055                    }
1056
1057                    self.apply_latest_split_state(latest_state);
1058
1059                    let card = chunk.cardinality();
1060                    if card == 0 {
1061                        continue;
1062                    }
1063                    source_output_row_count.inc_by(card as u64);
1064                    let to_remove_col_indices =
1065                        if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
1066                            vec![split_idx, offset_idx, pulsar_message_id_idx]
1067                        } else {
1068                            vec![split_idx, offset_idx]
1069                        };
1070                    let chunk =
1071                        prune_additional_cols(&chunk, &to_remove_col_indices, &source_desc.columns);
1072                    yield Message::Chunk(chunk);
1073                    self.try_flush_data().await?;
1074                }
1075            }
1076        }
1077
1078        // The source executor should only be stopped by the actor when finding a `Stop` mutation.
1079        tracing::error!(
1080            actor_id = %self.actor_ctx.id,
1081            "source executor exited unexpectedly"
1082        )
1083    }
1084}
1085
1086#[derive(Debug, Clone)]
1087enum ApplyMutationAfterBarrier<'a> {
1088    SplitChange {
1089        target_splits: Vec<SplitImpl>,
1090        should_trim_state: bool,
1091        split_change_count: &'a LabelGuardedMetric<GenericCounter<AtomicU64>>,
1092    },
1093    ConnectorPropsChange,
1094}
1095
1096impl<S: StateStore> Execute for SourceExecutor<S> {
1097    fn execute(self: Box<Self>) -> BoxedMessageStream {
1098        self.execute_inner().boxed()
1099    }
1100}
1101
1102impl<S: StateStore> Debug for SourceExecutor<S> {
1103    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1104        f.debug_struct("SourceExecutor")
1105            .field("source_id", &self.stream_source_core.source_id)
1106            .field("column_ids", &self.stream_source_core.column_ids)
1107            .finish()
1108    }
1109}
1110
1111struct WaitCheckpointTaskBuilder {
1112    wait_checkpoint_tx: UnboundedSender<(Epoch, WaitCheckpointTask)>,
1113    building_task: WaitCheckpointTask,
1114}
1115
1116impl WaitCheckpointTaskBuilder {
1117    fn update_task_on_chunk(
1118        &mut self,
1119        source_id: SourceId,
1120        latest_state: &HashMap<SplitId, SplitImpl>,
1121        offset_col: ArrayRef,
1122    ) {
1123        match &mut self.building_task {
1124            WaitCheckpointTask::AckPubsubMessage(_, arrays) => {
1125                arrays.push(offset_col);
1126            }
1127            WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => {
1128                arrays.push(offset_col);
1129            }
1130            WaitCheckpointTask::AckPulsarMessage(arrays) => {
1131                // each pulsar chunk will only contain one split
1132                let split_id = latest_state.keys().next().unwrap();
1133                let pulsar_ack_channel_id = build_pulsar_ack_channel_id(source_id, split_id);
1134                arrays.push((pulsar_ack_channel_id, offset_col));
1135            }
1136            WaitCheckpointTask::CommitCdcOffset(_) => {}
1137        }
1138    }
1139
1140    fn update_task_on_checkpoint(&mut self, updated_splits: HashMap<SplitId, SplitImpl>) {
1141        #[expect(clippy::single_match)]
1142        match &mut self.building_task {
1143            WaitCheckpointTask::CommitCdcOffset(offsets) => {
1144                if !updated_splits.is_empty() {
1145                    // cdc source only has one split
1146                    assert_eq!(1, updated_splits.len());
1147                    for (split_id, split_impl) in updated_splits {
1148                        if split_impl.is_cdc_split() {
1149                            *offsets = Some((split_id, split_impl.get_cdc_split_offset()));
1150                        } else {
1151                            unreachable!()
1152                        }
1153                    }
1154                }
1155            }
1156            _ => {}
1157        }
1158    }
1159
1160    /// Send the current task and reset to an empty one for the next epoch.
1161    /// Uses `reset_for_next_epoch()` to clone the client handle (e.g. `PubSub` `Subscription`)
1162    /// from the current task, avoiding network I/O on the checkpoint hot path.
1163    fn send(&mut self, epoch: Epoch) {
1164        let new_task = self.building_task.reset_for_next_epoch();
1165        self.wait_checkpoint_tx
1166            .send((epoch, std::mem::replace(&mut self.building_task, new_task)))
1167            .expect("wait_checkpoint_tx send should succeed");
1168    }
1169}
1170
1171/// A worker used to do some work after each checkpoint epoch is committed.
1172///
1173/// # Usage Cases
1174///
1175/// Typically there are 2 issues related with ack on checkpoint:
1176///
1177/// 1. Correctness (at-least-once), or don't let upstream clean uncommitted data.
1178///    For message queueing semantics (delete after ack), we should ack to avoid redelivery,
1179///    and only ack after checkpoint to avoid data loss.
1180///
1181/// 2. Allow upstream to clean data after commit.
1182///
1183/// See also <https://github.com/risingwavelabs/risingwave/issues/16736#issuecomment-2109379790>
1184///
1185/// ## CDC
1186///
1187/// Commit last consumed offset to upstream DB, so that old data can be discarded.
1188///
1189/// ## Google Pub/Sub
1190///
1191/// Due to queueing semantics.
1192/// Although Pub/Sub supports `retain_acked_messages` and `seek` functionality,
1193/// it's quite limited unlike Kafka.
1194///
1195/// See also <https://cloud.google.com/pubsub/docs/subscribe-best-practices#process-messages>
1196struct WaitCheckpointWorker<S: StateStore> {
1197    wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>,
1198    state_store: S,
1199    table_id: TableId,
1200    source_id: SourceId,
1201    source_name: String,
1202    metrics: Arc<StreamingMetrics>,
1203}
1204
1205impl<S: StateStore> WaitCheckpointWorker<S> {
1206    pub async fn run(mut self) {
1207        tracing::debug!("wait epoch worker start success");
1208        loop {
1209            // poll the rx and wait for the epoch commit
1210            match self.wait_checkpoint_rx.recv().await {
1211                Some((epoch, task)) => {
1212                    tracing::debug!("start to wait epoch {}", epoch.0);
1213                    let ret = self
1214                        .state_store
1215                        .try_wait_epoch(
1216                            HummockReadEpoch::Committed(epoch.0),
1217                            TryWaitEpochOptions {
1218                                table_id: self.table_id,
1219                            },
1220                        )
1221                        .await;
1222                    if self.wait_checkpoint_rx.is_closed() {
1223                        // The task must not be committed since the associated epoch may not succeed.
1224                        // The wait_checkpoint_rx lifetime is tied to the lifecycle of the source executor actor.
1225                        // The old actor must be dropped before any subsequent recovery can succeed; see PartialGraphState::abort_and_wait_actors
1226                        tracing::debug!(epoch = epoch.0, "Drop stale wait checkpoint task.");
1227                        break;
1228                    }
1229                    match ret {
1230                        Ok(()) => {
1231                            tracing::debug!(epoch = epoch.0, "wait epoch success");
1232
1233                            // Run task with callback to record LSN after successful commit
1234                            task.run_with_on_commit_success(
1235                                self.source_id,
1236                                &self.source_name,
1237                                |source_id: u64, offset| {
1238                                    if let Some(lsn_value) =
1239                                        extract_postgres_lsn_from_offset_str(offset)
1240                                    {
1241                                        self.metrics
1242                                            .pg_cdc_jni_commit_offset_lsn
1243                                            .with_guarded_label_values(&[&source_id.to_string()])
1244                                            .set(lsn_value as i64);
1245                                    }
1246                                    if let Some(lsn_value) =
1247                                        extract_sql_server_commit_lsn_from_offset_str(offset)
1248                                    {
1249                                        self.metrics
1250                                            .sqlserver_cdc_jni_commit_offset_lsn
1251                                            .with_guarded_label_values(&[&source_id.to_string()])
1252                                            .set(lsn_u128_to_i64(lsn_value));
1253                                    }
1254                                },
1255                            )
1256                            .await;
1257                        }
1258                        Err(e) => {
1259                            tracing::error!(
1260                            error = %e.as_report(),
1261                            "wait epoch {} failed", epoch.0
1262                            );
1263                        }
1264                    }
1265                }
1266                None => {
1267                    tracing::error!("wait epoch rx closed");
1268                    break;
1269                }
1270            }
1271        }
1272    }
1273}
1274
1275#[cfg(test)]
1276mod tests {
1277    use maplit::{btreemap, convert_args, hashmap};
1278    use risingwave_common::catalog::{ColumnId, Field};
1279    use risingwave_common::id::SourceId;
1280    use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
1281    use risingwave_common::test_prelude::StreamChunkTestExt;
1282    use risingwave_common::util::epoch::{EpochExt, test_epoch};
1283    use risingwave_connector::source::datagen::DatagenSplit;
1284    use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
1285    use risingwave_pb::catalog::StreamSourceInfo;
1286    use risingwave_pb::plan_common::PbRowFormatType;
1287    use risingwave_storage::memory::MemoryStateStore;
1288    use tokio::sync::mpsc::unbounded_channel;
1289    use tracing_test::traced_test;
1290
1291    use super::*;
1292    use crate::executor::AddMutation;
1293    use crate::executor::source::{SourceStateTableHandler, default_source_internal_table};
1294    use crate::task::LocalBarrierManager;
1295
1296    const MOCK_SOURCE_NAME: &str = "mock_source";
1297
1298    #[tokio::test]
1299    async fn test_source_executor() {
1300        let source_id = 0.into();
1301        let schema = Schema {
1302            fields: vec![Field::with_name(DataType::Int32, "sequence_int")],
1303        };
1304        let row_id_index = None;
1305        let source_info = StreamSourceInfo {
1306            row_format: PbRowFormatType::Native as i32,
1307            ..Default::default()
1308        };
1309        let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1310        let column_ids = vec![0].into_iter().map(ColumnId::from).collect();
1311
1312        // This datagen will generate 3 rows at one time.
1313        let properties = convert_args!(btreemap!(
1314            "connector" => "datagen",
1315            "datagen.rows.per.second" => "3",
1316            "fields.sequence_int.kind" => "sequence",
1317            "fields.sequence_int.start" => "11",
1318            "fields.sequence_int.end" => "11111",
1319        ));
1320        let source_desc_builder =
1321            create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1322        let split_state_store = SourceStateTableHandler::from_table_catalog(
1323            &default_source_internal_table(0x2333),
1324            MemoryStateStore::new(),
1325        )
1326        .await;
1327        let core = StreamSourceCore::<MemoryStateStore> {
1328            source_id,
1329            column_ids,
1330            source_desc_builder: Some(source_desc_builder),
1331            latest_split_info: HashMap::new(),
1332            split_state_store,
1333            updated_splits_in_epoch: HashMap::new(),
1334            source_name: MOCK_SOURCE_NAME.to_owned(),
1335        };
1336
1337        let system_params_manager = LocalSystemParamsManager::for_test();
1338
1339        let executor = SourceExecutor::new(
1340            ActorContext::for_test(0),
1341            core,
1342            Arc::new(StreamingMetrics::unused()),
1343            barrier_rx,
1344            system_params_manager.get_params(),
1345            None,
1346            false,
1347            LocalBarrierManager::for_test(),
1348        );
1349        let mut executor = executor.boxed().execute();
1350
1351        let init_barrier =
1352            Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
1353                splits: hashmap! {
1354                    ActorId::default() => vec![
1355                        SplitImpl::Datagen(DatagenSplit {
1356                            split_index: 0,
1357                            split_num: 1,
1358                            start_offset: None,
1359                        }),
1360                    ],
1361                },
1362                ..Default::default()
1363            }));
1364        barrier_tx.send(init_barrier).unwrap();
1365
1366        // Consume barrier.
1367        executor.next().await.unwrap().unwrap();
1368
1369        // Consume data chunk.
1370        let msg = executor.next().await.unwrap().unwrap();
1371
1372        // Row id will not be filled here.
1373        assert_eq!(
1374            msg.into_chunk().unwrap(),
1375            StreamChunk::from_pretty(
1376                " i
1377                + 11
1378                + 12
1379                + 13"
1380            )
1381        );
1382    }
1383
1384    #[traced_test]
1385    #[tokio::test]
1386    async fn test_split_change_mutation() {
1387        let source_id = SourceId::new(0);
1388        let schema = Schema {
1389            fields: vec![Field::with_name(DataType::Int32, "v1")],
1390        };
1391        let row_id_index = None;
1392        let source_info = StreamSourceInfo {
1393            row_format: PbRowFormatType::Native as i32,
1394            ..Default::default()
1395        };
1396        let properties = convert_args!(btreemap!(
1397            "connector" => "datagen",
1398            "fields.v1.kind" => "sequence",
1399            "fields.v1.start" => "11",
1400            "fields.v1.end" => "11111",
1401        ));
1402
1403        let source_desc_builder =
1404            create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1405        let mem_state_store = MemoryStateStore::new();
1406
1407        let column_ids = vec![ColumnId::from(0)];
1408        let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1409        let split_state_store = SourceStateTableHandler::from_table_catalog(
1410            &default_source_internal_table(0x2333),
1411            mem_state_store.clone(),
1412        )
1413        .await;
1414
1415        let core = StreamSourceCore::<MemoryStateStore> {
1416            source_id,
1417            column_ids: column_ids.clone(),
1418            source_desc_builder: Some(source_desc_builder),
1419            latest_split_info: HashMap::new(),
1420            split_state_store,
1421            updated_splits_in_epoch: HashMap::new(),
1422            source_name: MOCK_SOURCE_NAME.to_owned(),
1423        };
1424
1425        let system_params_manager = LocalSystemParamsManager::for_test();
1426
1427        let executor = SourceExecutor::new(
1428            ActorContext::for_test(0),
1429            core,
1430            Arc::new(StreamingMetrics::unused()),
1431            barrier_rx,
1432            system_params_manager.get_params(),
1433            None,
1434            false,
1435            LocalBarrierManager::for_test(),
1436        );
1437        let mut handler = executor.boxed().execute();
1438
1439        let mut epoch = test_epoch(1);
1440        let init_barrier =
1441            Barrier::new_test_barrier(epoch).with_mutation(Mutation::Add(AddMutation {
1442                splits: hashmap! {
1443                    ActorId::default() => vec![
1444                        SplitImpl::Datagen(DatagenSplit {
1445                            split_index: 0,
1446                            split_num: 3,
1447                            start_offset: None,
1448                        }),
1449                    ],
1450                },
1451                ..Default::default()
1452            }));
1453        barrier_tx.send(init_barrier).unwrap();
1454
1455        // Consume barrier.
1456        handler
1457            .next()
1458            .await
1459            .unwrap()
1460            .unwrap()
1461            .into_barrier()
1462            .unwrap();
1463
1464        let mut ready_chunks = handler.ready_chunks(10);
1465
1466        let _ = ready_chunks.next().await.unwrap();
1467
1468        let new_assignment = vec![
1469            SplitImpl::Datagen(DatagenSplit {
1470                split_index: 0,
1471                split_num: 3,
1472                start_offset: None,
1473            }),
1474            SplitImpl::Datagen(DatagenSplit {
1475                split_index: 1,
1476                split_num: 3,
1477                start_offset: None,
1478            }),
1479            SplitImpl::Datagen(DatagenSplit {
1480                split_index: 2,
1481                split_num: 3,
1482                start_offset: None,
1483            }),
1484        ];
1485
1486        epoch.inc_epoch();
1487        let change_split_mutation =
1488            Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1489                ActorId::default() => new_assignment.clone()
1490            }));
1491
1492        barrier_tx.send(change_split_mutation).unwrap();
1493
1494        let _ = ready_chunks.next().await.unwrap(); // barrier
1495
1496        epoch.inc_epoch();
1497        let barrier = Barrier::new_test_barrier(epoch);
1498        barrier_tx.send(barrier).unwrap();
1499
1500        ready_chunks.next().await.unwrap(); // barrier
1501
1502        let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1503            &default_source_internal_table(0x2333),
1504            mem_state_store.clone(),
1505        )
1506        .await;
1507
1508        // there must exist state for new add partition
1509        source_state_handler
1510            .init_epoch(EpochPair::new_test_epoch(epoch))
1511            .await
1512            .unwrap();
1513        source_state_handler
1514            .get(&new_assignment[1].id())
1515            .await
1516            .unwrap()
1517            .unwrap();
1518
1519        tokio::time::sleep(Duration::from_millis(100)).await;
1520
1521        let _ = ready_chunks.next().await.unwrap();
1522
1523        epoch.inc_epoch();
1524        let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Pause);
1525        barrier_tx.send(barrier).unwrap();
1526
1527        epoch.inc_epoch();
1528        let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Resume);
1529        barrier_tx.send(barrier).unwrap();
1530
1531        // receive all
1532        ready_chunks.next().await.unwrap();
1533
1534        let prev_assignment = new_assignment;
1535        let new_assignment = vec![prev_assignment[2].clone()];
1536
1537        epoch.inc_epoch();
1538        let drop_split_mutation =
1539            Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1540                ActorId::default() => new_assignment.clone()
1541            }));
1542
1543        barrier_tx.send(drop_split_mutation).unwrap();
1544
1545        ready_chunks.next().await.unwrap(); // barrier
1546
1547        epoch.inc_epoch();
1548        let barrier = Barrier::new_test_barrier(epoch);
1549        barrier_tx.send(barrier).unwrap();
1550
1551        ready_chunks.next().await.unwrap(); // barrier
1552
1553        let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1554            &default_source_internal_table(0x2333),
1555            mem_state_store.clone(),
1556        )
1557        .await;
1558
1559        let new_epoch = EpochPair::new_test_epoch(epoch);
1560        source_state_handler.init_epoch(new_epoch).await.unwrap();
1561
1562        let committed_reader = source_state_handler
1563            .new_committed_reader(new_epoch)
1564            .await
1565            .unwrap();
1566        assert!(
1567            committed_reader
1568                .try_recover_from_state_store(&prev_assignment[0])
1569                .await
1570                .unwrap()
1571                .is_none()
1572        );
1573
1574        assert!(
1575            committed_reader
1576                .try_recover_from_state_store(&prev_assignment[1])
1577                .await
1578                .unwrap()
1579                .is_none()
1580        );
1581
1582        assert!(
1583            committed_reader
1584                .try_recover_from_state_store(&prev_assignment[2])
1585                .await
1586                .unwrap()
1587                .is_some()
1588        );
1589    }
1590}