risingwave_stream/executor/source/
source_executor.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::time::Duration;
17
18use anyhow::anyhow;
19use either::Either;
20use itertools::Itertools;
21use prometheus::core::{AtomicU64, GenericCounter};
22use risingwave_common::array::ArrayRef;
23use risingwave_common::catalog::{ColumnId, TableId};
24use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedMetric};
25use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::types::JsonbVal;
28use risingwave_common::util::epoch::{Epoch, EpochPair};
29use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
30use risingwave_connector::source::cdc::split::{
31    extract_postgres_lsn_from_offset_str, extract_sql_server_commit_lsn_from_offset_str,
32};
33use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
34use risingwave_connector::source::reader::reader::SourceReader;
35use risingwave_connector::source::{
36    ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData,
37    StreamChunkWithState, WaitCheckpointTask, build_pulsar_ack_channel_id,
38};
39use risingwave_hummock_sdk::HummockReadEpoch;
40use risingwave_pb::common::ThrottleType;
41use risingwave_pb::id::SourceId;
42use risingwave_storage::store::TryWaitEpochOptions;
43use thiserror_ext::AsReport;
44use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
45use tokio::sync::{mpsc, oneshot};
46use tokio::time::Instant;
47
48use super::executor_core::StreamSourceCore;
49use super::{barrier_to_message_stream, get_split_offset_col_idx, prune_additional_cols};
50use crate::common::rate_limit::limited_chunk_size;
51use crate::executor::UpdateMutation;
52use crate::executor::prelude::*;
53use crate::executor::source::reader_stream::StreamReaderBuilder;
54use crate::executor::stream_reader::StreamReaderWithPause;
55use crate::task::LocalBarrierManager;
56
57/// A constant to multiply when calculating the maximum time to wait for a barrier. This is due to
58/// some latencies in network and cost in meta.
59pub const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;
60
61fn lsn_u128_to_i64(lsn: u128) -> i64 {
62    lsn.min(i64::MAX as u128) as i64
63}
64
65pub struct SourceExecutor<S: StateStore> {
66    actor_ctx: ActorContextRef,
67
68    /// Streaming source for external
69    stream_source_core: StreamSourceCore<S>,
70
71    /// Metrics for monitor.
72    metrics: Arc<StreamingMetrics>,
73
74    /// Receiver of barrier channel.
75    barrier_receiver: Option<UnboundedReceiver<Barrier>>,
76
77    /// System parameter reader to read barrier interval
78    system_params: SystemParamsReaderRef,
79
80    /// Rate limit in rows/s.
81    rate_limit_rps: Option<u32>,
82
83    is_shared_non_cdc: bool,
84
85    /// Local barrier manager for reporting source load finished events
86    barrier_manager: LocalBarrierManager,
87}
88
89impl<S: StateStore> SourceExecutor<S> {
90    #[expect(clippy::too_many_arguments)]
91    pub fn new(
92        actor_ctx: ActorContextRef,
93        stream_source_core: StreamSourceCore<S>,
94        metrics: Arc<StreamingMetrics>,
95        barrier_receiver: UnboundedReceiver<Barrier>,
96        system_params: SystemParamsReaderRef,
97        rate_limit_rps: Option<u32>,
98        is_shared_non_cdc: bool,
99        barrier_manager: LocalBarrierManager,
100    ) -> Self {
101        Self {
102            actor_ctx,
103            stream_source_core,
104            metrics,
105            barrier_receiver: Some(barrier_receiver),
106            system_params,
107            rate_limit_rps,
108            is_shared_non_cdc,
109            barrier_manager,
110        }
111    }
112
113    fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder {
114        StreamReaderBuilder {
115            source_desc,
116            rate_limit: self.rate_limit_rps,
117            source_id: self.stream_source_core.source_id,
118            source_name: self.stream_source_core.source_name.clone(),
119            is_auto_schema_change_enable: self.is_auto_schema_change_enable(),
120            actor_ctx: self.actor_ctx.clone(),
121            reader_stream: None,
122        }
123    }
124
125    async fn spawn_wait_checkpoint_worker(
126        core: &StreamSourceCore<S>,
127        source_reader: SourceReader,
128        metrics: Arc<StreamingMetrics>,
129    ) -> StreamExecutorResult<Option<WaitCheckpointTaskBuilder>> {
130        let Some(initial_task) = source_reader.create_wait_checkpoint_task().await? else {
131            return Ok(None);
132        };
133        let (wait_checkpoint_tx, wait_checkpoint_rx) = mpsc::unbounded_channel();
134        let wait_checkpoint_worker = WaitCheckpointWorker {
135            wait_checkpoint_rx,
136            state_store: core.split_state_store.state_table().state_store().clone(),
137            table_id: core.split_state_store.state_table().table_id(),
138            metrics,
139        };
140        tokio::spawn(wait_checkpoint_worker.run());
141        Ok(Some(WaitCheckpointTaskBuilder {
142            wait_checkpoint_tx,
143            source_reader,
144            building_task: initial_task,
145        }))
146    }
147
148    /// build the source column ids and the source context which will be used to build the source stream
149    pub fn prepare_source_stream_build(
150        &self,
151        source_desc: &SourceDesc,
152    ) -> (Vec<ColumnId>, SourceContext) {
153        let column_ids = source_desc
154            .columns
155            .iter()
156            .map(|column_desc| column_desc.column_id)
157            .collect_vec();
158
159        let (schema_change_tx, mut schema_change_rx) =
160            mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
161        let schema_change_tx = if self.is_auto_schema_change_enable() {
162            let meta_client = self.actor_ctx.meta_client.clone();
163            // spawn a task to handle schema change event from source parser
164            let _join_handle = tokio::task::spawn(async move {
165                while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
166                    let table_ids = schema_change.table_ids();
167                    tracing::info!(
168                        target: "auto_schema_change",
169                        "recv a schema change event for tables: {:?}", table_ids);
170                    // TODO: retry on rpc error
171                    if let Some(ref meta_client) = meta_client {
172                        match meta_client
173                            .auto_schema_change(schema_change.to_protobuf())
174                            .await
175                        {
176                            Ok(_) => {
177                                tracing::info!(
178                                    target: "auto_schema_change",
179                                    "schema change success for tables: {:?}", table_ids);
180                                finish_tx.send(()).unwrap();
181                            }
182                            Err(e) => {
183                                tracing::error!(
184                                    target: "auto_schema_change",
185                                    error = ?e.as_report(), "schema change error");
186                                finish_tx.send(()).unwrap();
187                            }
188                        }
189                    }
190                }
191            });
192            Some(schema_change_tx)
193        } else {
194            info!("auto schema change is disabled in config");
195            None
196        };
197        let source_ctx = SourceContext::new(
198            self.actor_ctx.id,
199            self.stream_source_core.source_id,
200            self.actor_ctx.fragment_id,
201            self.stream_source_core.source_name.clone(),
202            source_desc.metrics.clone(),
203            SourceCtrlOpts {
204                chunk_size: limited_chunk_size(self.rate_limit_rps),
205                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
206            },
207            source_desc.source.config.clone(),
208            schema_change_tx,
209        );
210
211        (column_ids, source_ctx)
212    }
213
214    fn is_auto_schema_change_enable(&self) -> bool {
215        self.actor_ctx.config.developer.enable_auto_schema_change
216    }
217
218    /// `source_id | source_name | actor_id | fragment_id`
219    #[inline]
220    fn get_metric_labels(&self) -> [String; 4] {
221        [
222            self.stream_source_core.source_id.to_string(),
223            self.stream_source_core.source_name.clone(),
224            self.actor_ctx.id.to_string(),
225            self.actor_ctx.fragment_id.to_string(),
226        ]
227    }
228
229    /// - `should_trim_state`: whether to trim state for dropped splits.
230    ///
231    ///   For scaling, the connector splits can be migrated to other actors, but
232    ///   won't be added or removed. Actors should not trim states for splits that
233    ///   are moved to other actors.
234    ///
235    ///   For source split change, split will not be migrated and we can trim states
236    ///   for deleted splits.
237    async fn apply_split_change_after_yield_barrier<const BIASED: bool>(
238        &mut self,
239        barrier_epoch: EpochPair,
240        source_desc: &SourceDesc,
241        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
242        apply_mutation: ApplyMutationAfterBarrier<'_>,
243    ) -> StreamExecutorResult<()> {
244        {
245            let mut should_rebuild_stream = false;
246            match apply_mutation {
247                ApplyMutationAfterBarrier::SplitChange {
248                    target_splits,
249                    should_trim_state,
250                    split_change_count,
251                } => {
252                    split_change_count.inc();
253                    if self
254                        .update_state_if_changed(barrier_epoch, target_splits, should_trim_state)
255                        .await?
256                    {
257                        should_rebuild_stream = true;
258                    }
259                }
260                ApplyMutationAfterBarrier::ConnectorPropsChange => {
261                    should_rebuild_stream = true;
262                }
263            }
264
265            if should_rebuild_stream {
266                self.rebuild_stream_reader(source_desc, stream)?;
267            }
268        }
269
270        Ok(())
271    }
272
273    /// Returns `true` if split changed. Otherwise `false`.
274    async fn update_state_if_changed(
275        &mut self,
276        barrier_epoch: EpochPair,
277        target_splits: Vec<SplitImpl>,
278        should_trim_state: bool,
279    ) -> StreamExecutorResult<bool> {
280        let core = &mut self.stream_source_core;
281
282        let target_splits: HashMap<_, _> = target_splits
283            .into_iter()
284            .map(|split| (split.id(), split))
285            .collect();
286
287        let mut target_state: HashMap<SplitId, SplitImpl> =
288            HashMap::with_capacity(target_splits.len());
289
290        let mut split_changed = false;
291
292        let committed_reader = core
293            .split_state_store
294            .new_committed_reader(barrier_epoch)
295            .await?;
296
297        // Checks added splits
298        for (split_id, split) in target_splits {
299            if let Some(s) = core.latest_split_info.get(&split_id) {
300                // For existing splits, we should use the latest offset from the cache.
301                // `target_splits` is from meta and contains the initial offset.
302                target_state.insert(split_id, s.clone());
303            } else {
304                split_changed = true;
305                // write new assigned split to state cache. snapshot is base on cache.
306
307                let initial_state = if let Some(recover_state) = committed_reader
308                    .try_recover_from_state_store(&split)
309                    .await?
310                {
311                    recover_state
312                } else {
313                    split
314                };
315
316                core.updated_splits_in_epoch
317                    .entry(split_id.clone())
318                    .or_insert_with(|| initial_state.clone());
319
320                target_state.insert(split_id, initial_state);
321            }
322        }
323
324        // Checks dropped splits
325        for existing_split_id in core.latest_split_info.keys() {
326            if !target_state.contains_key(existing_split_id) {
327                tracing::info!("split dropping detected: {}", existing_split_id);
328                split_changed = true;
329            }
330        }
331
332        if split_changed {
333            tracing::info!(
334                actor_id = %self.actor_ctx.id,
335                state = ?target_state,
336                "apply split change"
337            );
338
339            core.updated_splits_in_epoch
340                .retain(|split_id, _| target_state.contains_key(split_id));
341
342            let dropped_splits = core
343                .latest_split_info
344                .extract_if(|split_id, _| !target_state.contains_key(split_id))
345                .map(|(_, split)| split)
346                .collect_vec();
347
348            if should_trim_state && !dropped_splits.is_empty() {
349                // trim dropped splits' state
350                core.split_state_store.trim_state(&dropped_splits).await?;
351            }
352
353            core.latest_split_info = target_state;
354        }
355
356        Ok(split_changed)
357    }
358
359    /// Rebuild stream if there is a err in stream
360    fn rebuild_stream_reader_from_error<const BIASED: bool>(
361        &mut self,
362        source_desc: &SourceDesc,
363        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
364        e: StreamExecutorError,
365    ) -> StreamExecutorResult<()> {
366        let core = &mut self.stream_source_core;
367        tracing::error!(
368            error = ?e.as_report(),
369            actor_id = %self.actor_ctx.id,
370            source_id = %core.source_id,
371            "stream source reader error",
372        );
373        GLOBAL_ERROR_METRICS.user_source_error.report([
374            e.variant_name().to_owned(),
375            core.source_id.to_string(),
376            core.source_name.clone(),
377            self.actor_ctx.fragment_id.to_string(),
378        ]);
379
380        self.rebuild_stream_reader(source_desc, stream)
381    }
382
383    fn rebuild_stream_reader<const BIASED: bool>(
384        &mut self,
385        source_desc: &SourceDesc,
386        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
387    ) -> StreamExecutorResult<()> {
388        let core = &mut self.stream_source_core;
389        let target_state: Vec<SplitImpl> = core.latest_split_info.values().cloned().collect();
390
391        tracing::info!(
392            "actor {:?} apply source split change to {:?}",
393            self.actor_ctx.id,
394            target_state
395        );
396
397        // Replace the source reader with a new one of the new state.
398        let reader_stream_builder = self.stream_reader_builder(source_desc.clone());
399        let reader_stream = reader_stream_builder.into_retry_stream(Some(target_state), false);
400
401        stream.replace_data_stream(reader_stream);
402
403        Ok(())
404    }
405
406    /// Handle `InjectSourceOffsets` mutation by updating split offsets in place.
407    /// Returns `true` if any offsets were injected and a stream rebuild is needed.
408    ///
409    /// This is an UNSAFE operation that can cause data duplication or loss
410    /// depending on the correctness of the provided offsets.
411    async fn handle_inject_source_offsets(
412        &mut self,
413        split_offsets: &HashMap<String, String>,
414    ) -> StreamExecutorResult<bool> {
415        tracing::warn!(
416            actor_id = %self.actor_ctx.id,
417            source_id = %self.stream_source_core.source_id,
418            num_offsets = split_offsets.len(),
419            "UNSAFE: Injecting source offsets - this may cause data duplication or loss"
420        );
421
422        // First, filter to only offsets for splits this actor owns
423        let offsets_to_inject: Vec<(String, String)> = {
424            let owned_splits: HashSet<&str> = self
425                .stream_source_core
426                .latest_split_info
427                .keys()
428                .map(|s| s.as_ref())
429                .collect();
430            split_offsets
431                .iter()
432                .filter(|(split_id, _)| owned_splits.contains(split_id.as_str()))
433                .map(|(k, v)| (k.clone(), v.clone()))
434                .collect()
435        };
436
437        // Update splits and prepare JSON states for persistence
438        let mut json_states: Vec<(String, JsonbVal)> = Vec::new();
439        let mut failed_splits: Vec<String> = Vec::new();
440
441        for (split_id, offset) in &offsets_to_inject {
442            if let Some(split) = self
443                .stream_source_core
444                .latest_split_info
445                .get_mut(split_id.as_str())
446            {
447                tracing::info!(
448                    actor_id = %self.actor_ctx.id,
449                    split_id = %split_id,
450                    offset = %offset,
451                    "Injecting offset for owned split"
452                );
453                // Update the split's offset in place
454                if let Err(e) = split.update_in_place(offset.clone()) {
455                    tracing::error!(
456                        actor_id = %self.actor_ctx.id,
457                        split_id = %split_id,
458                        error = ?e.as_report(),
459                        "Failed to update split offset"
460                    );
461                    failed_splits.push(split_id.clone());
462                    continue;
463                }
464                // Mark this split as updated for persistence
465                self.stream_source_core
466                    .updated_splits_in_epoch
467                    .insert(split_id.clone().into(), split.clone());
468                // Parse the offset as JSON and store it
469                let json_value: serde_json::Value = serde_json::from_str(offset)
470                    .unwrap_or_else(|_| serde_json::json!({ "offset": offset }));
471                json_states.push((split_id.clone(), JsonbVal::from(json_value)));
472            }
473        }
474
475        if !failed_splits.is_empty() {
476            return Err(StreamExecutorError::connector_error(anyhow!(
477                "failed to inject offsets for splits: {:?}",
478                failed_splits
479            )));
480        }
481
482        let num_injected = json_states.len();
483        if num_injected > 0 {
484            // Store the injected offsets as JSON in the state table
485            self.stream_source_core
486                .split_state_store
487                .set_states_json(json_states)
488                .await?;
489
490            tracing::info!(
491                actor_id = %self.actor_ctx.id,
492                source_id = %self.stream_source_core.source_id,
493                num_injected = num_injected,
494                "Offset injection completed for owned splits, triggering rebuild"
495            );
496            Ok(true)
497        } else {
498            tracing::info!(
499                actor_id = %self.actor_ctx.id,
500                source_id = %self.stream_source_core.source_id,
501                "No owned splits to inject offsets for"
502            );
503            Ok(false)
504        }
505    }
506
507    async fn persist_state_and_clear_cache(
508        &mut self,
509        epoch: EpochPair,
510    ) -> StreamExecutorResult<HashMap<SplitId, SplitImpl>> {
511        let core = &mut self.stream_source_core;
512
513        let cache = core
514            .updated_splits_in_epoch
515            .values()
516            .map(|split_impl| split_impl.to_owned())
517            .collect_vec();
518
519        if !cache.is_empty() {
520            tracing::debug!(state = ?cache, "take snapshot");
521
522            // Record metrics for CDC sources before moving cache
523            let source_id = core.source_id.to_string();
524            for split_impl in &cache {
525                // Extract and record CDC-specific metrics based on split type
526                match split_impl {
527                    SplitImpl::PostgresCdc(pg_split) => {
528                        if let Some(lsn_value) = pg_split.pg_lsn() {
529                            self.metrics
530                                .pg_cdc_state_table_lsn
531                                .with_guarded_label_values(&[&source_id])
532                                .set(lsn_value as i64);
533                        }
534                    }
535                    SplitImpl::MysqlCdc(mysql_split) => {
536                        if let Some((file_seq, position)) = mysql_split.mysql_binlog_offset() {
537                            self.metrics
538                                .mysql_cdc_state_binlog_file_seq
539                                .with_guarded_label_values(&[&source_id])
540                                .set(file_seq as i64);
541
542                            self.metrics
543                                .mysql_cdc_state_binlog_position
544                                .with_guarded_label_values(&[&source_id])
545                                .set(position as i64);
546                        }
547                    }
548                    SplitImpl::SqlServerCdc(sqlserver_split) => {
549                        if let Some(lsn) = sqlserver_split.sql_server_change_lsn() {
550                            self.metrics
551                                .sqlserver_cdc_state_change_lsn
552                                .with_guarded_label_values(&[&source_id])
553                                .set(lsn_u128_to_i64(lsn));
554                        }
555                        if let Some(lsn) = sqlserver_split.sql_server_commit_lsn() {
556                            self.metrics
557                                .sqlserver_cdc_state_commit_lsn
558                                .with_guarded_label_values(&[&source_id])
559                                .set(lsn_u128_to_i64(lsn));
560                        }
561                    }
562                    _ => {}
563                }
564            }
565
566            core.split_state_store.set_states(cache).await?;
567        }
568
569        // commit anyway, even if no message saved
570        core.split_state_store.commit(epoch).await?;
571
572        let updated_splits = core.updated_splits_in_epoch.clone();
573
574        core.updated_splits_in_epoch.clear();
575
576        Ok(updated_splits)
577    }
578
579    /// try mem table spill
580    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
581        let core = &mut self.stream_source_core;
582        core.split_state_store.try_flush().await?;
583
584        Ok(())
585    }
586
587    /// Report CDC source offset updated only the first time.
588    fn maybe_report_cdc_source_offset(
589        &self,
590        updated_splits: &HashMap<SplitId, SplitImpl>,
591        epoch: EpochPair,
592        source_id: SourceId,
593        must_report_cdc_offset_once: &mut bool,
594        must_wait_cdc_offset_before_report: bool,
595    ) {
596        // Report CDC source offset updated only the first time
597        if *must_report_cdc_offset_once
598            && (!must_wait_cdc_offset_before_report
599                || updated_splits
600                    .values()
601                    .any(|split| split.is_cdc_split() && !split.get_cdc_split_offset().is_empty()))
602        {
603            // Report only once, then ignore all subsequent offset updates
604            self.barrier_manager.report_cdc_source_offset_updated(
605                epoch,
606                self.actor_ctx.id,
607                source_id,
608            );
609            tracing::info!(
610                actor_id = %self.actor_ctx.id,
611                source_id = %source_id,
612                epoch = ?epoch,
613                "Reported CDC source offset updated to meta (first time only)"
614            );
615            // Mark as reported to prevent any future reports, even if offset changes
616            *must_report_cdc_offset_once = false;
617        }
618    }
619
620    /// A source executor with a stream source receives:
621    /// 1. Barrier messages
622    /// 2. Data from external source
623    /// and acts accordingly.
624    #[try_stream(ok = Message, error = StreamExecutorError)]
625    async fn execute_inner(mut self) {
626        let mut barrier_receiver = self.barrier_receiver.take().unwrap();
627        let first_barrier = barrier_receiver
628            .recv()
629            .instrument_await("source_recv_first_barrier")
630            .await
631            .ok_or_else(|| {
632                anyhow!(
633                    "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
634                    self.actor_ctx.id,
635                    self.stream_source_core.source_id
636                )
637            })?;
638        let first_epoch = first_barrier.epoch;
639        // must_report_cdc_offset is true if and only if the source is a CDC source.
640        // must_wait_cdc_offset_before_report is true if and only if the source is a MySQL or SQL Server CDC source.
641        let (mut boot_state, mut must_report_cdc_offset_once, must_wait_cdc_offset_before_report) =
642            if let Some(splits) = first_barrier.initial_split_assignment(self.actor_ctx.id) {
643                // CDC source must reach this branch.
644                tracing::debug!(?splits, "boot with splits");
645                // Skip report for non-CDC.
646                let must_report_cdc_offset_once = splits.iter().any(|split| split.is_cdc_split());
647                // Only for MySQL and SQL Server CDC, we need to wait for the offset to be non-empty before reporting.
648                let must_wait_cdc_offset_before_report = must_report_cdc_offset_once
649                    && splits.iter().any(|split| {
650                        matches!(split, SplitImpl::MysqlCdc(_) | SplitImpl::SqlServerCdc(_))
651                    });
652                (
653                    splits.to_vec(),
654                    must_report_cdc_offset_once,
655                    must_wait_cdc_offset_before_report,
656                )
657            } else {
658                (Vec::default(), true, true)
659            };
660        let is_pause_on_startup = first_barrier.is_pause_on_startup();
661        let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id);
662
663        yield Message::Barrier(first_barrier);
664
665        let mut core = self.stream_source_core;
666        let source_id = core.source_id;
667
668        // Build source description from the builder.
669        let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap();
670        let mut source_desc = source_desc_builder
671            .build()
672            .map_err(StreamExecutorError::connector_error)?;
673
674        let mut wait_checkpoint_task_builder = Self::spawn_wait_checkpoint_worker(
675            &core,
676            source_desc.source.clone(),
677            self.metrics.clone(),
678        )
679        .await?;
680
681        let (Some(split_idx), Some(offset_idx), pulsar_message_id_idx) =
682            get_split_offset_col_idx(&source_desc.columns)
683        else {
684            unreachable!("Partition and offset columns must be set.");
685        };
686
687        core.split_state_store.init_epoch(first_epoch).await?;
688        {
689            let committed_reader = core
690                .split_state_store
691                .new_committed_reader(first_epoch)
692                .await?;
693            for ele in &mut boot_state {
694                if let Some(recover_state) =
695                    committed_reader.try_recover_from_state_store(ele).await?
696                {
697                    *ele = recover_state;
698                    // if state store is non-empty, we consider it's initialized.
699                    is_uninitialized = false;
700                } else {
701                    // This is a new split, not in state table.
702                    // make sure it is written to state table later.
703                    // Then even it receives no messages, we can observe it in state table.
704                    core.updated_splits_in_epoch.insert(ele.id(), ele.clone());
705                }
706            }
707        }
708
709        // init in-memory split states with persisted state if any
710        core.init_split_state(boot_state.clone());
711
712        // Return the ownership of `stream_source_core` to the source executor.
713        self.stream_source_core = core;
714
715        let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
716        tracing::debug!(state = ?recover_state, "start with state");
717
718        let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
719        let mut reader_stream_builder = self.stream_reader_builder(source_desc.clone());
720        let mut latest_splits = None;
721        // Build the source stream reader.
722        if is_uninitialized {
723            let create_split_reader_result = reader_stream_builder
724                .fetch_latest_splits(recover_state.clone(), self.is_shared_non_cdc)
725                .await?;
726            latest_splits = create_split_reader_result.latest_splits;
727        }
728
729        if let Some(latest_splits) = latest_splits {
730            // make sure it is written to state table later.
731            // Then even it receives no messages, we can observe it in state table.
732            self.stream_source_core
733                .updated_splits_in_epoch
734                .extend(latest_splits.into_iter().map(|s| (s.id(), s)));
735        }
736        // Merge the chunks from source and the barriers into a single stream. We prioritize
737        // barriers over source data chunks here.
738        let mut stream = StreamReaderWithPause::<true, StreamChunkWithState>::new(
739            barrier_stream,
740            reader_stream_builder
741                .into_retry_stream(recover_state, is_uninitialized && self.is_shared_non_cdc),
742        );
743        let mut command_paused = false;
744
745        // - If the first barrier requires us to pause on startup, pause the stream.
746        if is_pause_on_startup {
747            tracing::info!("source paused on startup");
748            stream.pause_stream();
749            command_paused = true;
750        }
751
752        // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms`
753        // milliseconds, considering some other latencies like network and cost in Meta.
754        let mut max_wait_barrier_time_ms =
755            self.system_params.load().barrier_interval_ms() as u128 * WAIT_BARRIER_MULTIPLE_TIMES;
756        let mut last_barrier_time = Instant::now();
757        let mut self_paused = false;
758
759        let source_output_row_count = self
760            .metrics
761            .source_output_row_count
762            .with_guarded_label_values(&self.get_metric_labels());
763
764        let source_split_change_count = self
765            .metrics
766            .source_split_change_count
767            .with_guarded_label_values(&self.get_metric_labels());
768
769        while let Some(msg) = stream.next().await {
770            let Ok(msg) = msg else {
771                tokio::time::sleep(Duration::from_millis(1000)).await;
772                self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err())?;
773                continue;
774            };
775
776            match msg {
777                // This branch will be preferred.
778                Either::Left(Message::Barrier(barrier)) => {
779                    last_barrier_time = Instant::now();
780
781                    if self_paused {
782                        self_paused = false;
783                        // command_paused has a higher priority.
784                        if !command_paused {
785                            stream.resume_stream();
786                        }
787                    }
788
789                    let epoch = barrier.epoch;
790                    let mut split_change = None;
791
792                    if let Some(mutation) = barrier.mutation.as_deref() {
793                        match mutation {
794                            Mutation::Pause => {
795                                command_paused = true;
796                                stream.pause_stream()
797                            }
798                            Mutation::Resume => {
799                                command_paused = false;
800                                stream.resume_stream()
801                            }
802                            Mutation::SourceChangeSplit(actor_splits) => {
803                                tracing::info!(
804                                    actor_id = %self.actor_ctx.id,
805                                    actor_splits = ?actor_splits,
806                                    "source change split received"
807                                );
808
809                                split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
810                                    |target_splits| {
811                                        (
812                                            &source_desc,
813                                            &mut stream,
814                                            ApplyMutationAfterBarrier::SplitChange {
815                                                target_splits,
816                                                should_trim_state: true,
817                                                split_change_count: &source_split_change_count,
818                                            },
819                                        )
820                                    },
821                                );
822                            }
823
824                            Mutation::ConnectorPropsChange(maybe_mutation) => {
825                                if let Some(new_props) = maybe_mutation.get(&source_id.as_raw_id())
826                                {
827                                    // rebuild the stream reader with new props
828                                    tracing::info!(
829                                        actor_id = %self.actor_ctx.id,
830                                        source_id = %source_id,
831                                        "updating source connector properties",
832                                    );
833                                    source_desc.update_reader(new_props.clone())?;
834                                    // suppose the connector props change will not involve state change
835                                    split_change = Some((
836                                        &source_desc,
837                                        &mut stream,
838                                        ApplyMutationAfterBarrier::ConnectorPropsChange,
839                                    ));
840                                }
841                            }
842
843                            Mutation::Update(UpdateMutation { actor_splits, .. }) => {
844                                split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
845                                    |target_splits| {
846                                        (
847                                            &source_desc,
848                                            &mut stream,
849                                            ApplyMutationAfterBarrier::SplitChange {
850                                                target_splits,
851                                                should_trim_state: false,
852                                                split_change_count: &source_split_change_count,
853                                            },
854                                        )
855                                    },
856                                );
857                            }
858                            Mutation::Throttle(fragment_to_apply) => {
859                                if let Some(entry) =
860                                    fragment_to_apply.get(&self.actor_ctx.fragment_id)
861                                    && entry.throttle_type() == ThrottleType::Source
862                                    && entry.rate_limit != self.rate_limit_rps
863                                {
864                                    tracing::info!(
865                                        "updating rate limit from {:?} to {:?}",
866                                        self.rate_limit_rps,
867                                        entry.rate_limit
868                                    );
869                                    self.rate_limit_rps = entry.rate_limit;
870                                    // recreate from latest_split_info
871                                    self.rebuild_stream_reader(&source_desc, &mut stream)?;
872                                }
873                            }
874                            Mutation::ResetSource { source_id } => {
875                                // Note: RESET SOURCE only clears the offset, does NOT pause the source.
876                                // When offset is None, after recovery/restart, Debezium will automatically
877                                // enter recovery mode and fetch the latest offset from upstream.
878                                if *source_id == self.stream_source_core.source_id {
879                                    tracing::info!(
880                                        actor_id = %self.actor_ctx.id,
881                                        source_id = source_id.as_raw_id(),
882                                        "Resetting CDC source: clearing offset (set to None)"
883                                    );
884
885                                    // Step 1: Collect all current splits and clear their offsets
886                                    let splits_with_cleared_offset: Vec<SplitImpl> = self.stream_source_core
887                                        .latest_split_info
888                                        .values()
889                                        .map(|split| {
890                                            // Clone the split and clear its offset
891                                            let mut new_split = split.clone();
892                                            match &mut new_split {
893                                                SplitImpl::MysqlCdc(debezium_split) => {
894                                                    if let Some(mysql_split) = debezium_split.mysql_split.as_mut() {
895                                                        tracing::info!(
896                                                            split_id = ?mysql_split.inner.split_id,
897                                                            old_offset = ?mysql_split.inner.start_offset,
898                                                            "Clearing MySQL CDC offset"
899                                                        );
900                                                        mysql_split.inner.start_offset = None;
901                                                    }
902                                                }
903                                                SplitImpl::PostgresCdc(debezium_split) => {
904                                                    if let Some(pg_split) = debezium_split.postgres_split.as_mut() {
905                                                        tracing::info!(
906                                                            split_id = ?pg_split.inner.split_id,
907                                                            old_offset = ?pg_split.inner.start_offset,
908                                                            "Clearing PostgreSQL CDC offset"
909                                                        );
910                                                        pg_split.inner.start_offset = None;
911                                                    }
912                                                }
913                                                SplitImpl::MongodbCdc(debezium_split) => {
914                                                    if let Some(mongo_split) = debezium_split.mongodb_split.as_mut() {
915                                                        tracing::info!(
916                                                            split_id = ?mongo_split.inner.split_id,
917                                                            old_offset = ?mongo_split.inner.start_offset,
918                                                            "Clearing MongoDB CDC offset"
919                                                        );
920                                                        mongo_split.inner.start_offset = None;
921                                                    }
922                                                }
923                                                SplitImpl::CitusCdc(debezium_split) => {
924                                                    if let Some(citus_split) = debezium_split.citus_split.as_mut() {
925                                                        tracing::info!(
926                                                            split_id = ?citus_split.inner.split_id,
927                                                            old_offset = ?citus_split.inner.start_offset,
928                                                            "Clearing Citus CDC offset"
929                                                        );
930                                                        citus_split.inner.start_offset = None;
931                                                    }
932                                                }
933                                                SplitImpl::SqlServerCdc(debezium_split) => {
934                                                    if let Some(sqlserver_split) = debezium_split.sql_server_split.as_mut() {
935                                                        tracing::info!(
936                                                            split_id = ?sqlserver_split.inner.split_id,
937                                                            old_offset = ?sqlserver_split.inner.start_offset,
938                                                            "Clearing SQL Server CDC offset"
939                                                        );
940                                                        sqlserver_split.inner.start_offset = None;
941                                                    }
942                                                }
943                                                _ => {
944                                                    tracing::warn!(
945                                                        "RESET SOURCE called on non-CDC split type"
946                                                    );
947                                                }
948                                            }
949                                            new_split
950                                        })
951                                        .collect();
952
953                                    if !splits_with_cleared_offset.is_empty() {
954                                        tracing::info!(
955                                            actor_id = %self.actor_ctx.id,
956                                            split_count = splits_with_cleared_offset.len(),
957                                            "Updating state table with cleared offsets"
958                                        );
959
960                                        // Step 2: Write splits back to state table with offset = None
961                                        self.stream_source_core
962                                            .split_state_store
963                                            .set_states(splits_with_cleared_offset.clone())
964                                            .await?;
965
966                                        // Step 3: Update in-memory split info with cleared offsets
967                                        for split in splits_with_cleared_offset {
968                                            self.stream_source_core
969                                                .latest_split_info
970                                                .insert(split.id(), split.clone());
971                                            self.stream_source_core
972                                                .updated_splits_in_epoch
973                                                .insert(split.id(), split);
974                                        }
975
976                                        tracing::info!(
977                                            actor_id = %self.actor_ctx.id,
978                                            source_id = source_id.as_raw_id(),
979                                            "RESET SOURCE completed: offset cleared (set to None). \
980                                             Trigger recovery/restart to fetch latest offset from upstream."
981                                        );
982                                    } else {
983                                        tracing::warn!(
984                                            actor_id = %self.actor_ctx.id,
985                                            "No splits found to reset - source may not be initialized yet"
986                                        );
987                                    }
988                                } else {
989                                    tracing::debug!(
990                                        actor_id = %self.actor_ctx.id,
991                                        target_source_id = source_id.as_raw_id(),
992                                        current_source_id = self.stream_source_core.source_id.as_raw_id(),
993                                        "ResetSource mutation for different source, ignoring"
994                                    );
995                                }
996                            }
997
998                            Mutation::InjectSourceOffsets {
999                                source_id,
1000                                split_offsets,
1001                            } => {
1002                                if *source_id == self.stream_source_core.source_id {
1003                                    if self.handle_inject_source_offsets(split_offsets).await? {
1004                                        // Trigger a rebuild to apply the new offsets
1005                                        split_change = Some((
1006                                            &source_desc,
1007                                            &mut stream,
1008                                            ApplyMutationAfterBarrier::ConnectorPropsChange,
1009                                        ));
1010                                    }
1011                                } else {
1012                                    tracing::debug!(
1013                                        actor_id = %self.actor_ctx.id,
1014                                        target_source_id = source_id.as_raw_id(),
1015                                        current_source_id = self.stream_source_core.source_id.as_raw_id(),
1016                                        "InjectSourceOffsets mutation for different source, ignoring"
1017                                    );
1018                                }
1019                            }
1020
1021                            _ => {}
1022                        }
1023                    }
1024
1025                    let updated_splits = self.persist_state_and_clear_cache(epoch).await?;
1026
1027                    self.maybe_report_cdc_source_offset(
1028                        &updated_splits,
1029                        epoch,
1030                        source_id,
1031                        &mut must_report_cdc_offset_once,
1032                        must_wait_cdc_offset_before_report,
1033                    );
1034
1035                    // when handle a checkpoint barrier, spawn a task to wait for epoch commit notification
1036                    if barrier.kind.is_checkpoint()
1037                        && let Some(task_builder) = &mut wait_checkpoint_task_builder
1038                    {
1039                        task_builder.update_task_on_checkpoint(updated_splits);
1040
1041                        tracing::debug!("epoch to wait {:?}", epoch);
1042                        task_builder.send(Epoch(epoch.prev)).await?
1043                    }
1044
1045                    let barrier_epoch = barrier.epoch;
1046                    yield Message::Barrier(barrier);
1047
1048                    if let Some((source_desc, stream, to_apply_mutation)) = split_change {
1049                        self.apply_split_change_after_yield_barrier(
1050                            barrier_epoch,
1051                            source_desc,
1052                            stream,
1053                            to_apply_mutation,
1054                        )
1055                        .await?;
1056                    }
1057                }
1058                Either::Left(_) => {
1059                    // For the source executor, the message we receive from this arm
1060                    // should always be barrier message.
1061                    unreachable!();
1062                }
1063
1064                Either::Right((chunk, latest_state)) => {
1065                    if let Some(task_builder) = &mut wait_checkpoint_task_builder {
1066                        if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
1067                            let pulsar_message_id_col = chunk.column_at(pulsar_message_id_idx);
1068                            task_builder.update_task_on_chunk(
1069                                source_id,
1070                                &latest_state,
1071                                pulsar_message_id_col.clone(),
1072                            );
1073                        } else {
1074                            let offset_col = chunk.column_at(offset_idx);
1075                            task_builder.update_task_on_chunk(
1076                                source_id,
1077                                &latest_state,
1078                                offset_col.clone(),
1079                            );
1080                        }
1081                    }
1082                    if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
1083                        // Exceeds the max wait barrier time, the source will be paused.
1084                        // Currently we can guarantee the
1085                        // source is not paused since it received stream
1086                        // chunks.
1087                        self_paused = true;
1088                        tracing::warn!(
1089                            "source paused, wait barrier for {:?}",
1090                            last_barrier_time.elapsed()
1091                        );
1092                        stream.pause_stream();
1093
1094                        // Only update `max_wait_barrier_time_ms` to capture
1095                        // `barrier_interval_ms`
1096                        // changes here to avoid frequently accessing the shared
1097                        // `system_params`.
1098                        max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
1099                            as u128
1100                            * WAIT_BARRIER_MULTIPLE_TIMES;
1101                    }
1102
1103                    latest_state.iter().for_each(|(split_id, new_split_impl)| {
1104                        if let Some(split_impl) =
1105                            self.stream_source_core.latest_split_info.get_mut(split_id)
1106                        {
1107                            *split_impl = new_split_impl.clone();
1108                        }
1109                    });
1110
1111                    self.stream_source_core
1112                        .updated_splits_in_epoch
1113                        .extend(latest_state);
1114
1115                    let card = chunk.cardinality();
1116                    if card == 0 {
1117                        continue;
1118                    }
1119                    source_output_row_count.inc_by(card as u64);
1120                    let to_remove_col_indices =
1121                        if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
1122                            vec![split_idx, offset_idx, pulsar_message_id_idx]
1123                        } else {
1124                            vec![split_idx, offset_idx]
1125                        };
1126                    let chunk =
1127                        prune_additional_cols(&chunk, &to_remove_col_indices, &source_desc.columns);
1128                    yield Message::Chunk(chunk);
1129                    self.try_flush_data().await?;
1130                }
1131            }
1132        }
1133
1134        // The source executor should only be stopped by the actor when finding a `Stop` mutation.
1135        tracing::error!(
1136            actor_id = %self.actor_ctx.id,
1137            "source executor exited unexpectedly"
1138        )
1139    }
1140}
1141
1142#[derive(Debug, Clone)]
1143enum ApplyMutationAfterBarrier<'a> {
1144    SplitChange {
1145        target_splits: Vec<SplitImpl>,
1146        should_trim_state: bool,
1147        split_change_count: &'a LabelGuardedMetric<GenericCounter<AtomicU64>>,
1148    },
1149    ConnectorPropsChange,
1150}
1151
1152impl<S: StateStore> Execute for SourceExecutor<S> {
1153    fn execute(self: Box<Self>) -> BoxedMessageStream {
1154        self.execute_inner().boxed()
1155    }
1156}
1157
1158impl<S: StateStore> Debug for SourceExecutor<S> {
1159    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1160        f.debug_struct("SourceExecutor")
1161            .field("source_id", &self.stream_source_core.source_id)
1162            .field("column_ids", &self.stream_source_core.column_ids)
1163            .finish()
1164    }
1165}
1166
1167struct WaitCheckpointTaskBuilder {
1168    wait_checkpoint_tx: UnboundedSender<(Epoch, WaitCheckpointTask)>,
1169    source_reader: SourceReader,
1170    building_task: WaitCheckpointTask,
1171}
1172
1173impl WaitCheckpointTaskBuilder {
1174    fn update_task_on_chunk(
1175        &mut self,
1176        source_id: SourceId,
1177        latest_state: &HashMap<SplitId, SplitImpl>,
1178        offset_col: ArrayRef,
1179    ) {
1180        match &mut self.building_task {
1181            WaitCheckpointTask::AckPubsubMessage(_, arrays) => {
1182                arrays.push(offset_col);
1183            }
1184            WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => {
1185                arrays.push(offset_col);
1186            }
1187            WaitCheckpointTask::AckPulsarMessage(arrays) => {
1188                // each pulsar chunk will only contain one split
1189                let split_id = latest_state.keys().next().unwrap();
1190                let pulsar_ack_channel_id = build_pulsar_ack_channel_id(source_id, split_id);
1191                arrays.push((pulsar_ack_channel_id, offset_col));
1192            }
1193            WaitCheckpointTask::CommitCdcOffset(_) => {}
1194        }
1195    }
1196
1197    fn update_task_on_checkpoint(&mut self, updated_splits: HashMap<SplitId, SplitImpl>) {
1198        #[expect(clippy::single_match)]
1199        match &mut self.building_task {
1200            WaitCheckpointTask::CommitCdcOffset(offsets) => {
1201                if !updated_splits.is_empty() {
1202                    // cdc source only has one split
1203                    assert_eq!(1, updated_splits.len());
1204                    for (split_id, split_impl) in updated_splits {
1205                        if split_impl.is_cdc_split() {
1206                            *offsets = Some((split_id, split_impl.get_cdc_split_offset()));
1207                        } else {
1208                            unreachable!()
1209                        }
1210                    }
1211                }
1212            }
1213            _ => {}
1214        }
1215    }
1216
1217    /// Send and reset the building task to a new one.
1218    async fn send(&mut self, epoch: Epoch) -> Result<(), anyhow::Error> {
1219        let new_task = self
1220            .source_reader
1221            .create_wait_checkpoint_task()
1222            .await?
1223            .expect("wait checkpoint task should be created");
1224        self.wait_checkpoint_tx
1225            .send((epoch, std::mem::replace(&mut self.building_task, new_task)))
1226            .expect("wait_checkpoint_tx send should succeed");
1227        Ok(())
1228    }
1229}
1230
1231/// A worker used to do some work after each checkpoint epoch is committed.
1232///
1233/// # Usage Cases
1234///
1235/// Typically there are 2 issues related with ack on checkpoint:
1236///
1237/// 1. Correctness (at-least-once), or don't let upstream clean uncommitted data.
1238///    For message queueing semantics (delete after ack), we should ack to avoid redelivery,
1239///    and only ack after checkpoint to avoid data loss.
1240///
1241/// 2. Allow upstream to clean data after commit.
1242///
1243/// See also <https://github.com/risingwavelabs/risingwave/issues/16736#issuecomment-2109379790>
1244///
1245/// ## CDC
1246///
1247/// Commit last consumed offset to upstream DB, so that old data can be discarded.
1248///
1249/// ## Google Pub/Sub
1250///
1251/// Due to queueing semantics.
1252/// Although Pub/Sub supports `retain_acked_messages` and `seek` functionality,
1253/// it's quite limited unlike Kafka.
1254///
1255/// See also <https://cloud.google.com/pubsub/docs/subscribe-best-practices#process-messages>
1256struct WaitCheckpointWorker<S: StateStore> {
1257    wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>,
1258    state_store: S,
1259    table_id: TableId,
1260    metrics: Arc<StreamingMetrics>,
1261}
1262
1263impl<S: StateStore> WaitCheckpointWorker<S> {
1264    pub async fn run(mut self) {
1265        tracing::debug!("wait epoch worker start success");
1266        loop {
1267            // poll the rx and wait for the epoch commit
1268            match self.wait_checkpoint_rx.recv().await {
1269                Some((epoch, task)) => {
1270                    tracing::debug!("start to wait epoch {}", epoch.0);
1271                    let ret = self
1272                        .state_store
1273                        .try_wait_epoch(
1274                            HummockReadEpoch::Committed(epoch.0),
1275                            TryWaitEpochOptions {
1276                                table_id: self.table_id,
1277                            },
1278                        )
1279                        .await;
1280                    if self.wait_checkpoint_rx.is_closed() {
1281                        // The task must not be committed since the associated epoch may not succeed.
1282                        // The wait_checkpoint_rx lifetime is tied to the lifecycle of the source executor actor.
1283                        // The old actor must be dropped before any subsequent recovery can succeed; see PartialGraphState::abort_and_wait_actors
1284                        tracing::debug!(epoch = epoch.0, "Drop stale wait checkpoint task.");
1285                        break;
1286                    }
1287                    match ret {
1288                        Ok(()) => {
1289                            tracing::debug!(epoch = epoch.0, "wait epoch success");
1290
1291                            // Run task with callback to record LSN after successful commit
1292                            task.run_with_on_commit_success(|source_id: u64, offset| {
1293                                if let Some(lsn_value) =
1294                                    extract_postgres_lsn_from_offset_str(offset)
1295                                {
1296                                    self.metrics
1297                                        .pg_cdc_jni_commit_offset_lsn
1298                                        .with_guarded_label_values(&[&source_id.to_string()])
1299                                        .set(lsn_value as i64);
1300                                }
1301                                if let Some(lsn_value) =
1302                                    extract_sql_server_commit_lsn_from_offset_str(offset)
1303                                {
1304                                    self.metrics
1305                                        .sqlserver_cdc_jni_commit_offset_lsn
1306                                        .with_guarded_label_values(&[&source_id.to_string()])
1307                                        .set(lsn_u128_to_i64(lsn_value));
1308                                }
1309                            })
1310                            .await;
1311                        }
1312                        Err(e) => {
1313                            tracing::error!(
1314                            error = %e.as_report(),
1315                            "wait epoch {} failed", epoch.0
1316                            );
1317                        }
1318                    }
1319                }
1320                None => {
1321                    tracing::error!("wait epoch rx closed");
1322                    break;
1323                }
1324            }
1325        }
1326    }
1327}
1328
1329#[cfg(test)]
1330mod tests {
1331    use maplit::{btreemap, convert_args, hashmap};
1332    use risingwave_common::catalog::{ColumnId, Field};
1333    use risingwave_common::id::SourceId;
1334    use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
1335    use risingwave_common::test_prelude::StreamChunkTestExt;
1336    use risingwave_common::util::epoch::{EpochExt, test_epoch};
1337    use risingwave_connector::source::datagen::DatagenSplit;
1338    use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
1339    use risingwave_pb::catalog::StreamSourceInfo;
1340    use risingwave_pb::plan_common::PbRowFormatType;
1341    use risingwave_storage::memory::MemoryStateStore;
1342    use tokio::sync::mpsc::unbounded_channel;
1343    use tracing_test::traced_test;
1344
1345    use super::*;
1346    use crate::executor::AddMutation;
1347    use crate::executor::source::{SourceStateTableHandler, default_source_internal_table};
1348    use crate::task::LocalBarrierManager;
1349
1350    const MOCK_SOURCE_NAME: &str = "mock_source";
1351
1352    #[tokio::test]
1353    async fn test_source_executor() {
1354        let source_id = 0.into();
1355        let schema = Schema {
1356            fields: vec![Field::with_name(DataType::Int32, "sequence_int")],
1357        };
1358        let row_id_index = None;
1359        let source_info = StreamSourceInfo {
1360            row_format: PbRowFormatType::Native as i32,
1361            ..Default::default()
1362        };
1363        let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1364        let column_ids = vec![0].into_iter().map(ColumnId::from).collect();
1365
1366        // This datagen will generate 3 rows at one time.
1367        let properties = convert_args!(btreemap!(
1368            "connector" => "datagen",
1369            "datagen.rows.per.second" => "3",
1370            "fields.sequence_int.kind" => "sequence",
1371            "fields.sequence_int.start" => "11",
1372            "fields.sequence_int.end" => "11111",
1373        ));
1374        let source_desc_builder =
1375            create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1376        let split_state_store = SourceStateTableHandler::from_table_catalog(
1377            &default_source_internal_table(0x2333),
1378            MemoryStateStore::new(),
1379        )
1380        .await;
1381        let core = StreamSourceCore::<MemoryStateStore> {
1382            source_id,
1383            column_ids,
1384            source_desc_builder: Some(source_desc_builder),
1385            latest_split_info: HashMap::new(),
1386            split_state_store,
1387            updated_splits_in_epoch: HashMap::new(),
1388            source_name: MOCK_SOURCE_NAME.to_owned(),
1389        };
1390
1391        let system_params_manager = LocalSystemParamsManager::for_test();
1392
1393        let executor = SourceExecutor::new(
1394            ActorContext::for_test(0),
1395            core,
1396            Arc::new(StreamingMetrics::unused()),
1397            barrier_rx,
1398            system_params_manager.get_params(),
1399            None,
1400            false,
1401            LocalBarrierManager::for_test(),
1402        );
1403        let mut executor = executor.boxed().execute();
1404
1405        let init_barrier =
1406            Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
1407                splits: hashmap! {
1408                    ActorId::default() => vec![
1409                        SplitImpl::Datagen(DatagenSplit {
1410                            split_index: 0,
1411                            split_num: 1,
1412                            start_offset: None,
1413                        }),
1414                    ],
1415                },
1416                ..Default::default()
1417            }));
1418        barrier_tx.send(init_barrier).unwrap();
1419
1420        // Consume barrier.
1421        executor.next().await.unwrap().unwrap();
1422
1423        // Consume data chunk.
1424        let msg = executor.next().await.unwrap().unwrap();
1425
1426        // Row id will not be filled here.
1427        assert_eq!(
1428            msg.into_chunk().unwrap(),
1429            StreamChunk::from_pretty(
1430                " i
1431                + 11
1432                + 12
1433                + 13"
1434            )
1435        );
1436    }
1437
1438    #[traced_test]
1439    #[tokio::test]
1440    async fn test_split_change_mutation() {
1441        let source_id = SourceId::new(0);
1442        let schema = Schema {
1443            fields: vec![Field::with_name(DataType::Int32, "v1")],
1444        };
1445        let row_id_index = None;
1446        let source_info = StreamSourceInfo {
1447            row_format: PbRowFormatType::Native as i32,
1448            ..Default::default()
1449        };
1450        let properties = convert_args!(btreemap!(
1451            "connector" => "datagen",
1452            "fields.v1.kind" => "sequence",
1453            "fields.v1.start" => "11",
1454            "fields.v1.end" => "11111",
1455        ));
1456
1457        let source_desc_builder =
1458            create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1459        let mem_state_store = MemoryStateStore::new();
1460
1461        let column_ids = vec![ColumnId::from(0)];
1462        let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1463        let split_state_store = SourceStateTableHandler::from_table_catalog(
1464            &default_source_internal_table(0x2333),
1465            mem_state_store.clone(),
1466        )
1467        .await;
1468
1469        let core = StreamSourceCore::<MemoryStateStore> {
1470            source_id,
1471            column_ids: column_ids.clone(),
1472            source_desc_builder: Some(source_desc_builder),
1473            latest_split_info: HashMap::new(),
1474            split_state_store,
1475            updated_splits_in_epoch: HashMap::new(),
1476            source_name: MOCK_SOURCE_NAME.to_owned(),
1477        };
1478
1479        let system_params_manager = LocalSystemParamsManager::for_test();
1480
1481        let executor = SourceExecutor::new(
1482            ActorContext::for_test(0),
1483            core,
1484            Arc::new(StreamingMetrics::unused()),
1485            barrier_rx,
1486            system_params_manager.get_params(),
1487            None,
1488            false,
1489            LocalBarrierManager::for_test(),
1490        );
1491        let mut handler = executor.boxed().execute();
1492
1493        let mut epoch = test_epoch(1);
1494        let init_barrier =
1495            Barrier::new_test_barrier(epoch).with_mutation(Mutation::Add(AddMutation {
1496                splits: hashmap! {
1497                    ActorId::default() => vec![
1498                        SplitImpl::Datagen(DatagenSplit {
1499                            split_index: 0,
1500                            split_num: 3,
1501                            start_offset: None,
1502                        }),
1503                    ],
1504                },
1505                ..Default::default()
1506            }));
1507        barrier_tx.send(init_barrier).unwrap();
1508
1509        // Consume barrier.
1510        handler
1511            .next()
1512            .await
1513            .unwrap()
1514            .unwrap()
1515            .into_barrier()
1516            .unwrap();
1517
1518        let mut ready_chunks = handler.ready_chunks(10);
1519
1520        let _ = ready_chunks.next().await.unwrap();
1521
1522        let new_assignment = vec![
1523            SplitImpl::Datagen(DatagenSplit {
1524                split_index: 0,
1525                split_num: 3,
1526                start_offset: None,
1527            }),
1528            SplitImpl::Datagen(DatagenSplit {
1529                split_index: 1,
1530                split_num: 3,
1531                start_offset: None,
1532            }),
1533            SplitImpl::Datagen(DatagenSplit {
1534                split_index: 2,
1535                split_num: 3,
1536                start_offset: None,
1537            }),
1538        ];
1539
1540        epoch.inc_epoch();
1541        let change_split_mutation =
1542            Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1543                ActorId::default() => new_assignment.clone()
1544            }));
1545
1546        barrier_tx.send(change_split_mutation).unwrap();
1547
1548        let _ = ready_chunks.next().await.unwrap(); // barrier
1549
1550        epoch.inc_epoch();
1551        let barrier = Barrier::new_test_barrier(epoch);
1552        barrier_tx.send(barrier).unwrap();
1553
1554        ready_chunks.next().await.unwrap(); // barrier
1555
1556        let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1557            &default_source_internal_table(0x2333),
1558            mem_state_store.clone(),
1559        )
1560        .await;
1561
1562        // there must exist state for new add partition
1563        source_state_handler
1564            .init_epoch(EpochPair::new_test_epoch(epoch))
1565            .await
1566            .unwrap();
1567        source_state_handler
1568            .get(&new_assignment[1].id())
1569            .await
1570            .unwrap()
1571            .unwrap();
1572
1573        tokio::time::sleep(Duration::from_millis(100)).await;
1574
1575        let _ = ready_chunks.next().await.unwrap();
1576
1577        epoch.inc_epoch();
1578        let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Pause);
1579        barrier_tx.send(barrier).unwrap();
1580
1581        epoch.inc_epoch();
1582        let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Resume);
1583        barrier_tx.send(barrier).unwrap();
1584
1585        // receive all
1586        ready_chunks.next().await.unwrap();
1587
1588        let prev_assignment = new_assignment;
1589        let new_assignment = vec![prev_assignment[2].clone()];
1590
1591        epoch.inc_epoch();
1592        let drop_split_mutation =
1593            Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1594                ActorId::default() => new_assignment.clone()
1595            }));
1596
1597        barrier_tx.send(drop_split_mutation).unwrap();
1598
1599        ready_chunks.next().await.unwrap(); // barrier
1600
1601        epoch.inc_epoch();
1602        let barrier = Barrier::new_test_barrier(epoch);
1603        barrier_tx.send(barrier).unwrap();
1604
1605        ready_chunks.next().await.unwrap(); // barrier
1606
1607        let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1608            &default_source_internal_table(0x2333),
1609            mem_state_store.clone(),
1610        )
1611        .await;
1612
1613        let new_epoch = EpochPair::new_test_epoch(epoch);
1614        source_state_handler.init_epoch(new_epoch).await.unwrap();
1615
1616        let committed_reader = source_state_handler
1617            .new_committed_reader(new_epoch)
1618            .await
1619            .unwrap();
1620        assert!(
1621            committed_reader
1622                .try_recover_from_state_store(&prev_assignment[0])
1623                .await
1624                .unwrap()
1625                .is_none()
1626        );
1627
1628        assert!(
1629            committed_reader
1630                .try_recover_from_state_store(&prev_assignment[1])
1631                .await
1632                .unwrap()
1633                .is_none()
1634        );
1635
1636        assert!(
1637            committed_reader
1638                .try_recover_from_state_store(&prev_assignment[2])
1639                .await
1640                .unwrap()
1641                .is_some()
1642        );
1643    }
1644}