risingwave_stream/executor/source/
source_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use either::Either;
20use itertools::Itertools;
21use risingwave_common::array::ArrayRef;
22use risingwave_common::catalog::{ColumnId, TableId};
23use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntCounter};
24use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
25use risingwave_common::system_param::reader::SystemParamsRead;
26use risingwave_common::util::epoch::{Epoch, EpochPair};
27use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
28use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
29use risingwave_connector::source::reader::reader::SourceReader;
30use risingwave_connector::source::{
31    ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData,
32    StreamChunkWithState, WaitCheckpointTask,
33};
34use risingwave_hummock_sdk::HummockReadEpoch;
35use risingwave_storage::store::TryWaitEpochOptions;
36use thiserror_ext::AsReport;
37use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
38use tokio::sync::{mpsc, oneshot};
39use tokio::time::Instant;
40
41use super::executor_core::StreamSourceCore;
42use super::{barrier_to_message_stream, get_split_offset_col_idx, prune_additional_cols};
43use crate::common::rate_limit::limited_chunk_size;
44use crate::executor::UpdateMutation;
45use crate::executor::prelude::*;
46use crate::executor::source::reader_stream::StreamReaderBuilder;
47use crate::executor::stream_reader::StreamReaderWithPause;
48
49/// A constant to multiply when calculating the maximum time to wait for a barrier. This is due to
50/// some latencies in network and cost in meta.
51pub const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;
52
53pub struct SourceExecutor<S: StateStore> {
54    actor_ctx: ActorContextRef,
55
56    /// Streaming source for external
57    stream_source_core: Option<StreamSourceCore<S>>,
58
59    /// Metrics for monitor.
60    metrics: Arc<StreamingMetrics>,
61
62    /// Receiver of barrier channel.
63    barrier_receiver: Option<UnboundedReceiver<Barrier>>,
64
65    /// System parameter reader to read barrier interval
66    system_params: SystemParamsReaderRef,
67
68    /// Rate limit in rows/s.
69    rate_limit_rps: Option<u32>,
70
71    is_shared_non_cdc: bool,
72}
73
74impl<S: StateStore> SourceExecutor<S> {
75    pub fn new(
76        actor_ctx: ActorContextRef,
77        stream_source_core: Option<StreamSourceCore<S>>,
78        metrics: Arc<StreamingMetrics>,
79        barrier_receiver: UnboundedReceiver<Barrier>,
80        system_params: SystemParamsReaderRef,
81        rate_limit_rps: Option<u32>,
82        is_shared_non_cdc: bool,
83    ) -> Self {
84        Self {
85            actor_ctx,
86            stream_source_core,
87            metrics,
88            barrier_receiver: Some(barrier_receiver),
89            system_params,
90            rate_limit_rps,
91            is_shared_non_cdc,
92        }
93    }
94
95    fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder {
96        StreamReaderBuilder {
97            source_desc,
98            rate_limit: self.rate_limit_rps,
99            source_id: self.stream_source_core.as_ref().unwrap().source_id,
100            source_name: self
101                .stream_source_core
102                .as_ref()
103                .unwrap()
104                .source_name
105                .clone(),
106            is_auto_schema_change_enable: self.is_auto_schema_change_enable(),
107            actor_ctx: self.actor_ctx.clone(),
108            reader_stream: None,
109        }
110    }
111
112    async fn spawn_wait_checkpoint_worker(
113        core: &StreamSourceCore<S>,
114        source_reader: SourceReader,
115    ) -> StreamExecutorResult<Option<WaitCheckpointTaskBuilder>> {
116        let Some(initial_task) = source_reader.create_wait_checkpoint_task().await? else {
117            return Ok(None);
118        };
119        let (wait_checkpoint_tx, wait_checkpoint_rx) = mpsc::unbounded_channel();
120        let wait_checkpoint_worker = WaitCheckpointWorker {
121            wait_checkpoint_rx,
122            state_store: core.split_state_store.state_table().state_store().clone(),
123            table_id: core.split_state_store.state_table().table_id().into(),
124        };
125        tokio::spawn(wait_checkpoint_worker.run());
126        Ok(Some(WaitCheckpointTaskBuilder {
127            wait_checkpoint_tx,
128            source_reader,
129            building_task: initial_task,
130        }))
131    }
132
133    /// build the source column ids and the source context which will be used to build the source stream
134    pub fn prepare_source_stream_build(
135        &self,
136        source_desc: &SourceDesc,
137    ) -> (Vec<ColumnId>, SourceContext) {
138        let column_ids = source_desc
139            .columns
140            .iter()
141            .map(|column_desc| column_desc.column_id)
142            .collect_vec();
143
144        let (schema_change_tx, mut schema_change_rx) =
145            mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
146        let schema_change_tx = if self.is_auto_schema_change_enable() {
147            let meta_client = self.actor_ctx.meta_client.clone();
148            // spawn a task to handle schema change event from source parser
149            let _join_handle = tokio::task::spawn(async move {
150                while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
151                    let table_ids = schema_change.table_ids();
152                    tracing::info!(
153                        target: "auto_schema_change",
154                        "recv a schema change event for tables: {:?}", table_ids);
155                    // TODO: retry on rpc error
156                    if let Some(ref meta_client) = meta_client {
157                        match meta_client
158                            .auto_schema_change(schema_change.to_protobuf())
159                            .await
160                        {
161                            Ok(_) => {
162                                tracing::info!(
163                                    target: "auto_schema_change",
164                                    "schema change success for tables: {:?}", table_ids);
165                                finish_tx.send(()).unwrap();
166                            }
167                            Err(e) => {
168                                tracing::error!(
169                                    target: "auto_schema_change",
170                                    error = ?e.as_report(), "schema change error");
171                                finish_tx.send(()).unwrap();
172                            }
173                        }
174                    }
175                }
176            });
177            Some(schema_change_tx)
178        } else {
179            info!("auto schema change is disabled in config");
180            None
181        };
182
183        let source_ctx = SourceContext::new(
184            self.actor_ctx.id,
185            self.stream_source_core.as_ref().unwrap().source_id,
186            self.actor_ctx.fragment_id,
187            self.stream_source_core
188                .as_ref()
189                .unwrap()
190                .source_name
191                .clone(),
192            source_desc.metrics.clone(),
193            SourceCtrlOpts {
194                chunk_size: limited_chunk_size(self.rate_limit_rps),
195                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
196            },
197            source_desc.source.config.clone(),
198            schema_change_tx,
199        );
200
201        (column_ids, source_ctx)
202    }
203
204    fn is_auto_schema_change_enable(&self) -> bool {
205        self.actor_ctx
206            .streaming_config
207            .developer
208            .enable_auto_schema_change
209    }
210
211    /// `source_id | source_name | actor_id | fragment_id`
212    #[inline]
213    fn get_metric_labels(&self) -> [String; 4] {
214        [
215            self.stream_source_core
216                .as_ref()
217                .unwrap()
218                .source_id
219                .to_string(),
220            self.stream_source_core
221                .as_ref()
222                .unwrap()
223                .source_name
224                .clone(),
225            self.actor_ctx.id.to_string(),
226            self.actor_ctx.fragment_id.to_string(),
227        ]
228    }
229
230    /// - `should_trim_state`: whether to trim state for dropped splits.
231    ///
232    ///   For scaling, the connector splits can be migrated to other actors, but
233    ///   won't be added or removed. Actors should not trim states for splits that
234    ///   are moved to other actors.
235    ///
236    ///   For source split change, split will not be migrated and we can trim states
237    ///   for deleted splits.
238    async fn apply_split_change_after_yield_barrier<const BIASED: bool>(
239        &mut self,
240        barrier_epoch: EpochPair,
241        source_desc: &SourceDesc,
242        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
243        target_splits: Vec<SplitImpl>,
244        should_trim_state: bool,
245        source_split_change_count_metrics: &LabelGuardedIntCounter<4>,
246    ) -> StreamExecutorResult<()> {
247        {
248            source_split_change_count_metrics.inc();
249            if self
250                .update_state_if_changed(barrier_epoch, target_splits, should_trim_state)
251                .await?
252            {
253                self.rebuild_stream_reader(source_desc, stream)?;
254            }
255        }
256
257        Ok(())
258    }
259
260    /// Returns `true` if split changed. Otherwise `false`.
261    async fn update_state_if_changed(
262        &mut self,
263        barrier_epoch: EpochPair,
264        target_splits: Vec<SplitImpl>,
265        should_trim_state: bool,
266    ) -> StreamExecutorResult<bool> {
267        let core = self.stream_source_core.as_mut().unwrap();
268
269        let target_splits: HashMap<_, _> = target_splits
270            .into_iter()
271            .map(|split| (split.id(), split))
272            .collect();
273
274        let mut target_state: HashMap<SplitId, SplitImpl> =
275            HashMap::with_capacity(target_splits.len());
276
277        let mut split_changed = false;
278
279        let committed_reader = core
280            .split_state_store
281            .new_committed_reader(barrier_epoch)
282            .await?;
283
284        // Checks added splits
285        for (split_id, split) in target_splits {
286            if let Some(s) = core.latest_split_info.get(&split_id) {
287                // For existing splits, we should use the latest offset from the cache.
288                // `target_splits` is from meta and contains the initial offset.
289                target_state.insert(split_id, s.clone());
290            } else {
291                split_changed = true;
292                // write new assigned split to state cache. snapshot is base on cache.
293
294                let initial_state = if let Some(recover_state) = committed_reader
295                    .try_recover_from_state_store(&split)
296                    .await?
297                {
298                    recover_state
299                } else {
300                    split
301                };
302
303                core.updated_splits_in_epoch
304                    .entry(split_id.clone())
305                    .or_insert_with(|| initial_state.clone());
306
307                target_state.insert(split_id, initial_state);
308            }
309        }
310
311        // Checks dropped splits
312        for existing_split_id in core.latest_split_info.keys() {
313            if !target_state.contains_key(existing_split_id) {
314                tracing::info!("split dropping detected: {}", existing_split_id);
315                split_changed = true;
316            }
317        }
318
319        if split_changed {
320            tracing::info!(
321                actor_id = self.actor_ctx.id,
322                state = ?target_state,
323                "apply split change"
324            );
325
326            core.updated_splits_in_epoch
327                .retain(|split_id, _| target_state.contains_key(split_id));
328
329            let dropped_splits = core
330                .latest_split_info
331                .extract_if(|split_id, _| !target_state.contains_key(split_id))
332                .map(|(_, split)| split)
333                .collect_vec();
334
335            if should_trim_state && !dropped_splits.is_empty() {
336                // trim dropped splits' state
337                core.split_state_store.trim_state(&dropped_splits).await?;
338            }
339
340            core.latest_split_info = target_state;
341        }
342
343        Ok(split_changed)
344    }
345
346    /// Rebuild stream if there is a err in stream
347    fn rebuild_stream_reader_from_error<const BIASED: bool>(
348        &mut self,
349        source_desc: &SourceDesc,
350        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
351        e: StreamExecutorError,
352    ) -> StreamExecutorResult<()> {
353        let core = self.stream_source_core.as_mut().unwrap();
354        tracing::warn!(
355            error = ?e.as_report(),
356            actor_id = self.actor_ctx.id,
357            source_id = %core.source_id,
358            "stream source reader error",
359        );
360        GLOBAL_ERROR_METRICS.user_source_error.report([
361            e.variant_name().to_owned(),
362            core.source_id.to_string(),
363            core.source_name.to_owned(),
364            self.actor_ctx.fragment_id.to_string(),
365        ]);
366
367        self.rebuild_stream_reader(source_desc, stream)
368    }
369
370    fn rebuild_stream_reader<const BIASED: bool>(
371        &mut self,
372        source_desc: &SourceDesc,
373        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
374    ) -> StreamExecutorResult<()> {
375        let core = self.stream_source_core.as_mut().unwrap();
376        let target_state: Vec<SplitImpl> = core.latest_split_info.values().cloned().collect();
377
378        tracing::info!(
379            "actor {:?} apply source split change to {:?}",
380            self.actor_ctx.id,
381            target_state
382        );
383
384        // Replace the source reader with a new one of the new state.
385        let reader_stream_builder = self.stream_reader_builder(source_desc.clone());
386        let reader_stream =
387            reader_stream_builder.into_retry_stream(Some(target_state.clone()), false);
388
389        stream.replace_data_stream(reader_stream);
390
391        Ok(())
392    }
393
394    async fn persist_state_and_clear_cache(
395        &mut self,
396        epoch: EpochPair,
397    ) -> StreamExecutorResult<HashMap<SplitId, SplitImpl>> {
398        let core = self.stream_source_core.as_mut().unwrap();
399
400        let cache = core
401            .updated_splits_in_epoch
402            .values()
403            .map(|split_impl| split_impl.to_owned())
404            .collect_vec();
405
406        if !cache.is_empty() {
407            tracing::debug!(state = ?cache, "take snapshot");
408            core.split_state_store.set_states(cache).await?;
409        }
410
411        // commit anyway, even if no message saved
412        core.split_state_store.commit(epoch).await?;
413
414        let updated_splits = core.updated_splits_in_epoch.clone();
415
416        core.updated_splits_in_epoch.clear();
417
418        Ok(updated_splits)
419    }
420
421    /// try mem table spill
422    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
423        let core = self.stream_source_core.as_mut().unwrap();
424        core.split_state_store.try_flush().await?;
425
426        Ok(())
427    }
428
429    /// A source executor with a stream source receives:
430    /// 1. Barrier messages
431    /// 2. Data from external source
432    /// and acts accordingly.
433    #[try_stream(ok = Message, error = StreamExecutorError)]
434    async fn execute_with_stream_source(mut self) {
435        let mut barrier_receiver = self.barrier_receiver.take().unwrap();
436        let first_barrier = barrier_receiver
437            .recv()
438            .instrument_await("source_recv_first_barrier")
439            .await
440            .ok_or_else(|| {
441                anyhow!(
442                    "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
443                    self.actor_ctx.id,
444                    self.stream_source_core.as_ref().unwrap().source_id
445                )
446            })?;
447        let first_epoch = first_barrier.epoch;
448        let mut boot_state =
449            if let Some(splits) = first_barrier.initial_split_assignment(self.actor_ctx.id) {
450                tracing::debug!(?splits, "boot with splits");
451                splits.to_vec()
452            } else {
453                Vec::default()
454            };
455        let is_pause_on_startup = first_barrier.is_pause_on_startup();
456        let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id);
457
458        yield Message::Barrier(first_barrier);
459
460        let mut core = self.stream_source_core.unwrap();
461
462        // Build source description from the builder.
463        let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap();
464        let source_desc = source_desc_builder
465            .build()
466            .map_err(StreamExecutorError::connector_error)?;
467
468        let mut wait_checkpoint_task_builder =
469            Self::spawn_wait_checkpoint_worker(&core, source_desc.source.clone()).await?;
470
471        let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns)
472        else {
473            unreachable!("Partition and offset columns must be set.");
474        };
475
476        core.split_state_store.init_epoch(first_epoch).await?;
477        {
478            let committed_reader = core
479                .split_state_store
480                .new_committed_reader(first_epoch)
481                .await?;
482            for ele in &mut boot_state {
483                if let Some(recover_state) =
484                    committed_reader.try_recover_from_state_store(ele).await?
485                {
486                    *ele = recover_state;
487                    // if state store is non-empty, we consider it's initialized.
488                    is_uninitialized = false;
489                } else {
490                    // This is a new split, not in state table.
491                    // make sure it is written to state table later.
492                    // Then even it receives no messages, we can observe it in state table.
493                    core.updated_splits_in_epoch.insert(ele.id(), ele.clone());
494                }
495            }
496        }
497
498        // init in-memory split states with persisted state if any
499        core.init_split_state(boot_state.clone());
500
501        // Return the ownership of `stream_source_core` to the source executor.
502        self.stream_source_core = Some(core);
503
504        let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
505        tracing::debug!(state = ?recover_state, "start with state");
506
507        let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
508        let mut reader_stream_builder = self.stream_reader_builder(source_desc.clone());
509        let mut latest_splits = None;
510        // Build the source stream reader.
511        if is_uninitialized {
512            let create_split_reader_result = reader_stream_builder
513                .fetch_latest_splits(recover_state.clone(), self.is_shared_non_cdc)
514                .await?;
515            latest_splits = create_split_reader_result.latest_splits;
516        }
517
518        if let Some(latest_splits) = latest_splits {
519            // make sure it is written to state table later.
520            // Then even it receives no messages, we can observe it in state table.
521            self.stream_source_core
522                .as_mut()
523                .unwrap()
524                .updated_splits_in_epoch
525                .extend(latest_splits.into_iter().map(|s| (s.id(), s)));
526        }
527        // Merge the chunks from source and the barriers into a single stream. We prioritize
528        // barriers over source data chunks here.
529        let mut stream = StreamReaderWithPause::<true, StreamChunkWithState>::new(
530            barrier_stream,
531            reader_stream_builder
532                .into_retry_stream(recover_state, is_uninitialized && self.is_shared_non_cdc),
533        );
534        let mut command_paused = false;
535
536        // - If the first barrier requires us to pause on startup, pause the stream.
537        if is_pause_on_startup {
538            tracing::info!("source paused on startup");
539            stream.pause_stream();
540            command_paused = true;
541        }
542
543        // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms`
544        // milliseconds, considering some other latencies like network and cost in Meta.
545        let mut max_wait_barrier_time_ms =
546            self.system_params.load().barrier_interval_ms() as u128 * WAIT_BARRIER_MULTIPLE_TIMES;
547        let mut last_barrier_time = Instant::now();
548        let mut self_paused = false;
549
550        let source_output_row_count = self
551            .metrics
552            .source_output_row_count
553            .with_guarded_label_values(&self.get_metric_labels().each_ref().map(AsRef::as_ref));
554
555        let source_split_change_count = self
556            .metrics
557            .source_split_change_count
558            .with_guarded_label_values(&self.get_metric_labels().each_ref().map(AsRef::as_ref));
559
560        while let Some(msg) = stream.next().await {
561            let Ok(msg) = msg else {
562                tokio::time::sleep(Duration::from_millis(1000)).await;
563                self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err())?;
564                continue;
565            };
566
567            match msg {
568                // This branch will be preferred.
569                Either::Left(Message::Barrier(barrier)) => {
570                    last_barrier_time = Instant::now();
571
572                    if self_paused {
573                        self_paused = false;
574                        // command_paused has a higher priority.
575                        if !command_paused {
576                            stream.resume_stream();
577                        }
578                    }
579
580                    let epoch = barrier.epoch;
581
582                    let mut split_change = None;
583
584                    if let Some(mutation) = barrier.mutation.as_deref() {
585                        match mutation {
586                            Mutation::Pause => {
587                                command_paused = true;
588                                stream.pause_stream()
589                            }
590                            Mutation::Resume => {
591                                command_paused = false;
592                                stream.resume_stream()
593                            }
594                            Mutation::SourceChangeSplit(actor_splits) => {
595                                tracing::info!(
596                                    actor_id = self.actor_ctx.id,
597                                    actor_splits = ?actor_splits,
598                                    "source change split received"
599                                );
600
601                                split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
602                                    |target_splits| {
603                                        (
604                                            &source_desc,
605                                            &mut stream,
606                                            target_splits,
607                                            true,
608                                            &source_split_change_count,
609                                        )
610                                    },
611                                );
612                            }
613
614                            Mutation::Update(UpdateMutation { actor_splits, .. }) => {
615                                split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
616                                    |target_splits| {
617                                        (
618                                            &source_desc,
619                                            &mut stream,
620                                            target_splits,
621                                            false,
622                                            &source_split_change_count,
623                                        )
624                                    },
625                                );
626                            }
627                            Mutation::Throttle(actor_to_apply) => {
628                                if let Some(new_rate_limit) = actor_to_apply.get(&self.actor_ctx.id)
629                                    && *new_rate_limit != self.rate_limit_rps
630                                {
631                                    tracing::info!(
632                                        "updating rate limit from {:?} to {:?}",
633                                        self.rate_limit_rps,
634                                        *new_rate_limit
635                                    );
636                                    self.rate_limit_rps = *new_rate_limit;
637                                    // recreate from latest_split_info
638                                    self.rebuild_stream_reader(&source_desc, &mut stream)?;
639                                }
640                            }
641                            _ => {}
642                        }
643                    }
644
645                    let updated_splits = self.persist_state_and_clear_cache(epoch).await?;
646
647                    // when handle a checkpoint barrier, spawn a task to wait for epoch commit notification
648                    if barrier.kind.is_checkpoint()
649                        && let Some(task_builder) = &mut wait_checkpoint_task_builder
650                    {
651                        task_builder.update_task_on_checkpoint(updated_splits);
652
653                        tracing::debug!("epoch to wait {:?}", epoch);
654                        task_builder.send(Epoch(epoch.prev)).await?
655                    }
656
657                    let barrier_epoch = barrier.epoch;
658                    yield Message::Barrier(barrier);
659
660                    if let Some((
661                        source_desc,
662                        stream,
663                        target_splits,
664                        should_trim_state,
665                        source_split_change_count,
666                    )) = split_change
667                    {
668                        self.apply_split_change_after_yield_barrier(
669                            barrier_epoch,
670                            source_desc,
671                            stream,
672                            target_splits,
673                            should_trim_state,
674                            source_split_change_count,
675                        )
676                        .await?;
677                    }
678                }
679                Either::Left(_) => {
680                    // For the source executor, the message we receive from this arm
681                    // should always be barrier message.
682                    unreachable!();
683                }
684
685                Either::Right((chunk, latest_state)) => {
686                    if let Some(task_builder) = &mut wait_checkpoint_task_builder {
687                        let offset_col = chunk.column_at(offset_idx);
688                        task_builder.update_task_on_chunk(offset_col.clone());
689                    }
690                    if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
691                        // Exceeds the max wait barrier time, the source will be paused.
692                        // Currently we can guarantee the
693                        // source is not paused since it received stream
694                        // chunks.
695                        self_paused = true;
696                        tracing::warn!(
697                            "source paused, wait barrier for {:?}",
698                            last_barrier_time.elapsed()
699                        );
700                        stream.pause_stream();
701
702                        // Only update `max_wait_barrier_time_ms` to capture
703                        // `barrier_interval_ms`
704                        // changes here to avoid frequently accessing the shared
705                        // `system_params`.
706                        max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
707                            as u128
708                            * WAIT_BARRIER_MULTIPLE_TIMES;
709                    }
710
711                    latest_state.iter().for_each(|(split_id, new_split_impl)| {
712                        if let Some(split_impl) = self
713                            .stream_source_core
714                            .as_mut()
715                            .unwrap()
716                            .latest_split_info
717                            .get_mut(split_id)
718                        {
719                            *split_impl = new_split_impl.clone();
720                        }
721                    });
722
723                    self.stream_source_core
724                        .as_mut()
725                        .unwrap()
726                        .updated_splits_in_epoch
727                        .extend(latest_state);
728
729                    source_output_row_count.inc_by(chunk.cardinality() as u64);
730                    let chunk =
731                        prune_additional_cols(&chunk, split_idx, offset_idx, &source_desc.columns);
732                    yield Message::Chunk(chunk);
733                    self.try_flush_data().await?;
734                }
735            }
736        }
737
738        // The source executor should only be stopped by the actor when finding a `Stop` mutation.
739        tracing::error!(
740            actor_id = self.actor_ctx.id,
741            "source executor exited unexpectedly"
742        )
743    }
744
745    /// A source executor without stream source only receives barrier messages and sends them to
746    /// the downstream executor.
747    #[try_stream(ok = Message, error = StreamExecutorError)]
748    async fn execute_without_stream_source(mut self) {
749        let mut barrier_receiver = self.barrier_receiver.take().unwrap();
750        let barrier = barrier_receiver
751            .recv()
752            .instrument_await("source_recv_first_barrier")
753            .await
754            .ok_or_else(|| {
755                anyhow!(
756                    "failed to receive the first barrier, actor_id: {:?} with no stream source",
757                    self.actor_ctx.id
758                )
759            })?;
760        yield Message::Barrier(barrier);
761
762        while let Some(barrier) = barrier_receiver.recv().await {
763            yield Message::Barrier(barrier);
764        }
765    }
766}
767
768impl<S: StateStore> Execute for SourceExecutor<S> {
769    fn execute(self: Box<Self>) -> BoxedMessageStream {
770        if self.stream_source_core.is_some() {
771            self.execute_with_stream_source().boxed()
772        } else {
773            self.execute_without_stream_source().boxed()
774        }
775    }
776}
777
778impl<S: StateStore> Debug for SourceExecutor<S> {
779    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
780        if let Some(core) = &self.stream_source_core {
781            f.debug_struct("SourceExecutor")
782                .field("source_id", &core.source_id)
783                .field("column_ids", &core.column_ids)
784                .finish()
785        } else {
786            f.debug_struct("SourceExecutor").finish()
787        }
788    }
789}
790
791struct WaitCheckpointTaskBuilder {
792    wait_checkpoint_tx: UnboundedSender<(Epoch, WaitCheckpointTask)>,
793    source_reader: SourceReader,
794    building_task: WaitCheckpointTask,
795}
796
797impl WaitCheckpointTaskBuilder {
798    fn update_task_on_chunk(&mut self, offset_col: ArrayRef) {
799        match &mut self.building_task {
800            WaitCheckpointTask::AckPubsubMessage(_, arrays) => {
801                arrays.push(offset_col);
802            }
803            WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => {
804                arrays.push(offset_col);
805            }
806            WaitCheckpointTask::CommitCdcOffset(_) => {}
807        }
808    }
809
810    fn update_task_on_checkpoint(&mut self, updated_splits: HashMap<SplitId, SplitImpl>) {
811        #[expect(clippy::single_match)]
812        match &mut self.building_task {
813            WaitCheckpointTask::CommitCdcOffset(offsets) => {
814                if !updated_splits.is_empty() {
815                    // cdc source only has one split
816                    assert_eq!(1, updated_splits.len());
817                    for (split_id, split_impl) in updated_splits {
818                        if split_impl.is_cdc_split() {
819                            *offsets = Some((split_id, split_impl.get_cdc_split_offset()));
820                        } else {
821                            unreachable!()
822                        }
823                    }
824                }
825            }
826            _ => {}
827        }
828    }
829
830    /// Send and reset the building task to a new one.
831    async fn send(&mut self, epoch: Epoch) -> Result<(), anyhow::Error> {
832        let new_task = self
833            .source_reader
834            .create_wait_checkpoint_task()
835            .await?
836            .expect("wait checkpoint task should be created");
837        self.wait_checkpoint_tx
838            .send((epoch, std::mem::replace(&mut self.building_task, new_task)))
839            .expect("wait_checkpoint_tx send should succeed");
840        Ok(())
841    }
842}
843
844/// A worker used to do some work after each checkpoint epoch is committed.
845///
846/// # Usage Cases
847///
848/// Typically there are 2 issues related with ack on checkpoint:
849///
850/// 1. Correctness (at-least-once), or don't let upstream clean uncommitted data.
851///    For message queueing semantics (delete after ack), we should ack to avoid redelivery,
852///    and only ack after checkpoint to avoid data loss.
853///
854/// 2. Allow upstream to clean data after commit.
855///
856/// See also <https://github.com/risingwavelabs/risingwave/issues/16736#issuecomment-2109379790>
857///
858/// ## CDC
859///
860/// Commit last consumed offset to upstream DB, so that old data can be discarded.
861///
862/// ## Google Pub/Sub
863///
864/// Due to queueing semantics.
865/// Although Pub/Sub supports `retain_acked_messages` and `seek` functionality,
866/// it's quite limited unlike Kafka.
867///
868/// See also <https://cloud.google.com/pubsub/docs/subscribe-best-practices#process-messages>
869struct WaitCheckpointWorker<S: StateStore> {
870    wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>,
871    state_store: S,
872    table_id: TableId,
873}
874
875impl<S: StateStore> WaitCheckpointWorker<S> {
876    pub async fn run(mut self) {
877        tracing::debug!("wait epoch worker start success");
878        loop {
879            // poll the rx and wait for the epoch commit
880            match self.wait_checkpoint_rx.recv().await {
881                Some((epoch, task)) => {
882                    tracing::debug!("start to wait epoch {}", epoch.0);
883                    let ret = self
884                        .state_store
885                        .try_wait_epoch(
886                            HummockReadEpoch::Committed(epoch.0),
887                            TryWaitEpochOptions {
888                                table_id: self.table_id,
889                            },
890                        )
891                        .await;
892
893                    match ret {
894                        Ok(()) => {
895                            tracing::debug!(epoch = epoch.0, "wait epoch success");
896                            task.run().await;
897                        }
898                        Err(e) => {
899                            tracing::error!(
900                            error = %e.as_report(),
901                            "wait epoch {} failed", epoch.0
902                            );
903                        }
904                    }
905                }
906                None => {
907                    tracing::error!("wait epoch rx closed");
908                    break;
909                }
910            }
911        }
912    }
913}
914
915#[cfg(test)]
916mod tests {
917    use std::collections::HashSet;
918
919    use maplit::{btreemap, convert_args, hashmap};
920    use risingwave_common::catalog::{ColumnId, Field, TableId};
921    use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
922    use risingwave_common::test_prelude::StreamChunkTestExt;
923    use risingwave_common::util::epoch::{EpochExt, test_epoch};
924    use risingwave_connector::source::datagen::DatagenSplit;
925    use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
926    use risingwave_pb::catalog::StreamSourceInfo;
927    use risingwave_pb::plan_common::PbRowFormatType;
928    use risingwave_storage::memory::MemoryStateStore;
929    use tokio::sync::mpsc::unbounded_channel;
930    use tracing_test::traced_test;
931
932    use super::*;
933    use crate::executor::AddMutation;
934    use crate::executor::source::{SourceStateTableHandler, default_source_internal_table};
935
936    const MOCK_SOURCE_NAME: &str = "mock_source";
937
938    #[tokio::test]
939    async fn test_source_executor() {
940        let table_id = TableId::default();
941        let schema = Schema {
942            fields: vec![Field::with_name(DataType::Int32, "sequence_int")],
943        };
944        let row_id_index = None;
945        let source_info = StreamSourceInfo {
946            row_format: PbRowFormatType::Native as i32,
947            ..Default::default()
948        };
949        let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
950        let column_ids = vec![0].into_iter().map(ColumnId::from).collect();
951
952        // This datagen will generate 3 rows at one time.
953        let properties = convert_args!(btreemap!(
954            "connector" => "datagen",
955            "datagen.rows.per.second" => "3",
956            "fields.sequence_int.kind" => "sequence",
957            "fields.sequence_int.start" => "11",
958            "fields.sequence_int.end" => "11111",
959        ));
960        let source_desc_builder =
961            create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
962        let split_state_store = SourceStateTableHandler::from_table_catalog(
963            &default_source_internal_table(0x2333),
964            MemoryStateStore::new(),
965        )
966        .await;
967        let core = StreamSourceCore::<MemoryStateStore> {
968            source_id: table_id,
969            column_ids,
970            source_desc_builder: Some(source_desc_builder),
971            latest_split_info: HashMap::new(),
972            split_state_store,
973            updated_splits_in_epoch: HashMap::new(),
974            source_name: MOCK_SOURCE_NAME.to_owned(),
975        };
976
977        let system_params_manager = LocalSystemParamsManager::for_test();
978
979        let executor = SourceExecutor::new(
980            ActorContext::for_test(0),
981            Some(core),
982            Arc::new(StreamingMetrics::unused()),
983            barrier_rx,
984            system_params_manager.get_params(),
985            None,
986            false,
987        );
988        let mut executor = executor.boxed().execute();
989
990        let init_barrier =
991            Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
992                adds: HashMap::new(),
993                added_actors: HashSet::new(),
994                splits: hashmap! {
995                    ActorId::default() => vec![
996                        SplitImpl::Datagen(DatagenSplit {
997                            split_index: 0,
998                            split_num: 1,
999                            start_offset: None,
1000                        }),
1001                    ],
1002                },
1003                pause: false,
1004                subscriptions_to_add: vec![],
1005            }));
1006        barrier_tx.send(init_barrier).unwrap();
1007
1008        // Consume barrier.
1009        executor.next().await.unwrap().unwrap();
1010
1011        // Consume data chunk.
1012        let msg = executor.next().await.unwrap().unwrap();
1013
1014        // Row id will not be filled here.
1015        assert_eq!(
1016            msg.into_chunk().unwrap(),
1017            StreamChunk::from_pretty(
1018                " i
1019                + 11
1020                + 12
1021                + 13"
1022            )
1023        );
1024    }
1025
1026    #[traced_test]
1027    #[tokio::test]
1028    async fn test_split_change_mutation() {
1029        let table_id = TableId::default();
1030        let schema = Schema {
1031            fields: vec![Field::with_name(DataType::Int32, "v1")],
1032        };
1033        let row_id_index = None;
1034        let source_info = StreamSourceInfo {
1035            row_format: PbRowFormatType::Native as i32,
1036            ..Default::default()
1037        };
1038        let properties = convert_args!(btreemap!(
1039            "connector" => "datagen",
1040            "fields.v1.kind" => "sequence",
1041            "fields.v1.start" => "11",
1042            "fields.v1.end" => "11111",
1043        ));
1044
1045        let source_desc_builder =
1046            create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1047        let mem_state_store = MemoryStateStore::new();
1048
1049        let column_ids = vec![ColumnId::from(0)];
1050        let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1051        let split_state_store = SourceStateTableHandler::from_table_catalog(
1052            &default_source_internal_table(0x2333),
1053            mem_state_store.clone(),
1054        )
1055        .await;
1056
1057        let core = StreamSourceCore::<MemoryStateStore> {
1058            source_id: table_id,
1059            column_ids: column_ids.clone(),
1060            source_desc_builder: Some(source_desc_builder),
1061            latest_split_info: HashMap::new(),
1062            split_state_store,
1063            updated_splits_in_epoch: HashMap::new(),
1064            source_name: MOCK_SOURCE_NAME.to_owned(),
1065        };
1066
1067        let system_params_manager = LocalSystemParamsManager::for_test();
1068
1069        let executor = SourceExecutor::new(
1070            ActorContext::for_test(0),
1071            Some(core),
1072            Arc::new(StreamingMetrics::unused()),
1073            barrier_rx,
1074            system_params_manager.get_params(),
1075            None,
1076            false,
1077        );
1078        let mut handler = executor.boxed().execute();
1079
1080        let mut epoch = test_epoch(1);
1081        let init_barrier =
1082            Barrier::new_test_barrier(epoch).with_mutation(Mutation::Add(AddMutation {
1083                adds: HashMap::new(),
1084                added_actors: HashSet::new(),
1085                splits: hashmap! {
1086                    ActorId::default() => vec![
1087                        SplitImpl::Datagen(DatagenSplit {
1088                            split_index: 0,
1089                            split_num: 3,
1090                            start_offset: None,
1091                        }),
1092                    ],
1093                },
1094                pause: false,
1095                subscriptions_to_add: vec![],
1096            }));
1097        barrier_tx.send(init_barrier).unwrap();
1098
1099        // Consume barrier.
1100        handler
1101            .next()
1102            .await
1103            .unwrap()
1104            .unwrap()
1105            .into_barrier()
1106            .unwrap();
1107
1108        let mut ready_chunks = handler.ready_chunks(10);
1109
1110        let _ = ready_chunks.next().await.unwrap();
1111
1112        let new_assignment = vec![
1113            SplitImpl::Datagen(DatagenSplit {
1114                split_index: 0,
1115                split_num: 3,
1116                start_offset: None,
1117            }),
1118            SplitImpl::Datagen(DatagenSplit {
1119                split_index: 1,
1120                split_num: 3,
1121                start_offset: None,
1122            }),
1123            SplitImpl::Datagen(DatagenSplit {
1124                split_index: 2,
1125                split_num: 3,
1126                start_offset: None,
1127            }),
1128        ];
1129
1130        epoch.inc_epoch();
1131        let change_split_mutation =
1132            Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1133                ActorId::default() => new_assignment.clone()
1134            }));
1135
1136        barrier_tx.send(change_split_mutation).unwrap();
1137
1138        let _ = ready_chunks.next().await.unwrap(); // barrier
1139
1140        epoch.inc_epoch();
1141        let barrier = Barrier::new_test_barrier(epoch);
1142        barrier_tx.send(barrier).unwrap();
1143
1144        ready_chunks.next().await.unwrap(); // barrier
1145
1146        let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1147            &default_source_internal_table(0x2333),
1148            mem_state_store.clone(),
1149        )
1150        .await;
1151
1152        // there must exist state for new add partition
1153        source_state_handler
1154            .init_epoch(EpochPair::new_test_epoch(epoch))
1155            .await
1156            .unwrap();
1157        source_state_handler
1158            .get(&new_assignment[1].id())
1159            .await
1160            .unwrap()
1161            .unwrap();
1162
1163        tokio::time::sleep(Duration::from_millis(100)).await;
1164
1165        let _ = ready_chunks.next().await.unwrap();
1166
1167        epoch.inc_epoch();
1168        let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Pause);
1169        barrier_tx.send(barrier).unwrap();
1170
1171        epoch.inc_epoch();
1172        let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Resume);
1173        barrier_tx.send(barrier).unwrap();
1174
1175        // receive all
1176        ready_chunks.next().await.unwrap();
1177
1178        let prev_assignment = new_assignment;
1179        let new_assignment = vec![prev_assignment[2].clone()];
1180
1181        epoch.inc_epoch();
1182        let drop_split_mutation =
1183            Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1184                ActorId::default() => new_assignment.clone()
1185            }));
1186
1187        barrier_tx.send(drop_split_mutation).unwrap();
1188
1189        ready_chunks.next().await.unwrap(); // barrier
1190
1191        epoch.inc_epoch();
1192        let barrier = Barrier::new_test_barrier(epoch);
1193        barrier_tx.send(barrier).unwrap();
1194
1195        ready_chunks.next().await.unwrap(); // barrier
1196
1197        let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1198            &default_source_internal_table(0x2333),
1199            mem_state_store.clone(),
1200        )
1201        .await;
1202
1203        let new_epoch = EpochPair::new_test_epoch(epoch);
1204        source_state_handler.init_epoch(new_epoch).await.unwrap();
1205
1206        let committed_reader = source_state_handler
1207            .new_committed_reader(new_epoch)
1208            .await
1209            .unwrap();
1210        assert!(
1211            committed_reader
1212                .try_recover_from_state_store(&prev_assignment[0])
1213                .await
1214                .unwrap()
1215                .is_none()
1216        );
1217
1218        assert!(
1219            committed_reader
1220                .try_recover_from_state_store(&prev_assignment[1])
1221                .await
1222                .unwrap()
1223                .is_none()
1224        );
1225
1226        assert!(
1227            committed_reader
1228                .try_recover_from_state_store(&prev_assignment[2])
1229                .await
1230                .unwrap()
1231                .is_some()
1232        );
1233    }
1234}