risingwave_stream/executor/source/
source_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use either::Either;
20use itertools::Itertools;
21use prometheus::core::{AtomicU64, GenericCounter};
22use risingwave_common::array::ArrayRef;
23use risingwave_common::catalog::{ColumnId, TableId};
24use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedMetric};
25use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::util::epoch::{Epoch, EpochPair};
28use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
29use risingwave_connector::source::cdc::split::extract_postgres_lsn_from_offset_str;
30use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
31use risingwave_connector::source::reader::reader::SourceReader;
32use risingwave_connector::source::{
33    ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData,
34    StreamChunkWithState, WaitCheckpointTask, build_pulsar_ack_channel_id,
35};
36use risingwave_hummock_sdk::HummockReadEpoch;
37use risingwave_pb::id::SourceId;
38use risingwave_storage::store::TryWaitEpochOptions;
39use thiserror_ext::AsReport;
40use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
41use tokio::sync::{mpsc, oneshot};
42use tokio::time::Instant;
43
44use super::executor_core::StreamSourceCore;
45use super::{barrier_to_message_stream, get_split_offset_col_idx, prune_additional_cols};
46use crate::common::rate_limit::limited_chunk_size;
47use crate::executor::UpdateMutation;
48use crate::executor::prelude::*;
49use crate::executor::source::reader_stream::StreamReaderBuilder;
50use crate::executor::stream_reader::StreamReaderWithPause;
51use crate::task::LocalBarrierManager;
52
53/// A constant to multiply when calculating the maximum time to wait for a barrier. This is due to
54/// some latencies in network and cost in meta.
55pub const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;
56
57pub struct SourceExecutor<S: StateStore> {
58    actor_ctx: ActorContextRef,
59
60    /// Streaming source for external
61    stream_source_core: StreamSourceCore<S>,
62
63    /// Metrics for monitor.
64    metrics: Arc<StreamingMetrics>,
65
66    /// Receiver of barrier channel.
67    barrier_receiver: Option<UnboundedReceiver<Barrier>>,
68
69    /// System parameter reader to read barrier interval
70    system_params: SystemParamsReaderRef,
71
72    /// Rate limit in rows/s.
73    rate_limit_rps: Option<u32>,
74
75    is_shared_non_cdc: bool,
76
77    /// Local barrier manager for reporting source load finished events
78    _barrier_manager: LocalBarrierManager,
79}
80
81impl<S: StateStore> SourceExecutor<S> {
82    #[expect(clippy::too_many_arguments)]
83    pub fn new(
84        actor_ctx: ActorContextRef,
85        stream_source_core: StreamSourceCore<S>,
86        metrics: Arc<StreamingMetrics>,
87        barrier_receiver: UnboundedReceiver<Barrier>,
88        system_params: SystemParamsReaderRef,
89        rate_limit_rps: Option<u32>,
90        is_shared_non_cdc: bool,
91        barrier_manager: LocalBarrierManager,
92    ) -> Self {
93        Self {
94            actor_ctx,
95            stream_source_core,
96            metrics,
97            barrier_receiver: Some(barrier_receiver),
98            system_params,
99            rate_limit_rps,
100            is_shared_non_cdc,
101            _barrier_manager: barrier_manager,
102        }
103    }
104
105    fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder {
106        StreamReaderBuilder {
107            source_desc,
108            rate_limit: self.rate_limit_rps,
109            source_id: self.stream_source_core.source_id,
110            source_name: self.stream_source_core.source_name.clone(),
111            is_auto_schema_change_enable: self.is_auto_schema_change_enable(),
112            actor_ctx: self.actor_ctx.clone(),
113            reader_stream: None,
114        }
115    }
116
117    async fn spawn_wait_checkpoint_worker(
118        core: &StreamSourceCore<S>,
119        source_reader: SourceReader,
120        metrics: Arc<StreamingMetrics>,
121    ) -> StreamExecutorResult<Option<WaitCheckpointTaskBuilder>> {
122        let Some(initial_task) = source_reader.create_wait_checkpoint_task().await? else {
123            return Ok(None);
124        };
125        let (wait_checkpoint_tx, wait_checkpoint_rx) = mpsc::unbounded_channel();
126        let wait_checkpoint_worker = WaitCheckpointWorker {
127            wait_checkpoint_rx,
128            state_store: core.split_state_store.state_table().state_store().clone(),
129            table_id: core.split_state_store.state_table().table_id(),
130            metrics,
131        };
132        tokio::spawn(wait_checkpoint_worker.run());
133        Ok(Some(WaitCheckpointTaskBuilder {
134            wait_checkpoint_tx,
135            source_reader,
136            building_task: initial_task,
137        }))
138    }
139
140    /// build the source column ids and the source context which will be used to build the source stream
141    pub fn prepare_source_stream_build(
142        &self,
143        source_desc: &SourceDesc,
144    ) -> (Vec<ColumnId>, SourceContext) {
145        let column_ids = source_desc
146            .columns
147            .iter()
148            .map(|column_desc| column_desc.column_id)
149            .collect_vec();
150
151        let (schema_change_tx, mut schema_change_rx) =
152            mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
153        let schema_change_tx = if self.is_auto_schema_change_enable() {
154            let meta_client = self.actor_ctx.meta_client.clone();
155            // spawn a task to handle schema change event from source parser
156            let _join_handle = tokio::task::spawn(async move {
157                while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
158                    let table_ids = schema_change.table_ids();
159                    tracing::info!(
160                        target: "auto_schema_change",
161                        "recv a schema change event for tables: {:?}", table_ids);
162                    // TODO: retry on rpc error
163                    if let Some(ref meta_client) = meta_client {
164                        match meta_client
165                            .auto_schema_change(schema_change.to_protobuf())
166                            .await
167                        {
168                            Ok(_) => {
169                                tracing::info!(
170                                    target: "auto_schema_change",
171                                    "schema change success for tables: {:?}", table_ids);
172                                finish_tx.send(()).unwrap();
173                            }
174                            Err(e) => {
175                                tracing::error!(
176                                    target: "auto_schema_change",
177                                    error = ?e.as_report(), "schema change error");
178                                finish_tx.send(()).unwrap();
179                            }
180                        }
181                    }
182                }
183            });
184            Some(schema_change_tx)
185        } else {
186            info!("auto schema change is disabled in config");
187            None
188        };
189        let source_ctx = SourceContext::new(
190            self.actor_ctx.id,
191            self.stream_source_core.source_id,
192            self.actor_ctx.fragment_id,
193            self.stream_source_core.source_name.clone(),
194            source_desc.metrics.clone(),
195            SourceCtrlOpts {
196                chunk_size: limited_chunk_size(self.rate_limit_rps),
197                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
198            },
199            source_desc.source.config.clone(),
200            schema_change_tx,
201        );
202
203        (column_ids, source_ctx)
204    }
205
206    fn is_auto_schema_change_enable(&self) -> bool {
207        self.actor_ctx.config.developer.enable_auto_schema_change
208    }
209
210    /// `source_id | source_name | actor_id | fragment_id`
211    #[inline]
212    fn get_metric_labels(&self) -> [String; 4] {
213        [
214            self.stream_source_core.source_id.to_string(),
215            self.stream_source_core.source_name.clone(),
216            self.actor_ctx.id.to_string(),
217            self.actor_ctx.fragment_id.to_string(),
218        ]
219    }
220
221    /// - `should_trim_state`: whether to trim state for dropped splits.
222    ///
223    ///   For scaling, the connector splits can be migrated to other actors, but
224    ///   won't be added or removed. Actors should not trim states for splits that
225    ///   are moved to other actors.
226    ///
227    ///   For source split change, split will not be migrated and we can trim states
228    ///   for deleted splits.
229    async fn apply_split_change_after_yield_barrier<const BIASED: bool>(
230        &mut self,
231        barrier_epoch: EpochPair,
232        source_desc: &SourceDesc,
233        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
234        apply_mutation: ApplyMutationAfterBarrier<'_>,
235    ) -> StreamExecutorResult<()> {
236        {
237            let mut should_rebuild_stream = false;
238            match apply_mutation {
239                ApplyMutationAfterBarrier::SplitChange {
240                    target_splits,
241                    should_trim_state,
242                    split_change_count,
243                } => {
244                    split_change_count.inc();
245                    if self
246                        .update_state_if_changed(barrier_epoch, target_splits, should_trim_state)
247                        .await?
248                    {
249                        should_rebuild_stream = true;
250                    }
251                }
252                ApplyMutationAfterBarrier::ConnectorPropsChange => {
253                    should_rebuild_stream = true;
254                }
255            }
256
257            if should_rebuild_stream {
258                self.rebuild_stream_reader(source_desc, stream)?;
259            }
260        }
261
262        Ok(())
263    }
264
265    /// Returns `true` if split changed. Otherwise `false`.
266    async fn update_state_if_changed(
267        &mut self,
268        barrier_epoch: EpochPair,
269        target_splits: Vec<SplitImpl>,
270        should_trim_state: bool,
271    ) -> StreamExecutorResult<bool> {
272        let core = &mut self.stream_source_core;
273
274        let target_splits: HashMap<_, _> = target_splits
275            .into_iter()
276            .map(|split| (split.id(), split))
277            .collect();
278
279        let mut target_state: HashMap<SplitId, SplitImpl> =
280            HashMap::with_capacity(target_splits.len());
281
282        let mut split_changed = false;
283
284        let committed_reader = core
285            .split_state_store
286            .new_committed_reader(barrier_epoch)
287            .await?;
288
289        // Checks added splits
290        for (split_id, split) in target_splits {
291            if let Some(s) = core.latest_split_info.get(&split_id) {
292                // For existing splits, we should use the latest offset from the cache.
293                // `target_splits` is from meta and contains the initial offset.
294                target_state.insert(split_id, s.clone());
295            } else {
296                split_changed = true;
297                // write new assigned split to state cache. snapshot is base on cache.
298
299                let initial_state = if let Some(recover_state) = committed_reader
300                    .try_recover_from_state_store(&split)
301                    .await?
302                {
303                    recover_state
304                } else {
305                    split
306                };
307
308                core.updated_splits_in_epoch
309                    .entry(split_id.clone())
310                    .or_insert_with(|| initial_state.clone());
311
312                target_state.insert(split_id, initial_state);
313            }
314        }
315
316        // Checks dropped splits
317        for existing_split_id in core.latest_split_info.keys() {
318            if !target_state.contains_key(existing_split_id) {
319                tracing::info!("split dropping detected: {}", existing_split_id);
320                split_changed = true;
321            }
322        }
323
324        if split_changed {
325            tracing::info!(
326                actor_id = %self.actor_ctx.id,
327                state = ?target_state,
328                "apply split change"
329            );
330
331            core.updated_splits_in_epoch
332                .retain(|split_id, _| target_state.contains_key(split_id));
333
334            let dropped_splits = core
335                .latest_split_info
336                .extract_if(|split_id, _| !target_state.contains_key(split_id))
337                .map(|(_, split)| split)
338                .collect_vec();
339
340            if should_trim_state && !dropped_splits.is_empty() {
341                // trim dropped splits' state
342                core.split_state_store.trim_state(&dropped_splits).await?;
343            }
344
345            core.latest_split_info = target_state;
346        }
347
348        Ok(split_changed)
349    }
350
351    /// Rebuild stream if there is a err in stream
352    fn rebuild_stream_reader_from_error<const BIASED: bool>(
353        &mut self,
354        source_desc: &SourceDesc,
355        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
356        e: StreamExecutorError,
357    ) -> StreamExecutorResult<()> {
358        let core = &mut self.stream_source_core;
359        tracing::error!(
360            error = ?e.as_report(),
361            actor_id = %self.actor_ctx.id,
362            source_id = %core.source_id,
363            "stream source reader error",
364        );
365        GLOBAL_ERROR_METRICS.user_source_error.report([
366            e.variant_name().to_owned(),
367            core.source_id.to_string(),
368            core.source_name.clone(),
369            self.actor_ctx.fragment_id.to_string(),
370        ]);
371
372        self.rebuild_stream_reader(source_desc, stream)
373    }
374
375    fn rebuild_stream_reader<const BIASED: bool>(
376        &mut self,
377        source_desc: &SourceDesc,
378        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
379    ) -> StreamExecutorResult<()> {
380        let core = &mut self.stream_source_core;
381        let target_state: Vec<SplitImpl> = core.latest_split_info.values().cloned().collect();
382
383        tracing::info!(
384            "actor {:?} apply source split change to {:?}",
385            self.actor_ctx.id,
386            target_state
387        );
388
389        // Replace the source reader with a new one of the new state.
390        let reader_stream_builder = self.stream_reader_builder(source_desc.clone());
391        let reader_stream = reader_stream_builder.into_retry_stream(Some(target_state), false);
392
393        stream.replace_data_stream(reader_stream);
394
395        Ok(())
396    }
397
398    async fn persist_state_and_clear_cache(
399        &mut self,
400        epoch: EpochPair,
401    ) -> StreamExecutorResult<HashMap<SplitId, SplitImpl>> {
402        let core = &mut self.stream_source_core;
403
404        let cache = core
405            .updated_splits_in_epoch
406            .values()
407            .map(|split_impl| split_impl.to_owned())
408            .collect_vec();
409
410        if !cache.is_empty() {
411            tracing::debug!(state = ?cache, "take snapshot");
412
413            // Record metrics for CDC sources before moving cache
414            let source_id = core.source_id.to_string();
415            for split_impl in &cache {
416                // Extract and record CDC-specific metrics based on split type
417                match split_impl {
418                    SplitImpl::PostgresCdc(pg_split) => {
419                        if let Some(lsn_value) = pg_split.pg_lsn() {
420                            self.metrics
421                                .pg_cdc_state_table_lsn
422                                .with_guarded_label_values(&[&source_id])
423                                .set(lsn_value as i64);
424                        }
425                    }
426                    SplitImpl::MysqlCdc(mysql_split) => {
427                        if let Some((file_seq, position)) = mysql_split.mysql_binlog_offset() {
428                            self.metrics
429                                .mysql_cdc_state_binlog_file_seq
430                                .with_guarded_label_values(&[&source_id])
431                                .set(file_seq as i64);
432
433                            self.metrics
434                                .mysql_cdc_state_binlog_position
435                                .with_guarded_label_values(&[&source_id])
436                                .set(position as i64);
437                        }
438                    }
439                    _ => {}
440                }
441            }
442
443            core.split_state_store.set_states(cache).await?;
444        }
445
446        // commit anyway, even if no message saved
447        core.split_state_store.commit(epoch).await?;
448
449        let updated_splits = core.updated_splits_in_epoch.clone();
450
451        core.updated_splits_in_epoch.clear();
452
453        Ok(updated_splits)
454    }
455
456    /// try mem table spill
457    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
458        let core = &mut self.stream_source_core;
459        core.split_state_store.try_flush().await?;
460
461        Ok(())
462    }
463
464    /// A source executor with a stream source receives:
465    /// 1. Barrier messages
466    /// 2. Data from external source
467    /// and acts accordingly.
468    #[try_stream(ok = Message, error = StreamExecutorError)]
469    async fn execute_inner(mut self) {
470        let mut barrier_receiver = self.barrier_receiver.take().unwrap();
471        let first_barrier = barrier_receiver
472            .recv()
473            .instrument_await("source_recv_first_barrier")
474            .await
475            .ok_or_else(|| {
476                anyhow!(
477                    "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
478                    self.actor_ctx.id,
479                    self.stream_source_core.source_id
480                )
481            })?;
482        let first_epoch = first_barrier.epoch;
483        let mut boot_state =
484            if let Some(splits) = first_barrier.initial_split_assignment(self.actor_ctx.id) {
485                tracing::debug!(?splits, "boot with splits");
486                splits.to_vec()
487            } else {
488                Vec::default()
489            };
490        let is_pause_on_startup = first_barrier.is_pause_on_startup();
491        let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id);
492
493        yield Message::Barrier(first_barrier);
494
495        let mut core = self.stream_source_core;
496        let source_id = core.source_id;
497
498        // Build source description from the builder.
499        let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap();
500        let mut source_desc = source_desc_builder
501            .build()
502            .map_err(StreamExecutorError::connector_error)?;
503
504        let mut wait_checkpoint_task_builder = Self::spawn_wait_checkpoint_worker(
505            &core,
506            source_desc.source.clone(),
507            self.metrics.clone(),
508        )
509        .await?;
510
511        let (Some(split_idx), Some(offset_idx), pulsar_message_id_idx) =
512            get_split_offset_col_idx(&source_desc.columns)
513        else {
514            unreachable!("Partition and offset columns must be set.");
515        };
516
517        core.split_state_store.init_epoch(first_epoch).await?;
518        {
519            let committed_reader = core
520                .split_state_store
521                .new_committed_reader(first_epoch)
522                .await?;
523            for ele in &mut boot_state {
524                if let Some(recover_state) =
525                    committed_reader.try_recover_from_state_store(ele).await?
526                {
527                    *ele = recover_state;
528                    // if state store is non-empty, we consider it's initialized.
529                    is_uninitialized = false;
530                } else {
531                    // This is a new split, not in state table.
532                    // make sure it is written to state table later.
533                    // Then even it receives no messages, we can observe it in state table.
534                    core.updated_splits_in_epoch.insert(ele.id(), ele.clone());
535                }
536            }
537        }
538
539        // init in-memory split states with persisted state if any
540        core.init_split_state(boot_state.clone());
541
542        // Return the ownership of `stream_source_core` to the source executor.
543        self.stream_source_core = core;
544
545        let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
546        tracing::debug!(state = ?recover_state, "start with state");
547
548        let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
549        let mut reader_stream_builder = self.stream_reader_builder(source_desc.clone());
550        let mut latest_splits = None;
551        // Build the source stream reader.
552        if is_uninitialized {
553            let create_split_reader_result = reader_stream_builder
554                .fetch_latest_splits(recover_state.clone(), self.is_shared_non_cdc)
555                .await?;
556            latest_splits = create_split_reader_result.latest_splits;
557        }
558
559        if let Some(latest_splits) = latest_splits {
560            // make sure it is written to state table later.
561            // Then even it receives no messages, we can observe it in state table.
562            self.stream_source_core
563                .updated_splits_in_epoch
564                .extend(latest_splits.into_iter().map(|s| (s.id(), s)));
565        }
566        // Merge the chunks from source and the barriers into a single stream. We prioritize
567        // barriers over source data chunks here.
568        let mut stream = StreamReaderWithPause::<true, StreamChunkWithState>::new(
569            barrier_stream,
570            reader_stream_builder
571                .into_retry_stream(recover_state, is_uninitialized && self.is_shared_non_cdc),
572        );
573        let mut command_paused = false;
574
575        // - If the first barrier requires us to pause on startup, pause the stream.
576        if is_pause_on_startup {
577            tracing::info!("source paused on startup");
578            stream.pause_stream();
579            command_paused = true;
580        }
581
582        // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms`
583        // milliseconds, considering some other latencies like network and cost in Meta.
584        let mut max_wait_barrier_time_ms =
585            self.system_params.load().barrier_interval_ms() as u128 * WAIT_BARRIER_MULTIPLE_TIMES;
586        let mut last_barrier_time = Instant::now();
587        let mut self_paused = false;
588
589        let source_output_row_count = self
590            .metrics
591            .source_output_row_count
592            .with_guarded_label_values(&self.get_metric_labels());
593
594        let source_split_change_count = self
595            .metrics
596            .source_split_change_count
597            .with_guarded_label_values(&self.get_metric_labels());
598
599        while let Some(msg) = stream.next().await {
600            let Ok(msg) = msg else {
601                tokio::time::sleep(Duration::from_millis(1000)).await;
602                self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err())?;
603                continue;
604            };
605
606            match msg {
607                // This branch will be preferred.
608                Either::Left(Message::Barrier(barrier)) => {
609                    last_barrier_time = Instant::now();
610
611                    if self_paused {
612                        self_paused = false;
613                        // command_paused has a higher priority.
614                        if !command_paused {
615                            stream.resume_stream();
616                        }
617                    }
618
619                    let epoch = barrier.epoch;
620                    let mut split_change = None;
621
622                    if let Some(mutation) = barrier.mutation.as_deref() {
623                        match mutation {
624                            Mutation::Pause => {
625                                command_paused = true;
626                                stream.pause_stream()
627                            }
628                            Mutation::Resume => {
629                                command_paused = false;
630                                stream.resume_stream()
631                            }
632                            Mutation::SourceChangeSplit(actor_splits) => {
633                                tracing::info!(
634                                    actor_id = %self.actor_ctx.id,
635                                    actor_splits = ?actor_splits,
636                                    "source change split received"
637                                );
638
639                                split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
640                                    |target_splits| {
641                                        (
642                                            &source_desc,
643                                            &mut stream,
644                                            ApplyMutationAfterBarrier::SplitChange {
645                                                target_splits,
646                                                should_trim_state: true,
647                                                split_change_count: &source_split_change_count,
648                                            },
649                                        )
650                                    },
651                                );
652                            }
653
654                            Mutation::ConnectorPropsChange(maybe_mutation) => {
655                                if let Some(new_props) = maybe_mutation.get(&source_id.as_raw_id())
656                                {
657                                    // rebuild the stream reader with new props
658                                    tracing::info!(
659                                        "updating source properties from {:?} to {:?}",
660                                        source_desc.source.config,
661                                        new_props
662                                    );
663                                    source_desc.update_reader(new_props.clone())?;
664                                    // suppose the connector props change will not involve state change
665                                    split_change = Some((
666                                        &source_desc,
667                                        &mut stream,
668                                        ApplyMutationAfterBarrier::ConnectorPropsChange,
669                                    ));
670                                }
671                            }
672
673                            Mutation::Update(UpdateMutation { actor_splits, .. }) => {
674                                split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
675                                    |target_splits| {
676                                        (
677                                            &source_desc,
678                                            &mut stream,
679                                            ApplyMutationAfterBarrier::SplitChange {
680                                                target_splits,
681                                                should_trim_state: false,
682                                                split_change_count: &source_split_change_count,
683                                            },
684                                        )
685                                    },
686                                );
687                            }
688                            Mutation::Throttle(actor_to_apply) => {
689                                if let Some(new_rate_limit) = actor_to_apply.get(&self.actor_ctx.id)
690                                    && *new_rate_limit != self.rate_limit_rps
691                                {
692                                    tracing::info!(
693                                        "updating rate limit from {:?} to {:?}",
694                                        self.rate_limit_rps,
695                                        *new_rate_limit
696                                    );
697                                    self.rate_limit_rps = *new_rate_limit;
698                                    // recreate from latest_split_info
699                                    self.rebuild_stream_reader(&source_desc, &mut stream)?;
700                                }
701                            }
702                            _ => {}
703                        }
704                    }
705
706                    let updated_splits = self.persist_state_and_clear_cache(epoch).await?;
707
708                    // when handle a checkpoint barrier, spawn a task to wait for epoch commit notification
709                    if barrier.kind.is_checkpoint()
710                        && let Some(task_builder) = &mut wait_checkpoint_task_builder
711                    {
712                        task_builder.update_task_on_checkpoint(updated_splits);
713
714                        tracing::debug!("epoch to wait {:?}", epoch);
715                        task_builder.send(Epoch(epoch.prev)).await?
716                    }
717
718                    let barrier_epoch = barrier.epoch;
719                    yield Message::Barrier(barrier);
720
721                    if let Some((source_desc, stream, to_apply_mutation)) = split_change {
722                        self.apply_split_change_after_yield_barrier(
723                            barrier_epoch,
724                            source_desc,
725                            stream,
726                            to_apply_mutation,
727                        )
728                        .await?;
729                    }
730                }
731                Either::Left(_) => {
732                    // For the source executor, the message we receive from this arm
733                    // should always be barrier message.
734                    unreachable!();
735                }
736
737                Either::Right((chunk, latest_state)) => {
738                    if let Some(task_builder) = &mut wait_checkpoint_task_builder {
739                        if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
740                            let pulsar_message_id_col = chunk.column_at(pulsar_message_id_idx);
741                            task_builder.update_task_on_chunk(
742                                source_id,
743                                &latest_state,
744                                pulsar_message_id_col.clone(),
745                            );
746                        } else {
747                            let offset_col = chunk.column_at(offset_idx);
748                            task_builder.update_task_on_chunk(
749                                source_id,
750                                &latest_state,
751                                offset_col.clone(),
752                            );
753                        }
754                    }
755                    if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
756                        // Exceeds the max wait barrier time, the source will be paused.
757                        // Currently we can guarantee the
758                        // source is not paused since it received stream
759                        // chunks.
760                        self_paused = true;
761                        tracing::warn!(
762                            "source paused, wait barrier for {:?}",
763                            last_barrier_time.elapsed()
764                        );
765                        stream.pause_stream();
766
767                        // Only update `max_wait_barrier_time_ms` to capture
768                        // `barrier_interval_ms`
769                        // changes here to avoid frequently accessing the shared
770                        // `system_params`.
771                        max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
772                            as u128
773                            * WAIT_BARRIER_MULTIPLE_TIMES;
774                    }
775
776                    latest_state.iter().for_each(|(split_id, new_split_impl)| {
777                        if let Some(split_impl) =
778                            self.stream_source_core.latest_split_info.get_mut(split_id)
779                        {
780                            *split_impl = new_split_impl.clone();
781                        }
782                    });
783
784                    self.stream_source_core
785                        .updated_splits_in_epoch
786                        .extend(latest_state);
787
788                    let card = chunk.cardinality();
789                    if card == 0 {
790                        continue;
791                    }
792                    source_output_row_count.inc_by(card as u64);
793                    let to_remove_col_indices =
794                        if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
795                            vec![split_idx, offset_idx, pulsar_message_id_idx]
796                        } else {
797                            vec![split_idx, offset_idx]
798                        };
799                    let chunk =
800                        prune_additional_cols(&chunk, &to_remove_col_indices, &source_desc.columns);
801                    yield Message::Chunk(chunk);
802                    self.try_flush_data().await?;
803                }
804            }
805        }
806
807        // The source executor should only be stopped by the actor when finding a `Stop` mutation.
808        tracing::error!(
809            actor_id = %self.actor_ctx.id,
810            "source executor exited unexpectedly"
811        )
812    }
813}
814
815#[derive(Debug, Clone)]
816enum ApplyMutationAfterBarrier<'a> {
817    SplitChange {
818        target_splits: Vec<SplitImpl>,
819        should_trim_state: bool,
820        split_change_count: &'a LabelGuardedMetric<GenericCounter<AtomicU64>>,
821    },
822    ConnectorPropsChange,
823}
824
825impl<S: StateStore> Execute for SourceExecutor<S> {
826    fn execute(self: Box<Self>) -> BoxedMessageStream {
827        self.execute_inner().boxed()
828    }
829}
830
831impl<S: StateStore> Debug for SourceExecutor<S> {
832    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
833        f.debug_struct("SourceExecutor")
834            .field("source_id", &self.stream_source_core.source_id)
835            .field("column_ids", &self.stream_source_core.column_ids)
836            .finish()
837    }
838}
839
840struct WaitCheckpointTaskBuilder {
841    wait_checkpoint_tx: UnboundedSender<(Epoch, WaitCheckpointTask)>,
842    source_reader: SourceReader,
843    building_task: WaitCheckpointTask,
844}
845
846impl WaitCheckpointTaskBuilder {
847    fn update_task_on_chunk(
848        &mut self,
849        source_id: SourceId,
850        latest_state: &HashMap<SplitId, SplitImpl>,
851        offset_col: ArrayRef,
852    ) {
853        match &mut self.building_task {
854            WaitCheckpointTask::AckPubsubMessage(_, arrays) => {
855                arrays.push(offset_col);
856            }
857            WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => {
858                arrays.push(offset_col);
859            }
860            WaitCheckpointTask::AckPulsarMessage(arrays) => {
861                // each pulsar chunk will only contain one split
862                let split_id = latest_state.keys().next().unwrap();
863                let pulsar_ack_channel_id = build_pulsar_ack_channel_id(source_id, split_id);
864                arrays.push((pulsar_ack_channel_id, offset_col));
865            }
866            WaitCheckpointTask::CommitCdcOffset(_) => {}
867        }
868    }
869
870    fn update_task_on_checkpoint(&mut self, updated_splits: HashMap<SplitId, SplitImpl>) {
871        #[expect(clippy::single_match)]
872        match &mut self.building_task {
873            WaitCheckpointTask::CommitCdcOffset(offsets) => {
874                if !updated_splits.is_empty() {
875                    // cdc source only has one split
876                    assert_eq!(1, updated_splits.len());
877                    for (split_id, split_impl) in updated_splits {
878                        if split_impl.is_cdc_split() {
879                            *offsets = Some((split_id, split_impl.get_cdc_split_offset()));
880                        } else {
881                            unreachable!()
882                        }
883                    }
884                }
885            }
886            _ => {}
887        }
888    }
889
890    /// Send and reset the building task to a new one.
891    async fn send(&mut self, epoch: Epoch) -> Result<(), anyhow::Error> {
892        let new_task = self
893            .source_reader
894            .create_wait_checkpoint_task()
895            .await?
896            .expect("wait checkpoint task should be created");
897        self.wait_checkpoint_tx
898            .send((epoch, std::mem::replace(&mut self.building_task, new_task)))
899            .expect("wait_checkpoint_tx send should succeed");
900        Ok(())
901    }
902}
903
904/// A worker used to do some work after each checkpoint epoch is committed.
905///
906/// # Usage Cases
907///
908/// Typically there are 2 issues related with ack on checkpoint:
909///
910/// 1. Correctness (at-least-once), or don't let upstream clean uncommitted data.
911///    For message queueing semantics (delete after ack), we should ack to avoid redelivery,
912///    and only ack after checkpoint to avoid data loss.
913///
914/// 2. Allow upstream to clean data after commit.
915///
916/// See also <https://github.com/risingwavelabs/risingwave/issues/16736#issuecomment-2109379790>
917///
918/// ## CDC
919///
920/// Commit last consumed offset to upstream DB, so that old data can be discarded.
921///
922/// ## Google Pub/Sub
923///
924/// Due to queueing semantics.
925/// Although Pub/Sub supports `retain_acked_messages` and `seek` functionality,
926/// it's quite limited unlike Kafka.
927///
928/// See also <https://cloud.google.com/pubsub/docs/subscribe-best-practices#process-messages>
929struct WaitCheckpointWorker<S: StateStore> {
930    wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>,
931    state_store: S,
932    table_id: TableId,
933    metrics: Arc<StreamingMetrics>,
934}
935
936impl<S: StateStore> WaitCheckpointWorker<S> {
937    pub async fn run(mut self) {
938        tracing::debug!("wait epoch worker start success");
939        loop {
940            // poll the rx and wait for the epoch commit
941            match self.wait_checkpoint_rx.recv().await {
942                Some((epoch, task)) => {
943                    tracing::debug!("start to wait epoch {}", epoch.0);
944                    let ret = self
945                        .state_store
946                        .try_wait_epoch(
947                            HummockReadEpoch::Committed(epoch.0),
948                            TryWaitEpochOptions {
949                                table_id: self.table_id,
950                            },
951                        )
952                        .await;
953
954                    match ret {
955                        Ok(()) => {
956                            tracing::debug!(epoch = epoch.0, "wait epoch success");
957
958                            // Run task with callback to record LSN after successful commit
959                            task.run_with_on_commit_success(|source_id: u64, offset| {
960                                if let Some(lsn_value) =
961                                    extract_postgres_lsn_from_offset_str(offset)
962                                {
963                                    self.metrics
964                                        .pg_cdc_jni_commit_offset_lsn
965                                        .with_guarded_label_values(&[&source_id.to_string()])
966                                        .set(lsn_value as i64);
967                                }
968                            })
969                            .await;
970                        }
971                        Err(e) => {
972                            tracing::error!(
973                            error = %e.as_report(),
974                            "wait epoch {} failed", epoch.0
975                            );
976                        }
977                    }
978                }
979                None => {
980                    tracing::error!("wait epoch rx closed");
981                    break;
982                }
983            }
984        }
985    }
986}
987
988#[cfg(test)]
989mod tests {
990    use maplit::{btreemap, convert_args, hashmap};
991    use risingwave_common::catalog::{ColumnId, Field};
992    use risingwave_common::id::SourceId;
993    use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
994    use risingwave_common::test_prelude::StreamChunkTestExt;
995    use risingwave_common::util::epoch::{EpochExt, test_epoch};
996    use risingwave_connector::source::datagen::DatagenSplit;
997    use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
998    use risingwave_pb::catalog::StreamSourceInfo;
999    use risingwave_pb::plan_common::PbRowFormatType;
1000    use risingwave_storage::memory::MemoryStateStore;
1001    use tokio::sync::mpsc::unbounded_channel;
1002    use tracing_test::traced_test;
1003
1004    use super::*;
1005    use crate::executor::AddMutation;
1006    use crate::executor::source::{SourceStateTableHandler, default_source_internal_table};
1007    use crate::task::LocalBarrierManager;
1008
1009    const MOCK_SOURCE_NAME: &str = "mock_source";
1010
1011    #[tokio::test]
1012    async fn test_source_executor() {
1013        let source_id = 0.into();
1014        let schema = Schema {
1015            fields: vec![Field::with_name(DataType::Int32, "sequence_int")],
1016        };
1017        let row_id_index = None;
1018        let source_info = StreamSourceInfo {
1019            row_format: PbRowFormatType::Native as i32,
1020            ..Default::default()
1021        };
1022        let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1023        let column_ids = vec![0].into_iter().map(ColumnId::from).collect();
1024
1025        // This datagen will generate 3 rows at one time.
1026        let properties = convert_args!(btreemap!(
1027            "connector" => "datagen",
1028            "datagen.rows.per.second" => "3",
1029            "fields.sequence_int.kind" => "sequence",
1030            "fields.sequence_int.start" => "11",
1031            "fields.sequence_int.end" => "11111",
1032        ));
1033        let source_desc_builder =
1034            create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1035        let split_state_store = SourceStateTableHandler::from_table_catalog(
1036            &default_source_internal_table(0x2333),
1037            MemoryStateStore::new(),
1038        )
1039        .await;
1040        let core = StreamSourceCore::<MemoryStateStore> {
1041            source_id,
1042            column_ids,
1043            source_desc_builder: Some(source_desc_builder),
1044            latest_split_info: HashMap::new(),
1045            split_state_store,
1046            updated_splits_in_epoch: HashMap::new(),
1047            source_name: MOCK_SOURCE_NAME.to_owned(),
1048        };
1049
1050        let system_params_manager = LocalSystemParamsManager::for_test();
1051
1052        let executor = SourceExecutor::new(
1053            ActorContext::for_test(0),
1054            core,
1055            Arc::new(StreamingMetrics::unused()),
1056            barrier_rx,
1057            system_params_manager.get_params(),
1058            None,
1059            false,
1060            LocalBarrierManager::for_test(),
1061        );
1062        let mut executor = executor.boxed().execute();
1063
1064        let init_barrier =
1065            Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
1066                splits: hashmap! {
1067                    ActorId::default() => vec![
1068                        SplitImpl::Datagen(DatagenSplit {
1069                            split_index: 0,
1070                            split_num: 1,
1071                            start_offset: None,
1072                        }),
1073                    ],
1074                },
1075                ..Default::default()
1076            }));
1077        barrier_tx.send(init_barrier).unwrap();
1078
1079        // Consume barrier.
1080        executor.next().await.unwrap().unwrap();
1081
1082        // Consume data chunk.
1083        let msg = executor.next().await.unwrap().unwrap();
1084
1085        // Row id will not be filled here.
1086        assert_eq!(
1087            msg.into_chunk().unwrap(),
1088            StreamChunk::from_pretty(
1089                " i
1090                + 11
1091                + 12
1092                + 13"
1093            )
1094        );
1095    }
1096
1097    #[traced_test]
1098    #[tokio::test]
1099    async fn test_split_change_mutation() {
1100        let source_id = SourceId::new(0);
1101        let schema = Schema {
1102            fields: vec![Field::with_name(DataType::Int32, "v1")],
1103        };
1104        let row_id_index = None;
1105        let source_info = StreamSourceInfo {
1106            row_format: PbRowFormatType::Native as i32,
1107            ..Default::default()
1108        };
1109        let properties = convert_args!(btreemap!(
1110            "connector" => "datagen",
1111            "fields.v1.kind" => "sequence",
1112            "fields.v1.start" => "11",
1113            "fields.v1.end" => "11111",
1114        ));
1115
1116        let source_desc_builder =
1117            create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1118        let mem_state_store = MemoryStateStore::new();
1119
1120        let column_ids = vec![ColumnId::from(0)];
1121        let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1122        let split_state_store = SourceStateTableHandler::from_table_catalog(
1123            &default_source_internal_table(0x2333),
1124            mem_state_store.clone(),
1125        )
1126        .await;
1127
1128        let core = StreamSourceCore::<MemoryStateStore> {
1129            source_id,
1130            column_ids: column_ids.clone(),
1131            source_desc_builder: Some(source_desc_builder),
1132            latest_split_info: HashMap::new(),
1133            split_state_store,
1134            updated_splits_in_epoch: HashMap::new(),
1135            source_name: MOCK_SOURCE_NAME.to_owned(),
1136        };
1137
1138        let system_params_manager = LocalSystemParamsManager::for_test();
1139
1140        let executor = SourceExecutor::new(
1141            ActorContext::for_test(0),
1142            core,
1143            Arc::new(StreamingMetrics::unused()),
1144            barrier_rx,
1145            system_params_manager.get_params(),
1146            None,
1147            false,
1148            LocalBarrierManager::for_test(),
1149        );
1150        let mut handler = executor.boxed().execute();
1151
1152        let mut epoch = test_epoch(1);
1153        let init_barrier =
1154            Barrier::new_test_barrier(epoch).with_mutation(Mutation::Add(AddMutation {
1155                splits: hashmap! {
1156                    ActorId::default() => vec![
1157                        SplitImpl::Datagen(DatagenSplit {
1158                            split_index: 0,
1159                            split_num: 3,
1160                            start_offset: None,
1161                        }),
1162                    ],
1163                },
1164                ..Default::default()
1165            }));
1166        barrier_tx.send(init_barrier).unwrap();
1167
1168        // Consume barrier.
1169        handler
1170            .next()
1171            .await
1172            .unwrap()
1173            .unwrap()
1174            .into_barrier()
1175            .unwrap();
1176
1177        let mut ready_chunks = handler.ready_chunks(10);
1178
1179        let _ = ready_chunks.next().await.unwrap();
1180
1181        let new_assignment = vec![
1182            SplitImpl::Datagen(DatagenSplit {
1183                split_index: 0,
1184                split_num: 3,
1185                start_offset: None,
1186            }),
1187            SplitImpl::Datagen(DatagenSplit {
1188                split_index: 1,
1189                split_num: 3,
1190                start_offset: None,
1191            }),
1192            SplitImpl::Datagen(DatagenSplit {
1193                split_index: 2,
1194                split_num: 3,
1195                start_offset: None,
1196            }),
1197        ];
1198
1199        epoch.inc_epoch();
1200        let change_split_mutation =
1201            Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1202                ActorId::default() => new_assignment.clone()
1203            }));
1204
1205        barrier_tx.send(change_split_mutation).unwrap();
1206
1207        let _ = ready_chunks.next().await.unwrap(); // barrier
1208
1209        epoch.inc_epoch();
1210        let barrier = Barrier::new_test_barrier(epoch);
1211        barrier_tx.send(barrier).unwrap();
1212
1213        ready_chunks.next().await.unwrap(); // barrier
1214
1215        let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1216            &default_source_internal_table(0x2333),
1217            mem_state_store.clone(),
1218        )
1219        .await;
1220
1221        // there must exist state for new add partition
1222        source_state_handler
1223            .init_epoch(EpochPair::new_test_epoch(epoch))
1224            .await
1225            .unwrap();
1226        source_state_handler
1227            .get(&new_assignment[1].id())
1228            .await
1229            .unwrap()
1230            .unwrap();
1231
1232        tokio::time::sleep(Duration::from_millis(100)).await;
1233
1234        let _ = ready_chunks.next().await.unwrap();
1235
1236        epoch.inc_epoch();
1237        let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Pause);
1238        barrier_tx.send(barrier).unwrap();
1239
1240        epoch.inc_epoch();
1241        let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Resume);
1242        barrier_tx.send(barrier).unwrap();
1243
1244        // receive all
1245        ready_chunks.next().await.unwrap();
1246
1247        let prev_assignment = new_assignment;
1248        let new_assignment = vec![prev_assignment[2].clone()];
1249
1250        epoch.inc_epoch();
1251        let drop_split_mutation =
1252            Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1253                ActorId::default() => new_assignment.clone()
1254            }));
1255
1256        barrier_tx.send(drop_split_mutation).unwrap();
1257
1258        ready_chunks.next().await.unwrap(); // barrier
1259
1260        epoch.inc_epoch();
1261        let barrier = Barrier::new_test_barrier(epoch);
1262        barrier_tx.send(barrier).unwrap();
1263
1264        ready_chunks.next().await.unwrap(); // barrier
1265
1266        let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1267            &default_source_internal_table(0x2333),
1268            mem_state_store.clone(),
1269        )
1270        .await;
1271
1272        let new_epoch = EpochPair::new_test_epoch(epoch);
1273        source_state_handler.init_epoch(new_epoch).await.unwrap();
1274
1275        let committed_reader = source_state_handler
1276            .new_committed_reader(new_epoch)
1277            .await
1278            .unwrap();
1279        assert!(
1280            committed_reader
1281                .try_recover_from_state_store(&prev_assignment[0])
1282                .await
1283                .unwrap()
1284                .is_none()
1285        );
1286
1287        assert!(
1288            committed_reader
1289                .try_recover_from_state_store(&prev_assignment[1])
1290                .await
1291                .unwrap()
1292                .is_none()
1293        );
1294
1295        assert!(
1296            committed_reader
1297                .try_recover_from_state_store(&prev_assignment[2])
1298                .await
1299                .unwrap()
1300                .is_some()
1301        );
1302    }
1303}