risingwave_stream/executor/source/
source_executor.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::time::Duration;
17
18use anyhow::anyhow;
19use either::Either;
20use itertools::Itertools;
21use prometheus::core::{AtomicU64, GenericCounter};
22use risingwave_common::array::ArrayRef;
23use risingwave_common::catalog::{ColumnId, TableId};
24use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedMetric};
25use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::types::JsonbVal;
28use risingwave_common::util::epoch::{Epoch, EpochPair};
29use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
30use risingwave_connector::source::cdc::split::extract_postgres_lsn_from_offset_str;
31use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
32use risingwave_connector::source::reader::reader::SourceReader;
33use risingwave_connector::source::{
34    ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData,
35    StreamChunkWithState, WaitCheckpointTask, build_pulsar_ack_channel_id,
36};
37use risingwave_hummock_sdk::HummockReadEpoch;
38use risingwave_pb::common::ThrottleType;
39use risingwave_pb::id::SourceId;
40use risingwave_storage::store::TryWaitEpochOptions;
41use thiserror_ext::AsReport;
42use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
43use tokio::sync::{mpsc, oneshot};
44use tokio::time::Instant;
45
46use super::executor_core::StreamSourceCore;
47use super::{barrier_to_message_stream, get_split_offset_col_idx, prune_additional_cols};
48use crate::common::rate_limit::limited_chunk_size;
49use crate::executor::UpdateMutation;
50use crate::executor::prelude::*;
51use crate::executor::source::reader_stream::StreamReaderBuilder;
52use crate::executor::stream_reader::StreamReaderWithPause;
53use crate::task::LocalBarrierManager;
54
55/// A constant to multiply when calculating the maximum time to wait for a barrier. This is due to
56/// some latencies in network and cost in meta.
57pub const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;
58
59pub struct SourceExecutor<S: StateStore> {
60    actor_ctx: ActorContextRef,
61
62    /// Streaming source for external
63    stream_source_core: StreamSourceCore<S>,
64
65    /// Metrics for monitor.
66    metrics: Arc<StreamingMetrics>,
67
68    /// Receiver of barrier channel.
69    barrier_receiver: Option<UnboundedReceiver<Barrier>>,
70
71    /// System parameter reader to read barrier interval
72    system_params: SystemParamsReaderRef,
73
74    /// Rate limit in rows/s.
75    rate_limit_rps: Option<u32>,
76
77    is_shared_non_cdc: bool,
78
79    /// Local barrier manager for reporting source load finished events
80    barrier_manager: LocalBarrierManager,
81}
82
83impl<S: StateStore> SourceExecutor<S> {
84    #[expect(clippy::too_many_arguments)]
85    pub fn new(
86        actor_ctx: ActorContextRef,
87        stream_source_core: StreamSourceCore<S>,
88        metrics: Arc<StreamingMetrics>,
89        barrier_receiver: UnboundedReceiver<Barrier>,
90        system_params: SystemParamsReaderRef,
91        rate_limit_rps: Option<u32>,
92        is_shared_non_cdc: bool,
93        barrier_manager: LocalBarrierManager,
94    ) -> Self {
95        Self {
96            actor_ctx,
97            stream_source_core,
98            metrics,
99            barrier_receiver: Some(barrier_receiver),
100            system_params,
101            rate_limit_rps,
102            is_shared_non_cdc,
103            barrier_manager,
104        }
105    }
106
107    fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder {
108        StreamReaderBuilder {
109            source_desc,
110            rate_limit: self.rate_limit_rps,
111            source_id: self.stream_source_core.source_id,
112            source_name: self.stream_source_core.source_name.clone(),
113            is_auto_schema_change_enable: self.is_auto_schema_change_enable(),
114            actor_ctx: self.actor_ctx.clone(),
115            reader_stream: None,
116        }
117    }
118
119    async fn spawn_wait_checkpoint_worker(
120        core: &StreamSourceCore<S>,
121        source_reader: SourceReader,
122        metrics: Arc<StreamingMetrics>,
123    ) -> StreamExecutorResult<Option<WaitCheckpointTaskBuilder>> {
124        let Some(initial_task) = source_reader.create_wait_checkpoint_task().await? else {
125            return Ok(None);
126        };
127        let (wait_checkpoint_tx, wait_checkpoint_rx) = mpsc::unbounded_channel();
128        let wait_checkpoint_worker = WaitCheckpointWorker {
129            wait_checkpoint_rx,
130            state_store: core.split_state_store.state_table().state_store().clone(),
131            table_id: core.split_state_store.state_table().table_id(),
132            metrics,
133        };
134        tokio::spawn(wait_checkpoint_worker.run());
135        Ok(Some(WaitCheckpointTaskBuilder {
136            wait_checkpoint_tx,
137            source_reader,
138            building_task: initial_task,
139        }))
140    }
141
142    /// build the source column ids and the source context which will be used to build the source stream
143    pub fn prepare_source_stream_build(
144        &self,
145        source_desc: &SourceDesc,
146    ) -> (Vec<ColumnId>, SourceContext) {
147        let column_ids = source_desc
148            .columns
149            .iter()
150            .map(|column_desc| column_desc.column_id)
151            .collect_vec();
152
153        let (schema_change_tx, mut schema_change_rx) =
154            mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
155        let schema_change_tx = if self.is_auto_schema_change_enable() {
156            let meta_client = self.actor_ctx.meta_client.clone();
157            // spawn a task to handle schema change event from source parser
158            let _join_handle = tokio::task::spawn(async move {
159                while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
160                    let table_ids = schema_change.table_ids();
161                    tracing::info!(
162                        target: "auto_schema_change",
163                        "recv a schema change event for tables: {:?}", table_ids);
164                    // TODO: retry on rpc error
165                    if let Some(ref meta_client) = meta_client {
166                        match meta_client
167                            .auto_schema_change(schema_change.to_protobuf())
168                            .await
169                        {
170                            Ok(_) => {
171                                tracing::info!(
172                                    target: "auto_schema_change",
173                                    "schema change success for tables: {:?}", table_ids);
174                                finish_tx.send(()).unwrap();
175                            }
176                            Err(e) => {
177                                tracing::error!(
178                                    target: "auto_schema_change",
179                                    error = ?e.as_report(), "schema change error");
180                                finish_tx.send(()).unwrap();
181                            }
182                        }
183                    }
184                }
185            });
186            Some(schema_change_tx)
187        } else {
188            info!("auto schema change is disabled in config");
189            None
190        };
191        let source_ctx = SourceContext::new(
192            self.actor_ctx.id,
193            self.stream_source_core.source_id,
194            self.actor_ctx.fragment_id,
195            self.stream_source_core.source_name.clone(),
196            source_desc.metrics.clone(),
197            SourceCtrlOpts {
198                chunk_size: limited_chunk_size(self.rate_limit_rps),
199                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
200            },
201            source_desc.source.config.clone(),
202            schema_change_tx,
203        );
204
205        (column_ids, source_ctx)
206    }
207
208    fn is_auto_schema_change_enable(&self) -> bool {
209        self.actor_ctx.config.developer.enable_auto_schema_change
210    }
211
212    /// `source_id | source_name | actor_id | fragment_id`
213    #[inline]
214    fn get_metric_labels(&self) -> [String; 4] {
215        [
216            self.stream_source_core.source_id.to_string(),
217            self.stream_source_core.source_name.clone(),
218            self.actor_ctx.id.to_string(),
219            self.actor_ctx.fragment_id.to_string(),
220        ]
221    }
222
223    /// - `should_trim_state`: whether to trim state for dropped splits.
224    ///
225    ///   For scaling, the connector splits can be migrated to other actors, but
226    ///   won't be added or removed. Actors should not trim states for splits that
227    ///   are moved to other actors.
228    ///
229    ///   For source split change, split will not be migrated and we can trim states
230    ///   for deleted splits.
231    async fn apply_split_change_after_yield_barrier<const BIASED: bool>(
232        &mut self,
233        barrier_epoch: EpochPair,
234        source_desc: &SourceDesc,
235        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
236        apply_mutation: ApplyMutationAfterBarrier<'_>,
237    ) -> StreamExecutorResult<()> {
238        {
239            let mut should_rebuild_stream = false;
240            match apply_mutation {
241                ApplyMutationAfterBarrier::SplitChange {
242                    target_splits,
243                    should_trim_state,
244                    split_change_count,
245                } => {
246                    split_change_count.inc();
247                    if self
248                        .update_state_if_changed(barrier_epoch, target_splits, should_trim_state)
249                        .await?
250                    {
251                        should_rebuild_stream = true;
252                    }
253                }
254                ApplyMutationAfterBarrier::ConnectorPropsChange => {
255                    should_rebuild_stream = true;
256                }
257            }
258
259            if should_rebuild_stream {
260                self.rebuild_stream_reader(source_desc, stream)?;
261            }
262        }
263
264        Ok(())
265    }
266
267    /// Returns `true` if split changed. Otherwise `false`.
268    async fn update_state_if_changed(
269        &mut self,
270        barrier_epoch: EpochPair,
271        target_splits: Vec<SplitImpl>,
272        should_trim_state: bool,
273    ) -> StreamExecutorResult<bool> {
274        let core = &mut self.stream_source_core;
275
276        let target_splits: HashMap<_, _> = target_splits
277            .into_iter()
278            .map(|split| (split.id(), split))
279            .collect();
280
281        let mut target_state: HashMap<SplitId, SplitImpl> =
282            HashMap::with_capacity(target_splits.len());
283
284        let mut split_changed = false;
285
286        let committed_reader = core
287            .split_state_store
288            .new_committed_reader(barrier_epoch)
289            .await?;
290
291        // Checks added splits
292        for (split_id, split) in target_splits {
293            if let Some(s) = core.latest_split_info.get(&split_id) {
294                // For existing splits, we should use the latest offset from the cache.
295                // `target_splits` is from meta and contains the initial offset.
296                target_state.insert(split_id, s.clone());
297            } else {
298                split_changed = true;
299                // write new assigned split to state cache. snapshot is base on cache.
300
301                let initial_state = if let Some(recover_state) = committed_reader
302                    .try_recover_from_state_store(&split)
303                    .await?
304                {
305                    recover_state
306                } else {
307                    split
308                };
309
310                core.updated_splits_in_epoch
311                    .entry(split_id.clone())
312                    .or_insert_with(|| initial_state.clone());
313
314                target_state.insert(split_id, initial_state);
315            }
316        }
317
318        // Checks dropped splits
319        for existing_split_id in core.latest_split_info.keys() {
320            if !target_state.contains_key(existing_split_id) {
321                tracing::info!("split dropping detected: {}", existing_split_id);
322                split_changed = true;
323            }
324        }
325
326        if split_changed {
327            tracing::info!(
328                actor_id = %self.actor_ctx.id,
329                state = ?target_state,
330                "apply split change"
331            );
332
333            core.updated_splits_in_epoch
334                .retain(|split_id, _| target_state.contains_key(split_id));
335
336            let dropped_splits = core
337                .latest_split_info
338                .extract_if(|split_id, _| !target_state.contains_key(split_id))
339                .map(|(_, split)| split)
340                .collect_vec();
341
342            if should_trim_state && !dropped_splits.is_empty() {
343                // trim dropped splits' state
344                core.split_state_store.trim_state(&dropped_splits).await?;
345            }
346
347            core.latest_split_info = target_state;
348        }
349
350        Ok(split_changed)
351    }
352
353    /// Rebuild stream if there is a err in stream
354    fn rebuild_stream_reader_from_error<const BIASED: bool>(
355        &mut self,
356        source_desc: &SourceDesc,
357        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
358        e: StreamExecutorError,
359    ) -> StreamExecutorResult<()> {
360        let core = &mut self.stream_source_core;
361        tracing::error!(
362            error = ?e.as_report(),
363            actor_id = %self.actor_ctx.id,
364            source_id = %core.source_id,
365            "stream source reader error",
366        );
367        GLOBAL_ERROR_METRICS.user_source_error.report([
368            e.variant_name().to_owned(),
369            core.source_id.to_string(),
370            core.source_name.clone(),
371            self.actor_ctx.fragment_id.to_string(),
372        ]);
373
374        self.rebuild_stream_reader(source_desc, stream)
375    }
376
377    fn rebuild_stream_reader<const BIASED: bool>(
378        &mut self,
379        source_desc: &SourceDesc,
380        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
381    ) -> StreamExecutorResult<()> {
382        let core = &mut self.stream_source_core;
383        let target_state: Vec<SplitImpl> = core.latest_split_info.values().cloned().collect();
384
385        tracing::info!(
386            "actor {:?} apply source split change to {:?}",
387            self.actor_ctx.id,
388            target_state
389        );
390
391        // Replace the source reader with a new one of the new state.
392        let reader_stream_builder = self.stream_reader_builder(source_desc.clone());
393        let reader_stream = reader_stream_builder.into_retry_stream(Some(target_state), false);
394
395        stream.replace_data_stream(reader_stream);
396
397        Ok(())
398    }
399
400    /// Handle `InjectSourceOffsets` mutation by updating split offsets in place.
401    /// Returns `true` if any offsets were injected and a stream rebuild is needed.
402    ///
403    /// This is an UNSAFE operation that can cause data duplication or loss
404    /// depending on the correctness of the provided offsets.
405    async fn handle_inject_source_offsets(
406        &mut self,
407        split_offsets: &HashMap<String, String>,
408    ) -> StreamExecutorResult<bool> {
409        tracing::warn!(
410            actor_id = %self.actor_ctx.id,
411            source_id = %self.stream_source_core.source_id,
412            num_offsets = split_offsets.len(),
413            "UNSAFE: Injecting source offsets - this may cause data duplication or loss"
414        );
415
416        // First, filter to only offsets for splits this actor owns
417        let offsets_to_inject: Vec<(String, String)> = {
418            let owned_splits: HashSet<&str> = self
419                .stream_source_core
420                .latest_split_info
421                .keys()
422                .map(|s| s.as_ref())
423                .collect();
424            split_offsets
425                .iter()
426                .filter(|(split_id, _)| owned_splits.contains(split_id.as_str()))
427                .map(|(k, v)| (k.clone(), v.clone()))
428                .collect()
429        };
430
431        // Update splits and prepare JSON states for persistence
432        let mut json_states: Vec<(String, JsonbVal)> = Vec::new();
433        let mut failed_splits: Vec<String> = Vec::new();
434
435        for (split_id, offset) in &offsets_to_inject {
436            if let Some(split) = self
437                .stream_source_core
438                .latest_split_info
439                .get_mut(split_id.as_str())
440            {
441                tracing::info!(
442                    actor_id = %self.actor_ctx.id,
443                    split_id = %split_id,
444                    offset = %offset,
445                    "Injecting offset for owned split"
446                );
447                // Update the split's offset in place
448                if let Err(e) = split.update_in_place(offset.clone()) {
449                    tracing::error!(
450                        actor_id = %self.actor_ctx.id,
451                        split_id = %split_id,
452                        error = ?e.as_report(),
453                        "Failed to update split offset"
454                    );
455                    failed_splits.push(split_id.clone());
456                    continue;
457                }
458                // Mark this split as updated for persistence
459                self.stream_source_core
460                    .updated_splits_in_epoch
461                    .insert(split_id.clone().into(), split.clone());
462                // Parse the offset as JSON and store it
463                let json_value: serde_json::Value = serde_json::from_str(offset)
464                    .unwrap_or_else(|_| serde_json::json!({ "offset": offset }));
465                json_states.push((split_id.clone(), JsonbVal::from(json_value)));
466            }
467        }
468
469        if !failed_splits.is_empty() {
470            return Err(StreamExecutorError::connector_error(anyhow!(
471                "failed to inject offsets for splits: {:?}",
472                failed_splits
473            )));
474        }
475
476        let num_injected = json_states.len();
477        if num_injected > 0 {
478            // Store the injected offsets as JSON in the state table
479            self.stream_source_core
480                .split_state_store
481                .set_states_json(json_states)
482                .await?;
483
484            tracing::info!(
485                actor_id = %self.actor_ctx.id,
486                source_id = %self.stream_source_core.source_id,
487                num_injected = num_injected,
488                "Offset injection completed for owned splits, triggering rebuild"
489            );
490            Ok(true)
491        } else {
492            tracing::info!(
493                actor_id = %self.actor_ctx.id,
494                source_id = %self.stream_source_core.source_id,
495                "No owned splits to inject offsets for"
496            );
497            Ok(false)
498        }
499    }
500
501    async fn persist_state_and_clear_cache(
502        &mut self,
503        epoch: EpochPair,
504    ) -> StreamExecutorResult<HashMap<SplitId, SplitImpl>> {
505        let core = &mut self.stream_source_core;
506
507        let cache = core
508            .updated_splits_in_epoch
509            .values()
510            .map(|split_impl| split_impl.to_owned())
511            .collect_vec();
512
513        if !cache.is_empty() {
514            tracing::debug!(state = ?cache, "take snapshot");
515
516            // Record metrics for CDC sources before moving cache
517            let source_id = core.source_id.to_string();
518            for split_impl in &cache {
519                // Extract and record CDC-specific metrics based on split type
520                match split_impl {
521                    SplitImpl::PostgresCdc(pg_split) => {
522                        if let Some(lsn_value) = pg_split.pg_lsn() {
523                            self.metrics
524                                .pg_cdc_state_table_lsn
525                                .with_guarded_label_values(&[&source_id])
526                                .set(lsn_value as i64);
527                        }
528                    }
529                    SplitImpl::MysqlCdc(mysql_split) => {
530                        if let Some((file_seq, position)) = mysql_split.mysql_binlog_offset() {
531                            self.metrics
532                                .mysql_cdc_state_binlog_file_seq
533                                .with_guarded_label_values(&[&source_id])
534                                .set(file_seq as i64);
535
536                            self.metrics
537                                .mysql_cdc_state_binlog_position
538                                .with_guarded_label_values(&[&source_id])
539                                .set(position as i64);
540                        }
541                    }
542                    _ => {}
543                }
544            }
545
546            core.split_state_store.set_states(cache).await?;
547        }
548
549        // commit anyway, even if no message saved
550        core.split_state_store.commit(epoch).await?;
551
552        let updated_splits = core.updated_splits_in_epoch.clone();
553
554        core.updated_splits_in_epoch.clear();
555
556        Ok(updated_splits)
557    }
558
559    /// try mem table spill
560    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
561        let core = &mut self.stream_source_core;
562        core.split_state_store.try_flush().await?;
563
564        Ok(())
565    }
566
567    /// Report CDC source offset updated only the first time.
568    fn maybe_report_cdc_source_offset(
569        &self,
570        updated_splits: &HashMap<SplitId, SplitImpl>,
571        epoch: EpochPair,
572        source_id: SourceId,
573        must_report_cdc_offset_once: &mut bool,
574        must_wait_cdc_offset_before_report: bool,
575    ) {
576        // Report CDC source offset updated only the first time
577        if *must_report_cdc_offset_once
578            && (!must_wait_cdc_offset_before_report
579                || updated_splits
580                    .values()
581                    .any(|split| split.is_cdc_split() && !split.get_cdc_split_offset().is_empty()))
582        {
583            // Report only once, then ignore all subsequent offset updates
584            self.barrier_manager.report_cdc_source_offset_updated(
585                epoch,
586                self.actor_ctx.id,
587                source_id,
588            );
589            tracing::info!(
590                actor_id = %self.actor_ctx.id,
591                source_id = %source_id,
592                epoch = ?epoch,
593                "Reported CDC source offset updated to meta (first time only)"
594            );
595            // Mark as reported to prevent any future reports, even if offset changes
596            *must_report_cdc_offset_once = false;
597        }
598    }
599
600    /// A source executor with a stream source receives:
601    /// 1. Barrier messages
602    /// 2. Data from external source
603    /// and acts accordingly.
604    #[try_stream(ok = Message, error = StreamExecutorError)]
605    async fn execute_inner(mut self) {
606        let mut barrier_receiver = self.barrier_receiver.take().unwrap();
607        let first_barrier = barrier_receiver
608            .recv()
609            .instrument_await("source_recv_first_barrier")
610            .await
611            .ok_or_else(|| {
612                anyhow!(
613                    "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
614                    self.actor_ctx.id,
615                    self.stream_source_core.source_id
616                )
617            })?;
618        let first_epoch = first_barrier.epoch;
619        // must_report_cdc_offset is true if and only if the source is a CDC source.
620        // must_wait_cdc_offset_before_report is true if and only if the source is a MySQL or SQL Server CDC source.
621        let (mut boot_state, mut must_report_cdc_offset_once, must_wait_cdc_offset_before_report) =
622            if let Some(splits) = first_barrier.initial_split_assignment(self.actor_ctx.id) {
623                // CDC source must reach this branch.
624                tracing::debug!(?splits, "boot with splits");
625                // Skip report for non-CDC.
626                let must_report_cdc_offset_once = splits.iter().any(|split| split.is_cdc_split());
627                // Only for MySQL and SQL Server CDC, we need to wait for the offset to be non-empty before reporting.
628                let must_wait_cdc_offset_before_report = must_report_cdc_offset_once
629                    && splits.iter().any(|split| {
630                        matches!(split, SplitImpl::MysqlCdc(_) | SplitImpl::SqlServerCdc(_))
631                    });
632                (
633                    splits.to_vec(),
634                    must_report_cdc_offset_once,
635                    must_wait_cdc_offset_before_report,
636                )
637            } else {
638                (Vec::default(), true, true)
639            };
640        let is_pause_on_startup = first_barrier.is_pause_on_startup();
641        let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id);
642
643        yield Message::Barrier(first_barrier);
644
645        let mut core = self.stream_source_core;
646        let source_id = core.source_id;
647
648        // Build source description from the builder.
649        let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap();
650        let mut source_desc = source_desc_builder
651            .build()
652            .map_err(StreamExecutorError::connector_error)?;
653
654        let mut wait_checkpoint_task_builder = Self::spawn_wait_checkpoint_worker(
655            &core,
656            source_desc.source.clone(),
657            self.metrics.clone(),
658        )
659        .await?;
660
661        let (Some(split_idx), Some(offset_idx), pulsar_message_id_idx) =
662            get_split_offset_col_idx(&source_desc.columns)
663        else {
664            unreachable!("Partition and offset columns must be set.");
665        };
666
667        core.split_state_store.init_epoch(first_epoch).await?;
668        {
669            let committed_reader = core
670                .split_state_store
671                .new_committed_reader(first_epoch)
672                .await?;
673            for ele in &mut boot_state {
674                if let Some(recover_state) =
675                    committed_reader.try_recover_from_state_store(ele).await?
676                {
677                    *ele = recover_state;
678                    // if state store is non-empty, we consider it's initialized.
679                    is_uninitialized = false;
680                } else {
681                    // This is a new split, not in state table.
682                    // make sure it is written to state table later.
683                    // Then even it receives no messages, we can observe it in state table.
684                    core.updated_splits_in_epoch.insert(ele.id(), ele.clone());
685                }
686            }
687        }
688
689        // init in-memory split states with persisted state if any
690        core.init_split_state(boot_state.clone());
691
692        // Return the ownership of `stream_source_core` to the source executor.
693        self.stream_source_core = core;
694
695        let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
696        tracing::debug!(state = ?recover_state, "start with state");
697
698        let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
699        let mut reader_stream_builder = self.stream_reader_builder(source_desc.clone());
700        let mut latest_splits = None;
701        // Build the source stream reader.
702        if is_uninitialized {
703            let create_split_reader_result = reader_stream_builder
704                .fetch_latest_splits(recover_state.clone(), self.is_shared_non_cdc)
705                .await?;
706            latest_splits = create_split_reader_result.latest_splits;
707        }
708
709        if let Some(latest_splits) = latest_splits {
710            // make sure it is written to state table later.
711            // Then even it receives no messages, we can observe it in state table.
712            self.stream_source_core
713                .updated_splits_in_epoch
714                .extend(latest_splits.into_iter().map(|s| (s.id(), s)));
715        }
716        // Merge the chunks from source and the barriers into a single stream. We prioritize
717        // barriers over source data chunks here.
718        let mut stream = StreamReaderWithPause::<true, StreamChunkWithState>::new(
719            barrier_stream,
720            reader_stream_builder
721                .into_retry_stream(recover_state, is_uninitialized && self.is_shared_non_cdc),
722        );
723        let mut command_paused = false;
724
725        // - If the first barrier requires us to pause on startup, pause the stream.
726        if is_pause_on_startup {
727            tracing::info!("source paused on startup");
728            stream.pause_stream();
729            command_paused = true;
730        }
731
732        // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms`
733        // milliseconds, considering some other latencies like network and cost in Meta.
734        let mut max_wait_barrier_time_ms =
735            self.system_params.load().barrier_interval_ms() as u128 * WAIT_BARRIER_MULTIPLE_TIMES;
736        let mut last_barrier_time = Instant::now();
737        let mut self_paused = false;
738
739        let source_output_row_count = self
740            .metrics
741            .source_output_row_count
742            .with_guarded_label_values(&self.get_metric_labels());
743
744        let source_split_change_count = self
745            .metrics
746            .source_split_change_count
747            .with_guarded_label_values(&self.get_metric_labels());
748
749        while let Some(msg) = stream.next().await {
750            let Ok(msg) = msg else {
751                tokio::time::sleep(Duration::from_millis(1000)).await;
752                self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err())?;
753                continue;
754            };
755
756            match msg {
757                // This branch will be preferred.
758                Either::Left(Message::Barrier(barrier)) => {
759                    last_barrier_time = Instant::now();
760
761                    if self_paused {
762                        self_paused = false;
763                        // command_paused has a higher priority.
764                        if !command_paused {
765                            stream.resume_stream();
766                        }
767                    }
768
769                    let epoch = barrier.epoch;
770                    let mut split_change = None;
771
772                    if let Some(mutation) = barrier.mutation.as_deref() {
773                        match mutation {
774                            Mutation::Pause => {
775                                command_paused = true;
776                                stream.pause_stream()
777                            }
778                            Mutation::Resume => {
779                                command_paused = false;
780                                stream.resume_stream()
781                            }
782                            Mutation::SourceChangeSplit(actor_splits) => {
783                                tracing::info!(
784                                    actor_id = %self.actor_ctx.id,
785                                    actor_splits = ?actor_splits,
786                                    "source change split received"
787                                );
788
789                                split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
790                                    |target_splits| {
791                                        (
792                                            &source_desc,
793                                            &mut stream,
794                                            ApplyMutationAfterBarrier::SplitChange {
795                                                target_splits,
796                                                should_trim_state: true,
797                                                split_change_count: &source_split_change_count,
798                                            },
799                                        )
800                                    },
801                                );
802                            }
803
804                            Mutation::ConnectorPropsChange(maybe_mutation) => {
805                                if let Some(new_props) = maybe_mutation.get(&source_id.as_raw_id())
806                                {
807                                    // rebuild the stream reader with new props
808                                    tracing::info!(
809                                        actor_id = %self.actor_ctx.id,
810                                        source_id = %source_id,
811                                        "updating source connector properties",
812                                    );
813                                    source_desc.update_reader(new_props.clone())?;
814                                    // suppose the connector props change will not involve state change
815                                    split_change = Some((
816                                        &source_desc,
817                                        &mut stream,
818                                        ApplyMutationAfterBarrier::ConnectorPropsChange,
819                                    ));
820                                }
821                            }
822
823                            Mutation::Update(UpdateMutation { actor_splits, .. }) => {
824                                split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
825                                    |target_splits| {
826                                        (
827                                            &source_desc,
828                                            &mut stream,
829                                            ApplyMutationAfterBarrier::SplitChange {
830                                                target_splits,
831                                                should_trim_state: false,
832                                                split_change_count: &source_split_change_count,
833                                            },
834                                        )
835                                    },
836                                );
837                            }
838                            Mutation::Throttle(fragment_to_apply) => {
839                                if let Some(entry) =
840                                    fragment_to_apply.get(&self.actor_ctx.fragment_id)
841                                    && entry.throttle_type() == ThrottleType::Source
842                                    && entry.rate_limit != self.rate_limit_rps
843                                {
844                                    tracing::info!(
845                                        "updating rate limit from {:?} to {:?}",
846                                        self.rate_limit_rps,
847                                        entry.rate_limit
848                                    );
849                                    self.rate_limit_rps = entry.rate_limit;
850                                    // recreate from latest_split_info
851                                    self.rebuild_stream_reader(&source_desc, &mut stream)?;
852                                }
853                            }
854                            Mutation::ResetSource { source_id } => {
855                                // Note: RESET SOURCE only clears the offset, does NOT pause the source.
856                                // When offset is None, after recovery/restart, Debezium will automatically
857                                // enter recovery mode and fetch the latest offset from upstream.
858                                if *source_id == self.stream_source_core.source_id {
859                                    tracing::info!(
860                                        actor_id = %self.actor_ctx.id,
861                                        source_id = source_id.as_raw_id(),
862                                        "Resetting CDC source: clearing offset (set to None)"
863                                    );
864
865                                    // Step 1: Collect all current splits and clear their offsets
866                                    let splits_with_cleared_offset: Vec<SplitImpl> = self.stream_source_core
867                                        .latest_split_info
868                                        .values()
869                                        .map(|split| {
870                                            // Clone the split and clear its offset
871                                            let mut new_split = split.clone();
872                                            match &mut new_split {
873                                                SplitImpl::MysqlCdc(debezium_split) => {
874                                                    if let Some(mysql_split) = debezium_split.mysql_split.as_mut() {
875                                                        tracing::info!(
876                                                            split_id = ?mysql_split.inner.split_id,
877                                                            old_offset = ?mysql_split.inner.start_offset,
878                                                            "Clearing MySQL CDC offset"
879                                                        );
880                                                        mysql_split.inner.start_offset = None;
881                                                    }
882                                                }
883                                                SplitImpl::PostgresCdc(debezium_split) => {
884                                                    if let Some(pg_split) = debezium_split.postgres_split.as_mut() {
885                                                        tracing::info!(
886                                                            split_id = ?pg_split.inner.split_id,
887                                                            old_offset = ?pg_split.inner.start_offset,
888                                                            "Clearing PostgreSQL CDC offset"
889                                                        );
890                                                        pg_split.inner.start_offset = None;
891                                                    }
892                                                }
893                                                SplitImpl::MongodbCdc(debezium_split) => {
894                                                    if let Some(mongo_split) = debezium_split.mongodb_split.as_mut() {
895                                                        tracing::info!(
896                                                            split_id = ?mongo_split.inner.split_id,
897                                                            old_offset = ?mongo_split.inner.start_offset,
898                                                            "Clearing MongoDB CDC offset"
899                                                        );
900                                                        mongo_split.inner.start_offset = None;
901                                                    }
902                                                }
903                                                SplitImpl::CitusCdc(debezium_split) => {
904                                                    if let Some(citus_split) = debezium_split.citus_split.as_mut() {
905                                                        tracing::info!(
906                                                            split_id = ?citus_split.inner.split_id,
907                                                            old_offset = ?citus_split.inner.start_offset,
908                                                            "Clearing Citus CDC offset"
909                                                        );
910                                                        citus_split.inner.start_offset = None;
911                                                    }
912                                                }
913                                                SplitImpl::SqlServerCdc(debezium_split) => {
914                                                    if let Some(sqlserver_split) = debezium_split.sql_server_split.as_mut() {
915                                                        tracing::info!(
916                                                            split_id = ?sqlserver_split.inner.split_id,
917                                                            old_offset = ?sqlserver_split.inner.start_offset,
918                                                            "Clearing SQL Server CDC offset"
919                                                        );
920                                                        sqlserver_split.inner.start_offset = None;
921                                                    }
922                                                }
923                                                _ => {
924                                                    tracing::warn!(
925                                                        "RESET SOURCE called on non-CDC split type"
926                                                    );
927                                                }
928                                            }
929                                            new_split
930                                        })
931                                        .collect();
932
933                                    if !splits_with_cleared_offset.is_empty() {
934                                        tracing::info!(
935                                            actor_id = %self.actor_ctx.id,
936                                            split_count = splits_with_cleared_offset.len(),
937                                            "Updating state table with cleared offsets"
938                                        );
939
940                                        // Step 2: Write splits back to state table with offset = None
941                                        self.stream_source_core
942                                            .split_state_store
943                                            .set_states(splits_with_cleared_offset.clone())
944                                            .await?;
945
946                                        // Step 3: Update in-memory split info with cleared offsets
947                                        for split in splits_with_cleared_offset {
948                                            self.stream_source_core
949                                                .latest_split_info
950                                                .insert(split.id(), split.clone());
951                                            self.stream_source_core
952                                                .updated_splits_in_epoch
953                                                .insert(split.id(), split);
954                                        }
955
956                                        tracing::info!(
957                                            actor_id = %self.actor_ctx.id,
958                                            source_id = source_id.as_raw_id(),
959                                            "RESET SOURCE completed: offset cleared (set to None). \
960                                             Trigger recovery/restart to fetch latest offset from upstream."
961                                        );
962                                    } else {
963                                        tracing::warn!(
964                                            actor_id = %self.actor_ctx.id,
965                                            "No splits found to reset - source may not be initialized yet"
966                                        );
967                                    }
968                                } else {
969                                    tracing::debug!(
970                                        actor_id = %self.actor_ctx.id,
971                                        target_source_id = source_id.as_raw_id(),
972                                        current_source_id = self.stream_source_core.source_id.as_raw_id(),
973                                        "ResetSource mutation for different source, ignoring"
974                                    );
975                                }
976                            }
977
978                            Mutation::InjectSourceOffsets {
979                                source_id,
980                                split_offsets,
981                            } => {
982                                if *source_id == self.stream_source_core.source_id {
983                                    if self.handle_inject_source_offsets(split_offsets).await? {
984                                        // Trigger a rebuild to apply the new offsets
985                                        split_change = Some((
986                                            &source_desc,
987                                            &mut stream,
988                                            ApplyMutationAfterBarrier::ConnectorPropsChange,
989                                        ));
990                                    }
991                                } else {
992                                    tracing::debug!(
993                                        actor_id = %self.actor_ctx.id,
994                                        target_source_id = source_id.as_raw_id(),
995                                        current_source_id = self.stream_source_core.source_id.as_raw_id(),
996                                        "InjectSourceOffsets mutation for different source, ignoring"
997                                    );
998                                }
999                            }
1000
1001                            _ => {}
1002                        }
1003                    }
1004
1005                    let updated_splits = self.persist_state_and_clear_cache(epoch).await?;
1006
1007                    self.maybe_report_cdc_source_offset(
1008                        &updated_splits,
1009                        epoch,
1010                        source_id,
1011                        &mut must_report_cdc_offset_once,
1012                        must_wait_cdc_offset_before_report,
1013                    );
1014
1015                    // when handle a checkpoint barrier, spawn a task to wait for epoch commit notification
1016                    if barrier.kind.is_checkpoint()
1017                        && let Some(task_builder) = &mut wait_checkpoint_task_builder
1018                    {
1019                        task_builder.update_task_on_checkpoint(updated_splits);
1020
1021                        tracing::debug!("epoch to wait {:?}", epoch);
1022                        task_builder.send(Epoch(epoch.prev)).await?
1023                    }
1024
1025                    let barrier_epoch = barrier.epoch;
1026                    yield Message::Barrier(barrier);
1027
1028                    if let Some((source_desc, stream, to_apply_mutation)) = split_change {
1029                        self.apply_split_change_after_yield_barrier(
1030                            barrier_epoch,
1031                            source_desc,
1032                            stream,
1033                            to_apply_mutation,
1034                        )
1035                        .await?;
1036                    }
1037                }
1038                Either::Left(_) => {
1039                    // For the source executor, the message we receive from this arm
1040                    // should always be barrier message.
1041                    unreachable!();
1042                }
1043
1044                Either::Right((chunk, latest_state)) => {
1045                    if let Some(task_builder) = &mut wait_checkpoint_task_builder {
1046                        if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
1047                            let pulsar_message_id_col = chunk.column_at(pulsar_message_id_idx);
1048                            task_builder.update_task_on_chunk(
1049                                source_id,
1050                                &latest_state,
1051                                pulsar_message_id_col.clone(),
1052                            );
1053                        } else {
1054                            let offset_col = chunk.column_at(offset_idx);
1055                            task_builder.update_task_on_chunk(
1056                                source_id,
1057                                &latest_state,
1058                                offset_col.clone(),
1059                            );
1060                        }
1061                    }
1062                    if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
1063                        // Exceeds the max wait barrier time, the source will be paused.
1064                        // Currently we can guarantee the
1065                        // source is not paused since it received stream
1066                        // chunks.
1067                        self_paused = true;
1068                        tracing::warn!(
1069                            "source paused, wait barrier for {:?}",
1070                            last_barrier_time.elapsed()
1071                        );
1072                        stream.pause_stream();
1073
1074                        // Only update `max_wait_barrier_time_ms` to capture
1075                        // `barrier_interval_ms`
1076                        // changes here to avoid frequently accessing the shared
1077                        // `system_params`.
1078                        max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
1079                            as u128
1080                            * WAIT_BARRIER_MULTIPLE_TIMES;
1081                    }
1082
1083                    latest_state.iter().for_each(|(split_id, new_split_impl)| {
1084                        if let Some(split_impl) =
1085                            self.stream_source_core.latest_split_info.get_mut(split_id)
1086                        {
1087                            *split_impl = new_split_impl.clone();
1088                        }
1089                    });
1090
1091                    self.stream_source_core
1092                        .updated_splits_in_epoch
1093                        .extend(latest_state);
1094
1095                    let card = chunk.cardinality();
1096                    if card == 0 {
1097                        continue;
1098                    }
1099                    source_output_row_count.inc_by(card as u64);
1100                    let to_remove_col_indices =
1101                        if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
1102                            vec![split_idx, offset_idx, pulsar_message_id_idx]
1103                        } else {
1104                            vec![split_idx, offset_idx]
1105                        };
1106                    let chunk =
1107                        prune_additional_cols(&chunk, &to_remove_col_indices, &source_desc.columns);
1108                    yield Message::Chunk(chunk);
1109                    self.try_flush_data().await?;
1110                }
1111            }
1112        }
1113
1114        // The source executor should only be stopped by the actor when finding a `Stop` mutation.
1115        tracing::error!(
1116            actor_id = %self.actor_ctx.id,
1117            "source executor exited unexpectedly"
1118        )
1119    }
1120}
1121
1122#[derive(Debug, Clone)]
1123enum ApplyMutationAfterBarrier<'a> {
1124    SplitChange {
1125        target_splits: Vec<SplitImpl>,
1126        should_trim_state: bool,
1127        split_change_count: &'a LabelGuardedMetric<GenericCounter<AtomicU64>>,
1128    },
1129    ConnectorPropsChange,
1130}
1131
1132impl<S: StateStore> Execute for SourceExecutor<S> {
1133    fn execute(self: Box<Self>) -> BoxedMessageStream {
1134        self.execute_inner().boxed()
1135    }
1136}
1137
1138impl<S: StateStore> Debug for SourceExecutor<S> {
1139    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1140        f.debug_struct("SourceExecutor")
1141            .field("source_id", &self.stream_source_core.source_id)
1142            .field("column_ids", &self.stream_source_core.column_ids)
1143            .finish()
1144    }
1145}
1146
1147struct WaitCheckpointTaskBuilder {
1148    wait_checkpoint_tx: UnboundedSender<(Epoch, WaitCheckpointTask)>,
1149    source_reader: SourceReader,
1150    building_task: WaitCheckpointTask,
1151}
1152
1153impl WaitCheckpointTaskBuilder {
1154    fn update_task_on_chunk(
1155        &mut self,
1156        source_id: SourceId,
1157        latest_state: &HashMap<SplitId, SplitImpl>,
1158        offset_col: ArrayRef,
1159    ) {
1160        match &mut self.building_task {
1161            WaitCheckpointTask::AckPubsubMessage(_, arrays) => {
1162                arrays.push(offset_col);
1163            }
1164            WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => {
1165                arrays.push(offset_col);
1166            }
1167            WaitCheckpointTask::AckPulsarMessage(arrays) => {
1168                // each pulsar chunk will only contain one split
1169                let split_id = latest_state.keys().next().unwrap();
1170                let pulsar_ack_channel_id = build_pulsar_ack_channel_id(source_id, split_id);
1171                arrays.push((pulsar_ack_channel_id, offset_col));
1172            }
1173            WaitCheckpointTask::CommitCdcOffset(_) => {}
1174        }
1175    }
1176
1177    fn update_task_on_checkpoint(&mut self, updated_splits: HashMap<SplitId, SplitImpl>) {
1178        #[expect(clippy::single_match)]
1179        match &mut self.building_task {
1180            WaitCheckpointTask::CommitCdcOffset(offsets) => {
1181                if !updated_splits.is_empty() {
1182                    // cdc source only has one split
1183                    assert_eq!(1, updated_splits.len());
1184                    for (split_id, split_impl) in updated_splits {
1185                        if split_impl.is_cdc_split() {
1186                            *offsets = Some((split_id, split_impl.get_cdc_split_offset()));
1187                        } else {
1188                            unreachable!()
1189                        }
1190                    }
1191                }
1192            }
1193            _ => {}
1194        }
1195    }
1196
1197    /// Send and reset the building task to a new one.
1198    async fn send(&mut self, epoch: Epoch) -> Result<(), anyhow::Error> {
1199        let new_task = self
1200            .source_reader
1201            .create_wait_checkpoint_task()
1202            .await?
1203            .expect("wait checkpoint task should be created");
1204        self.wait_checkpoint_tx
1205            .send((epoch, std::mem::replace(&mut self.building_task, new_task)))
1206            .expect("wait_checkpoint_tx send should succeed");
1207        Ok(())
1208    }
1209}
1210
1211/// A worker used to do some work after each checkpoint epoch is committed.
1212///
1213/// # Usage Cases
1214///
1215/// Typically there are 2 issues related with ack on checkpoint:
1216///
1217/// 1. Correctness (at-least-once), or don't let upstream clean uncommitted data.
1218///    For message queueing semantics (delete after ack), we should ack to avoid redelivery,
1219///    and only ack after checkpoint to avoid data loss.
1220///
1221/// 2. Allow upstream to clean data after commit.
1222///
1223/// See also <https://github.com/risingwavelabs/risingwave/issues/16736#issuecomment-2109379790>
1224///
1225/// ## CDC
1226///
1227/// Commit last consumed offset to upstream DB, so that old data can be discarded.
1228///
1229/// ## Google Pub/Sub
1230///
1231/// Due to queueing semantics.
1232/// Although Pub/Sub supports `retain_acked_messages` and `seek` functionality,
1233/// it's quite limited unlike Kafka.
1234///
1235/// See also <https://cloud.google.com/pubsub/docs/subscribe-best-practices#process-messages>
1236struct WaitCheckpointWorker<S: StateStore> {
1237    wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>,
1238    state_store: S,
1239    table_id: TableId,
1240    metrics: Arc<StreamingMetrics>,
1241}
1242
1243impl<S: StateStore> WaitCheckpointWorker<S> {
1244    pub async fn run(mut self) {
1245        tracing::debug!("wait epoch worker start success");
1246        loop {
1247            // poll the rx and wait for the epoch commit
1248            match self.wait_checkpoint_rx.recv().await {
1249                Some((epoch, task)) => {
1250                    tracing::debug!("start to wait epoch {}", epoch.0);
1251                    let ret = self
1252                        .state_store
1253                        .try_wait_epoch(
1254                            HummockReadEpoch::Committed(epoch.0),
1255                            TryWaitEpochOptions {
1256                                table_id: self.table_id,
1257                            },
1258                        )
1259                        .await;
1260
1261                    match ret {
1262                        Ok(()) => {
1263                            tracing::debug!(epoch = epoch.0, "wait epoch success");
1264
1265                            // Run task with callback to record LSN after successful commit
1266                            task.run_with_on_commit_success(|source_id: u64, offset| {
1267                                if let Some(lsn_value) =
1268                                    extract_postgres_lsn_from_offset_str(offset)
1269                                {
1270                                    self.metrics
1271                                        .pg_cdc_jni_commit_offset_lsn
1272                                        .with_guarded_label_values(&[&source_id.to_string()])
1273                                        .set(lsn_value as i64);
1274                                }
1275                            })
1276                            .await;
1277                        }
1278                        Err(e) => {
1279                            tracing::error!(
1280                            error = %e.as_report(),
1281                            "wait epoch {} failed", epoch.0
1282                            );
1283                        }
1284                    }
1285                }
1286                None => {
1287                    tracing::error!("wait epoch rx closed");
1288                    break;
1289                }
1290            }
1291        }
1292    }
1293}
1294
1295#[cfg(test)]
1296mod tests {
1297    use maplit::{btreemap, convert_args, hashmap};
1298    use risingwave_common::catalog::{ColumnId, Field};
1299    use risingwave_common::id::SourceId;
1300    use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
1301    use risingwave_common::test_prelude::StreamChunkTestExt;
1302    use risingwave_common::util::epoch::{EpochExt, test_epoch};
1303    use risingwave_connector::source::datagen::DatagenSplit;
1304    use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
1305    use risingwave_pb::catalog::StreamSourceInfo;
1306    use risingwave_pb::plan_common::PbRowFormatType;
1307    use risingwave_storage::memory::MemoryStateStore;
1308    use tokio::sync::mpsc::unbounded_channel;
1309    use tracing_test::traced_test;
1310
1311    use super::*;
1312    use crate::executor::AddMutation;
1313    use crate::executor::source::{SourceStateTableHandler, default_source_internal_table};
1314    use crate::task::LocalBarrierManager;
1315
1316    const MOCK_SOURCE_NAME: &str = "mock_source";
1317
1318    #[tokio::test]
1319    async fn test_source_executor() {
1320        let source_id = 0.into();
1321        let schema = Schema {
1322            fields: vec![Field::with_name(DataType::Int32, "sequence_int")],
1323        };
1324        let row_id_index = None;
1325        let source_info = StreamSourceInfo {
1326            row_format: PbRowFormatType::Native as i32,
1327            ..Default::default()
1328        };
1329        let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1330        let column_ids = vec![0].into_iter().map(ColumnId::from).collect();
1331
1332        // This datagen will generate 3 rows at one time.
1333        let properties = convert_args!(btreemap!(
1334            "connector" => "datagen",
1335            "datagen.rows.per.second" => "3",
1336            "fields.sequence_int.kind" => "sequence",
1337            "fields.sequence_int.start" => "11",
1338            "fields.sequence_int.end" => "11111",
1339        ));
1340        let source_desc_builder =
1341            create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1342        let split_state_store = SourceStateTableHandler::from_table_catalog(
1343            &default_source_internal_table(0x2333),
1344            MemoryStateStore::new(),
1345        )
1346        .await;
1347        let core = StreamSourceCore::<MemoryStateStore> {
1348            source_id,
1349            column_ids,
1350            source_desc_builder: Some(source_desc_builder),
1351            latest_split_info: HashMap::new(),
1352            split_state_store,
1353            updated_splits_in_epoch: HashMap::new(),
1354            source_name: MOCK_SOURCE_NAME.to_owned(),
1355        };
1356
1357        let system_params_manager = LocalSystemParamsManager::for_test();
1358
1359        let executor = SourceExecutor::new(
1360            ActorContext::for_test(0),
1361            core,
1362            Arc::new(StreamingMetrics::unused()),
1363            barrier_rx,
1364            system_params_manager.get_params(),
1365            None,
1366            false,
1367            LocalBarrierManager::for_test(),
1368        );
1369        let mut executor = executor.boxed().execute();
1370
1371        let init_barrier =
1372            Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
1373                splits: hashmap! {
1374                    ActorId::default() => vec![
1375                        SplitImpl::Datagen(DatagenSplit {
1376                            split_index: 0,
1377                            split_num: 1,
1378                            start_offset: None,
1379                        }),
1380                    ],
1381                },
1382                ..Default::default()
1383            }));
1384        barrier_tx.send(init_barrier).unwrap();
1385
1386        // Consume barrier.
1387        executor.next().await.unwrap().unwrap();
1388
1389        // Consume data chunk.
1390        let msg = executor.next().await.unwrap().unwrap();
1391
1392        // Row id will not be filled here.
1393        assert_eq!(
1394            msg.into_chunk().unwrap(),
1395            StreamChunk::from_pretty(
1396                " i
1397                + 11
1398                + 12
1399                + 13"
1400            )
1401        );
1402    }
1403
1404    #[traced_test]
1405    #[tokio::test]
1406    async fn test_split_change_mutation() {
1407        let source_id = SourceId::new(0);
1408        let schema = Schema {
1409            fields: vec![Field::with_name(DataType::Int32, "v1")],
1410        };
1411        let row_id_index = None;
1412        let source_info = StreamSourceInfo {
1413            row_format: PbRowFormatType::Native as i32,
1414            ..Default::default()
1415        };
1416        let properties = convert_args!(btreemap!(
1417            "connector" => "datagen",
1418            "fields.v1.kind" => "sequence",
1419            "fields.v1.start" => "11",
1420            "fields.v1.end" => "11111",
1421        ));
1422
1423        let source_desc_builder =
1424            create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1425        let mem_state_store = MemoryStateStore::new();
1426
1427        let column_ids = vec![ColumnId::from(0)];
1428        let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1429        let split_state_store = SourceStateTableHandler::from_table_catalog(
1430            &default_source_internal_table(0x2333),
1431            mem_state_store.clone(),
1432        )
1433        .await;
1434
1435        let core = StreamSourceCore::<MemoryStateStore> {
1436            source_id,
1437            column_ids: column_ids.clone(),
1438            source_desc_builder: Some(source_desc_builder),
1439            latest_split_info: HashMap::new(),
1440            split_state_store,
1441            updated_splits_in_epoch: HashMap::new(),
1442            source_name: MOCK_SOURCE_NAME.to_owned(),
1443        };
1444
1445        let system_params_manager = LocalSystemParamsManager::for_test();
1446
1447        let executor = SourceExecutor::new(
1448            ActorContext::for_test(0),
1449            core,
1450            Arc::new(StreamingMetrics::unused()),
1451            barrier_rx,
1452            system_params_manager.get_params(),
1453            None,
1454            false,
1455            LocalBarrierManager::for_test(),
1456        );
1457        let mut handler = executor.boxed().execute();
1458
1459        let mut epoch = test_epoch(1);
1460        let init_barrier =
1461            Barrier::new_test_barrier(epoch).with_mutation(Mutation::Add(AddMutation {
1462                splits: hashmap! {
1463                    ActorId::default() => vec![
1464                        SplitImpl::Datagen(DatagenSplit {
1465                            split_index: 0,
1466                            split_num: 3,
1467                            start_offset: None,
1468                        }),
1469                    ],
1470                },
1471                ..Default::default()
1472            }));
1473        barrier_tx.send(init_barrier).unwrap();
1474
1475        // Consume barrier.
1476        handler
1477            .next()
1478            .await
1479            .unwrap()
1480            .unwrap()
1481            .into_barrier()
1482            .unwrap();
1483
1484        let mut ready_chunks = handler.ready_chunks(10);
1485
1486        let _ = ready_chunks.next().await.unwrap();
1487
1488        let new_assignment = vec![
1489            SplitImpl::Datagen(DatagenSplit {
1490                split_index: 0,
1491                split_num: 3,
1492                start_offset: None,
1493            }),
1494            SplitImpl::Datagen(DatagenSplit {
1495                split_index: 1,
1496                split_num: 3,
1497                start_offset: None,
1498            }),
1499            SplitImpl::Datagen(DatagenSplit {
1500                split_index: 2,
1501                split_num: 3,
1502                start_offset: None,
1503            }),
1504        ];
1505
1506        epoch.inc_epoch();
1507        let change_split_mutation =
1508            Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1509                ActorId::default() => new_assignment.clone()
1510            }));
1511
1512        barrier_tx.send(change_split_mutation).unwrap();
1513
1514        let _ = ready_chunks.next().await.unwrap(); // barrier
1515
1516        epoch.inc_epoch();
1517        let barrier = Barrier::new_test_barrier(epoch);
1518        barrier_tx.send(barrier).unwrap();
1519
1520        ready_chunks.next().await.unwrap(); // barrier
1521
1522        let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1523            &default_source_internal_table(0x2333),
1524            mem_state_store.clone(),
1525        )
1526        .await;
1527
1528        // there must exist state for new add partition
1529        source_state_handler
1530            .init_epoch(EpochPair::new_test_epoch(epoch))
1531            .await
1532            .unwrap();
1533        source_state_handler
1534            .get(&new_assignment[1].id())
1535            .await
1536            .unwrap()
1537            .unwrap();
1538
1539        tokio::time::sleep(Duration::from_millis(100)).await;
1540
1541        let _ = ready_chunks.next().await.unwrap();
1542
1543        epoch.inc_epoch();
1544        let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Pause);
1545        barrier_tx.send(barrier).unwrap();
1546
1547        epoch.inc_epoch();
1548        let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Resume);
1549        barrier_tx.send(barrier).unwrap();
1550
1551        // receive all
1552        ready_chunks.next().await.unwrap();
1553
1554        let prev_assignment = new_assignment;
1555        let new_assignment = vec![prev_assignment[2].clone()];
1556
1557        epoch.inc_epoch();
1558        let drop_split_mutation =
1559            Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1560                ActorId::default() => new_assignment.clone()
1561            }));
1562
1563        barrier_tx.send(drop_split_mutation).unwrap();
1564
1565        ready_chunks.next().await.unwrap(); // barrier
1566
1567        epoch.inc_epoch();
1568        let barrier = Barrier::new_test_barrier(epoch);
1569        barrier_tx.send(barrier).unwrap();
1570
1571        ready_chunks.next().await.unwrap(); // barrier
1572
1573        let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1574            &default_source_internal_table(0x2333),
1575            mem_state_store.clone(),
1576        )
1577        .await;
1578
1579        let new_epoch = EpochPair::new_test_epoch(epoch);
1580        source_state_handler.init_epoch(new_epoch).await.unwrap();
1581
1582        let committed_reader = source_state_handler
1583            .new_committed_reader(new_epoch)
1584            .await
1585            .unwrap();
1586        assert!(
1587            committed_reader
1588                .try_recover_from_state_store(&prev_assignment[0])
1589                .await
1590                .unwrap()
1591                .is_none()
1592        );
1593
1594        assert!(
1595            committed_reader
1596                .try_recover_from_state_store(&prev_assignment[1])
1597                .await
1598                .unwrap()
1599                .is_none()
1600        );
1601
1602        assert!(
1603            committed_reader
1604                .try_recover_from_state_store(&prev_assignment[2])
1605                .await
1606                .unwrap()
1607                .is_some()
1608        );
1609    }
1610}