risingwave_stream/executor/source/
source_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use either::Either;
20use itertools::Itertools;
21use prometheus::core::{AtomicU64, GenericCounter};
22use risingwave_common::array::ArrayRef;
23use risingwave_common::catalog::{ColumnId, TableId};
24use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedMetric};
25use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::util::epoch::{Epoch, EpochPair};
28use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
29use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
30use risingwave_connector::source::reader::reader::SourceReader;
31use risingwave_connector::source::{
32    ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData,
33    StreamChunkWithState, WaitCheckpointTask,
34};
35use risingwave_hummock_sdk::HummockReadEpoch;
36use risingwave_storage::store::TryWaitEpochOptions;
37use thiserror_ext::AsReport;
38use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
39use tokio::sync::{mpsc, oneshot};
40use tokio::time::Instant;
41
42use super::executor_core::StreamSourceCore;
43use super::{barrier_to_message_stream, get_split_offset_col_idx, prune_additional_cols};
44use crate::common::rate_limit::limited_chunk_size;
45use crate::executor::UpdateMutation;
46use crate::executor::prelude::*;
47use crate::executor::source::reader_stream::StreamReaderBuilder;
48use crate::executor::stream_reader::StreamReaderWithPause;
49use crate::task::LocalBarrierManager;
50
51/// A constant to multiply when calculating the maximum time to wait for a barrier. This is due to
52/// some latencies in network and cost in meta.
53pub const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;
54
55pub struct SourceExecutor<S: StateStore> {
56    actor_ctx: ActorContextRef,
57
58    /// Streaming source for external
59    stream_source_core: StreamSourceCore<S>,
60
61    /// Metrics for monitor.
62    metrics: Arc<StreamingMetrics>,
63
64    /// Receiver of barrier channel.
65    barrier_receiver: Option<UnboundedReceiver<Barrier>>,
66
67    /// System parameter reader to read barrier interval
68    system_params: SystemParamsReaderRef,
69
70    /// Rate limit in rows/s.
71    rate_limit_rps: Option<u32>,
72
73    is_shared_non_cdc: bool,
74
75    /// Local barrier manager for reporting source load finished events
76    barrier_manager: LocalBarrierManager,
77}
78
79impl<S: StateStore> SourceExecutor<S> {
80    #[expect(clippy::too_many_arguments)]
81    pub fn new(
82        actor_ctx: ActorContextRef,
83        stream_source_core: StreamSourceCore<S>,
84        metrics: Arc<StreamingMetrics>,
85        barrier_receiver: UnboundedReceiver<Barrier>,
86        system_params: SystemParamsReaderRef,
87        rate_limit_rps: Option<u32>,
88        is_shared_non_cdc: bool,
89        barrier_manager: LocalBarrierManager,
90    ) -> Self {
91        Self {
92            actor_ctx,
93            stream_source_core,
94            metrics,
95            barrier_receiver: Some(barrier_receiver),
96            system_params,
97            rate_limit_rps,
98            is_shared_non_cdc,
99            barrier_manager,
100        }
101    }
102
103    fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder {
104        StreamReaderBuilder {
105            source_desc,
106            rate_limit: self.rate_limit_rps,
107            source_id: self.stream_source_core.source_id,
108            source_name: self.stream_source_core.source_name.clone(),
109            is_auto_schema_change_enable: self.is_auto_schema_change_enable(),
110            actor_ctx: self.actor_ctx.clone(),
111            reader_stream: None,
112        }
113    }
114
115    async fn spawn_wait_checkpoint_worker(
116        core: &StreamSourceCore<S>,
117        source_reader: SourceReader,
118    ) -> StreamExecutorResult<Option<WaitCheckpointTaskBuilder>> {
119        let Some(initial_task) = source_reader.create_wait_checkpoint_task().await? else {
120            return Ok(None);
121        };
122        let (wait_checkpoint_tx, wait_checkpoint_rx) = mpsc::unbounded_channel();
123        let wait_checkpoint_worker = WaitCheckpointWorker {
124            wait_checkpoint_rx,
125            state_store: core.split_state_store.state_table().state_store().clone(),
126            table_id: core.split_state_store.state_table().table_id().into(),
127        };
128        tokio::spawn(wait_checkpoint_worker.run());
129        Ok(Some(WaitCheckpointTaskBuilder {
130            wait_checkpoint_tx,
131            source_reader,
132            building_task: initial_task,
133        }))
134    }
135
136    /// build the source column ids and the source context which will be used to build the source stream
137    pub fn prepare_source_stream_build(
138        &self,
139        source_desc: &SourceDesc,
140    ) -> (Vec<ColumnId>, SourceContext) {
141        let column_ids = source_desc
142            .columns
143            .iter()
144            .map(|column_desc| column_desc.column_id)
145            .collect_vec();
146
147        let (schema_change_tx, mut schema_change_rx) =
148            mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
149        let schema_change_tx = if self.is_auto_schema_change_enable() {
150            let meta_client = self.actor_ctx.meta_client.clone();
151            // spawn a task to handle schema change event from source parser
152            let _join_handle = tokio::task::spawn(async move {
153                while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
154                    let table_ids = schema_change.table_ids();
155                    tracing::info!(
156                        target: "auto_schema_change",
157                        "recv a schema change event for tables: {:?}", table_ids);
158                    // TODO: retry on rpc error
159                    if let Some(ref meta_client) = meta_client {
160                        match meta_client
161                            .auto_schema_change(schema_change.to_protobuf())
162                            .await
163                        {
164                            Ok(_) => {
165                                tracing::info!(
166                                    target: "auto_schema_change",
167                                    "schema change success for tables: {:?}", table_ids);
168                                finish_tx.send(()).unwrap();
169                            }
170                            Err(e) => {
171                                tracing::error!(
172                                    target: "auto_schema_change",
173                                    error = ?e.as_report(), "schema change error");
174                                finish_tx.send(()).unwrap();
175                            }
176                        }
177                    }
178                }
179            });
180            Some(schema_change_tx)
181        } else {
182            info!("auto schema change is disabled in config");
183            None
184        };
185
186        let source_ctx = SourceContext::new(
187            self.actor_ctx.id,
188            self.stream_source_core.source_id,
189            self.actor_ctx.fragment_id,
190            self.stream_source_core.source_name.clone(),
191            source_desc.metrics.clone(),
192            SourceCtrlOpts {
193                chunk_size: limited_chunk_size(self.rate_limit_rps),
194                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
195            },
196            source_desc.source.config.clone(),
197            schema_change_tx,
198        );
199
200        (column_ids, source_ctx)
201    }
202
203    /// Check if this is a batch refreshable source.
204    fn is_batch_source(&self) -> bool {
205        self.stream_source_core.is_batch_source
206    }
207
208    /// Refresh splits
209    fn refresh_batch_splits(&mut self) -> StreamExecutorResult<Vec<SplitImpl>> {
210        debug_assert!(self.is_batch_source());
211        let core = &self.stream_source_core;
212        let mut split = core.get_batch_split();
213        split.refresh();
214        Ok(vec![split.into()])
215    }
216
217    fn is_auto_schema_change_enable(&self) -> bool {
218        self.actor_ctx
219            .streaming_config
220            .developer
221            .enable_auto_schema_change
222    }
223
224    /// `source_id | source_name | actor_id | fragment_id`
225    #[inline]
226    fn get_metric_labels(&self) -> [String; 4] {
227        [
228            self.stream_source_core.source_id.to_string(),
229            self.stream_source_core.source_name.clone(),
230            self.actor_ctx.id.to_string(),
231            self.actor_ctx.fragment_id.to_string(),
232        ]
233    }
234
235    /// - `should_trim_state`: whether to trim state for dropped splits.
236    ///
237    ///   For scaling, the connector splits can be migrated to other actors, but
238    ///   won't be added or removed. Actors should not trim states for splits that
239    ///   are moved to other actors.
240    ///
241    ///   For source split change, split will not be migrated and we can trim states
242    ///   for deleted splits.
243    async fn apply_split_change_after_yield_barrier<const BIASED: bool>(
244        &mut self,
245        barrier_epoch: EpochPair,
246        source_desc: &SourceDesc,
247        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
248        apply_mutation: ApplyMutationAfterBarrier<'_>,
249    ) -> StreamExecutorResult<()> {
250        {
251            let mut should_rebuild_stream = false;
252            match apply_mutation {
253                ApplyMutationAfterBarrier::SplitChange {
254                    target_splits,
255                    should_trim_state,
256                    split_change_count,
257                } => {
258                    split_change_count.inc();
259                    if self
260                        .update_state_if_changed(barrier_epoch, target_splits, should_trim_state)
261                        .await?
262                    {
263                        should_rebuild_stream = true;
264                    }
265                }
266                ApplyMutationAfterBarrier::RefreshBatchSplits(splits) => {
267                    // Just override the latest split info with the refreshed splits. No need to check.
268                    self.stream_source_core.latest_split_info =
269                        splits.into_iter().map(|s| (s.id(), s)).collect();
270                    should_rebuild_stream = true;
271                }
272                ApplyMutationAfterBarrier::ConnectorPropsChange => {
273                    should_rebuild_stream = true;
274                }
275            }
276
277            if should_rebuild_stream {
278                self.rebuild_stream_reader(source_desc, stream)?;
279            }
280        }
281
282        Ok(())
283    }
284
285    /// Returns `true` if split changed. Otherwise `false`.
286    async fn update_state_if_changed(
287        &mut self,
288        barrier_epoch: EpochPair,
289        target_splits: Vec<SplitImpl>,
290        should_trim_state: bool,
291    ) -> StreamExecutorResult<bool> {
292        let core = &mut self.stream_source_core;
293
294        let target_splits: HashMap<_, _> = target_splits
295            .into_iter()
296            .map(|split| (split.id(), split))
297            .collect();
298
299        let mut target_state: HashMap<SplitId, SplitImpl> =
300            HashMap::with_capacity(target_splits.len());
301
302        let mut split_changed = false;
303
304        let committed_reader = core
305            .split_state_store
306            .new_committed_reader(barrier_epoch)
307            .await?;
308
309        // Checks added splits
310        for (split_id, split) in target_splits {
311            if let Some(s) = core.latest_split_info.get(&split_id) {
312                // For existing splits, we should use the latest offset from the cache.
313                // `target_splits` is from meta and contains the initial offset.
314                target_state.insert(split_id, s.clone());
315            } else {
316                split_changed = true;
317                // write new assigned split to state cache. snapshot is base on cache.
318
319                let initial_state = if let Some(recover_state) = committed_reader
320                    .try_recover_from_state_store(&split)
321                    .await?
322                {
323                    recover_state
324                } else {
325                    split
326                };
327
328                core.updated_splits_in_epoch
329                    .entry(split_id.clone())
330                    .or_insert_with(|| initial_state.clone());
331
332                target_state.insert(split_id, initial_state);
333            }
334        }
335
336        // Checks dropped splits
337        for existing_split_id in core.latest_split_info.keys() {
338            if !target_state.contains_key(existing_split_id) {
339                tracing::info!("split dropping detected: {}", existing_split_id);
340                split_changed = true;
341            }
342        }
343
344        if split_changed {
345            tracing::info!(
346                actor_id = self.actor_ctx.id,
347                state = ?target_state,
348                "apply split change"
349            );
350
351            core.updated_splits_in_epoch
352                .retain(|split_id, _| target_state.contains_key(split_id));
353
354            let dropped_splits = core
355                .latest_split_info
356                .extract_if(|split_id, _| !target_state.contains_key(split_id))
357                .map(|(_, split)| split)
358                .collect_vec();
359
360            if should_trim_state && !dropped_splits.is_empty() {
361                // trim dropped splits' state
362                core.split_state_store.trim_state(&dropped_splits).await?;
363            }
364
365            core.latest_split_info = target_state;
366        }
367
368        Ok(split_changed)
369    }
370
371    /// Rebuild stream if there is a err in stream
372    fn rebuild_stream_reader_from_error<const BIASED: bool>(
373        &mut self,
374        source_desc: &SourceDesc,
375        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
376        e: StreamExecutorError,
377    ) -> StreamExecutorResult<()> {
378        let core = &mut self.stream_source_core;
379        tracing::warn!(
380            error = ?e.as_report(),
381            actor_id = self.actor_ctx.id,
382            source_id = %core.source_id,
383            "stream source reader error",
384        );
385        GLOBAL_ERROR_METRICS.user_source_error.report([
386            e.variant_name().to_owned(),
387            core.source_id.to_string(),
388            core.source_name.to_owned(),
389            self.actor_ctx.fragment_id.to_string(),
390        ]);
391
392        self.rebuild_stream_reader(source_desc, stream)
393    }
394
395    fn rebuild_stream_reader<const BIASED: bool>(
396        &mut self,
397        source_desc: &SourceDesc,
398        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
399    ) -> StreamExecutorResult<()> {
400        let core = &mut self.stream_source_core;
401        let target_state: Vec<SplitImpl> = core.latest_split_info.values().cloned().collect();
402
403        tracing::info!(
404            "actor {:?} apply source split change to {:?}",
405            self.actor_ctx.id,
406            target_state
407        );
408
409        // Replace the source reader with a new one of the new state.
410        let reader_stream_builder = self.stream_reader_builder(source_desc.clone());
411        let reader_stream =
412            reader_stream_builder.into_retry_stream(Some(target_state.clone()), false);
413
414        stream.replace_data_stream(reader_stream);
415
416        Ok(())
417    }
418
419    async fn persist_state_and_clear_cache(
420        &mut self,
421        epoch: EpochPair,
422    ) -> StreamExecutorResult<HashMap<SplitId, SplitImpl>> {
423        let core = &mut self.stream_source_core;
424
425        let cache = core
426            .updated_splits_in_epoch
427            .values()
428            .map(|split_impl| split_impl.to_owned())
429            .collect_vec();
430
431        if !cache.is_empty() {
432            tracing::debug!(state = ?cache, "take snapshot");
433            core.split_state_store.set_states(cache).await?;
434        }
435
436        // commit anyway, even if no message saved
437        core.split_state_store.commit(epoch).await?;
438
439        let updated_splits = core.updated_splits_in_epoch.clone();
440
441        core.updated_splits_in_epoch.clear();
442
443        Ok(updated_splits)
444    }
445
446    /// try mem table spill
447    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
448        let core = &mut self.stream_source_core;
449        core.split_state_store.try_flush().await?;
450
451        Ok(())
452    }
453
454    /// A source executor with a stream source receives:
455    /// 1. Barrier messages
456    /// 2. Data from external source
457    /// and acts accordingly.
458    #[try_stream(ok = Message, error = StreamExecutorError)]
459    async fn execute_inner(mut self) {
460        let mut barrier_receiver = self.barrier_receiver.take().unwrap();
461        let first_barrier = barrier_receiver
462            .recv()
463            .instrument_await("source_recv_first_barrier")
464            .await
465            .ok_or_else(|| {
466                anyhow!(
467                    "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
468                    self.actor_ctx.id,
469                    self.stream_source_core.source_id
470                )
471            })?;
472        let first_epoch = first_barrier.epoch;
473        let mut boot_state =
474            if let Some(splits) = first_barrier.initial_split_assignment(self.actor_ctx.id) {
475                tracing::debug!(?splits, "boot with splits");
476                splits.to_vec()
477            } else {
478                Vec::default()
479            };
480        let is_pause_on_startup = first_barrier.is_pause_on_startup();
481        let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id);
482
483        yield Message::Barrier(first_barrier);
484
485        let mut core = self.stream_source_core;
486        let source_id = core.source_id;
487
488        // Build source description from the builder.
489        let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap();
490        let mut source_desc = source_desc_builder
491            .build()
492            .map_err(StreamExecutorError::connector_error)?;
493
494        let mut wait_checkpoint_task_builder =
495            Self::spawn_wait_checkpoint_worker(&core, source_desc.source.clone()).await?;
496
497        let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns)
498        else {
499            unreachable!("Partition and offset columns must be set.");
500        };
501
502        core.split_state_store.init_epoch(first_epoch).await?;
503        {
504            let committed_reader = core
505                .split_state_store
506                .new_committed_reader(first_epoch)
507                .await?;
508            for ele in &mut boot_state {
509                if let Some(recover_state) =
510                    committed_reader.try_recover_from_state_store(ele).await?
511                {
512                    *ele = recover_state;
513                    // if state store is non-empty, we consider it's initialized.
514                    is_uninitialized = false;
515                } else {
516                    // This is a new split, not in state table.
517                    // make sure it is written to state table later.
518                    // Then even it receives no messages, we can observe it in state table.
519                    core.updated_splits_in_epoch.insert(ele.id(), ele.clone());
520                }
521            }
522        }
523
524        // init in-memory split states with persisted state if any
525        core.init_split_state(boot_state.clone());
526
527        // Return the ownership of `stream_source_core` to the source executor.
528        self.stream_source_core = core;
529
530        let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
531        tracing::debug!(state = ?recover_state, "start with state");
532
533        let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
534        let mut reader_stream_builder = self.stream_reader_builder(source_desc.clone());
535        let mut latest_splits = None;
536        // Build the source stream reader.
537        if is_uninitialized {
538            let create_split_reader_result = reader_stream_builder
539                .fetch_latest_splits(recover_state.clone(), self.is_shared_non_cdc)
540                .await?;
541            latest_splits = create_split_reader_result.latest_splits;
542        }
543
544        if let Some(latest_splits) = latest_splits {
545            // make sure it is written to state table later.
546            // Then even it receives no messages, we can observe it in state table.
547            self.stream_source_core
548                .updated_splits_in_epoch
549                .extend(latest_splits.into_iter().map(|s| (s.id(), s)));
550        }
551        // Merge the chunks from source and the barriers into a single stream. We prioritize
552        // barriers over source data chunks here.
553        let mut stream = StreamReaderWithPause::<true, StreamChunkWithState>::new(
554            barrier_stream,
555            reader_stream_builder
556                .into_retry_stream(recover_state, is_uninitialized && self.is_shared_non_cdc),
557        );
558        let mut command_paused = false;
559
560        // - If the first barrier requires us to pause on startup, pause the stream.
561        if is_pause_on_startup {
562            tracing::info!("source paused on startup");
563            stream.pause_stream();
564            command_paused = true;
565        }
566
567        // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms`
568        // milliseconds, considering some other latencies like network and cost in Meta.
569        let mut max_wait_barrier_time_ms =
570            self.system_params.load().barrier_interval_ms() as u128 * WAIT_BARRIER_MULTIPLE_TIMES;
571        let mut last_barrier_time = Instant::now();
572        let mut self_paused = false;
573
574        let source_output_row_count = self
575            .metrics
576            .source_output_row_count
577            .with_guarded_label_values(&self.get_metric_labels());
578
579        let source_split_change_count = self
580            .metrics
581            .source_split_change_count
582            .with_guarded_label_values(&self.get_metric_labels());
583
584        let mut is_refreshing = false;
585
586        while let Some(msg) = stream.next().await {
587            let Ok(msg) = msg else {
588                tokio::time::sleep(Duration::from_millis(1000)).await;
589                self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err())?;
590                continue;
591            };
592
593            match msg {
594                // This branch will be preferred.
595                Either::Left(Message::Barrier(barrier)) => {
596                    last_barrier_time = Instant::now();
597
598                    if self_paused {
599                        self_paused = false;
600                        // command_paused has a higher priority.
601                        if !command_paused {
602                            stream.resume_stream();
603                        }
604                    }
605
606                    let epoch = barrier.epoch;
607
608                    // NOTE: We rely on CompleteBarrierTask, which is only for checkpoint barrier,
609                    // so we wait for a checkpoint barrier here.
610                    if barrier.is_checkpoint() && self.is_batch_source() && is_refreshing {
611                        let batch_split = self.stream_source_core.get_batch_split();
612                        if batch_split.finished() {
613                            tracing::info!(?epoch, "emitting load finish");
614                            self.barrier_manager.report_source_load_finished(
615                                epoch,
616                                self.actor_ctx.id,
617                                source_id.table_id(),
618                                source_id.table_id(),
619                            );
620                            is_refreshing = false;
621                        }
622                    }
623
624                    let mut split_change = None;
625
626                    if let Some(mutation) = barrier.mutation.as_deref() {
627                        match mutation {
628                            Mutation::Pause => {
629                                command_paused = true;
630                                stream.pause_stream()
631                            }
632                            Mutation::Resume => {
633                                command_paused = false;
634                                stream.resume_stream()
635                            }
636                            Mutation::SourceChangeSplit(actor_splits) => {
637                                tracing::info!(
638                                    actor_id = self.actor_ctx.id,
639                                    actor_splits = ?actor_splits,
640                                    "source change split received"
641                                );
642
643                                split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
644                                    |target_splits| {
645                                        (
646                                            &source_desc,
647                                            &mut stream,
648                                            ApplyMutationAfterBarrier::SplitChange {
649                                                target_splits,
650                                                should_trim_state: true,
651                                                split_change_count: &source_split_change_count,
652                                            },
653                                        )
654                                    },
655                                );
656                            }
657
658                            Mutation::ConnectorPropsChange(maybe_mutation) => {
659                                if let Some(new_props) = maybe_mutation.get(&source_id.table_id()) {
660                                    // rebuild the stream reader with new props
661                                    tracing::info!(
662                                        "updating source properties from {:?} to {:?}",
663                                        source_desc.source.config,
664                                        new_props
665                                    );
666                                    source_desc.update_reader(new_props.clone())?;
667                                    // suppose the connector props change will not involve state change
668                                    split_change = Some((
669                                        &source_desc,
670                                        &mut stream,
671                                        ApplyMutationAfterBarrier::ConnectorPropsChange,
672                                    ));
673                                }
674                            }
675
676                            Mutation::Update(UpdateMutation { actor_splits, .. }) => {
677                                split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
678                                    |target_splits| {
679                                        (
680                                            &source_desc,
681                                            &mut stream,
682                                            ApplyMutationAfterBarrier::SplitChange {
683                                                target_splits,
684                                                should_trim_state: false,
685                                                split_change_count: &source_split_change_count,
686                                            },
687                                        )
688                                    },
689                                );
690                            }
691                            Mutation::Throttle(actor_to_apply) => {
692                                if let Some(new_rate_limit) = actor_to_apply.get(&self.actor_ctx.id)
693                                    && *new_rate_limit != self.rate_limit_rps
694                                {
695                                    tracing::info!(
696                                        "updating rate limit from {:?} to {:?}",
697                                        self.rate_limit_rps,
698                                        *new_rate_limit
699                                    );
700                                    self.rate_limit_rps = *new_rate_limit;
701                                    // recreate from latest_split_info
702                                    self.rebuild_stream_reader(&source_desc, &mut stream)?;
703                                }
704                            }
705                            Mutation::RefreshStart {
706                                table_id: _,
707                                associated_source_id,
708                            } if *associated_source_id == source_id => {
709                                debug_assert!(self.is_batch_source());
710                                is_refreshing = true;
711
712                                // Similar to split_change, we need to update the split info, and rebuild source reader.
713
714                                // For batch sources, trigger re-enumeration of splits to detect file changes
715                                if let Ok(new_splits) = self.refresh_batch_splits() {
716                                    tracing::info!(
717                                        actor_id = self.actor_ctx.id,
718                                         %associated_source_id,
719                                        new_splits_count = new_splits.len(),
720                                        "RefreshStart triggered split re-enumeration"
721                                    );
722                                    split_change = Some((
723                                        &source_desc,
724                                        &mut stream,
725                                        ApplyMutationAfterBarrier::RefreshBatchSplits(new_splits),
726                                    ));
727                                } else {
728                                    tracing::warn!(
729                                        actor_id = self.actor_ctx.id,
730                                        %associated_source_id,
731                                        "Failed to refresh splits during RefreshStart"
732                                    );
733                                }
734                            }
735                            _ => {}
736                        }
737                    }
738
739                    let updated_splits = self.persist_state_and_clear_cache(epoch).await?;
740
741                    // when handle a checkpoint barrier, spawn a task to wait for epoch commit notification
742                    if barrier.kind.is_checkpoint()
743                        && let Some(task_builder) = &mut wait_checkpoint_task_builder
744                    {
745                        task_builder.update_task_on_checkpoint(updated_splits);
746
747                        tracing::debug!("epoch to wait {:?}", epoch);
748                        task_builder.send(Epoch(epoch.prev)).await?
749                    }
750
751                    let barrier_epoch = barrier.epoch;
752                    yield Message::Barrier(barrier);
753
754                    if let Some((source_desc, stream, to_apply_mutation)) = split_change {
755                        self.apply_split_change_after_yield_barrier(
756                            barrier_epoch,
757                            source_desc,
758                            stream,
759                            to_apply_mutation,
760                        )
761                        .await?;
762                    }
763                }
764                Either::Left(_) => {
765                    // For the source executor, the message we receive from this arm
766                    // should always be barrier message.
767                    unreachable!();
768                }
769
770                Either::Right((chunk, latest_state)) => {
771                    if let Some(task_builder) = &mut wait_checkpoint_task_builder {
772                        let offset_col = chunk.column_at(offset_idx);
773                        task_builder.update_task_on_chunk(offset_col.clone());
774                    }
775                    if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
776                        // Exceeds the max wait barrier time, the source will be paused.
777                        // Currently we can guarantee the
778                        // source is not paused since it received stream
779                        // chunks.
780                        self_paused = true;
781                        tracing::warn!(
782                            "source paused, wait barrier for {:?}",
783                            last_barrier_time.elapsed()
784                        );
785                        stream.pause_stream();
786
787                        // Only update `max_wait_barrier_time_ms` to capture
788                        // `barrier_interval_ms`
789                        // changes here to avoid frequently accessing the shared
790                        // `system_params`.
791                        max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
792                            as u128
793                            * WAIT_BARRIER_MULTIPLE_TIMES;
794                    }
795
796                    latest_state.iter().for_each(|(split_id, new_split_impl)| {
797                        if let Some(split_impl) =
798                            self.stream_source_core.latest_split_info.get_mut(split_id)
799                        {
800                            *split_impl = new_split_impl.clone();
801                        }
802                    });
803
804                    self.stream_source_core
805                        .updated_splits_in_epoch
806                        .extend(latest_state);
807
808                    let card = chunk.cardinality();
809                    source_output_row_count.inc_by(card as u64);
810                    let chunk =
811                        prune_additional_cols(&chunk, split_idx, offset_idx, &source_desc.columns);
812                    yield Message::Chunk(chunk);
813                    self.try_flush_data().await?;
814                }
815            }
816        }
817
818        // The source executor should only be stopped by the actor when finding a `Stop` mutation.
819        tracing::error!(
820            actor_id = self.actor_ctx.id,
821            "source executor exited unexpectedly"
822        )
823    }
824}
825
826#[derive(Debug, Clone)]
827enum ApplyMutationAfterBarrier<'a> {
828    SplitChange {
829        target_splits: Vec<SplitImpl>,
830        should_trim_state: bool,
831        split_change_count: &'a LabelGuardedMetric<GenericCounter<AtomicU64>>,
832    },
833    RefreshBatchSplits(Vec<SplitImpl>),
834    ConnectorPropsChange,
835}
836
837impl<S: StateStore> Execute for SourceExecutor<S> {
838    fn execute(self: Box<Self>) -> BoxedMessageStream {
839        self.execute_inner().boxed()
840    }
841}
842
843impl<S: StateStore> Debug for SourceExecutor<S> {
844    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
845        f.debug_struct("SourceExecutor")
846            .field("source_id", &self.stream_source_core.source_id)
847            .field("column_ids", &self.stream_source_core.column_ids)
848            .finish()
849    }
850}
851
852struct WaitCheckpointTaskBuilder {
853    wait_checkpoint_tx: UnboundedSender<(Epoch, WaitCheckpointTask)>,
854    source_reader: SourceReader,
855    building_task: WaitCheckpointTask,
856}
857
858impl WaitCheckpointTaskBuilder {
859    fn update_task_on_chunk(&mut self, offset_col: ArrayRef) {
860        match &mut self.building_task {
861            WaitCheckpointTask::AckPubsubMessage(_, arrays) => {
862                arrays.push(offset_col);
863            }
864            WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => {
865                arrays.push(offset_col);
866            }
867            WaitCheckpointTask::CommitCdcOffset(_) => {}
868        }
869    }
870
871    fn update_task_on_checkpoint(&mut self, updated_splits: HashMap<SplitId, SplitImpl>) {
872        #[expect(clippy::single_match)]
873        match &mut self.building_task {
874            WaitCheckpointTask::CommitCdcOffset(offsets) => {
875                if !updated_splits.is_empty() {
876                    // cdc source only has one split
877                    assert_eq!(1, updated_splits.len());
878                    for (split_id, split_impl) in updated_splits {
879                        if split_impl.is_cdc_split() {
880                            *offsets = Some((split_id, split_impl.get_cdc_split_offset()));
881                        } else {
882                            unreachable!()
883                        }
884                    }
885                }
886            }
887            _ => {}
888        }
889    }
890
891    /// Send and reset the building task to a new one.
892    async fn send(&mut self, epoch: Epoch) -> Result<(), anyhow::Error> {
893        let new_task = self
894            .source_reader
895            .create_wait_checkpoint_task()
896            .await?
897            .expect("wait checkpoint task should be created");
898        self.wait_checkpoint_tx
899            .send((epoch, std::mem::replace(&mut self.building_task, new_task)))
900            .expect("wait_checkpoint_tx send should succeed");
901        Ok(())
902    }
903}
904
905/// A worker used to do some work after each checkpoint epoch is committed.
906///
907/// # Usage Cases
908///
909/// Typically there are 2 issues related with ack on checkpoint:
910///
911/// 1. Correctness (at-least-once), or don't let upstream clean uncommitted data.
912///    For message queueing semantics (delete after ack), we should ack to avoid redelivery,
913///    and only ack after checkpoint to avoid data loss.
914///
915/// 2. Allow upstream to clean data after commit.
916///
917/// See also <https://github.com/risingwavelabs/risingwave/issues/16736#issuecomment-2109379790>
918///
919/// ## CDC
920///
921/// Commit last consumed offset to upstream DB, so that old data can be discarded.
922///
923/// ## Google Pub/Sub
924///
925/// Due to queueing semantics.
926/// Although Pub/Sub supports `retain_acked_messages` and `seek` functionality,
927/// it's quite limited unlike Kafka.
928///
929/// See also <https://cloud.google.com/pubsub/docs/subscribe-best-practices#process-messages>
930struct WaitCheckpointWorker<S: StateStore> {
931    wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>,
932    state_store: S,
933    table_id: TableId,
934}
935
936impl<S: StateStore> WaitCheckpointWorker<S> {
937    pub async fn run(mut self) {
938        tracing::debug!("wait epoch worker start success");
939        loop {
940            // poll the rx and wait for the epoch commit
941            match self.wait_checkpoint_rx.recv().await {
942                Some((epoch, task)) => {
943                    tracing::debug!("start to wait epoch {}", epoch.0);
944                    let ret = self
945                        .state_store
946                        .try_wait_epoch(
947                            HummockReadEpoch::Committed(epoch.0),
948                            TryWaitEpochOptions {
949                                table_id: self.table_id,
950                            },
951                        )
952                        .await;
953
954                    match ret {
955                        Ok(()) => {
956                            tracing::debug!(epoch = epoch.0, "wait epoch success");
957                            task.run().await;
958                        }
959                        Err(e) => {
960                            tracing::error!(
961                            error = %e.as_report(),
962                            "wait epoch {} failed", epoch.0
963                            );
964                        }
965                    }
966                }
967                None => {
968                    tracing::error!("wait epoch rx closed");
969                    break;
970                }
971            }
972        }
973    }
974}
975
976#[cfg(test)]
977mod tests {
978    use std::collections::HashSet;
979
980    use maplit::{btreemap, convert_args, hashmap};
981    use risingwave_common::catalog::{ColumnId, Field, TableId};
982    use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
983    use risingwave_common::test_prelude::StreamChunkTestExt;
984    use risingwave_common::util::epoch::{EpochExt, test_epoch};
985    use risingwave_connector::source::datagen::DatagenSplit;
986    use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
987    use risingwave_pb::catalog::StreamSourceInfo;
988    use risingwave_pb::plan_common::PbRowFormatType;
989    use risingwave_storage::memory::MemoryStateStore;
990    use tokio::sync::mpsc::unbounded_channel;
991    use tracing_test::traced_test;
992
993    use super::*;
994    use crate::executor::AddMutation;
995    use crate::executor::source::{SourceStateTableHandler, default_source_internal_table};
996    use crate::task::LocalBarrierManager;
997
998    const MOCK_SOURCE_NAME: &str = "mock_source";
999
1000    #[tokio::test]
1001    async fn test_source_executor() {
1002        let table_id = TableId::default();
1003        let schema = Schema {
1004            fields: vec![Field::with_name(DataType::Int32, "sequence_int")],
1005        };
1006        let row_id_index = None;
1007        let source_info = StreamSourceInfo {
1008            row_format: PbRowFormatType::Native as i32,
1009            ..Default::default()
1010        };
1011        let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1012        let column_ids = vec![0].into_iter().map(ColumnId::from).collect();
1013
1014        // This datagen will generate 3 rows at one time.
1015        let properties = convert_args!(btreemap!(
1016            "connector" => "datagen",
1017            "datagen.rows.per.second" => "3",
1018            "fields.sequence_int.kind" => "sequence",
1019            "fields.sequence_int.start" => "11",
1020            "fields.sequence_int.end" => "11111",
1021        ));
1022        let source_desc_builder =
1023            create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1024        let split_state_store = SourceStateTableHandler::from_table_catalog(
1025            &default_source_internal_table(0x2333),
1026            MemoryStateStore::new(),
1027        )
1028        .await;
1029        let core = StreamSourceCore::<MemoryStateStore> {
1030            source_id: table_id,
1031            column_ids,
1032            source_desc_builder: Some(source_desc_builder),
1033            latest_split_info: HashMap::new(),
1034            split_state_store,
1035            updated_splits_in_epoch: HashMap::new(),
1036            source_name: MOCK_SOURCE_NAME.to_owned(),
1037            is_batch_source: false,
1038        };
1039
1040        let system_params_manager = LocalSystemParamsManager::for_test();
1041
1042        let executor = SourceExecutor::new(
1043            ActorContext::for_test(0),
1044            core,
1045            Arc::new(StreamingMetrics::unused()),
1046            barrier_rx,
1047            system_params_manager.get_params(),
1048            None,
1049            false,
1050            LocalBarrierManager::for_test(),
1051        );
1052        let mut executor = executor.boxed().execute();
1053
1054        let init_barrier =
1055            Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
1056                adds: HashMap::new(),
1057                added_actors: HashSet::new(),
1058                splits: hashmap! {
1059                    ActorId::default() => vec![
1060                        SplitImpl::Datagen(DatagenSplit {
1061                            split_index: 0,
1062                            split_num: 1,
1063                            start_offset: None,
1064                        }),
1065                    ],
1066                },
1067                pause: false,
1068                subscriptions_to_add: vec![],
1069                backfill_nodes_to_pause: Default::default(),
1070                actor_cdc_table_snapshot_splits: Default::default(),
1071            }));
1072        barrier_tx.send(init_barrier).unwrap();
1073
1074        // Consume barrier.
1075        executor.next().await.unwrap().unwrap();
1076
1077        // Consume data chunk.
1078        let msg = executor.next().await.unwrap().unwrap();
1079
1080        // Row id will not be filled here.
1081        assert_eq!(
1082            msg.into_chunk().unwrap(),
1083            StreamChunk::from_pretty(
1084                " i
1085                + 11
1086                + 12
1087                + 13"
1088            )
1089        );
1090    }
1091
1092    #[traced_test]
1093    #[tokio::test]
1094    async fn test_split_change_mutation() {
1095        let table_id = TableId::default();
1096        let schema = Schema {
1097            fields: vec![Field::with_name(DataType::Int32, "v1")],
1098        };
1099        let row_id_index = None;
1100        let source_info = StreamSourceInfo {
1101            row_format: PbRowFormatType::Native as i32,
1102            ..Default::default()
1103        };
1104        let properties = convert_args!(btreemap!(
1105            "connector" => "datagen",
1106            "fields.v1.kind" => "sequence",
1107            "fields.v1.start" => "11",
1108            "fields.v1.end" => "11111",
1109        ));
1110
1111        let source_desc_builder =
1112            create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1113        let mem_state_store = MemoryStateStore::new();
1114
1115        let column_ids = vec![ColumnId::from(0)];
1116        let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1117        let split_state_store = SourceStateTableHandler::from_table_catalog(
1118            &default_source_internal_table(0x2333),
1119            mem_state_store.clone(),
1120        )
1121        .await;
1122
1123        let core = StreamSourceCore::<MemoryStateStore> {
1124            source_id: table_id,
1125            column_ids: column_ids.clone(),
1126            source_desc_builder: Some(source_desc_builder),
1127            latest_split_info: HashMap::new(),
1128            split_state_store,
1129            updated_splits_in_epoch: HashMap::new(),
1130            source_name: MOCK_SOURCE_NAME.to_owned(),
1131            is_batch_source: false,
1132        };
1133
1134        let system_params_manager = LocalSystemParamsManager::for_test();
1135
1136        let executor = SourceExecutor::new(
1137            ActorContext::for_test(0),
1138            core,
1139            Arc::new(StreamingMetrics::unused()),
1140            barrier_rx,
1141            system_params_manager.get_params(),
1142            None,
1143            false,
1144            LocalBarrierManager::for_test(),
1145        );
1146        let mut handler = executor.boxed().execute();
1147
1148        let mut epoch = test_epoch(1);
1149        let init_barrier =
1150            Barrier::new_test_barrier(epoch).with_mutation(Mutation::Add(AddMutation {
1151                adds: HashMap::new(),
1152                added_actors: HashSet::new(),
1153                splits: hashmap! {
1154                    ActorId::default() => vec![
1155                        SplitImpl::Datagen(DatagenSplit {
1156                            split_index: 0,
1157                            split_num: 3,
1158                            start_offset: None,
1159                        }),
1160                    ],
1161                },
1162                pause: false,
1163                subscriptions_to_add: vec![],
1164                backfill_nodes_to_pause: Default::default(),
1165                actor_cdc_table_snapshot_splits: Default::default(),
1166            }));
1167        barrier_tx.send(init_barrier).unwrap();
1168
1169        // Consume barrier.
1170        handler
1171            .next()
1172            .await
1173            .unwrap()
1174            .unwrap()
1175            .into_barrier()
1176            .unwrap();
1177
1178        let mut ready_chunks = handler.ready_chunks(10);
1179
1180        let _ = ready_chunks.next().await.unwrap();
1181
1182        let new_assignment = vec![
1183            SplitImpl::Datagen(DatagenSplit {
1184                split_index: 0,
1185                split_num: 3,
1186                start_offset: None,
1187            }),
1188            SplitImpl::Datagen(DatagenSplit {
1189                split_index: 1,
1190                split_num: 3,
1191                start_offset: None,
1192            }),
1193            SplitImpl::Datagen(DatagenSplit {
1194                split_index: 2,
1195                split_num: 3,
1196                start_offset: None,
1197            }),
1198        ];
1199
1200        epoch.inc_epoch();
1201        let change_split_mutation =
1202            Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1203                ActorId::default() => new_assignment.clone()
1204            }));
1205
1206        barrier_tx.send(change_split_mutation).unwrap();
1207
1208        let _ = ready_chunks.next().await.unwrap(); // barrier
1209
1210        epoch.inc_epoch();
1211        let barrier = Barrier::new_test_barrier(epoch);
1212        barrier_tx.send(barrier).unwrap();
1213
1214        ready_chunks.next().await.unwrap(); // barrier
1215
1216        let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1217            &default_source_internal_table(0x2333),
1218            mem_state_store.clone(),
1219        )
1220        .await;
1221
1222        // there must exist state for new add partition
1223        source_state_handler
1224            .init_epoch(EpochPair::new_test_epoch(epoch))
1225            .await
1226            .unwrap();
1227        source_state_handler
1228            .get(&new_assignment[1].id())
1229            .await
1230            .unwrap()
1231            .unwrap();
1232
1233        tokio::time::sleep(Duration::from_millis(100)).await;
1234
1235        let _ = ready_chunks.next().await.unwrap();
1236
1237        epoch.inc_epoch();
1238        let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Pause);
1239        barrier_tx.send(barrier).unwrap();
1240
1241        epoch.inc_epoch();
1242        let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Resume);
1243        barrier_tx.send(barrier).unwrap();
1244
1245        // receive all
1246        ready_chunks.next().await.unwrap();
1247
1248        let prev_assignment = new_assignment;
1249        let new_assignment = vec![prev_assignment[2].clone()];
1250
1251        epoch.inc_epoch();
1252        let drop_split_mutation =
1253            Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1254                ActorId::default() => new_assignment.clone()
1255            }));
1256
1257        barrier_tx.send(drop_split_mutation).unwrap();
1258
1259        ready_chunks.next().await.unwrap(); // barrier
1260
1261        epoch.inc_epoch();
1262        let barrier = Barrier::new_test_barrier(epoch);
1263        barrier_tx.send(barrier).unwrap();
1264
1265        ready_chunks.next().await.unwrap(); // barrier
1266
1267        let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1268            &default_source_internal_table(0x2333),
1269            mem_state_store.clone(),
1270        )
1271        .await;
1272
1273        let new_epoch = EpochPair::new_test_epoch(epoch);
1274        source_state_handler.init_epoch(new_epoch).await.unwrap();
1275
1276        let committed_reader = source_state_handler
1277            .new_committed_reader(new_epoch)
1278            .await
1279            .unwrap();
1280        assert!(
1281            committed_reader
1282                .try_recover_from_state_store(&prev_assignment[0])
1283                .await
1284                .unwrap()
1285                .is_none()
1286        );
1287
1288        assert!(
1289            committed_reader
1290                .try_recover_from_state_store(&prev_assignment[1])
1291                .await
1292                .unwrap()
1293                .is_none()
1294        );
1295
1296        assert!(
1297            committed_reader
1298                .try_recover_from_state_store(&prev_assignment[2])
1299                .await
1300                .unwrap()
1301                .is_some()
1302        );
1303    }
1304}