risingwave_stream/executor/source/
source_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use either::Either;
20use itertools::Itertools;
21use prometheus::core::{AtomicU64, GenericCounter};
22use risingwave_common::array::ArrayRef;
23use risingwave_common::catalog::{ColumnId, TableId};
24use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedMetric};
25use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::util::epoch::{Epoch, EpochPair};
28use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
29use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
30use risingwave_connector::source::reader::reader::SourceReader;
31use risingwave_connector::source::{
32    ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData,
33    StreamChunkWithState, WaitCheckpointTask,
34};
35use risingwave_hummock_sdk::HummockReadEpoch;
36use risingwave_storage::store::TryWaitEpochOptions;
37use serde_json;
38use thiserror_ext::AsReport;
39use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
40use tokio::sync::{mpsc, oneshot};
41use tokio::time::Instant;
42
43use super::executor_core::StreamSourceCore;
44use super::{barrier_to_message_stream, get_split_offset_col_idx, prune_additional_cols};
45use crate::common::rate_limit::limited_chunk_size;
46use crate::executor::UpdateMutation;
47use crate::executor::prelude::*;
48use crate::executor::source::reader_stream::StreamReaderBuilder;
49use crate::executor::stream_reader::StreamReaderWithPause;
50use crate::task::LocalBarrierManager;
51
52/// A constant to multiply when calculating the maximum time to wait for a barrier. This is due to
53/// some latencies in network and cost in meta.
54pub const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;
55
56/// Extract offset value from CDC split
57///
58/// This function extracts the offset value from CDC split.
59/// For Postgres CDC, the offset is LSN.
60fn extract_split_offset(split: &SplitImpl) -> Option<u64> {
61    match split {
62        SplitImpl::PostgresCdc(pg_split) => {
63            let offset_str = pg_split.start_offset().as_ref()?;
64            extract_pg_cdc_lsn_from_offset(offset_str)
65        }
66        _ => None,
67    }
68}
69
70/// This function parses the offset JSON and extracts the LSN value from the sourceOffset.lsn field.
71/// Returns Some(lsn) if the LSN is found and can be parsed as u64, None otherwise.
72fn extract_pg_cdc_lsn_from_offset(offset_str: &str) -> Option<u64> {
73    let offset = serde_json::from_str::<serde_json::Value>(offset_str).ok()?;
74    let source_offset = offset.get("sourceOffset")?;
75    let lsn = source_offset.get("lsn")?;
76    lsn.as_u64()
77}
78
79pub struct SourceExecutor<S: StateStore> {
80    actor_ctx: ActorContextRef,
81
82    /// Streaming source for external
83    stream_source_core: StreamSourceCore<S>,
84
85    /// Metrics for monitor.
86    metrics: Arc<StreamingMetrics>,
87
88    /// Receiver of barrier channel.
89    barrier_receiver: Option<UnboundedReceiver<Barrier>>,
90
91    /// System parameter reader to read barrier interval
92    system_params: SystemParamsReaderRef,
93
94    /// Rate limit in rows/s.
95    rate_limit_rps: Option<u32>,
96
97    is_shared_non_cdc: bool,
98
99    /// Local barrier manager for reporting source load finished events
100    _barrier_manager: LocalBarrierManager,
101}
102
103impl<S: StateStore> SourceExecutor<S> {
104    #[expect(clippy::too_many_arguments)]
105    pub fn new(
106        actor_ctx: ActorContextRef,
107        stream_source_core: StreamSourceCore<S>,
108        metrics: Arc<StreamingMetrics>,
109        barrier_receiver: UnboundedReceiver<Barrier>,
110        system_params: SystemParamsReaderRef,
111        rate_limit_rps: Option<u32>,
112        is_shared_non_cdc: bool,
113        barrier_manager: LocalBarrierManager,
114    ) -> Self {
115        Self {
116            actor_ctx,
117            stream_source_core,
118            metrics,
119            barrier_receiver: Some(barrier_receiver),
120            system_params,
121            rate_limit_rps,
122            is_shared_non_cdc,
123            _barrier_manager: barrier_manager,
124        }
125    }
126
127    fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder {
128        StreamReaderBuilder {
129            source_desc,
130            rate_limit: self.rate_limit_rps,
131            source_id: self.stream_source_core.source_id,
132            source_name: self.stream_source_core.source_name.clone(),
133            is_auto_schema_change_enable: self.is_auto_schema_change_enable(),
134            actor_ctx: self.actor_ctx.clone(),
135            reader_stream: None,
136        }
137    }
138
139    async fn spawn_wait_checkpoint_worker(
140        core: &StreamSourceCore<S>,
141        source_reader: SourceReader,
142        metrics: Arc<StreamingMetrics>,
143    ) -> StreamExecutorResult<Option<WaitCheckpointTaskBuilder>> {
144        let Some(initial_task) = source_reader.create_wait_checkpoint_task().await? else {
145            return Ok(None);
146        };
147        let (wait_checkpoint_tx, wait_checkpoint_rx) = mpsc::unbounded_channel();
148        let wait_checkpoint_worker = WaitCheckpointWorker {
149            wait_checkpoint_rx,
150            state_store: core.split_state_store.state_table().state_store().clone(),
151            table_id: core.split_state_store.state_table().table_id().into(),
152            metrics,
153        };
154        tokio::spawn(wait_checkpoint_worker.run());
155        Ok(Some(WaitCheckpointTaskBuilder {
156            wait_checkpoint_tx,
157            source_reader,
158            building_task: initial_task,
159        }))
160    }
161
162    /// build the source column ids and the source context which will be used to build the source stream
163    pub fn prepare_source_stream_build(
164        &self,
165        source_desc: &SourceDesc,
166    ) -> (Vec<ColumnId>, SourceContext) {
167        let column_ids = source_desc
168            .columns
169            .iter()
170            .map(|column_desc| column_desc.column_id)
171            .collect_vec();
172
173        let (schema_change_tx, mut schema_change_rx) =
174            mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
175        let schema_change_tx = if self.is_auto_schema_change_enable() {
176            let meta_client = self.actor_ctx.meta_client.clone();
177            // spawn a task to handle schema change event from source parser
178            let _join_handle = tokio::task::spawn(async move {
179                while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
180                    let table_ids = schema_change.table_ids();
181                    tracing::info!(
182                        target: "auto_schema_change",
183                        "recv a schema change event for tables: {:?}", table_ids);
184                    // TODO: retry on rpc error
185                    if let Some(ref meta_client) = meta_client {
186                        match meta_client
187                            .auto_schema_change(schema_change.to_protobuf())
188                            .await
189                        {
190                            Ok(_) => {
191                                tracing::info!(
192                                    target: "auto_schema_change",
193                                    "schema change success for tables: {:?}", table_ids);
194                                finish_tx.send(()).unwrap();
195                            }
196                            Err(e) => {
197                                tracing::error!(
198                                    target: "auto_schema_change",
199                                    error = ?e.as_report(), "schema change error");
200                                finish_tx.send(()).unwrap();
201                            }
202                        }
203                    }
204                }
205            });
206            Some(schema_change_tx)
207        } else {
208            info!("auto schema change is disabled in config");
209            None
210        };
211        let source_ctx = SourceContext::new(
212            self.actor_ctx.id,
213            self.stream_source_core.source_id,
214            self.actor_ctx.fragment_id,
215            self.stream_source_core.source_name.clone(),
216            source_desc.metrics.clone(),
217            SourceCtrlOpts {
218                chunk_size: limited_chunk_size(self.rate_limit_rps),
219                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
220            },
221            source_desc.source.config.clone(),
222            schema_change_tx,
223        );
224
225        (column_ids, source_ctx)
226    }
227
228    fn is_auto_schema_change_enable(&self) -> bool {
229        self.actor_ctx
230            .streaming_config
231            .developer
232            .enable_auto_schema_change
233    }
234
235    /// `source_id | source_name | actor_id | fragment_id`
236    #[inline]
237    fn get_metric_labels(&self) -> [String; 4] {
238        [
239            self.stream_source_core.source_id.to_string(),
240            self.stream_source_core.source_name.clone(),
241            self.actor_ctx.id.to_string(),
242            self.actor_ctx.fragment_id.to_string(),
243        ]
244    }
245
246    /// - `should_trim_state`: whether to trim state for dropped splits.
247    ///
248    ///   For scaling, the connector splits can be migrated to other actors, but
249    ///   won't be added or removed. Actors should not trim states for splits that
250    ///   are moved to other actors.
251    ///
252    ///   For source split change, split will not be migrated and we can trim states
253    ///   for deleted splits.
254    async fn apply_split_change_after_yield_barrier<const BIASED: bool>(
255        &mut self,
256        barrier_epoch: EpochPair,
257        source_desc: &SourceDesc,
258        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
259        apply_mutation: ApplyMutationAfterBarrier<'_>,
260    ) -> StreamExecutorResult<()> {
261        {
262            let mut should_rebuild_stream = false;
263            match apply_mutation {
264                ApplyMutationAfterBarrier::SplitChange {
265                    target_splits,
266                    should_trim_state,
267                    split_change_count,
268                } => {
269                    split_change_count.inc();
270                    if self
271                        .update_state_if_changed(barrier_epoch, target_splits, should_trim_state)
272                        .await?
273                    {
274                        should_rebuild_stream = true;
275                    }
276                }
277                ApplyMutationAfterBarrier::ConnectorPropsChange => {
278                    should_rebuild_stream = true;
279                }
280            }
281
282            if should_rebuild_stream {
283                self.rebuild_stream_reader(source_desc, stream)?;
284            }
285        }
286
287        Ok(())
288    }
289
290    /// Returns `true` if split changed. Otherwise `false`.
291    async fn update_state_if_changed(
292        &mut self,
293        barrier_epoch: EpochPair,
294        target_splits: Vec<SplitImpl>,
295        should_trim_state: bool,
296    ) -> StreamExecutorResult<bool> {
297        let core = &mut self.stream_source_core;
298
299        let target_splits: HashMap<_, _> = target_splits
300            .into_iter()
301            .map(|split| (split.id(), split))
302            .collect();
303
304        let mut target_state: HashMap<SplitId, SplitImpl> =
305            HashMap::with_capacity(target_splits.len());
306
307        let mut split_changed = false;
308
309        let committed_reader = core
310            .split_state_store
311            .new_committed_reader(barrier_epoch)
312            .await?;
313
314        // Checks added splits
315        for (split_id, split) in target_splits {
316            if let Some(s) = core.latest_split_info.get(&split_id) {
317                // For existing splits, we should use the latest offset from the cache.
318                // `target_splits` is from meta and contains the initial offset.
319                target_state.insert(split_id, s.clone());
320            } else {
321                split_changed = true;
322                // write new assigned split to state cache. snapshot is base on cache.
323
324                let initial_state = if let Some(recover_state) = committed_reader
325                    .try_recover_from_state_store(&split)
326                    .await?
327                {
328                    recover_state
329                } else {
330                    split
331                };
332
333                core.updated_splits_in_epoch
334                    .entry(split_id.clone())
335                    .or_insert_with(|| initial_state.clone());
336
337                target_state.insert(split_id, initial_state);
338            }
339        }
340
341        // Checks dropped splits
342        for existing_split_id in core.latest_split_info.keys() {
343            if !target_state.contains_key(existing_split_id) {
344                tracing::info!("split dropping detected: {}", existing_split_id);
345                split_changed = true;
346            }
347        }
348
349        if split_changed {
350            tracing::info!(
351                actor_id = self.actor_ctx.id,
352                state = ?target_state,
353                "apply split change"
354            );
355
356            core.updated_splits_in_epoch
357                .retain(|split_id, _| target_state.contains_key(split_id));
358
359            let dropped_splits = core
360                .latest_split_info
361                .extract_if(|split_id, _| !target_state.contains_key(split_id))
362                .map(|(_, split)| split)
363                .collect_vec();
364
365            if should_trim_state && !dropped_splits.is_empty() {
366                // trim dropped splits' state
367                core.split_state_store.trim_state(&dropped_splits).await?;
368            }
369
370            core.latest_split_info = target_state;
371        }
372
373        Ok(split_changed)
374    }
375
376    /// Rebuild stream if there is a err in stream
377    fn rebuild_stream_reader_from_error<const BIASED: bool>(
378        &mut self,
379        source_desc: &SourceDesc,
380        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
381        e: StreamExecutorError,
382    ) -> StreamExecutorResult<()> {
383        let core = &mut self.stream_source_core;
384        tracing::error!(
385            error = ?e.as_report(),
386            actor_id = self.actor_ctx.id,
387            source_id = %core.source_id,
388            "stream source reader error",
389        );
390        GLOBAL_ERROR_METRICS.user_source_error.report([
391            e.variant_name().to_owned(),
392            core.source_id.to_string(),
393            core.source_name.clone(),
394            self.actor_ctx.fragment_id.to_string(),
395        ]);
396
397        self.rebuild_stream_reader(source_desc, stream)
398    }
399
400    fn rebuild_stream_reader<const BIASED: bool>(
401        &mut self,
402        source_desc: &SourceDesc,
403        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
404    ) -> StreamExecutorResult<()> {
405        let core = &mut self.stream_source_core;
406        let target_state: Vec<SplitImpl> = core.latest_split_info.values().cloned().collect();
407
408        tracing::info!(
409            "actor {:?} apply source split change to {:?}",
410            self.actor_ctx.id,
411            target_state
412        );
413
414        // Replace the source reader with a new one of the new state.
415        let reader_stream_builder = self.stream_reader_builder(source_desc.clone());
416        let reader_stream = reader_stream_builder.into_retry_stream(Some(target_state), false);
417
418        stream.replace_data_stream(reader_stream);
419
420        Ok(())
421    }
422
423    async fn persist_state_and_clear_cache(
424        &mut self,
425        epoch: EpochPair,
426    ) -> StreamExecutorResult<HashMap<SplitId, SplitImpl>> {
427        let core = &mut self.stream_source_core;
428
429        let cache = core
430            .updated_splits_in_epoch
431            .values()
432            .map(|split_impl| split_impl.to_owned())
433            .collect_vec();
434
435        if !cache.is_empty() {
436            tracing::debug!(state = ?cache, "take snapshot");
437
438            // Record LSN metrics for PostgreSQL CDC sources before moving cache
439            let source_id = core.source_id.to_string();
440            for split_impl in &cache {
441                // Extract offset for CDC using type-safe matching
442                if let Some(state_table_lsn_value) = extract_split_offset(split_impl) {
443                    self.metrics
444                        .pg_cdc_state_table_lsn
445                        .with_guarded_label_values(&[&source_id])
446                        .set(state_table_lsn_value as i64);
447                }
448            }
449
450            core.split_state_store.set_states(cache).await?;
451        }
452
453        // commit anyway, even if no message saved
454        core.split_state_store.commit(epoch).await?;
455
456        let updated_splits = core.updated_splits_in_epoch.clone();
457
458        core.updated_splits_in_epoch.clear();
459
460        Ok(updated_splits)
461    }
462
463    /// try mem table spill
464    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
465        let core = &mut self.stream_source_core;
466        core.split_state_store.try_flush().await?;
467
468        Ok(())
469    }
470
471    /// A source executor with a stream source receives:
472    /// 1. Barrier messages
473    /// 2. Data from external source
474    /// and acts accordingly.
475    #[try_stream(ok = Message, error = StreamExecutorError)]
476    async fn execute_inner(mut self) {
477        let mut barrier_receiver = self.barrier_receiver.take().unwrap();
478        let first_barrier = barrier_receiver
479            .recv()
480            .instrument_await("source_recv_first_barrier")
481            .await
482            .ok_or_else(|| {
483                anyhow!(
484                    "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
485                    self.actor_ctx.id,
486                    self.stream_source_core.source_id
487                )
488            })?;
489        let first_epoch = first_barrier.epoch;
490        let mut boot_state =
491            if let Some(splits) = first_barrier.initial_split_assignment(self.actor_ctx.id) {
492                tracing::debug!(?splits, "boot with splits");
493                splits.to_vec()
494            } else {
495                Vec::default()
496            };
497        let is_pause_on_startup = first_barrier.is_pause_on_startup();
498        let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id);
499
500        yield Message::Barrier(first_barrier);
501
502        let mut core = self.stream_source_core;
503        let source_id = core.source_id;
504
505        // Build source description from the builder.
506        let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap();
507        let mut source_desc = source_desc_builder
508            .build()
509            .map_err(StreamExecutorError::connector_error)?;
510
511        let mut wait_checkpoint_task_builder = Self::spawn_wait_checkpoint_worker(
512            &core,
513            source_desc.source.clone(),
514            self.metrics.clone(),
515        )
516        .await?;
517
518        let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns)
519        else {
520            unreachable!("Partition and offset columns must be set.");
521        };
522
523        core.split_state_store.init_epoch(first_epoch).await?;
524        {
525            let committed_reader = core
526                .split_state_store
527                .new_committed_reader(first_epoch)
528                .await?;
529            for ele in &mut boot_state {
530                if let Some(recover_state) =
531                    committed_reader.try_recover_from_state_store(ele).await?
532                {
533                    *ele = recover_state;
534                    // if state store is non-empty, we consider it's initialized.
535                    is_uninitialized = false;
536                } else {
537                    // This is a new split, not in state table.
538                    // make sure it is written to state table later.
539                    // Then even it receives no messages, we can observe it in state table.
540                    core.updated_splits_in_epoch.insert(ele.id(), ele.clone());
541                }
542            }
543        }
544
545        // init in-memory split states with persisted state if any
546        core.init_split_state(boot_state.clone());
547
548        // Return the ownership of `stream_source_core` to the source executor.
549        self.stream_source_core = core;
550
551        let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
552        tracing::debug!(state = ?recover_state, "start with state");
553
554        let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
555        let mut reader_stream_builder = self.stream_reader_builder(source_desc.clone());
556        let mut latest_splits = None;
557        // Build the source stream reader.
558        if is_uninitialized {
559            let create_split_reader_result = reader_stream_builder
560                .fetch_latest_splits(recover_state.clone(), self.is_shared_non_cdc)
561                .await?;
562            latest_splits = create_split_reader_result.latest_splits;
563        }
564
565        if let Some(latest_splits) = latest_splits {
566            // make sure it is written to state table later.
567            // Then even it receives no messages, we can observe it in state table.
568            self.stream_source_core
569                .updated_splits_in_epoch
570                .extend(latest_splits.into_iter().map(|s| (s.id(), s)));
571        }
572        // Merge the chunks from source and the barriers into a single stream. We prioritize
573        // barriers over source data chunks here.
574        let mut stream = StreamReaderWithPause::<true, StreamChunkWithState>::new(
575            barrier_stream,
576            reader_stream_builder
577                .into_retry_stream(recover_state, is_uninitialized && self.is_shared_non_cdc),
578        );
579        let mut command_paused = false;
580
581        // - If the first barrier requires us to pause on startup, pause the stream.
582        if is_pause_on_startup {
583            tracing::info!("source paused on startup");
584            stream.pause_stream();
585            command_paused = true;
586        }
587
588        // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms`
589        // milliseconds, considering some other latencies like network and cost in Meta.
590        let mut max_wait_barrier_time_ms =
591            self.system_params.load().barrier_interval_ms() as u128 * WAIT_BARRIER_MULTIPLE_TIMES;
592        let mut last_barrier_time = Instant::now();
593        let mut self_paused = false;
594
595        let source_output_row_count = self
596            .metrics
597            .source_output_row_count
598            .with_guarded_label_values(&self.get_metric_labels());
599
600        let source_split_change_count = self
601            .metrics
602            .source_split_change_count
603            .with_guarded_label_values(&self.get_metric_labels());
604
605        while let Some(msg) = stream.next().await {
606            let Ok(msg) = msg else {
607                tokio::time::sleep(Duration::from_millis(1000)).await;
608                self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err())?;
609                continue;
610            };
611
612            match msg {
613                // This branch will be preferred.
614                Either::Left(Message::Barrier(barrier)) => {
615                    last_barrier_time = Instant::now();
616
617                    if self_paused {
618                        self_paused = false;
619                        // command_paused has a higher priority.
620                        if !command_paused {
621                            stream.resume_stream();
622                        }
623                    }
624
625                    let epoch = barrier.epoch;
626                    let mut split_change = None;
627
628                    if let Some(mutation) = barrier.mutation.as_deref() {
629                        match mutation {
630                            Mutation::Pause => {
631                                command_paused = true;
632                                stream.pause_stream()
633                            }
634                            Mutation::Resume => {
635                                command_paused = false;
636                                stream.resume_stream()
637                            }
638                            Mutation::SourceChangeSplit(actor_splits) => {
639                                tracing::info!(
640                                    actor_id = self.actor_ctx.id,
641                                    actor_splits = ?actor_splits,
642                                    "source change split received"
643                                );
644
645                                split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
646                                    |target_splits| {
647                                        (
648                                            &source_desc,
649                                            &mut stream,
650                                            ApplyMutationAfterBarrier::SplitChange {
651                                                target_splits,
652                                                should_trim_state: true,
653                                                split_change_count: &source_split_change_count,
654                                            },
655                                        )
656                                    },
657                                );
658                            }
659
660                            Mutation::ConnectorPropsChange(maybe_mutation) => {
661                                if let Some(new_props) = maybe_mutation.get(&source_id.table_id()) {
662                                    // rebuild the stream reader with new props
663                                    tracing::info!(
664                                        "updating source properties from {:?} to {:?}",
665                                        source_desc.source.config,
666                                        new_props
667                                    );
668                                    source_desc.update_reader(new_props.clone())?;
669                                    // suppose the connector props change will not involve state change
670                                    split_change = Some((
671                                        &source_desc,
672                                        &mut stream,
673                                        ApplyMutationAfterBarrier::ConnectorPropsChange,
674                                    ));
675                                }
676                            }
677
678                            Mutation::Update(UpdateMutation { actor_splits, .. }) => {
679                                split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
680                                    |target_splits| {
681                                        (
682                                            &source_desc,
683                                            &mut stream,
684                                            ApplyMutationAfterBarrier::SplitChange {
685                                                target_splits,
686                                                should_trim_state: false,
687                                                split_change_count: &source_split_change_count,
688                                            },
689                                        )
690                                    },
691                                );
692                            }
693                            Mutation::Throttle(actor_to_apply) => {
694                                if let Some(new_rate_limit) = actor_to_apply.get(&self.actor_ctx.id)
695                                    && *new_rate_limit != self.rate_limit_rps
696                                {
697                                    tracing::info!(
698                                        "updating rate limit from {:?} to {:?}",
699                                        self.rate_limit_rps,
700                                        *new_rate_limit
701                                    );
702                                    self.rate_limit_rps = *new_rate_limit;
703                                    // recreate from latest_split_info
704                                    self.rebuild_stream_reader(&source_desc, &mut stream)?;
705                                }
706                            }
707                            _ => {}
708                        }
709                    }
710
711                    let updated_splits = self.persist_state_and_clear_cache(epoch).await?;
712
713                    // when handle a checkpoint barrier, spawn a task to wait for epoch commit notification
714                    if barrier.kind.is_checkpoint()
715                        && let Some(task_builder) = &mut wait_checkpoint_task_builder
716                    {
717                        task_builder.update_task_on_checkpoint(updated_splits);
718
719                        tracing::debug!("epoch to wait {:?}", epoch);
720                        task_builder.send(Epoch(epoch.prev)).await?
721                    }
722
723                    let barrier_epoch = barrier.epoch;
724                    yield Message::Barrier(barrier);
725
726                    if let Some((source_desc, stream, to_apply_mutation)) = split_change {
727                        self.apply_split_change_after_yield_barrier(
728                            barrier_epoch,
729                            source_desc,
730                            stream,
731                            to_apply_mutation,
732                        )
733                        .await?;
734                    }
735                }
736                Either::Left(_) => {
737                    // For the source executor, the message we receive from this arm
738                    // should always be barrier message.
739                    unreachable!();
740                }
741
742                Either::Right((chunk, latest_state)) => {
743                    if let Some(task_builder) = &mut wait_checkpoint_task_builder {
744                        let offset_col = chunk.column_at(offset_idx);
745                        task_builder.update_task_on_chunk(offset_col.clone());
746                    }
747                    if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
748                        // Exceeds the max wait barrier time, the source will be paused.
749                        // Currently we can guarantee the
750                        // source is not paused since it received stream
751                        // chunks.
752                        self_paused = true;
753                        tracing::warn!(
754                            "source paused, wait barrier for {:?}",
755                            last_barrier_time.elapsed()
756                        );
757                        stream.pause_stream();
758
759                        // Only update `max_wait_barrier_time_ms` to capture
760                        // `barrier_interval_ms`
761                        // changes here to avoid frequently accessing the shared
762                        // `system_params`.
763                        max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
764                            as u128
765                            * WAIT_BARRIER_MULTIPLE_TIMES;
766                    }
767
768                    latest_state.iter().for_each(|(split_id, new_split_impl)| {
769                        if let Some(split_impl) =
770                            self.stream_source_core.latest_split_info.get_mut(split_id)
771                        {
772                            *split_impl = new_split_impl.clone();
773                        }
774                    });
775
776                    self.stream_source_core
777                        .updated_splits_in_epoch
778                        .extend(latest_state);
779
780                    let card = chunk.cardinality();
781                    source_output_row_count.inc_by(card as u64);
782                    let chunk =
783                        prune_additional_cols(&chunk, split_idx, offset_idx, &source_desc.columns);
784                    yield Message::Chunk(chunk);
785                    self.try_flush_data().await?;
786                }
787            }
788        }
789
790        // The source executor should only be stopped by the actor when finding a `Stop` mutation.
791        tracing::error!(
792            actor_id = self.actor_ctx.id,
793            "source executor exited unexpectedly"
794        )
795    }
796}
797
798#[derive(Debug, Clone)]
799enum ApplyMutationAfterBarrier<'a> {
800    SplitChange {
801        target_splits: Vec<SplitImpl>,
802        should_trim_state: bool,
803        split_change_count: &'a LabelGuardedMetric<GenericCounter<AtomicU64>>,
804    },
805    ConnectorPropsChange,
806}
807
808impl<S: StateStore> Execute for SourceExecutor<S> {
809    fn execute(self: Box<Self>) -> BoxedMessageStream {
810        self.execute_inner().boxed()
811    }
812}
813
814impl<S: StateStore> Debug for SourceExecutor<S> {
815    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
816        f.debug_struct("SourceExecutor")
817            .field("source_id", &self.stream_source_core.source_id)
818            .field("column_ids", &self.stream_source_core.column_ids)
819            .finish()
820    }
821}
822
823struct WaitCheckpointTaskBuilder {
824    wait_checkpoint_tx: UnboundedSender<(Epoch, WaitCheckpointTask)>,
825    source_reader: SourceReader,
826    building_task: WaitCheckpointTask,
827}
828
829impl WaitCheckpointTaskBuilder {
830    fn update_task_on_chunk(&mut self, offset_col: ArrayRef) {
831        match &mut self.building_task {
832            WaitCheckpointTask::AckPubsubMessage(_, arrays) => {
833                arrays.push(offset_col);
834            }
835            WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => {
836                arrays.push(offset_col);
837            }
838            WaitCheckpointTask::CommitCdcOffset(_) => {}
839        }
840    }
841
842    fn update_task_on_checkpoint(&mut self, updated_splits: HashMap<SplitId, SplitImpl>) {
843        #[expect(clippy::single_match)]
844        match &mut self.building_task {
845            WaitCheckpointTask::CommitCdcOffset(offsets) => {
846                if !updated_splits.is_empty() {
847                    // cdc source only has one split
848                    assert_eq!(1, updated_splits.len());
849                    for (split_id, split_impl) in updated_splits {
850                        if split_impl.is_cdc_split() {
851                            *offsets = Some((split_id, split_impl.get_cdc_split_offset()));
852                        } else {
853                            unreachable!()
854                        }
855                    }
856                }
857            }
858            _ => {}
859        }
860    }
861
862    /// Send and reset the building task to a new one.
863    async fn send(&mut self, epoch: Epoch) -> Result<(), anyhow::Error> {
864        let new_task = self
865            .source_reader
866            .create_wait_checkpoint_task()
867            .await?
868            .expect("wait checkpoint task should be created");
869        self.wait_checkpoint_tx
870            .send((epoch, std::mem::replace(&mut self.building_task, new_task)))
871            .expect("wait_checkpoint_tx send should succeed");
872        Ok(())
873    }
874}
875
876/// A worker used to do some work after each checkpoint epoch is committed.
877///
878/// # Usage Cases
879///
880/// Typically there are 2 issues related with ack on checkpoint:
881///
882/// 1. Correctness (at-least-once), or don't let upstream clean uncommitted data.
883///    For message queueing semantics (delete after ack), we should ack to avoid redelivery,
884///    and only ack after checkpoint to avoid data loss.
885///
886/// 2. Allow upstream to clean data after commit.
887///
888/// See also <https://github.com/risingwavelabs/risingwave/issues/16736#issuecomment-2109379790>
889///
890/// ## CDC
891///
892/// Commit last consumed offset to upstream DB, so that old data can be discarded.
893///
894/// ## Google Pub/Sub
895///
896/// Due to queueing semantics.
897/// Although Pub/Sub supports `retain_acked_messages` and `seek` functionality,
898/// it's quite limited unlike Kafka.
899///
900/// See also <https://cloud.google.com/pubsub/docs/subscribe-best-practices#process-messages>
901struct WaitCheckpointWorker<S: StateStore> {
902    wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>,
903    state_store: S,
904    table_id: TableId,
905    metrics: Arc<StreamingMetrics>,
906}
907
908impl<S: StateStore> WaitCheckpointWorker<S> {
909    pub async fn run(mut self) {
910        tracing::debug!("wait epoch worker start success");
911        loop {
912            // poll the rx and wait for the epoch commit
913            match self.wait_checkpoint_rx.recv().await {
914                Some((epoch, task)) => {
915                    tracing::debug!("start to wait epoch {}", epoch.0);
916                    let ret = self
917                        .state_store
918                        .try_wait_epoch(
919                            HummockReadEpoch::Committed(epoch.0),
920                            TryWaitEpochOptions {
921                                table_id: self.table_id,
922                            },
923                        )
924                        .await;
925
926                    match ret {
927                        Ok(()) => {
928                            tracing::debug!(epoch = epoch.0, "wait epoch success");
929
930                            // Run task with callback to record LSN after successful commit
931                            task.run_with_on_commit_success(|source_id: u64, offset| {
932                                if let Some(lsn_value) = extract_pg_cdc_lsn_from_offset(offset) {
933                                    self.metrics
934                                        .pg_cdc_jni_commit_offset_lsn
935                                        .with_guarded_label_values(&[&source_id.to_string()])
936                                        .set(lsn_value as i64);
937                                }
938                            })
939                            .await;
940                        }
941                        Err(e) => {
942                            tracing::error!(
943                            error = %e.as_report(),
944                            "wait epoch {} failed", epoch.0
945                            );
946                        }
947                    }
948                }
949                None => {
950                    tracing::error!("wait epoch rx closed");
951                    break;
952                }
953            }
954        }
955    }
956}
957
958#[cfg(test)]
959mod tests {
960    use maplit::{btreemap, convert_args, hashmap};
961    use risingwave_common::catalog::{ColumnId, Field, TableId};
962    use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
963    use risingwave_common::test_prelude::StreamChunkTestExt;
964    use risingwave_common::util::epoch::{EpochExt, test_epoch};
965    use risingwave_connector::source::datagen::DatagenSplit;
966    use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
967    use risingwave_pb::catalog::StreamSourceInfo;
968    use risingwave_pb::plan_common::PbRowFormatType;
969    use risingwave_storage::memory::MemoryStateStore;
970    use tokio::sync::mpsc::unbounded_channel;
971    use tracing_test::traced_test;
972
973    use super::*;
974    use crate::executor::AddMutation;
975    use crate::executor::source::{SourceStateTableHandler, default_source_internal_table};
976    use crate::task::LocalBarrierManager;
977
978    const MOCK_SOURCE_NAME: &str = "mock_source";
979
980    #[tokio::test]
981    async fn test_source_executor() {
982        let table_id = TableId::default();
983        let schema = Schema {
984            fields: vec![Field::with_name(DataType::Int32, "sequence_int")],
985        };
986        let row_id_index = None;
987        let source_info = StreamSourceInfo {
988            row_format: PbRowFormatType::Native as i32,
989            ..Default::default()
990        };
991        let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
992        let column_ids = vec![0].into_iter().map(ColumnId::from).collect();
993
994        // This datagen will generate 3 rows at one time.
995        let properties = convert_args!(btreemap!(
996            "connector" => "datagen",
997            "datagen.rows.per.second" => "3",
998            "fields.sequence_int.kind" => "sequence",
999            "fields.sequence_int.start" => "11",
1000            "fields.sequence_int.end" => "11111",
1001        ));
1002        let source_desc_builder =
1003            create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1004        let split_state_store = SourceStateTableHandler::from_table_catalog(
1005            &default_source_internal_table(0x2333),
1006            MemoryStateStore::new(),
1007        )
1008        .await;
1009        let core = StreamSourceCore::<MemoryStateStore> {
1010            source_id: table_id,
1011            column_ids,
1012            source_desc_builder: Some(source_desc_builder),
1013            latest_split_info: HashMap::new(),
1014            split_state_store,
1015            updated_splits_in_epoch: HashMap::new(),
1016            source_name: MOCK_SOURCE_NAME.to_owned(),
1017        };
1018
1019        let system_params_manager = LocalSystemParamsManager::for_test();
1020
1021        let executor = SourceExecutor::new(
1022            ActorContext::for_test(0),
1023            core,
1024            Arc::new(StreamingMetrics::unused()),
1025            barrier_rx,
1026            system_params_manager.get_params(),
1027            None,
1028            false,
1029            LocalBarrierManager::for_test(),
1030        );
1031        let mut executor = executor.boxed().execute();
1032
1033        let init_barrier =
1034            Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
1035                splits: hashmap! {
1036                    ActorId::default() => vec![
1037                        SplitImpl::Datagen(DatagenSplit {
1038                            split_index: 0,
1039                            split_num: 1,
1040                            start_offset: None,
1041                        }),
1042                    ],
1043                },
1044                ..Default::default()
1045            }));
1046        barrier_tx.send(init_barrier).unwrap();
1047
1048        // Consume barrier.
1049        executor.next().await.unwrap().unwrap();
1050
1051        // Consume data chunk.
1052        let msg = executor.next().await.unwrap().unwrap();
1053
1054        // Row id will not be filled here.
1055        assert_eq!(
1056            msg.into_chunk().unwrap(),
1057            StreamChunk::from_pretty(
1058                " i
1059                + 11
1060                + 12
1061                + 13"
1062            )
1063        );
1064    }
1065
1066    #[traced_test]
1067    #[tokio::test]
1068    async fn test_split_change_mutation() {
1069        let table_id = TableId::default();
1070        let schema = Schema {
1071            fields: vec![Field::with_name(DataType::Int32, "v1")],
1072        };
1073        let row_id_index = None;
1074        let source_info = StreamSourceInfo {
1075            row_format: PbRowFormatType::Native as i32,
1076            ..Default::default()
1077        };
1078        let properties = convert_args!(btreemap!(
1079            "connector" => "datagen",
1080            "fields.v1.kind" => "sequence",
1081            "fields.v1.start" => "11",
1082            "fields.v1.end" => "11111",
1083        ));
1084
1085        let source_desc_builder =
1086            create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1087        let mem_state_store = MemoryStateStore::new();
1088
1089        let column_ids = vec![ColumnId::from(0)];
1090        let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1091        let split_state_store = SourceStateTableHandler::from_table_catalog(
1092            &default_source_internal_table(0x2333),
1093            mem_state_store.clone(),
1094        )
1095        .await;
1096
1097        let core = StreamSourceCore::<MemoryStateStore> {
1098            source_id: table_id,
1099            column_ids: column_ids.clone(),
1100            source_desc_builder: Some(source_desc_builder),
1101            latest_split_info: HashMap::new(),
1102            split_state_store,
1103            updated_splits_in_epoch: HashMap::new(),
1104            source_name: MOCK_SOURCE_NAME.to_owned(),
1105        };
1106
1107        let system_params_manager = LocalSystemParamsManager::for_test();
1108
1109        let executor = SourceExecutor::new(
1110            ActorContext::for_test(0),
1111            core,
1112            Arc::new(StreamingMetrics::unused()),
1113            barrier_rx,
1114            system_params_manager.get_params(),
1115            None,
1116            false,
1117            LocalBarrierManager::for_test(),
1118        );
1119        let mut handler = executor.boxed().execute();
1120
1121        let mut epoch = test_epoch(1);
1122        let init_barrier =
1123            Barrier::new_test_barrier(epoch).with_mutation(Mutation::Add(AddMutation {
1124                splits: hashmap! {
1125                    ActorId::default() => vec![
1126                        SplitImpl::Datagen(DatagenSplit {
1127                            split_index: 0,
1128                            split_num: 3,
1129                            start_offset: None,
1130                        }),
1131                    ],
1132                },
1133                ..Default::default()
1134            }));
1135        barrier_tx.send(init_barrier).unwrap();
1136
1137        // Consume barrier.
1138        handler
1139            .next()
1140            .await
1141            .unwrap()
1142            .unwrap()
1143            .into_barrier()
1144            .unwrap();
1145
1146        let mut ready_chunks = handler.ready_chunks(10);
1147
1148        let _ = ready_chunks.next().await.unwrap();
1149
1150        let new_assignment = vec![
1151            SplitImpl::Datagen(DatagenSplit {
1152                split_index: 0,
1153                split_num: 3,
1154                start_offset: None,
1155            }),
1156            SplitImpl::Datagen(DatagenSplit {
1157                split_index: 1,
1158                split_num: 3,
1159                start_offset: None,
1160            }),
1161            SplitImpl::Datagen(DatagenSplit {
1162                split_index: 2,
1163                split_num: 3,
1164                start_offset: None,
1165            }),
1166        ];
1167
1168        epoch.inc_epoch();
1169        let change_split_mutation =
1170            Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1171                ActorId::default() => new_assignment.clone()
1172            }));
1173
1174        barrier_tx.send(change_split_mutation).unwrap();
1175
1176        let _ = ready_chunks.next().await.unwrap(); // barrier
1177
1178        epoch.inc_epoch();
1179        let barrier = Barrier::new_test_barrier(epoch);
1180        barrier_tx.send(barrier).unwrap();
1181
1182        ready_chunks.next().await.unwrap(); // barrier
1183
1184        let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1185            &default_source_internal_table(0x2333),
1186            mem_state_store.clone(),
1187        )
1188        .await;
1189
1190        // there must exist state for new add partition
1191        source_state_handler
1192            .init_epoch(EpochPair::new_test_epoch(epoch))
1193            .await
1194            .unwrap();
1195        source_state_handler
1196            .get(&new_assignment[1].id())
1197            .await
1198            .unwrap()
1199            .unwrap();
1200
1201        tokio::time::sleep(Duration::from_millis(100)).await;
1202
1203        let _ = ready_chunks.next().await.unwrap();
1204
1205        epoch.inc_epoch();
1206        let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Pause);
1207        barrier_tx.send(barrier).unwrap();
1208
1209        epoch.inc_epoch();
1210        let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Resume);
1211        barrier_tx.send(barrier).unwrap();
1212
1213        // receive all
1214        ready_chunks.next().await.unwrap();
1215
1216        let prev_assignment = new_assignment;
1217        let new_assignment = vec![prev_assignment[2].clone()];
1218
1219        epoch.inc_epoch();
1220        let drop_split_mutation =
1221            Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1222                ActorId::default() => new_assignment.clone()
1223            }));
1224
1225        barrier_tx.send(drop_split_mutation).unwrap();
1226
1227        ready_chunks.next().await.unwrap(); // barrier
1228
1229        epoch.inc_epoch();
1230        let barrier = Barrier::new_test_barrier(epoch);
1231        barrier_tx.send(barrier).unwrap();
1232
1233        ready_chunks.next().await.unwrap(); // barrier
1234
1235        let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1236            &default_source_internal_table(0x2333),
1237            mem_state_store.clone(),
1238        )
1239        .await;
1240
1241        let new_epoch = EpochPair::new_test_epoch(epoch);
1242        source_state_handler.init_epoch(new_epoch).await.unwrap();
1243
1244        let committed_reader = source_state_handler
1245            .new_committed_reader(new_epoch)
1246            .await
1247            .unwrap();
1248        assert!(
1249            committed_reader
1250                .try_recover_from_state_store(&prev_assignment[0])
1251                .await
1252                .unwrap()
1253                .is_none()
1254        );
1255
1256        assert!(
1257            committed_reader
1258                .try_recover_from_state_store(&prev_assignment[1])
1259                .await
1260                .unwrap()
1261                .is_none()
1262        );
1263
1264        assert!(
1265            committed_reader
1266                .try_recover_from_state_store(&prev_assignment[2])
1267                .await
1268                .unwrap()
1269                .is_some()
1270        );
1271    }
1272}