risingwave_stream/executor/source/
source_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use either::Either;
20use itertools::Itertools;
21use prometheus::core::{AtomicU64, GenericCounter};
22use risingwave_common::array::ArrayRef;
23use risingwave_common::catalog::{ColumnId, TableId};
24use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedMetric};
25use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::util::epoch::{Epoch, EpochPair};
28use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
29use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
30use risingwave_connector::source::reader::reader::SourceReader;
31use risingwave_connector::source::{
32    ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData,
33    StreamChunkWithState, WaitCheckpointTask, build_pulsar_ack_channel_id,
34};
35use risingwave_hummock_sdk::HummockReadEpoch;
36use risingwave_pb::id::SourceId;
37use risingwave_storage::store::TryWaitEpochOptions;
38use serde_json;
39use thiserror_ext::AsReport;
40use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
41use tokio::sync::{mpsc, oneshot};
42use tokio::time::Instant;
43
44use super::executor_core::StreamSourceCore;
45use super::{barrier_to_message_stream, get_split_offset_col_idx, prune_additional_cols};
46use crate::common::rate_limit::limited_chunk_size;
47use crate::executor::UpdateMutation;
48use crate::executor::prelude::*;
49use crate::executor::source::reader_stream::StreamReaderBuilder;
50use crate::executor::stream_reader::StreamReaderWithPause;
51use crate::task::LocalBarrierManager;
52
53/// A constant to multiply when calculating the maximum time to wait for a barrier. This is due to
54/// some latencies in network and cost in meta.
55pub const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;
56
57/// Extract offset value from CDC split
58///
59/// This function extracts the offset value from CDC split.
60/// For Postgres CDC, the offset is LSN.
61fn extract_split_offset(split: &SplitImpl) -> Option<u64> {
62    match split {
63        SplitImpl::PostgresCdc(pg_split) => {
64            let offset_str = pg_split.start_offset().as_ref()?;
65            extract_pg_cdc_lsn_from_offset(offset_str)
66        }
67        _ => None,
68    }
69}
70
71/// This function parses the offset JSON and extracts the LSN value from the sourceOffset.lsn field.
72/// Returns Some(lsn) if the LSN is found and can be parsed as u64, None otherwise.
73fn extract_pg_cdc_lsn_from_offset(offset_str: &str) -> Option<u64> {
74    let offset = serde_json::from_str::<serde_json::Value>(offset_str).ok()?;
75    let source_offset = offset.get("sourceOffset")?;
76    let lsn = source_offset.get("lsn")?;
77    lsn.as_u64()
78}
79
80pub struct SourceExecutor<S: StateStore> {
81    actor_ctx: ActorContextRef,
82
83    /// Streaming source for external
84    stream_source_core: StreamSourceCore<S>,
85
86    /// Metrics for monitor.
87    metrics: Arc<StreamingMetrics>,
88
89    /// Receiver of barrier channel.
90    barrier_receiver: Option<UnboundedReceiver<Barrier>>,
91
92    /// System parameter reader to read barrier interval
93    system_params: SystemParamsReaderRef,
94
95    /// Rate limit in rows/s.
96    rate_limit_rps: Option<u32>,
97
98    is_shared_non_cdc: bool,
99
100    /// Local barrier manager for reporting source load finished events
101    _barrier_manager: LocalBarrierManager,
102}
103
104impl<S: StateStore> SourceExecutor<S> {
105    #[expect(clippy::too_many_arguments)]
106    pub fn new(
107        actor_ctx: ActorContextRef,
108        stream_source_core: StreamSourceCore<S>,
109        metrics: Arc<StreamingMetrics>,
110        barrier_receiver: UnboundedReceiver<Barrier>,
111        system_params: SystemParamsReaderRef,
112        rate_limit_rps: Option<u32>,
113        is_shared_non_cdc: bool,
114        barrier_manager: LocalBarrierManager,
115    ) -> Self {
116        Self {
117            actor_ctx,
118            stream_source_core,
119            metrics,
120            barrier_receiver: Some(barrier_receiver),
121            system_params,
122            rate_limit_rps,
123            is_shared_non_cdc,
124            _barrier_manager: barrier_manager,
125        }
126    }
127
128    fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder {
129        StreamReaderBuilder {
130            source_desc,
131            rate_limit: self.rate_limit_rps,
132            source_id: self.stream_source_core.source_id,
133            source_name: self.stream_source_core.source_name.clone(),
134            is_auto_schema_change_enable: self.is_auto_schema_change_enable(),
135            actor_ctx: self.actor_ctx.clone(),
136            reader_stream: None,
137        }
138    }
139
140    async fn spawn_wait_checkpoint_worker(
141        core: &StreamSourceCore<S>,
142        source_reader: SourceReader,
143        metrics: Arc<StreamingMetrics>,
144    ) -> StreamExecutorResult<Option<WaitCheckpointTaskBuilder>> {
145        let Some(initial_task) = source_reader.create_wait_checkpoint_task().await? else {
146            return Ok(None);
147        };
148        let (wait_checkpoint_tx, wait_checkpoint_rx) = mpsc::unbounded_channel();
149        let wait_checkpoint_worker = WaitCheckpointWorker {
150            wait_checkpoint_rx,
151            state_store: core.split_state_store.state_table().state_store().clone(),
152            table_id: core.split_state_store.state_table().table_id(),
153            metrics,
154        };
155        tokio::spawn(wait_checkpoint_worker.run());
156        Ok(Some(WaitCheckpointTaskBuilder {
157            wait_checkpoint_tx,
158            source_reader,
159            building_task: initial_task,
160        }))
161    }
162
163    /// build the source column ids and the source context which will be used to build the source stream
164    pub fn prepare_source_stream_build(
165        &self,
166        source_desc: &SourceDesc,
167    ) -> (Vec<ColumnId>, SourceContext) {
168        let column_ids = source_desc
169            .columns
170            .iter()
171            .map(|column_desc| column_desc.column_id)
172            .collect_vec();
173
174        let (schema_change_tx, mut schema_change_rx) =
175            mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
176        let schema_change_tx = if self.is_auto_schema_change_enable() {
177            let meta_client = self.actor_ctx.meta_client.clone();
178            // spawn a task to handle schema change event from source parser
179            let _join_handle = tokio::task::spawn(async move {
180                while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
181                    let table_ids = schema_change.table_ids();
182                    tracing::info!(
183                        target: "auto_schema_change",
184                        "recv a schema change event for tables: {:?}", table_ids);
185                    // TODO: retry on rpc error
186                    if let Some(ref meta_client) = meta_client {
187                        match meta_client
188                            .auto_schema_change(schema_change.to_protobuf())
189                            .await
190                        {
191                            Ok(_) => {
192                                tracing::info!(
193                                    target: "auto_schema_change",
194                                    "schema change success for tables: {:?}", table_ids);
195                                finish_tx.send(()).unwrap();
196                            }
197                            Err(e) => {
198                                tracing::error!(
199                                    target: "auto_schema_change",
200                                    error = ?e.as_report(), "schema change error");
201                                finish_tx.send(()).unwrap();
202                            }
203                        }
204                    }
205                }
206            });
207            Some(schema_change_tx)
208        } else {
209            info!("auto schema change is disabled in config");
210            None
211        };
212        let source_ctx = SourceContext::new(
213            self.actor_ctx.id,
214            self.stream_source_core.source_id,
215            self.actor_ctx.fragment_id,
216            self.stream_source_core.source_name.clone(),
217            source_desc.metrics.clone(),
218            SourceCtrlOpts {
219                chunk_size: limited_chunk_size(self.rate_limit_rps),
220                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
221            },
222            source_desc.source.config.clone(),
223            schema_change_tx,
224        );
225
226        (column_ids, source_ctx)
227    }
228
229    fn is_auto_schema_change_enable(&self) -> bool {
230        self.actor_ctx.config.developer.enable_auto_schema_change
231    }
232
233    /// `source_id | source_name | actor_id | fragment_id`
234    #[inline]
235    fn get_metric_labels(&self) -> [String; 4] {
236        [
237            self.stream_source_core.source_id.to_string(),
238            self.stream_source_core.source_name.clone(),
239            self.actor_ctx.id.to_string(),
240            self.actor_ctx.fragment_id.to_string(),
241        ]
242    }
243
244    /// - `should_trim_state`: whether to trim state for dropped splits.
245    ///
246    ///   For scaling, the connector splits can be migrated to other actors, but
247    ///   won't be added or removed. Actors should not trim states for splits that
248    ///   are moved to other actors.
249    ///
250    ///   For source split change, split will not be migrated and we can trim states
251    ///   for deleted splits.
252    async fn apply_split_change_after_yield_barrier<const BIASED: bool>(
253        &mut self,
254        barrier_epoch: EpochPair,
255        source_desc: &SourceDesc,
256        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
257        apply_mutation: ApplyMutationAfterBarrier<'_>,
258    ) -> StreamExecutorResult<()> {
259        {
260            let mut should_rebuild_stream = false;
261            match apply_mutation {
262                ApplyMutationAfterBarrier::SplitChange {
263                    target_splits,
264                    should_trim_state,
265                    split_change_count,
266                } => {
267                    split_change_count.inc();
268                    if self
269                        .update_state_if_changed(barrier_epoch, target_splits, should_trim_state)
270                        .await?
271                    {
272                        should_rebuild_stream = true;
273                    }
274                }
275                ApplyMutationAfterBarrier::ConnectorPropsChange => {
276                    should_rebuild_stream = true;
277                }
278            }
279
280            if should_rebuild_stream {
281                self.rebuild_stream_reader(source_desc, stream)?;
282            }
283        }
284
285        Ok(())
286    }
287
288    /// Returns `true` if split changed. Otherwise `false`.
289    async fn update_state_if_changed(
290        &mut self,
291        barrier_epoch: EpochPair,
292        target_splits: Vec<SplitImpl>,
293        should_trim_state: bool,
294    ) -> StreamExecutorResult<bool> {
295        let core = &mut self.stream_source_core;
296
297        let target_splits: HashMap<_, _> = target_splits
298            .into_iter()
299            .map(|split| (split.id(), split))
300            .collect();
301
302        let mut target_state: HashMap<SplitId, SplitImpl> =
303            HashMap::with_capacity(target_splits.len());
304
305        let mut split_changed = false;
306
307        let committed_reader = core
308            .split_state_store
309            .new_committed_reader(barrier_epoch)
310            .await?;
311
312        // Checks added splits
313        for (split_id, split) in target_splits {
314            if let Some(s) = core.latest_split_info.get(&split_id) {
315                // For existing splits, we should use the latest offset from the cache.
316                // `target_splits` is from meta and contains the initial offset.
317                target_state.insert(split_id, s.clone());
318            } else {
319                split_changed = true;
320                // write new assigned split to state cache. snapshot is base on cache.
321
322                let initial_state = if let Some(recover_state) = committed_reader
323                    .try_recover_from_state_store(&split)
324                    .await?
325                {
326                    recover_state
327                } else {
328                    split
329                };
330
331                core.updated_splits_in_epoch
332                    .entry(split_id.clone())
333                    .or_insert_with(|| initial_state.clone());
334
335                target_state.insert(split_id, initial_state);
336            }
337        }
338
339        // Checks dropped splits
340        for existing_split_id in core.latest_split_info.keys() {
341            if !target_state.contains_key(existing_split_id) {
342                tracing::info!("split dropping detected: {}", existing_split_id);
343                split_changed = true;
344            }
345        }
346
347        if split_changed {
348            tracing::info!(
349                actor_id = %self.actor_ctx.id,
350                state = ?target_state,
351                "apply split change"
352            );
353
354            core.updated_splits_in_epoch
355                .retain(|split_id, _| target_state.contains_key(split_id));
356
357            let dropped_splits = core
358                .latest_split_info
359                .extract_if(|split_id, _| !target_state.contains_key(split_id))
360                .map(|(_, split)| split)
361                .collect_vec();
362
363            if should_trim_state && !dropped_splits.is_empty() {
364                // trim dropped splits' state
365                core.split_state_store.trim_state(&dropped_splits).await?;
366            }
367
368            core.latest_split_info = target_state;
369        }
370
371        Ok(split_changed)
372    }
373
374    /// Rebuild stream if there is a err in stream
375    fn rebuild_stream_reader_from_error<const BIASED: bool>(
376        &mut self,
377        source_desc: &SourceDesc,
378        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
379        e: StreamExecutorError,
380    ) -> StreamExecutorResult<()> {
381        let core = &mut self.stream_source_core;
382        tracing::error!(
383            error = ?e.as_report(),
384            actor_id = %self.actor_ctx.id,
385            source_id = %core.source_id,
386            "stream source reader error",
387        );
388        GLOBAL_ERROR_METRICS.user_source_error.report([
389            e.variant_name().to_owned(),
390            core.source_id.to_string(),
391            core.source_name.clone(),
392            self.actor_ctx.fragment_id.to_string(),
393        ]);
394
395        self.rebuild_stream_reader(source_desc, stream)
396    }
397
398    fn rebuild_stream_reader<const BIASED: bool>(
399        &mut self,
400        source_desc: &SourceDesc,
401        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
402    ) -> StreamExecutorResult<()> {
403        let core = &mut self.stream_source_core;
404        let target_state: Vec<SplitImpl> = core.latest_split_info.values().cloned().collect();
405
406        tracing::info!(
407            "actor {:?} apply source split change to {:?}",
408            self.actor_ctx.id,
409            target_state
410        );
411
412        // Replace the source reader with a new one of the new state.
413        let reader_stream_builder = self.stream_reader_builder(source_desc.clone());
414        let reader_stream = reader_stream_builder.into_retry_stream(Some(target_state), false);
415
416        stream.replace_data_stream(reader_stream);
417
418        Ok(())
419    }
420
421    async fn persist_state_and_clear_cache(
422        &mut self,
423        epoch: EpochPair,
424    ) -> StreamExecutorResult<HashMap<SplitId, SplitImpl>> {
425        let core = &mut self.stream_source_core;
426
427        let cache = core
428            .updated_splits_in_epoch
429            .values()
430            .map(|split_impl| split_impl.to_owned())
431            .collect_vec();
432
433        if !cache.is_empty() {
434            tracing::debug!(state = ?cache, "take snapshot");
435
436            // Record LSN metrics for PostgreSQL CDC sources before moving cache
437            let source_id = core.source_id.to_string();
438            for split_impl in &cache {
439                // Extract offset for CDC using type-safe matching
440                if let Some(state_table_lsn_value) = extract_split_offset(split_impl) {
441                    self.metrics
442                        .pg_cdc_state_table_lsn
443                        .with_guarded_label_values(&[&source_id])
444                        .set(state_table_lsn_value as i64);
445                }
446            }
447
448            core.split_state_store.set_states(cache).await?;
449        }
450
451        // commit anyway, even if no message saved
452        core.split_state_store.commit(epoch).await?;
453
454        let updated_splits = core.updated_splits_in_epoch.clone();
455
456        core.updated_splits_in_epoch.clear();
457
458        Ok(updated_splits)
459    }
460
461    /// try mem table spill
462    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
463        let core = &mut self.stream_source_core;
464        core.split_state_store.try_flush().await?;
465
466        Ok(())
467    }
468
469    /// A source executor with a stream source receives:
470    /// 1. Barrier messages
471    /// 2. Data from external source
472    /// and acts accordingly.
473    #[try_stream(ok = Message, error = StreamExecutorError)]
474    async fn execute_inner(mut self) {
475        let mut barrier_receiver = self.barrier_receiver.take().unwrap();
476        let first_barrier = barrier_receiver
477            .recv()
478            .instrument_await("source_recv_first_barrier")
479            .await
480            .ok_or_else(|| {
481                anyhow!(
482                    "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
483                    self.actor_ctx.id,
484                    self.stream_source_core.source_id
485                )
486            })?;
487        let first_epoch = first_barrier.epoch;
488        let mut boot_state =
489            if let Some(splits) = first_barrier.initial_split_assignment(self.actor_ctx.id) {
490                tracing::debug!(?splits, "boot with splits");
491                splits.to_vec()
492            } else {
493                Vec::default()
494            };
495        let is_pause_on_startup = first_barrier.is_pause_on_startup();
496        let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id);
497
498        yield Message::Barrier(first_barrier);
499
500        let mut core = self.stream_source_core;
501        let source_id = core.source_id;
502
503        // Build source description from the builder.
504        let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap();
505        let mut source_desc = source_desc_builder
506            .build()
507            .map_err(StreamExecutorError::connector_error)?;
508
509        let mut wait_checkpoint_task_builder = Self::spawn_wait_checkpoint_worker(
510            &core,
511            source_desc.source.clone(),
512            self.metrics.clone(),
513        )
514        .await?;
515
516        let (Some(split_idx), Some(offset_idx), pulsar_message_id_idx) =
517            get_split_offset_col_idx(&source_desc.columns)
518        else {
519            unreachable!("Partition and offset columns must be set.");
520        };
521
522        core.split_state_store.init_epoch(first_epoch).await?;
523        {
524            let committed_reader = core
525                .split_state_store
526                .new_committed_reader(first_epoch)
527                .await?;
528            for ele in &mut boot_state {
529                if let Some(recover_state) =
530                    committed_reader.try_recover_from_state_store(ele).await?
531                {
532                    *ele = recover_state;
533                    // if state store is non-empty, we consider it's initialized.
534                    is_uninitialized = false;
535                } else {
536                    // This is a new split, not in state table.
537                    // make sure it is written to state table later.
538                    // Then even it receives no messages, we can observe it in state table.
539                    core.updated_splits_in_epoch.insert(ele.id(), ele.clone());
540                }
541            }
542        }
543
544        // init in-memory split states with persisted state if any
545        core.init_split_state(boot_state.clone());
546
547        // Return the ownership of `stream_source_core` to the source executor.
548        self.stream_source_core = core;
549
550        let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
551        tracing::debug!(state = ?recover_state, "start with state");
552
553        let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
554        let mut reader_stream_builder = self.stream_reader_builder(source_desc.clone());
555        let mut latest_splits = None;
556        // Build the source stream reader.
557        if is_uninitialized {
558            let create_split_reader_result = reader_stream_builder
559                .fetch_latest_splits(recover_state.clone(), self.is_shared_non_cdc)
560                .await?;
561            latest_splits = create_split_reader_result.latest_splits;
562        }
563
564        if let Some(latest_splits) = latest_splits {
565            // make sure it is written to state table later.
566            // Then even it receives no messages, we can observe it in state table.
567            self.stream_source_core
568                .updated_splits_in_epoch
569                .extend(latest_splits.into_iter().map(|s| (s.id(), s)));
570        }
571        // Merge the chunks from source and the barriers into a single stream. We prioritize
572        // barriers over source data chunks here.
573        let mut stream = StreamReaderWithPause::<true, StreamChunkWithState>::new(
574            barrier_stream,
575            reader_stream_builder
576                .into_retry_stream(recover_state, is_uninitialized && self.is_shared_non_cdc),
577        );
578        let mut command_paused = false;
579
580        // - If the first barrier requires us to pause on startup, pause the stream.
581        if is_pause_on_startup {
582            tracing::info!("source paused on startup");
583            stream.pause_stream();
584            command_paused = true;
585        }
586
587        // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms`
588        // milliseconds, considering some other latencies like network and cost in Meta.
589        let mut max_wait_barrier_time_ms =
590            self.system_params.load().barrier_interval_ms() as u128 * WAIT_BARRIER_MULTIPLE_TIMES;
591        let mut last_barrier_time = Instant::now();
592        let mut self_paused = false;
593
594        let source_output_row_count = self
595            .metrics
596            .source_output_row_count
597            .with_guarded_label_values(&self.get_metric_labels());
598
599        let source_split_change_count = self
600            .metrics
601            .source_split_change_count
602            .with_guarded_label_values(&self.get_metric_labels());
603
604        while let Some(msg) = stream.next().await {
605            let Ok(msg) = msg else {
606                tokio::time::sleep(Duration::from_millis(1000)).await;
607                self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err())?;
608                continue;
609            };
610
611            match msg {
612                // This branch will be preferred.
613                Either::Left(Message::Barrier(barrier)) => {
614                    last_barrier_time = Instant::now();
615
616                    if self_paused {
617                        self_paused = false;
618                        // command_paused has a higher priority.
619                        if !command_paused {
620                            stream.resume_stream();
621                        }
622                    }
623
624                    let epoch = barrier.epoch;
625                    let mut split_change = None;
626
627                    if let Some(mutation) = barrier.mutation.as_deref() {
628                        match mutation {
629                            Mutation::Pause => {
630                                command_paused = true;
631                                stream.pause_stream()
632                            }
633                            Mutation::Resume => {
634                                command_paused = false;
635                                stream.resume_stream()
636                            }
637                            Mutation::SourceChangeSplit(actor_splits) => {
638                                tracing::info!(
639                                    actor_id = %self.actor_ctx.id,
640                                    actor_splits = ?actor_splits,
641                                    "source change split received"
642                                );
643
644                                split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
645                                    |target_splits| {
646                                        (
647                                            &source_desc,
648                                            &mut stream,
649                                            ApplyMutationAfterBarrier::SplitChange {
650                                                target_splits,
651                                                should_trim_state: true,
652                                                split_change_count: &source_split_change_count,
653                                            },
654                                        )
655                                    },
656                                );
657                            }
658
659                            Mutation::ConnectorPropsChange(maybe_mutation) => {
660                                if let Some(new_props) = maybe_mutation.get(&source_id.as_raw_id())
661                                {
662                                    // rebuild the stream reader with new props
663                                    tracing::info!(
664                                        "updating source properties from {:?} to {:?}",
665                                        source_desc.source.config,
666                                        new_props
667                                    );
668                                    source_desc.update_reader(new_props.clone())?;
669                                    // suppose the connector props change will not involve state change
670                                    split_change = Some((
671                                        &source_desc,
672                                        &mut stream,
673                                        ApplyMutationAfterBarrier::ConnectorPropsChange,
674                                    ));
675                                }
676                            }
677
678                            Mutation::Update(UpdateMutation { actor_splits, .. }) => {
679                                split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
680                                    |target_splits| {
681                                        (
682                                            &source_desc,
683                                            &mut stream,
684                                            ApplyMutationAfterBarrier::SplitChange {
685                                                target_splits,
686                                                should_trim_state: false,
687                                                split_change_count: &source_split_change_count,
688                                            },
689                                        )
690                                    },
691                                );
692                            }
693                            Mutation::Throttle(actor_to_apply) => {
694                                if let Some(new_rate_limit) = actor_to_apply.get(&self.actor_ctx.id)
695                                    && *new_rate_limit != self.rate_limit_rps
696                                {
697                                    tracing::info!(
698                                        "updating rate limit from {:?} to {:?}",
699                                        self.rate_limit_rps,
700                                        *new_rate_limit
701                                    );
702                                    self.rate_limit_rps = *new_rate_limit;
703                                    // recreate from latest_split_info
704                                    self.rebuild_stream_reader(&source_desc, &mut stream)?;
705                                }
706                            }
707                            _ => {}
708                        }
709                    }
710
711                    let updated_splits = self.persist_state_and_clear_cache(epoch).await?;
712
713                    // when handle a checkpoint barrier, spawn a task to wait for epoch commit notification
714                    if barrier.kind.is_checkpoint()
715                        && let Some(task_builder) = &mut wait_checkpoint_task_builder
716                    {
717                        task_builder.update_task_on_checkpoint(updated_splits);
718
719                        tracing::debug!("epoch to wait {:?}", epoch);
720                        task_builder.send(Epoch(epoch.prev)).await?
721                    }
722
723                    let barrier_epoch = barrier.epoch;
724                    yield Message::Barrier(barrier);
725
726                    if let Some((source_desc, stream, to_apply_mutation)) = split_change {
727                        self.apply_split_change_after_yield_barrier(
728                            barrier_epoch,
729                            source_desc,
730                            stream,
731                            to_apply_mutation,
732                        )
733                        .await?;
734                    }
735                }
736                Either::Left(_) => {
737                    // For the source executor, the message we receive from this arm
738                    // should always be barrier message.
739                    unreachable!();
740                }
741
742                Either::Right((chunk, latest_state)) => {
743                    if let Some(task_builder) = &mut wait_checkpoint_task_builder {
744                        if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
745                            let pulsar_message_id_col = chunk.column_at(pulsar_message_id_idx);
746                            task_builder.update_task_on_chunk(
747                                source_id,
748                                &latest_state,
749                                pulsar_message_id_col.clone(),
750                            );
751                        } else {
752                            let offset_col = chunk.column_at(offset_idx);
753                            task_builder.update_task_on_chunk(
754                                source_id,
755                                &latest_state,
756                                offset_col.clone(),
757                            );
758                        }
759                    }
760                    if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
761                        // Exceeds the max wait barrier time, the source will be paused.
762                        // Currently we can guarantee the
763                        // source is not paused since it received stream
764                        // chunks.
765                        self_paused = true;
766                        tracing::warn!(
767                            "source paused, wait barrier for {:?}",
768                            last_barrier_time.elapsed()
769                        );
770                        stream.pause_stream();
771
772                        // Only update `max_wait_barrier_time_ms` to capture
773                        // `barrier_interval_ms`
774                        // changes here to avoid frequently accessing the shared
775                        // `system_params`.
776                        max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
777                            as u128
778                            * WAIT_BARRIER_MULTIPLE_TIMES;
779                    }
780
781                    latest_state.iter().for_each(|(split_id, new_split_impl)| {
782                        if let Some(split_impl) =
783                            self.stream_source_core.latest_split_info.get_mut(split_id)
784                        {
785                            *split_impl = new_split_impl.clone();
786                        }
787                    });
788
789                    self.stream_source_core
790                        .updated_splits_in_epoch
791                        .extend(latest_state);
792
793                    let card = chunk.cardinality();
794                    source_output_row_count.inc_by(card as u64);
795                    let to_remove_col_indices =
796                        if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
797                            vec![split_idx, offset_idx, pulsar_message_id_idx]
798                        } else {
799                            vec![split_idx, offset_idx]
800                        };
801                    let chunk =
802                        prune_additional_cols(&chunk, &to_remove_col_indices, &source_desc.columns);
803                    yield Message::Chunk(chunk);
804                    self.try_flush_data().await?;
805                }
806            }
807        }
808
809        // The source executor should only be stopped by the actor when finding a `Stop` mutation.
810        tracing::error!(
811            actor_id = %self.actor_ctx.id,
812            "source executor exited unexpectedly"
813        )
814    }
815}
816
817#[derive(Debug, Clone)]
818enum ApplyMutationAfterBarrier<'a> {
819    SplitChange {
820        target_splits: Vec<SplitImpl>,
821        should_trim_state: bool,
822        split_change_count: &'a LabelGuardedMetric<GenericCounter<AtomicU64>>,
823    },
824    ConnectorPropsChange,
825}
826
827impl<S: StateStore> Execute for SourceExecutor<S> {
828    fn execute(self: Box<Self>) -> BoxedMessageStream {
829        self.execute_inner().boxed()
830    }
831}
832
833impl<S: StateStore> Debug for SourceExecutor<S> {
834    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
835        f.debug_struct("SourceExecutor")
836            .field("source_id", &self.stream_source_core.source_id)
837            .field("column_ids", &self.stream_source_core.column_ids)
838            .finish()
839    }
840}
841
842struct WaitCheckpointTaskBuilder {
843    wait_checkpoint_tx: UnboundedSender<(Epoch, WaitCheckpointTask)>,
844    source_reader: SourceReader,
845    building_task: WaitCheckpointTask,
846}
847
848impl WaitCheckpointTaskBuilder {
849    fn update_task_on_chunk(
850        &mut self,
851        source_id: SourceId,
852        latest_state: &HashMap<SplitId, SplitImpl>,
853        offset_col: ArrayRef,
854    ) {
855        match &mut self.building_task {
856            WaitCheckpointTask::AckPubsubMessage(_, arrays) => {
857                arrays.push(offset_col);
858            }
859            WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => {
860                arrays.push(offset_col);
861            }
862            WaitCheckpointTask::AckPulsarMessage(arrays) => {
863                // each pulsar chunk will only contain one split
864                let split_id = latest_state.keys().next().unwrap();
865                let pulsar_ack_channel_id = build_pulsar_ack_channel_id(source_id, split_id);
866                arrays.push((pulsar_ack_channel_id, offset_col));
867            }
868            WaitCheckpointTask::CommitCdcOffset(_) => {}
869        }
870    }
871
872    fn update_task_on_checkpoint(&mut self, updated_splits: HashMap<SplitId, SplitImpl>) {
873        #[expect(clippy::single_match)]
874        match &mut self.building_task {
875            WaitCheckpointTask::CommitCdcOffset(offsets) => {
876                if !updated_splits.is_empty() {
877                    // cdc source only has one split
878                    assert_eq!(1, updated_splits.len());
879                    for (split_id, split_impl) in updated_splits {
880                        if split_impl.is_cdc_split() {
881                            *offsets = Some((split_id, split_impl.get_cdc_split_offset()));
882                        } else {
883                            unreachable!()
884                        }
885                    }
886                }
887            }
888            _ => {}
889        }
890    }
891
892    /// Send and reset the building task to a new one.
893    async fn send(&mut self, epoch: Epoch) -> Result<(), anyhow::Error> {
894        let new_task = self
895            .source_reader
896            .create_wait_checkpoint_task()
897            .await?
898            .expect("wait checkpoint task should be created");
899        self.wait_checkpoint_tx
900            .send((epoch, std::mem::replace(&mut self.building_task, new_task)))
901            .expect("wait_checkpoint_tx send should succeed");
902        Ok(())
903    }
904}
905
906/// A worker used to do some work after each checkpoint epoch is committed.
907///
908/// # Usage Cases
909///
910/// Typically there are 2 issues related with ack on checkpoint:
911///
912/// 1. Correctness (at-least-once), or don't let upstream clean uncommitted data.
913///    For message queueing semantics (delete after ack), we should ack to avoid redelivery,
914///    and only ack after checkpoint to avoid data loss.
915///
916/// 2. Allow upstream to clean data after commit.
917///
918/// See also <https://github.com/risingwavelabs/risingwave/issues/16736#issuecomment-2109379790>
919///
920/// ## CDC
921///
922/// Commit last consumed offset to upstream DB, so that old data can be discarded.
923///
924/// ## Google Pub/Sub
925///
926/// Due to queueing semantics.
927/// Although Pub/Sub supports `retain_acked_messages` and `seek` functionality,
928/// it's quite limited unlike Kafka.
929///
930/// See also <https://cloud.google.com/pubsub/docs/subscribe-best-practices#process-messages>
931struct WaitCheckpointWorker<S: StateStore> {
932    wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>,
933    state_store: S,
934    table_id: TableId,
935    metrics: Arc<StreamingMetrics>,
936}
937
938impl<S: StateStore> WaitCheckpointWorker<S> {
939    pub async fn run(mut self) {
940        tracing::debug!("wait epoch worker start success");
941        loop {
942            // poll the rx and wait for the epoch commit
943            match self.wait_checkpoint_rx.recv().await {
944                Some((epoch, task)) => {
945                    tracing::debug!("start to wait epoch {}", epoch.0);
946                    let ret = self
947                        .state_store
948                        .try_wait_epoch(
949                            HummockReadEpoch::Committed(epoch.0),
950                            TryWaitEpochOptions {
951                                table_id: self.table_id,
952                            },
953                        )
954                        .await;
955
956                    match ret {
957                        Ok(()) => {
958                            tracing::debug!(epoch = epoch.0, "wait epoch success");
959
960                            // Run task with callback to record LSN after successful commit
961                            task.run_with_on_commit_success(|source_id: u64, offset| {
962                                if let Some(lsn_value) = extract_pg_cdc_lsn_from_offset(offset) {
963                                    self.metrics
964                                        .pg_cdc_jni_commit_offset_lsn
965                                        .with_guarded_label_values(&[&source_id.to_string()])
966                                        .set(lsn_value as i64);
967                                }
968                            })
969                            .await;
970                        }
971                        Err(e) => {
972                            tracing::error!(
973                            error = %e.as_report(),
974                            "wait epoch {} failed", epoch.0
975                            );
976                        }
977                    }
978                }
979                None => {
980                    tracing::error!("wait epoch rx closed");
981                    break;
982                }
983            }
984        }
985    }
986}
987
988#[cfg(test)]
989mod tests {
990    use maplit::{btreemap, convert_args, hashmap};
991    use risingwave_common::catalog::{ColumnId, Field};
992    use risingwave_common::id::SourceId;
993    use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
994    use risingwave_common::test_prelude::StreamChunkTestExt;
995    use risingwave_common::util::epoch::{EpochExt, test_epoch};
996    use risingwave_connector::source::datagen::DatagenSplit;
997    use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
998    use risingwave_pb::catalog::StreamSourceInfo;
999    use risingwave_pb::plan_common::PbRowFormatType;
1000    use risingwave_storage::memory::MemoryStateStore;
1001    use tokio::sync::mpsc::unbounded_channel;
1002    use tracing_test::traced_test;
1003
1004    use super::*;
1005    use crate::executor::AddMutation;
1006    use crate::executor::source::{SourceStateTableHandler, default_source_internal_table};
1007    use crate::task::LocalBarrierManager;
1008
1009    const MOCK_SOURCE_NAME: &str = "mock_source";
1010
1011    #[tokio::test]
1012    async fn test_source_executor() {
1013        let source_id = 0.into();
1014        let schema = Schema {
1015            fields: vec![Field::with_name(DataType::Int32, "sequence_int")],
1016        };
1017        let row_id_index = None;
1018        let source_info = StreamSourceInfo {
1019            row_format: PbRowFormatType::Native as i32,
1020            ..Default::default()
1021        };
1022        let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1023        let column_ids = vec![0].into_iter().map(ColumnId::from).collect();
1024
1025        // This datagen will generate 3 rows at one time.
1026        let properties = convert_args!(btreemap!(
1027            "connector" => "datagen",
1028            "datagen.rows.per.second" => "3",
1029            "fields.sequence_int.kind" => "sequence",
1030            "fields.sequence_int.start" => "11",
1031            "fields.sequence_int.end" => "11111",
1032        ));
1033        let source_desc_builder =
1034            create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1035        let split_state_store = SourceStateTableHandler::from_table_catalog(
1036            &default_source_internal_table(0x2333),
1037            MemoryStateStore::new(),
1038        )
1039        .await;
1040        let core = StreamSourceCore::<MemoryStateStore> {
1041            source_id,
1042            column_ids,
1043            source_desc_builder: Some(source_desc_builder),
1044            latest_split_info: HashMap::new(),
1045            split_state_store,
1046            updated_splits_in_epoch: HashMap::new(),
1047            source_name: MOCK_SOURCE_NAME.to_owned(),
1048        };
1049
1050        let system_params_manager = LocalSystemParamsManager::for_test();
1051
1052        let executor = SourceExecutor::new(
1053            ActorContext::for_test(0),
1054            core,
1055            Arc::new(StreamingMetrics::unused()),
1056            barrier_rx,
1057            system_params_manager.get_params(),
1058            None,
1059            false,
1060            LocalBarrierManager::for_test(),
1061        );
1062        let mut executor = executor.boxed().execute();
1063
1064        let init_barrier =
1065            Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
1066                splits: hashmap! {
1067                    ActorId::default() => vec![
1068                        SplitImpl::Datagen(DatagenSplit {
1069                            split_index: 0,
1070                            split_num: 1,
1071                            start_offset: None,
1072                        }),
1073                    ],
1074                },
1075                ..Default::default()
1076            }));
1077        barrier_tx.send(init_barrier).unwrap();
1078
1079        // Consume barrier.
1080        executor.next().await.unwrap().unwrap();
1081
1082        // Consume data chunk.
1083        let msg = executor.next().await.unwrap().unwrap();
1084
1085        // Row id will not be filled here.
1086        assert_eq!(
1087            msg.into_chunk().unwrap(),
1088            StreamChunk::from_pretty(
1089                " i
1090                + 11
1091                + 12
1092                + 13"
1093            )
1094        );
1095    }
1096
1097    #[traced_test]
1098    #[tokio::test]
1099    async fn test_split_change_mutation() {
1100        let source_id = SourceId::new(0);
1101        let schema = Schema {
1102            fields: vec![Field::with_name(DataType::Int32, "v1")],
1103        };
1104        let row_id_index = None;
1105        let source_info = StreamSourceInfo {
1106            row_format: PbRowFormatType::Native as i32,
1107            ..Default::default()
1108        };
1109        let properties = convert_args!(btreemap!(
1110            "connector" => "datagen",
1111            "fields.v1.kind" => "sequence",
1112            "fields.v1.start" => "11",
1113            "fields.v1.end" => "11111",
1114        ));
1115
1116        let source_desc_builder =
1117            create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1118        let mem_state_store = MemoryStateStore::new();
1119
1120        let column_ids = vec![ColumnId::from(0)];
1121        let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1122        let split_state_store = SourceStateTableHandler::from_table_catalog(
1123            &default_source_internal_table(0x2333),
1124            mem_state_store.clone(),
1125        )
1126        .await;
1127
1128        let core = StreamSourceCore::<MemoryStateStore> {
1129            source_id,
1130            column_ids: column_ids.clone(),
1131            source_desc_builder: Some(source_desc_builder),
1132            latest_split_info: HashMap::new(),
1133            split_state_store,
1134            updated_splits_in_epoch: HashMap::new(),
1135            source_name: MOCK_SOURCE_NAME.to_owned(),
1136        };
1137
1138        let system_params_manager = LocalSystemParamsManager::for_test();
1139
1140        let executor = SourceExecutor::new(
1141            ActorContext::for_test(0),
1142            core,
1143            Arc::new(StreamingMetrics::unused()),
1144            barrier_rx,
1145            system_params_manager.get_params(),
1146            None,
1147            false,
1148            LocalBarrierManager::for_test(),
1149        );
1150        let mut handler = executor.boxed().execute();
1151
1152        let mut epoch = test_epoch(1);
1153        let init_barrier =
1154            Barrier::new_test_barrier(epoch).with_mutation(Mutation::Add(AddMutation {
1155                splits: hashmap! {
1156                    ActorId::default() => vec![
1157                        SplitImpl::Datagen(DatagenSplit {
1158                            split_index: 0,
1159                            split_num: 3,
1160                            start_offset: None,
1161                        }),
1162                    ],
1163                },
1164                ..Default::default()
1165            }));
1166        barrier_tx.send(init_barrier).unwrap();
1167
1168        // Consume barrier.
1169        handler
1170            .next()
1171            .await
1172            .unwrap()
1173            .unwrap()
1174            .into_barrier()
1175            .unwrap();
1176
1177        let mut ready_chunks = handler.ready_chunks(10);
1178
1179        let _ = ready_chunks.next().await.unwrap();
1180
1181        let new_assignment = vec![
1182            SplitImpl::Datagen(DatagenSplit {
1183                split_index: 0,
1184                split_num: 3,
1185                start_offset: None,
1186            }),
1187            SplitImpl::Datagen(DatagenSplit {
1188                split_index: 1,
1189                split_num: 3,
1190                start_offset: None,
1191            }),
1192            SplitImpl::Datagen(DatagenSplit {
1193                split_index: 2,
1194                split_num: 3,
1195                start_offset: None,
1196            }),
1197        ];
1198
1199        epoch.inc_epoch();
1200        let change_split_mutation =
1201            Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1202                ActorId::default() => new_assignment.clone()
1203            }));
1204
1205        barrier_tx.send(change_split_mutation).unwrap();
1206
1207        let _ = ready_chunks.next().await.unwrap(); // barrier
1208
1209        epoch.inc_epoch();
1210        let barrier = Barrier::new_test_barrier(epoch);
1211        barrier_tx.send(barrier).unwrap();
1212
1213        ready_chunks.next().await.unwrap(); // barrier
1214
1215        let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1216            &default_source_internal_table(0x2333),
1217            mem_state_store.clone(),
1218        )
1219        .await;
1220
1221        // there must exist state for new add partition
1222        source_state_handler
1223            .init_epoch(EpochPair::new_test_epoch(epoch))
1224            .await
1225            .unwrap();
1226        source_state_handler
1227            .get(&new_assignment[1].id())
1228            .await
1229            .unwrap()
1230            .unwrap();
1231
1232        tokio::time::sleep(Duration::from_millis(100)).await;
1233
1234        let _ = ready_chunks.next().await.unwrap();
1235
1236        epoch.inc_epoch();
1237        let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Pause);
1238        barrier_tx.send(barrier).unwrap();
1239
1240        epoch.inc_epoch();
1241        let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Resume);
1242        barrier_tx.send(barrier).unwrap();
1243
1244        // receive all
1245        ready_chunks.next().await.unwrap();
1246
1247        let prev_assignment = new_assignment;
1248        let new_assignment = vec![prev_assignment[2].clone()];
1249
1250        epoch.inc_epoch();
1251        let drop_split_mutation =
1252            Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1253                ActorId::default() => new_assignment.clone()
1254            }));
1255
1256        barrier_tx.send(drop_split_mutation).unwrap();
1257
1258        ready_chunks.next().await.unwrap(); // barrier
1259
1260        epoch.inc_epoch();
1261        let barrier = Barrier::new_test_barrier(epoch);
1262        barrier_tx.send(barrier).unwrap();
1263
1264        ready_chunks.next().await.unwrap(); // barrier
1265
1266        let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1267            &default_source_internal_table(0x2333),
1268            mem_state_store.clone(),
1269        )
1270        .await;
1271
1272        let new_epoch = EpochPair::new_test_epoch(epoch);
1273        source_state_handler.init_epoch(new_epoch).await.unwrap();
1274
1275        let committed_reader = source_state_handler
1276            .new_committed_reader(new_epoch)
1277            .await
1278            .unwrap();
1279        assert!(
1280            committed_reader
1281                .try_recover_from_state_store(&prev_assignment[0])
1282                .await
1283                .unwrap()
1284                .is_none()
1285        );
1286
1287        assert!(
1288            committed_reader
1289                .try_recover_from_state_store(&prev_assignment[1])
1290                .await
1291                .unwrap()
1292                .is_none()
1293        );
1294
1295        assert!(
1296            committed_reader
1297                .try_recover_from_state_store(&prev_assignment[2])
1298                .await
1299                .unwrap()
1300                .is_some()
1301        );
1302    }
1303}