risingwave_stream/executor/source/
source_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use either::Either;
20use itertools::Itertools;
21use prometheus::core::{AtomicU64, GenericCounter};
22use risingwave_common::array::ArrayRef;
23use risingwave_common::catalog::{ColumnId, TableId};
24use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedMetric};
25use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::util::epoch::{Epoch, EpochPair};
28use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
29use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
30use risingwave_connector::source::reader::reader::SourceReader;
31use risingwave_connector::source::{
32    ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData,
33    StreamChunkWithState, WaitCheckpointTask,
34};
35use risingwave_hummock_sdk::HummockReadEpoch;
36use risingwave_storage::store::TryWaitEpochOptions;
37use serde_json;
38use thiserror_ext::AsReport;
39use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
40use tokio::sync::{mpsc, oneshot};
41use tokio::time::Instant;
42
43use super::executor_core::StreamSourceCore;
44use super::{barrier_to_message_stream, get_split_offset_col_idx, prune_additional_cols};
45use crate::common::rate_limit::limited_chunk_size;
46use crate::executor::UpdateMutation;
47use crate::executor::prelude::*;
48use crate::executor::source::reader_stream::StreamReaderBuilder;
49use crate::executor::stream_reader::StreamReaderWithPause;
50use crate::task::LocalBarrierManager;
51
52/// A constant to multiply when calculating the maximum time to wait for a barrier. This is due to
53/// some latencies in network and cost in meta.
54pub const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;
55
56/// Extract offset value from CDC split
57///
58/// This function extracts the offset value from CDC split.
59/// For Postgres CDC, the offset is LSN.
60fn extract_split_offset(split: &SplitImpl) -> Option<u64> {
61    match split {
62        SplitImpl::PostgresCdc(pg_split) => {
63            let offset_str = pg_split.start_offset().as_ref()?;
64            extract_pg_cdc_lsn_from_offset(offset_str)
65        }
66        _ => None,
67    }
68}
69
70/// This function parses the offset JSON and extracts the LSN value from the sourceOffset.lsn field.
71/// Returns Some(lsn) if the LSN is found and can be parsed as u64, None otherwise.
72fn extract_pg_cdc_lsn_from_offset(offset_str: &str) -> Option<u64> {
73    let offset = serde_json::from_str::<serde_json::Value>(offset_str).ok()?;
74    let source_offset = offset.get("sourceOffset")?;
75    let lsn = source_offset.get("lsn")?;
76    lsn.as_u64()
77}
78
79pub struct SourceExecutor<S: StateStore> {
80    actor_ctx: ActorContextRef,
81
82    /// Streaming source for external
83    stream_source_core: StreamSourceCore<S>,
84
85    /// Metrics for monitor.
86    metrics: Arc<StreamingMetrics>,
87
88    /// Receiver of barrier channel.
89    barrier_receiver: Option<UnboundedReceiver<Barrier>>,
90
91    /// System parameter reader to read barrier interval
92    system_params: SystemParamsReaderRef,
93
94    /// Rate limit in rows/s.
95    rate_limit_rps: Option<u32>,
96
97    is_shared_non_cdc: bool,
98
99    /// Local barrier manager for reporting source load finished events
100    barrier_manager: LocalBarrierManager,
101}
102
103impl<S: StateStore> SourceExecutor<S> {
104    #[expect(clippy::too_many_arguments)]
105    pub fn new(
106        actor_ctx: ActorContextRef,
107        stream_source_core: StreamSourceCore<S>,
108        metrics: Arc<StreamingMetrics>,
109        barrier_receiver: UnboundedReceiver<Barrier>,
110        system_params: SystemParamsReaderRef,
111        rate_limit_rps: Option<u32>,
112        is_shared_non_cdc: bool,
113        barrier_manager: LocalBarrierManager,
114    ) -> Self {
115        Self {
116            actor_ctx,
117            stream_source_core,
118            metrics,
119            barrier_receiver: Some(barrier_receiver),
120            system_params,
121            rate_limit_rps,
122            is_shared_non_cdc,
123            barrier_manager,
124        }
125    }
126
127    fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder {
128        StreamReaderBuilder {
129            source_desc,
130            rate_limit: self.rate_limit_rps,
131            source_id: self.stream_source_core.source_id,
132            source_name: self.stream_source_core.source_name.clone(),
133            is_auto_schema_change_enable: self.is_auto_schema_change_enable(),
134            actor_ctx: self.actor_ctx.clone(),
135            reader_stream: None,
136        }
137    }
138
139    async fn spawn_wait_checkpoint_worker(
140        core: &StreamSourceCore<S>,
141        source_reader: SourceReader,
142        metrics: Arc<StreamingMetrics>,
143    ) -> StreamExecutorResult<Option<WaitCheckpointTaskBuilder>> {
144        let Some(initial_task) = source_reader.create_wait_checkpoint_task().await? else {
145            return Ok(None);
146        };
147        let (wait_checkpoint_tx, wait_checkpoint_rx) = mpsc::unbounded_channel();
148        let wait_checkpoint_worker = WaitCheckpointWorker {
149            wait_checkpoint_rx,
150            state_store: core.split_state_store.state_table().state_store().clone(),
151            table_id: core.split_state_store.state_table().table_id().into(),
152            metrics,
153        };
154        tokio::spawn(wait_checkpoint_worker.run());
155        Ok(Some(WaitCheckpointTaskBuilder {
156            wait_checkpoint_tx,
157            source_reader,
158            building_task: initial_task,
159        }))
160    }
161
162    /// build the source column ids and the source context which will be used to build the source stream
163    pub fn prepare_source_stream_build(
164        &self,
165        source_desc: &SourceDesc,
166    ) -> (Vec<ColumnId>, SourceContext) {
167        let column_ids = source_desc
168            .columns
169            .iter()
170            .map(|column_desc| column_desc.column_id)
171            .collect_vec();
172
173        let (schema_change_tx, mut schema_change_rx) =
174            mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
175        let schema_change_tx = if self.is_auto_schema_change_enable() {
176            let meta_client = self.actor_ctx.meta_client.clone();
177            // spawn a task to handle schema change event from source parser
178            let _join_handle = tokio::task::spawn(async move {
179                while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
180                    let table_ids = schema_change.table_ids();
181                    tracing::info!(
182                        target: "auto_schema_change",
183                        "recv a schema change event for tables: {:?}", table_ids);
184                    // TODO: retry on rpc error
185                    if let Some(ref meta_client) = meta_client {
186                        match meta_client
187                            .auto_schema_change(schema_change.to_protobuf())
188                            .await
189                        {
190                            Ok(_) => {
191                                tracing::info!(
192                                    target: "auto_schema_change",
193                                    "schema change success for tables: {:?}", table_ids);
194                                finish_tx.send(()).unwrap();
195                            }
196                            Err(e) => {
197                                tracing::error!(
198                                    target: "auto_schema_change",
199                                    error = ?e.as_report(), "schema change error");
200                                finish_tx.send(()).unwrap();
201                            }
202                        }
203                    }
204                }
205            });
206            Some(schema_change_tx)
207        } else {
208            info!("auto schema change is disabled in config");
209            None
210        };
211        let source_ctx = SourceContext::new(
212            self.actor_ctx.id,
213            self.stream_source_core.source_id,
214            self.actor_ctx.fragment_id,
215            self.stream_source_core.source_name.clone(),
216            source_desc.metrics.clone(),
217            SourceCtrlOpts {
218                chunk_size: limited_chunk_size(self.rate_limit_rps),
219                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
220            },
221            source_desc.source.config.clone(),
222            schema_change_tx,
223        );
224
225        (column_ids, source_ctx)
226    }
227
228    /// Check if this is a batch refreshable source.
229    fn is_batch_source(&self) -> bool {
230        self.stream_source_core.is_batch_source
231    }
232
233    /// Refresh splits
234    fn refresh_batch_splits(&mut self) -> StreamExecutorResult<Vec<SplitImpl>> {
235        debug_assert!(self.is_batch_source());
236        let core = &self.stream_source_core;
237        let mut split = core.get_batch_split();
238        split.refresh();
239        Ok(vec![split.into()])
240    }
241
242    fn is_auto_schema_change_enable(&self) -> bool {
243        self.actor_ctx
244            .streaming_config
245            .developer
246            .enable_auto_schema_change
247    }
248
249    /// `source_id | source_name | actor_id | fragment_id`
250    #[inline]
251    fn get_metric_labels(&self) -> [String; 4] {
252        [
253            self.stream_source_core.source_id.to_string(),
254            self.stream_source_core.source_name.clone(),
255            self.actor_ctx.id.to_string(),
256            self.actor_ctx.fragment_id.to_string(),
257        ]
258    }
259
260    /// - `should_trim_state`: whether to trim state for dropped splits.
261    ///
262    ///   For scaling, the connector splits can be migrated to other actors, but
263    ///   won't be added or removed. Actors should not trim states for splits that
264    ///   are moved to other actors.
265    ///
266    ///   For source split change, split will not be migrated and we can trim states
267    ///   for deleted splits.
268    async fn apply_split_change_after_yield_barrier<const BIASED: bool>(
269        &mut self,
270        barrier_epoch: EpochPair,
271        source_desc: &SourceDesc,
272        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
273        apply_mutation: ApplyMutationAfterBarrier<'_>,
274    ) -> StreamExecutorResult<()> {
275        {
276            let mut should_rebuild_stream = false;
277            match apply_mutation {
278                ApplyMutationAfterBarrier::SplitChange {
279                    target_splits,
280                    should_trim_state,
281                    split_change_count,
282                } => {
283                    split_change_count.inc();
284                    if self
285                        .update_state_if_changed(barrier_epoch, target_splits, should_trim_state)
286                        .await?
287                    {
288                        should_rebuild_stream = true;
289                    }
290                }
291                ApplyMutationAfterBarrier::RefreshBatchSplits(splits) => {
292                    // Just override the latest split info with the refreshed splits. No need to check.
293                    self.stream_source_core.latest_split_info =
294                        splits.into_iter().map(|s| (s.id(), s)).collect();
295                    should_rebuild_stream = true;
296                }
297                ApplyMutationAfterBarrier::ConnectorPropsChange => {
298                    should_rebuild_stream = true;
299                }
300            }
301
302            if should_rebuild_stream {
303                self.rebuild_stream_reader(source_desc, stream)?;
304            }
305        }
306
307        Ok(())
308    }
309
310    /// Returns `true` if split changed. Otherwise `false`.
311    async fn update_state_if_changed(
312        &mut self,
313        barrier_epoch: EpochPair,
314        target_splits: Vec<SplitImpl>,
315        should_trim_state: bool,
316    ) -> StreamExecutorResult<bool> {
317        let core = &mut self.stream_source_core;
318
319        let target_splits: HashMap<_, _> = target_splits
320            .into_iter()
321            .map(|split| (split.id(), split))
322            .collect();
323
324        let mut target_state: HashMap<SplitId, SplitImpl> =
325            HashMap::with_capacity(target_splits.len());
326
327        let mut split_changed = false;
328
329        let committed_reader = core
330            .split_state_store
331            .new_committed_reader(barrier_epoch)
332            .await?;
333
334        // Checks added splits
335        for (split_id, split) in target_splits {
336            if let Some(s) = core.latest_split_info.get(&split_id) {
337                // For existing splits, we should use the latest offset from the cache.
338                // `target_splits` is from meta and contains the initial offset.
339                target_state.insert(split_id, s.clone());
340            } else {
341                split_changed = true;
342                // write new assigned split to state cache. snapshot is base on cache.
343
344                let initial_state = if let Some(recover_state) = committed_reader
345                    .try_recover_from_state_store(&split)
346                    .await?
347                {
348                    recover_state
349                } else {
350                    split
351                };
352
353                core.updated_splits_in_epoch
354                    .entry(split_id.clone())
355                    .or_insert_with(|| initial_state.clone());
356
357                target_state.insert(split_id, initial_state);
358            }
359        }
360
361        // Checks dropped splits
362        for existing_split_id in core.latest_split_info.keys() {
363            if !target_state.contains_key(existing_split_id) {
364                tracing::info!("split dropping detected: {}", existing_split_id);
365                split_changed = true;
366            }
367        }
368
369        if split_changed {
370            tracing::info!(
371                actor_id = self.actor_ctx.id,
372                state = ?target_state,
373                "apply split change"
374            );
375
376            core.updated_splits_in_epoch
377                .retain(|split_id, _| target_state.contains_key(split_id));
378
379            let dropped_splits = core
380                .latest_split_info
381                .extract_if(|split_id, _| !target_state.contains_key(split_id))
382                .map(|(_, split)| split)
383                .collect_vec();
384
385            if should_trim_state && !dropped_splits.is_empty() {
386                // trim dropped splits' state
387                core.split_state_store.trim_state(&dropped_splits).await?;
388            }
389
390            core.latest_split_info = target_state;
391        }
392
393        Ok(split_changed)
394    }
395
396    /// Rebuild stream if there is a err in stream
397    fn rebuild_stream_reader_from_error<const BIASED: bool>(
398        &mut self,
399        source_desc: &SourceDesc,
400        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
401        e: StreamExecutorError,
402    ) -> StreamExecutorResult<()> {
403        let core = &mut self.stream_source_core;
404        tracing::error!(
405            error = ?e.as_report(),
406            actor_id = self.actor_ctx.id,
407            source_id = %core.source_id,
408            "stream source reader error",
409        );
410        GLOBAL_ERROR_METRICS.user_source_error.report([
411            e.variant_name().to_owned(),
412            core.source_id.to_string(),
413            core.source_name.to_owned(),
414            self.actor_ctx.fragment_id.to_string(),
415        ]);
416
417        self.rebuild_stream_reader(source_desc, stream)
418    }
419
420    fn rebuild_stream_reader<const BIASED: bool>(
421        &mut self,
422        source_desc: &SourceDesc,
423        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
424    ) -> StreamExecutorResult<()> {
425        let core = &mut self.stream_source_core;
426        let target_state: Vec<SplitImpl> = core.latest_split_info.values().cloned().collect();
427
428        tracing::info!(
429            "actor {:?} apply source split change to {:?}",
430            self.actor_ctx.id,
431            target_state
432        );
433
434        // Replace the source reader with a new one of the new state.
435        let reader_stream_builder = self.stream_reader_builder(source_desc.clone());
436        let reader_stream =
437            reader_stream_builder.into_retry_stream(Some(target_state.clone()), false);
438
439        stream.replace_data_stream(reader_stream);
440
441        Ok(())
442    }
443
444    async fn persist_state_and_clear_cache(
445        &mut self,
446        epoch: EpochPair,
447    ) -> StreamExecutorResult<HashMap<SplitId, SplitImpl>> {
448        let core = &mut self.stream_source_core;
449
450        let cache = core
451            .updated_splits_in_epoch
452            .values()
453            .map(|split_impl| split_impl.to_owned())
454            .collect_vec();
455
456        if !cache.is_empty() {
457            tracing::debug!(state = ?cache, "take snapshot");
458
459            // Record LSN metrics for PostgreSQL CDC sources before moving cache
460            let source_id = core.source_id.to_string();
461            for split_impl in &cache {
462                // Extract offset for CDC using type-safe matching
463                if let Some(state_table_lsn_value) = extract_split_offset(split_impl) {
464                    self.metrics
465                        .pg_cdc_state_table_lsn
466                        .with_guarded_label_values(&[&source_id])
467                        .set(state_table_lsn_value as i64);
468                }
469            }
470
471            core.split_state_store.set_states(cache).await?;
472        }
473
474        // commit anyway, even if no message saved
475        core.split_state_store.commit(epoch).await?;
476
477        let updated_splits = core.updated_splits_in_epoch.clone();
478
479        core.updated_splits_in_epoch.clear();
480
481        Ok(updated_splits)
482    }
483
484    /// try mem table spill
485    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
486        let core = &mut self.stream_source_core;
487        core.split_state_store.try_flush().await?;
488
489        Ok(())
490    }
491
492    /// A source executor with a stream source receives:
493    /// 1. Barrier messages
494    /// 2. Data from external source
495    /// and acts accordingly.
496    #[try_stream(ok = Message, error = StreamExecutorError)]
497    async fn execute_inner(mut self) {
498        let mut barrier_receiver = self.barrier_receiver.take().unwrap();
499        let first_barrier = barrier_receiver
500            .recv()
501            .instrument_await("source_recv_first_barrier")
502            .await
503            .ok_or_else(|| {
504                anyhow!(
505                    "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
506                    self.actor_ctx.id,
507                    self.stream_source_core.source_id
508                )
509            })?;
510        let first_epoch = first_barrier.epoch;
511        let mut boot_state =
512            if let Some(splits) = first_barrier.initial_split_assignment(self.actor_ctx.id) {
513                tracing::debug!(?splits, "boot with splits");
514                splits.to_vec()
515            } else {
516                Vec::default()
517            };
518        let is_pause_on_startup = first_barrier.is_pause_on_startup();
519        let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id);
520
521        yield Message::Barrier(first_barrier);
522
523        let mut core = self.stream_source_core;
524        let source_id = core.source_id;
525
526        // Build source description from the builder.
527        let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap();
528        let mut source_desc = source_desc_builder
529            .build()
530            .map_err(StreamExecutorError::connector_error)?;
531
532        let mut wait_checkpoint_task_builder = Self::spawn_wait_checkpoint_worker(
533            &core,
534            source_desc.source.clone(),
535            self.metrics.clone(),
536        )
537        .await?;
538
539        let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns)
540        else {
541            unreachable!("Partition and offset columns must be set.");
542        };
543
544        core.split_state_store.init_epoch(first_epoch).await?;
545        {
546            let committed_reader = core
547                .split_state_store
548                .new_committed_reader(first_epoch)
549                .await?;
550            for ele in &mut boot_state {
551                if let Some(recover_state) =
552                    committed_reader.try_recover_from_state_store(ele).await?
553                {
554                    *ele = recover_state;
555                    // if state store is non-empty, we consider it's initialized.
556                    is_uninitialized = false;
557                } else {
558                    // This is a new split, not in state table.
559                    // make sure it is written to state table later.
560                    // Then even it receives no messages, we can observe it in state table.
561                    core.updated_splits_in_epoch.insert(ele.id(), ele.clone());
562                }
563            }
564        }
565
566        // init in-memory split states with persisted state if any
567        core.init_split_state(boot_state.clone());
568
569        // Return the ownership of `stream_source_core` to the source executor.
570        self.stream_source_core = core;
571
572        let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
573        tracing::debug!(state = ?recover_state, "start with state");
574
575        let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
576        let mut reader_stream_builder = self.stream_reader_builder(source_desc.clone());
577        let mut latest_splits = None;
578        // Build the source stream reader.
579        if is_uninitialized {
580            let create_split_reader_result = reader_stream_builder
581                .fetch_latest_splits(recover_state.clone(), self.is_shared_non_cdc)
582                .await?;
583            latest_splits = create_split_reader_result.latest_splits;
584        }
585
586        if let Some(latest_splits) = latest_splits {
587            // make sure it is written to state table later.
588            // Then even it receives no messages, we can observe it in state table.
589            self.stream_source_core
590                .updated_splits_in_epoch
591                .extend(latest_splits.into_iter().map(|s| (s.id(), s)));
592        }
593        // Merge the chunks from source and the barriers into a single stream. We prioritize
594        // barriers over source data chunks here.
595        let mut stream = StreamReaderWithPause::<true, StreamChunkWithState>::new(
596            barrier_stream,
597            reader_stream_builder
598                .into_retry_stream(recover_state, is_uninitialized && self.is_shared_non_cdc),
599        );
600        let mut command_paused = false;
601
602        // - If the first barrier requires us to pause on startup, pause the stream.
603        if is_pause_on_startup {
604            tracing::info!("source paused on startup");
605            stream.pause_stream();
606            command_paused = true;
607        }
608
609        // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms`
610        // milliseconds, considering some other latencies like network and cost in Meta.
611        let mut max_wait_barrier_time_ms =
612            self.system_params.load().barrier_interval_ms() as u128 * WAIT_BARRIER_MULTIPLE_TIMES;
613        let mut last_barrier_time = Instant::now();
614        let mut self_paused = false;
615
616        let source_output_row_count = self
617            .metrics
618            .source_output_row_count
619            .with_guarded_label_values(&self.get_metric_labels());
620
621        let source_split_change_count = self
622            .metrics
623            .source_split_change_count
624            .with_guarded_label_values(&self.get_metric_labels());
625
626        let mut is_refreshing = false;
627
628        while let Some(msg) = stream.next().await {
629            let Ok(msg) = msg else {
630                tokio::time::sleep(Duration::from_millis(1000)).await;
631                self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err())?;
632                continue;
633            };
634
635            match msg {
636                // This branch will be preferred.
637                Either::Left(Message::Barrier(barrier)) => {
638                    last_barrier_time = Instant::now();
639
640                    if self_paused {
641                        self_paused = false;
642                        // command_paused has a higher priority.
643                        if !command_paused {
644                            stream.resume_stream();
645                        }
646                    }
647
648                    let epoch = barrier.epoch;
649
650                    // NOTE: We rely on CompleteBarrierTask, which is only for checkpoint barrier,
651                    // so we wait for a checkpoint barrier here.
652                    if barrier.is_checkpoint() && self.is_batch_source() && is_refreshing {
653                        let batch_split = self.stream_source_core.get_batch_split();
654                        if batch_split.finished() {
655                            tracing::info!(?epoch, "emitting load finish");
656                            self.barrier_manager.report_source_load_finished(
657                                epoch,
658                                self.actor_ctx.id,
659                                source_id.table_id(),
660                                source_id.table_id(),
661                            );
662                            is_refreshing = false;
663                        }
664                    }
665
666                    let mut split_change = None;
667
668                    if let Some(mutation) = barrier.mutation.as_deref() {
669                        match mutation {
670                            Mutation::Pause => {
671                                command_paused = true;
672                                stream.pause_stream()
673                            }
674                            Mutation::Resume => {
675                                command_paused = false;
676                                stream.resume_stream()
677                            }
678                            Mutation::SourceChangeSplit(actor_splits) => {
679                                tracing::info!(
680                                    actor_id = self.actor_ctx.id,
681                                    actor_splits = ?actor_splits,
682                                    "source change split received"
683                                );
684
685                                split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
686                                    |target_splits| {
687                                        (
688                                            &source_desc,
689                                            &mut stream,
690                                            ApplyMutationAfterBarrier::SplitChange {
691                                                target_splits,
692                                                should_trim_state: true,
693                                                split_change_count: &source_split_change_count,
694                                            },
695                                        )
696                                    },
697                                );
698                            }
699
700                            Mutation::ConnectorPropsChange(maybe_mutation) => {
701                                if let Some(new_props) = maybe_mutation.get(&source_id.table_id()) {
702                                    // rebuild the stream reader with new props
703                                    tracing::info!(
704                                        "updating source properties from {:?} to {:?}",
705                                        source_desc.source.config,
706                                        new_props
707                                    );
708                                    source_desc.update_reader(new_props.clone())?;
709                                    // suppose the connector props change will not involve state change
710                                    split_change = Some((
711                                        &source_desc,
712                                        &mut stream,
713                                        ApplyMutationAfterBarrier::ConnectorPropsChange,
714                                    ));
715                                }
716                            }
717
718                            Mutation::Update(UpdateMutation { actor_splits, .. }) => {
719                                split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
720                                    |target_splits| {
721                                        (
722                                            &source_desc,
723                                            &mut stream,
724                                            ApplyMutationAfterBarrier::SplitChange {
725                                                target_splits,
726                                                should_trim_state: false,
727                                                split_change_count: &source_split_change_count,
728                                            },
729                                        )
730                                    },
731                                );
732                            }
733                            Mutation::Throttle(actor_to_apply) => {
734                                if let Some(new_rate_limit) = actor_to_apply.get(&self.actor_ctx.id)
735                                    && *new_rate_limit != self.rate_limit_rps
736                                {
737                                    tracing::info!(
738                                        "updating rate limit from {:?} to {:?}",
739                                        self.rate_limit_rps,
740                                        *new_rate_limit
741                                    );
742                                    self.rate_limit_rps = *new_rate_limit;
743                                    // recreate from latest_split_info
744                                    self.rebuild_stream_reader(&source_desc, &mut stream)?;
745                                }
746                            }
747                            Mutation::RefreshStart {
748                                table_id: _,
749                                associated_source_id,
750                            } if *associated_source_id == source_id => {
751                                debug_assert!(self.is_batch_source());
752                                is_refreshing = true;
753
754                                // Similar to split_change, we need to update the split info, and rebuild source reader.
755
756                                // For batch sources, trigger re-enumeration of splits to detect file changes
757                                if let Ok(new_splits) = self.refresh_batch_splits() {
758                                    tracing::info!(
759                                        actor_id = self.actor_ctx.id,
760                                         %associated_source_id,
761                                        new_splits_count = new_splits.len(),
762                                        "RefreshStart triggered split re-enumeration"
763                                    );
764                                    split_change = Some((
765                                        &source_desc,
766                                        &mut stream,
767                                        ApplyMutationAfterBarrier::RefreshBatchSplits(new_splits),
768                                    ));
769                                } else {
770                                    tracing::warn!(
771                                        actor_id = self.actor_ctx.id,
772                                        %associated_source_id,
773                                        "Failed to refresh splits during RefreshStart"
774                                    );
775                                }
776                            }
777                            _ => {}
778                        }
779                    }
780
781                    let updated_splits = self.persist_state_and_clear_cache(epoch).await?;
782
783                    // when handle a checkpoint barrier, spawn a task to wait for epoch commit notification
784                    if barrier.kind.is_checkpoint()
785                        && let Some(task_builder) = &mut wait_checkpoint_task_builder
786                    {
787                        task_builder.update_task_on_checkpoint(updated_splits);
788
789                        tracing::debug!("epoch to wait {:?}", epoch);
790                        task_builder.send(Epoch(epoch.prev)).await?
791                    }
792
793                    let barrier_epoch = barrier.epoch;
794                    yield Message::Barrier(barrier);
795
796                    if let Some((source_desc, stream, to_apply_mutation)) = split_change {
797                        self.apply_split_change_after_yield_barrier(
798                            barrier_epoch,
799                            source_desc,
800                            stream,
801                            to_apply_mutation,
802                        )
803                        .await?;
804                    }
805                }
806                Either::Left(_) => {
807                    // For the source executor, the message we receive from this arm
808                    // should always be barrier message.
809                    unreachable!();
810                }
811
812                Either::Right((chunk, latest_state)) => {
813                    if let Some(task_builder) = &mut wait_checkpoint_task_builder {
814                        let offset_col = chunk.column_at(offset_idx);
815                        task_builder.update_task_on_chunk(offset_col.clone());
816                    }
817                    if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
818                        // Exceeds the max wait barrier time, the source will be paused.
819                        // Currently we can guarantee the
820                        // source is not paused since it received stream
821                        // chunks.
822                        self_paused = true;
823                        tracing::warn!(
824                            "source paused, wait barrier for {:?}",
825                            last_barrier_time.elapsed()
826                        );
827                        stream.pause_stream();
828
829                        // Only update `max_wait_barrier_time_ms` to capture
830                        // `barrier_interval_ms`
831                        // changes here to avoid frequently accessing the shared
832                        // `system_params`.
833                        max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
834                            as u128
835                            * WAIT_BARRIER_MULTIPLE_TIMES;
836                    }
837
838                    latest_state.iter().for_each(|(split_id, new_split_impl)| {
839                        if let Some(split_impl) =
840                            self.stream_source_core.latest_split_info.get_mut(split_id)
841                        {
842                            *split_impl = new_split_impl.clone();
843                        }
844                    });
845
846                    self.stream_source_core
847                        .updated_splits_in_epoch
848                        .extend(latest_state);
849
850                    let card = chunk.cardinality();
851                    source_output_row_count.inc_by(card as u64);
852                    let chunk =
853                        prune_additional_cols(&chunk, split_idx, offset_idx, &source_desc.columns);
854                    yield Message::Chunk(chunk);
855                    self.try_flush_data().await?;
856                }
857            }
858        }
859
860        // The source executor should only be stopped by the actor when finding a `Stop` mutation.
861        tracing::error!(
862            actor_id = self.actor_ctx.id,
863            "source executor exited unexpectedly"
864        )
865    }
866}
867
868#[derive(Debug, Clone)]
869enum ApplyMutationAfterBarrier<'a> {
870    SplitChange {
871        target_splits: Vec<SplitImpl>,
872        should_trim_state: bool,
873        split_change_count: &'a LabelGuardedMetric<GenericCounter<AtomicU64>>,
874    },
875    RefreshBatchSplits(Vec<SplitImpl>),
876    ConnectorPropsChange,
877}
878
879impl<S: StateStore> Execute for SourceExecutor<S> {
880    fn execute(self: Box<Self>) -> BoxedMessageStream {
881        self.execute_inner().boxed()
882    }
883}
884
885impl<S: StateStore> Debug for SourceExecutor<S> {
886    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
887        f.debug_struct("SourceExecutor")
888            .field("source_id", &self.stream_source_core.source_id)
889            .field("column_ids", &self.stream_source_core.column_ids)
890            .finish()
891    }
892}
893
894struct WaitCheckpointTaskBuilder {
895    wait_checkpoint_tx: UnboundedSender<(Epoch, WaitCheckpointTask)>,
896    source_reader: SourceReader,
897    building_task: WaitCheckpointTask,
898}
899
900impl WaitCheckpointTaskBuilder {
901    fn update_task_on_chunk(&mut self, offset_col: ArrayRef) {
902        match &mut self.building_task {
903            WaitCheckpointTask::AckPubsubMessage(_, arrays) => {
904                arrays.push(offset_col);
905            }
906            WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => {
907                arrays.push(offset_col);
908            }
909            WaitCheckpointTask::CommitCdcOffset(_) => {}
910        }
911    }
912
913    fn update_task_on_checkpoint(&mut self, updated_splits: HashMap<SplitId, SplitImpl>) {
914        #[expect(clippy::single_match)]
915        match &mut self.building_task {
916            WaitCheckpointTask::CommitCdcOffset(offsets) => {
917                if !updated_splits.is_empty() {
918                    // cdc source only has one split
919                    assert_eq!(1, updated_splits.len());
920                    for (split_id, split_impl) in updated_splits {
921                        if split_impl.is_cdc_split() {
922                            *offsets = Some((split_id, split_impl.get_cdc_split_offset()));
923                        } else {
924                            unreachable!()
925                        }
926                    }
927                }
928            }
929            _ => {}
930        }
931    }
932
933    /// Send and reset the building task to a new one.
934    async fn send(&mut self, epoch: Epoch) -> Result<(), anyhow::Error> {
935        let new_task = self
936            .source_reader
937            .create_wait_checkpoint_task()
938            .await?
939            .expect("wait checkpoint task should be created");
940        self.wait_checkpoint_tx
941            .send((epoch, std::mem::replace(&mut self.building_task, new_task)))
942            .expect("wait_checkpoint_tx send should succeed");
943        Ok(())
944    }
945}
946
947/// A worker used to do some work after each checkpoint epoch is committed.
948///
949/// # Usage Cases
950///
951/// Typically there are 2 issues related with ack on checkpoint:
952///
953/// 1. Correctness (at-least-once), or don't let upstream clean uncommitted data.
954///    For message queueing semantics (delete after ack), we should ack to avoid redelivery,
955///    and only ack after checkpoint to avoid data loss.
956///
957/// 2. Allow upstream to clean data after commit.
958///
959/// See also <https://github.com/risingwavelabs/risingwave/issues/16736#issuecomment-2109379790>
960///
961/// ## CDC
962///
963/// Commit last consumed offset to upstream DB, so that old data can be discarded.
964///
965/// ## Google Pub/Sub
966///
967/// Due to queueing semantics.
968/// Although Pub/Sub supports `retain_acked_messages` and `seek` functionality,
969/// it's quite limited unlike Kafka.
970///
971/// See also <https://cloud.google.com/pubsub/docs/subscribe-best-practices#process-messages>
972struct WaitCheckpointWorker<S: StateStore> {
973    wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>,
974    state_store: S,
975    table_id: TableId,
976    metrics: Arc<StreamingMetrics>,
977}
978
979impl<S: StateStore> WaitCheckpointWorker<S> {
980    pub async fn run(mut self) {
981        tracing::debug!("wait epoch worker start success");
982        loop {
983            // poll the rx and wait for the epoch commit
984            match self.wait_checkpoint_rx.recv().await {
985                Some((epoch, task)) => {
986                    tracing::debug!("start to wait epoch {}", epoch.0);
987                    let ret = self
988                        .state_store
989                        .try_wait_epoch(
990                            HummockReadEpoch::Committed(epoch.0),
991                            TryWaitEpochOptions {
992                                table_id: self.table_id,
993                            },
994                        )
995                        .await;
996
997                    match ret {
998                        Ok(()) => {
999                            tracing::debug!(epoch = epoch.0, "wait epoch success");
1000
1001                            // Run task with callback to record LSN after successful commit
1002                            task.run_with_on_commit_success(|source_id: u64, offset| {
1003                                if let Some(lsn_value) = extract_pg_cdc_lsn_from_offset(offset) {
1004                                    self.metrics
1005                                        .pg_cdc_jni_commit_offset_lsn
1006                                        .with_guarded_label_values(&[&source_id.to_string()])
1007                                        .set(lsn_value as i64);
1008                                }
1009                            })
1010                            .await;
1011                        }
1012                        Err(e) => {
1013                            tracing::error!(
1014                            error = %e.as_report(),
1015                            "wait epoch {} failed", epoch.0
1016                            );
1017                        }
1018                    }
1019                }
1020                None => {
1021                    tracing::error!("wait epoch rx closed");
1022                    break;
1023                }
1024            }
1025        }
1026    }
1027}
1028
1029#[cfg(test)]
1030mod tests {
1031    use maplit::{btreemap, convert_args, hashmap};
1032    use risingwave_common::catalog::{ColumnId, Field, TableId};
1033    use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
1034    use risingwave_common::test_prelude::StreamChunkTestExt;
1035    use risingwave_common::util::epoch::{EpochExt, test_epoch};
1036    use risingwave_connector::source::datagen::DatagenSplit;
1037    use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
1038    use risingwave_pb::catalog::StreamSourceInfo;
1039    use risingwave_pb::plan_common::PbRowFormatType;
1040    use risingwave_storage::memory::MemoryStateStore;
1041    use tokio::sync::mpsc::unbounded_channel;
1042    use tracing_test::traced_test;
1043
1044    use super::*;
1045    use crate::executor::AddMutation;
1046    use crate::executor::source::{SourceStateTableHandler, default_source_internal_table};
1047    use crate::task::LocalBarrierManager;
1048
1049    const MOCK_SOURCE_NAME: &str = "mock_source";
1050
1051    #[tokio::test]
1052    async fn test_source_executor() {
1053        let table_id = TableId::default();
1054        let schema = Schema {
1055            fields: vec![Field::with_name(DataType::Int32, "sequence_int")],
1056        };
1057        let row_id_index = None;
1058        let source_info = StreamSourceInfo {
1059            row_format: PbRowFormatType::Native as i32,
1060            ..Default::default()
1061        };
1062        let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1063        let column_ids = vec![0].into_iter().map(ColumnId::from).collect();
1064
1065        // This datagen will generate 3 rows at one time.
1066        let properties = convert_args!(btreemap!(
1067            "connector" => "datagen",
1068            "datagen.rows.per.second" => "3",
1069            "fields.sequence_int.kind" => "sequence",
1070            "fields.sequence_int.start" => "11",
1071            "fields.sequence_int.end" => "11111",
1072        ));
1073        let source_desc_builder =
1074            create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1075        let split_state_store = SourceStateTableHandler::from_table_catalog(
1076            &default_source_internal_table(0x2333),
1077            MemoryStateStore::new(),
1078        )
1079        .await;
1080        let core = StreamSourceCore::<MemoryStateStore> {
1081            source_id: table_id,
1082            column_ids,
1083            source_desc_builder: Some(source_desc_builder),
1084            latest_split_info: HashMap::new(),
1085            split_state_store,
1086            updated_splits_in_epoch: HashMap::new(),
1087            source_name: MOCK_SOURCE_NAME.to_owned(),
1088            is_batch_source: false,
1089        };
1090
1091        let system_params_manager = LocalSystemParamsManager::for_test();
1092
1093        let executor = SourceExecutor::new(
1094            ActorContext::for_test(0),
1095            core,
1096            Arc::new(StreamingMetrics::unused()),
1097            barrier_rx,
1098            system_params_manager.get_params(),
1099            None,
1100            false,
1101            LocalBarrierManager::for_test(),
1102        );
1103        let mut executor = executor.boxed().execute();
1104
1105        let init_barrier =
1106            Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
1107                splits: hashmap! {
1108                    ActorId::default() => vec![
1109                        SplitImpl::Datagen(DatagenSplit {
1110                            split_index: 0,
1111                            split_num: 1,
1112                            start_offset: None,
1113                        }),
1114                    ],
1115                },
1116                ..Default::default()
1117            }));
1118        barrier_tx.send(init_barrier).unwrap();
1119
1120        // Consume barrier.
1121        executor.next().await.unwrap().unwrap();
1122
1123        // Consume data chunk.
1124        let msg = executor.next().await.unwrap().unwrap();
1125
1126        // Row id will not be filled here.
1127        assert_eq!(
1128            msg.into_chunk().unwrap(),
1129            StreamChunk::from_pretty(
1130                " i
1131                + 11
1132                + 12
1133                + 13"
1134            )
1135        );
1136    }
1137
1138    #[traced_test]
1139    #[tokio::test]
1140    async fn test_split_change_mutation() {
1141        let table_id = TableId::default();
1142        let schema = Schema {
1143            fields: vec![Field::with_name(DataType::Int32, "v1")],
1144        };
1145        let row_id_index = None;
1146        let source_info = StreamSourceInfo {
1147            row_format: PbRowFormatType::Native as i32,
1148            ..Default::default()
1149        };
1150        let properties = convert_args!(btreemap!(
1151            "connector" => "datagen",
1152            "fields.v1.kind" => "sequence",
1153            "fields.v1.start" => "11",
1154            "fields.v1.end" => "11111",
1155        ));
1156
1157        let source_desc_builder =
1158            create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1159        let mem_state_store = MemoryStateStore::new();
1160
1161        let column_ids = vec![ColumnId::from(0)];
1162        let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1163        let split_state_store = SourceStateTableHandler::from_table_catalog(
1164            &default_source_internal_table(0x2333),
1165            mem_state_store.clone(),
1166        )
1167        .await;
1168
1169        let core = StreamSourceCore::<MemoryStateStore> {
1170            source_id: table_id,
1171            column_ids: column_ids.clone(),
1172            source_desc_builder: Some(source_desc_builder),
1173            latest_split_info: HashMap::new(),
1174            split_state_store,
1175            updated_splits_in_epoch: HashMap::new(),
1176            source_name: MOCK_SOURCE_NAME.to_owned(),
1177            is_batch_source: false,
1178        };
1179
1180        let system_params_manager = LocalSystemParamsManager::for_test();
1181
1182        let executor = SourceExecutor::new(
1183            ActorContext::for_test(0),
1184            core,
1185            Arc::new(StreamingMetrics::unused()),
1186            barrier_rx,
1187            system_params_manager.get_params(),
1188            None,
1189            false,
1190            LocalBarrierManager::for_test(),
1191        );
1192        let mut handler = executor.boxed().execute();
1193
1194        let mut epoch = test_epoch(1);
1195        let init_barrier =
1196            Barrier::new_test_barrier(epoch).with_mutation(Mutation::Add(AddMutation {
1197                splits: hashmap! {
1198                    ActorId::default() => vec![
1199                        SplitImpl::Datagen(DatagenSplit {
1200                            split_index: 0,
1201                            split_num: 3,
1202                            start_offset: None,
1203                        }),
1204                    ],
1205                },
1206                ..Default::default()
1207            }));
1208        barrier_tx.send(init_barrier).unwrap();
1209
1210        // Consume barrier.
1211        handler
1212            .next()
1213            .await
1214            .unwrap()
1215            .unwrap()
1216            .into_barrier()
1217            .unwrap();
1218
1219        let mut ready_chunks = handler.ready_chunks(10);
1220
1221        let _ = ready_chunks.next().await.unwrap();
1222
1223        let new_assignment = vec![
1224            SplitImpl::Datagen(DatagenSplit {
1225                split_index: 0,
1226                split_num: 3,
1227                start_offset: None,
1228            }),
1229            SplitImpl::Datagen(DatagenSplit {
1230                split_index: 1,
1231                split_num: 3,
1232                start_offset: None,
1233            }),
1234            SplitImpl::Datagen(DatagenSplit {
1235                split_index: 2,
1236                split_num: 3,
1237                start_offset: None,
1238            }),
1239        ];
1240
1241        epoch.inc_epoch();
1242        let change_split_mutation =
1243            Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1244                ActorId::default() => new_assignment.clone()
1245            }));
1246
1247        barrier_tx.send(change_split_mutation).unwrap();
1248
1249        let _ = ready_chunks.next().await.unwrap(); // barrier
1250
1251        epoch.inc_epoch();
1252        let barrier = Barrier::new_test_barrier(epoch);
1253        barrier_tx.send(barrier).unwrap();
1254
1255        ready_chunks.next().await.unwrap(); // barrier
1256
1257        let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1258            &default_source_internal_table(0x2333),
1259            mem_state_store.clone(),
1260        )
1261        .await;
1262
1263        // there must exist state for new add partition
1264        source_state_handler
1265            .init_epoch(EpochPair::new_test_epoch(epoch))
1266            .await
1267            .unwrap();
1268        source_state_handler
1269            .get(&new_assignment[1].id())
1270            .await
1271            .unwrap()
1272            .unwrap();
1273
1274        tokio::time::sleep(Duration::from_millis(100)).await;
1275
1276        let _ = ready_chunks.next().await.unwrap();
1277
1278        epoch.inc_epoch();
1279        let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Pause);
1280        barrier_tx.send(barrier).unwrap();
1281
1282        epoch.inc_epoch();
1283        let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Resume);
1284        barrier_tx.send(barrier).unwrap();
1285
1286        // receive all
1287        ready_chunks.next().await.unwrap();
1288
1289        let prev_assignment = new_assignment;
1290        let new_assignment = vec![prev_assignment[2].clone()];
1291
1292        epoch.inc_epoch();
1293        let drop_split_mutation =
1294            Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1295                ActorId::default() => new_assignment.clone()
1296            }));
1297
1298        barrier_tx.send(drop_split_mutation).unwrap();
1299
1300        ready_chunks.next().await.unwrap(); // barrier
1301
1302        epoch.inc_epoch();
1303        let barrier = Barrier::new_test_barrier(epoch);
1304        barrier_tx.send(barrier).unwrap();
1305
1306        ready_chunks.next().await.unwrap(); // barrier
1307
1308        let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1309            &default_source_internal_table(0x2333),
1310            mem_state_store.clone(),
1311        )
1312        .await;
1313
1314        let new_epoch = EpochPair::new_test_epoch(epoch);
1315        source_state_handler.init_epoch(new_epoch).await.unwrap();
1316
1317        let committed_reader = source_state_handler
1318            .new_committed_reader(new_epoch)
1319            .await
1320            .unwrap();
1321        assert!(
1322            committed_reader
1323                .try_recover_from_state_store(&prev_assignment[0])
1324                .await
1325                .unwrap()
1326                .is_none()
1327        );
1328
1329        assert!(
1330            committed_reader
1331                .try_recover_from_state_store(&prev_assignment[1])
1332                .await
1333                .unwrap()
1334                .is_none()
1335        );
1336
1337        assert!(
1338            committed_reader
1339                .try_recover_from_state_store(&prev_assignment[2])
1340                .await
1341                .unwrap()
1342                .is_some()
1343        );
1344    }
1345}