risingwave_stream/executor/source/
source_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use either::Either;
20use itertools::Itertools;
21use prometheus::core::{AtomicU64, GenericCounter};
22use risingwave_common::array::ArrayRef;
23use risingwave_common::catalog::{ColumnId, TableId};
24use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedMetric};
25use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::util::epoch::{Epoch, EpochPair};
28use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
29use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
30use risingwave_connector::source::reader::reader::SourceReader;
31use risingwave_connector::source::{
32    ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData,
33    StreamChunkWithState, WaitCheckpointTask,
34};
35use risingwave_hummock_sdk::HummockReadEpoch;
36use risingwave_storage::store::TryWaitEpochOptions;
37use thiserror_ext::AsReport;
38use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
39use tokio::sync::{mpsc, oneshot};
40use tokio::time::Instant;
41
42use super::executor_core::StreamSourceCore;
43use super::{barrier_to_message_stream, get_split_offset_col_idx, prune_additional_cols};
44use crate::common::rate_limit::limited_chunk_size;
45use crate::executor::UpdateMutation;
46use crate::executor::prelude::*;
47use crate::executor::source::reader_stream::StreamReaderBuilder;
48use crate::executor::stream_reader::StreamReaderWithPause;
49
50/// A constant to multiply when calculating the maximum time to wait for a barrier. This is due to
51/// some latencies in network and cost in meta.
52pub const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;
53
54pub struct SourceExecutor<S: StateStore> {
55    actor_ctx: ActorContextRef,
56
57    /// Streaming source for external
58    stream_source_core: Option<StreamSourceCore<S>>,
59
60    /// Metrics for monitor.
61    metrics: Arc<StreamingMetrics>,
62
63    /// Receiver of barrier channel.
64    barrier_receiver: Option<UnboundedReceiver<Barrier>>,
65
66    /// System parameter reader to read barrier interval
67    system_params: SystemParamsReaderRef,
68
69    /// Rate limit in rows/s.
70    rate_limit_rps: Option<u32>,
71
72    is_shared_non_cdc: bool,
73}
74
75impl<S: StateStore> SourceExecutor<S> {
76    pub fn new(
77        actor_ctx: ActorContextRef,
78        stream_source_core: Option<StreamSourceCore<S>>,
79        metrics: Arc<StreamingMetrics>,
80        barrier_receiver: UnboundedReceiver<Barrier>,
81        system_params: SystemParamsReaderRef,
82        rate_limit_rps: Option<u32>,
83        is_shared_non_cdc: bool,
84    ) -> Self {
85        Self {
86            actor_ctx,
87            stream_source_core,
88            metrics,
89            barrier_receiver: Some(barrier_receiver),
90            system_params,
91            rate_limit_rps,
92            is_shared_non_cdc,
93        }
94    }
95
96    fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder {
97        StreamReaderBuilder {
98            source_desc,
99            rate_limit: self.rate_limit_rps,
100            source_id: self.stream_source_core.as_ref().unwrap().source_id,
101            source_name: self
102                .stream_source_core
103                .as_ref()
104                .unwrap()
105                .source_name
106                .clone(),
107            is_auto_schema_change_enable: self.is_auto_schema_change_enable(),
108            actor_ctx: self.actor_ctx.clone(),
109            reader_stream: None,
110        }
111    }
112
113    async fn spawn_wait_checkpoint_worker(
114        core: &StreamSourceCore<S>,
115        source_reader: SourceReader,
116    ) -> StreamExecutorResult<Option<WaitCheckpointTaskBuilder>> {
117        let Some(initial_task) = source_reader.create_wait_checkpoint_task().await? else {
118            return Ok(None);
119        };
120        let (wait_checkpoint_tx, wait_checkpoint_rx) = mpsc::unbounded_channel();
121        let wait_checkpoint_worker = WaitCheckpointWorker {
122            wait_checkpoint_rx,
123            state_store: core.split_state_store.state_table().state_store().clone(),
124            table_id: core.split_state_store.state_table().table_id().into(),
125        };
126        tokio::spawn(wait_checkpoint_worker.run());
127        Ok(Some(WaitCheckpointTaskBuilder {
128            wait_checkpoint_tx,
129            source_reader,
130            building_task: initial_task,
131        }))
132    }
133
134    /// build the source column ids and the source context which will be used to build the source stream
135    pub fn prepare_source_stream_build(
136        &self,
137        source_desc: &SourceDesc,
138    ) -> (Vec<ColumnId>, SourceContext) {
139        let column_ids = source_desc
140            .columns
141            .iter()
142            .map(|column_desc| column_desc.column_id)
143            .collect_vec();
144
145        let (schema_change_tx, mut schema_change_rx) =
146            mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
147        let schema_change_tx = if self.is_auto_schema_change_enable() {
148            let meta_client = self.actor_ctx.meta_client.clone();
149            // spawn a task to handle schema change event from source parser
150            let _join_handle = tokio::task::spawn(async move {
151                while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
152                    let table_ids = schema_change.table_ids();
153                    tracing::info!(
154                        target: "auto_schema_change",
155                        "recv a schema change event for tables: {:?}", table_ids);
156                    // TODO: retry on rpc error
157                    if let Some(ref meta_client) = meta_client {
158                        match meta_client
159                            .auto_schema_change(schema_change.to_protobuf())
160                            .await
161                        {
162                            Ok(_) => {
163                                tracing::info!(
164                                    target: "auto_schema_change",
165                                    "schema change success for tables: {:?}", table_ids);
166                                finish_tx.send(()).unwrap();
167                            }
168                            Err(e) => {
169                                tracing::error!(
170                                    target: "auto_schema_change",
171                                    error = ?e.as_report(), "schema change error");
172                                finish_tx.send(()).unwrap();
173                            }
174                        }
175                    }
176                }
177            });
178            Some(schema_change_tx)
179        } else {
180            info!("auto schema change is disabled in config");
181            None
182        };
183
184        let source_ctx = SourceContext::new(
185            self.actor_ctx.id,
186            self.stream_source_core.as_ref().unwrap().source_id,
187            self.actor_ctx.fragment_id,
188            self.stream_source_core
189                .as_ref()
190                .unwrap()
191                .source_name
192                .clone(),
193            source_desc.metrics.clone(),
194            SourceCtrlOpts {
195                chunk_size: limited_chunk_size(self.rate_limit_rps),
196                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
197            },
198            source_desc.source.config.clone(),
199            schema_change_tx,
200        );
201
202        (column_ids, source_ctx)
203    }
204
205    fn is_auto_schema_change_enable(&self) -> bool {
206        self.actor_ctx
207            .streaming_config
208            .developer
209            .enable_auto_schema_change
210    }
211
212    /// `source_id | source_name | actor_id | fragment_id`
213    #[inline]
214    fn get_metric_labels(&self) -> [String; 4] {
215        [
216            self.stream_source_core
217                .as_ref()
218                .unwrap()
219                .source_id
220                .to_string(),
221            self.stream_source_core
222                .as_ref()
223                .unwrap()
224                .source_name
225                .clone(),
226            self.actor_ctx.id.to_string(),
227            self.actor_ctx.fragment_id.to_string(),
228        ]
229    }
230
231    /// - `should_trim_state`: whether to trim state for dropped splits.
232    ///
233    ///   For scaling, the connector splits can be migrated to other actors, but
234    ///   won't be added or removed. Actors should not trim states for splits that
235    ///   are moved to other actors.
236    ///
237    ///   For source split change, split will not be migrated and we can trim states
238    ///   for deleted splits.
239    async fn apply_split_change_after_yield_barrier<const BIASED: bool>(
240        &mut self,
241        barrier_epoch: EpochPair,
242        source_desc: &SourceDesc,
243        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
244        apply_mutation: ApplyMutationAfterBarrier<'_>,
245    ) -> StreamExecutorResult<()> {
246        {
247            let mut should_rebuild_stream = false;
248            match apply_mutation {
249                ApplyMutationAfterBarrier::SplitChange {
250                    target_splits,
251                    should_trim_state,
252                    split_change_count,
253                } => {
254                    split_change_count.inc();
255                    if self
256                        .update_state_if_changed(barrier_epoch, target_splits, should_trim_state)
257                        .await?
258                    {
259                        should_rebuild_stream = true;
260                    }
261                }
262                ApplyMutationAfterBarrier::ConnectorPropsChange => {
263                    should_rebuild_stream = true;
264                }
265            }
266
267            if should_rebuild_stream {
268                self.rebuild_stream_reader(source_desc, stream)?;
269            }
270        }
271
272        Ok(())
273    }
274
275    /// Returns `true` if split changed. Otherwise `false`.
276    async fn update_state_if_changed(
277        &mut self,
278        barrier_epoch: EpochPair,
279        target_splits: Vec<SplitImpl>,
280        should_trim_state: bool,
281    ) -> StreamExecutorResult<bool> {
282        let core = self.stream_source_core.as_mut().unwrap();
283
284        let target_splits: HashMap<_, _> = target_splits
285            .into_iter()
286            .map(|split| (split.id(), split))
287            .collect();
288
289        let mut target_state: HashMap<SplitId, SplitImpl> =
290            HashMap::with_capacity(target_splits.len());
291
292        let mut split_changed = false;
293
294        let committed_reader = core
295            .split_state_store
296            .new_committed_reader(barrier_epoch)
297            .await?;
298
299        // Checks added splits
300        for (split_id, split) in target_splits {
301            if let Some(s) = core.latest_split_info.get(&split_id) {
302                // For existing splits, we should use the latest offset from the cache.
303                // `target_splits` is from meta and contains the initial offset.
304                target_state.insert(split_id, s.clone());
305            } else {
306                split_changed = true;
307                // write new assigned split to state cache. snapshot is base on cache.
308
309                let initial_state = if let Some(recover_state) = committed_reader
310                    .try_recover_from_state_store(&split)
311                    .await?
312                {
313                    recover_state
314                } else {
315                    split
316                };
317
318                core.updated_splits_in_epoch
319                    .entry(split_id.clone())
320                    .or_insert_with(|| initial_state.clone());
321
322                target_state.insert(split_id, initial_state);
323            }
324        }
325
326        // Checks dropped splits
327        for existing_split_id in core.latest_split_info.keys() {
328            if !target_state.contains_key(existing_split_id) {
329                tracing::info!("split dropping detected: {}", existing_split_id);
330                split_changed = true;
331            }
332        }
333
334        if split_changed {
335            tracing::info!(
336                actor_id = self.actor_ctx.id,
337                state = ?target_state,
338                "apply split change"
339            );
340
341            core.updated_splits_in_epoch
342                .retain(|split_id, _| target_state.contains_key(split_id));
343
344            let dropped_splits = core
345                .latest_split_info
346                .extract_if(|split_id, _| !target_state.contains_key(split_id))
347                .map(|(_, split)| split)
348                .collect_vec();
349
350            if should_trim_state && !dropped_splits.is_empty() {
351                // trim dropped splits' state
352                core.split_state_store.trim_state(&dropped_splits).await?;
353            }
354
355            core.latest_split_info = target_state;
356        }
357
358        Ok(split_changed)
359    }
360
361    /// Rebuild stream if there is a err in stream
362    fn rebuild_stream_reader_from_error<const BIASED: bool>(
363        &mut self,
364        source_desc: &SourceDesc,
365        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
366        e: StreamExecutorError,
367    ) -> StreamExecutorResult<()> {
368        let core = self.stream_source_core.as_mut().unwrap();
369        tracing::warn!(
370            error = ?e.as_report(),
371            actor_id = self.actor_ctx.id,
372            source_id = %core.source_id,
373            "stream source reader error",
374        );
375        GLOBAL_ERROR_METRICS.user_source_error.report([
376            e.variant_name().to_owned(),
377            core.source_id.to_string(),
378            core.source_name.to_owned(),
379            self.actor_ctx.fragment_id.to_string(),
380        ]);
381
382        self.rebuild_stream_reader(source_desc, stream)
383    }
384
385    fn rebuild_stream_reader<const BIASED: bool>(
386        &mut self,
387        source_desc: &SourceDesc,
388        stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
389    ) -> StreamExecutorResult<()> {
390        let core = self.stream_source_core.as_mut().unwrap();
391        let target_state: Vec<SplitImpl> = core.latest_split_info.values().cloned().collect();
392
393        tracing::info!(
394            "actor {:?} apply source split change to {:?}",
395            self.actor_ctx.id,
396            target_state
397        );
398
399        // Replace the source reader with a new one of the new state.
400        let reader_stream_builder = self.stream_reader_builder(source_desc.clone());
401        let reader_stream =
402            reader_stream_builder.into_retry_stream(Some(target_state.clone()), false);
403
404        stream.replace_data_stream(reader_stream);
405
406        Ok(())
407    }
408
409    async fn persist_state_and_clear_cache(
410        &mut self,
411        epoch: EpochPair,
412    ) -> StreamExecutorResult<HashMap<SplitId, SplitImpl>> {
413        let core = self.stream_source_core.as_mut().unwrap();
414
415        let cache = core
416            .updated_splits_in_epoch
417            .values()
418            .map(|split_impl| split_impl.to_owned())
419            .collect_vec();
420
421        if !cache.is_empty() {
422            tracing::debug!(state = ?cache, "take snapshot");
423            core.split_state_store.set_states(cache).await?;
424        }
425
426        // commit anyway, even if no message saved
427        core.split_state_store.commit(epoch).await?;
428
429        let updated_splits = core.updated_splits_in_epoch.clone();
430
431        core.updated_splits_in_epoch.clear();
432
433        Ok(updated_splits)
434    }
435
436    /// try mem table spill
437    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
438        let core = self.stream_source_core.as_mut().unwrap();
439        core.split_state_store.try_flush().await?;
440
441        Ok(())
442    }
443
444    /// A source executor with a stream source receives:
445    /// 1. Barrier messages
446    /// 2. Data from external source
447    /// and acts accordingly.
448    #[try_stream(ok = Message, error = StreamExecutorError)]
449    async fn execute_with_stream_source(mut self) {
450        let mut barrier_receiver = self.barrier_receiver.take().unwrap();
451        let first_barrier = barrier_receiver
452            .recv()
453            .instrument_await("source_recv_first_barrier")
454            .await
455            .ok_or_else(|| {
456                anyhow!(
457                    "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
458                    self.actor_ctx.id,
459                    self.stream_source_core.as_ref().unwrap().source_id
460                )
461            })?;
462        let first_epoch = first_barrier.epoch;
463        let mut boot_state =
464            if let Some(splits) = first_barrier.initial_split_assignment(self.actor_ctx.id) {
465                tracing::debug!(?splits, "boot with splits");
466                splits.to_vec()
467            } else {
468                Vec::default()
469            };
470        let is_pause_on_startup = first_barrier.is_pause_on_startup();
471        let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id);
472
473        yield Message::Barrier(first_barrier);
474
475        let mut core = self.stream_source_core.unwrap();
476        let source_id = core.source_id;
477
478        // Build source description from the builder.
479        let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap();
480        let mut source_desc = source_desc_builder
481            .build()
482            .map_err(StreamExecutorError::connector_error)?;
483
484        let mut wait_checkpoint_task_builder =
485            Self::spawn_wait_checkpoint_worker(&core, source_desc.source.clone()).await?;
486
487        let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns)
488        else {
489            unreachable!("Partition and offset columns must be set.");
490        };
491
492        core.split_state_store.init_epoch(first_epoch).await?;
493        {
494            let committed_reader = core
495                .split_state_store
496                .new_committed_reader(first_epoch)
497                .await?;
498            for ele in &mut boot_state {
499                if let Some(recover_state) =
500                    committed_reader.try_recover_from_state_store(ele).await?
501                {
502                    *ele = recover_state;
503                    // if state store is non-empty, we consider it's initialized.
504                    is_uninitialized = false;
505                } else {
506                    // This is a new split, not in state table.
507                    // make sure it is written to state table later.
508                    // Then even it receives no messages, we can observe it in state table.
509                    core.updated_splits_in_epoch.insert(ele.id(), ele.clone());
510                }
511            }
512        }
513
514        // init in-memory split states with persisted state if any
515        core.init_split_state(boot_state.clone());
516
517        // Return the ownership of `stream_source_core` to the source executor.
518        self.stream_source_core = Some(core);
519
520        let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
521        tracing::debug!(state = ?recover_state, "start with state");
522
523        let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
524        let mut reader_stream_builder = self.stream_reader_builder(source_desc.clone());
525        let mut latest_splits = None;
526        // Build the source stream reader.
527        if is_uninitialized {
528            let create_split_reader_result = reader_stream_builder
529                .fetch_latest_splits(recover_state.clone(), self.is_shared_non_cdc)
530                .await?;
531            latest_splits = create_split_reader_result.latest_splits;
532        }
533
534        if let Some(latest_splits) = latest_splits {
535            // make sure it is written to state table later.
536            // Then even it receives no messages, we can observe it in state table.
537            self.stream_source_core
538                .as_mut()
539                .unwrap()
540                .updated_splits_in_epoch
541                .extend(latest_splits.into_iter().map(|s| (s.id(), s)));
542        }
543        // Merge the chunks from source and the barriers into a single stream. We prioritize
544        // barriers over source data chunks here.
545        let mut stream = StreamReaderWithPause::<true, StreamChunkWithState>::new(
546            barrier_stream,
547            reader_stream_builder
548                .into_retry_stream(recover_state, is_uninitialized && self.is_shared_non_cdc),
549        );
550        let mut command_paused = false;
551
552        // - If the first barrier requires us to pause on startup, pause the stream.
553        if is_pause_on_startup {
554            tracing::info!("source paused on startup");
555            stream.pause_stream();
556            command_paused = true;
557        }
558
559        // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms`
560        // milliseconds, considering some other latencies like network and cost in Meta.
561        let mut max_wait_barrier_time_ms =
562            self.system_params.load().barrier_interval_ms() as u128 * WAIT_BARRIER_MULTIPLE_TIMES;
563        let mut last_barrier_time = Instant::now();
564        let mut self_paused = false;
565
566        let source_output_row_count = self
567            .metrics
568            .source_output_row_count
569            .with_guarded_label_values(&self.get_metric_labels());
570
571        let source_split_change_count = self
572            .metrics
573            .source_split_change_count
574            .with_guarded_label_values(&self.get_metric_labels());
575
576        while let Some(msg) = stream.next().await {
577            let Ok(msg) = msg else {
578                tokio::time::sleep(Duration::from_millis(1000)).await;
579                self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err())?;
580                continue;
581            };
582
583            match msg {
584                // This branch will be preferred.
585                Either::Left(Message::Barrier(barrier)) => {
586                    last_barrier_time = Instant::now();
587
588                    if self_paused {
589                        self_paused = false;
590                        // command_paused has a higher priority.
591                        if !command_paused {
592                            stream.resume_stream();
593                        }
594                    }
595
596                    let epoch = barrier.epoch;
597
598                    let mut split_change = None;
599
600                    if let Some(mutation) = barrier.mutation.as_deref() {
601                        match mutation {
602                            Mutation::Pause => {
603                                command_paused = true;
604                                stream.pause_stream()
605                            }
606                            Mutation::Resume => {
607                                command_paused = false;
608                                stream.resume_stream()
609                            }
610                            Mutation::SourceChangeSplit(actor_splits) => {
611                                tracing::info!(
612                                    actor_id = self.actor_ctx.id,
613                                    actor_splits = ?actor_splits,
614                                    "source change split received"
615                                );
616
617                                split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
618                                    |target_splits| {
619                                        (
620                                            &source_desc,
621                                            &mut stream,
622                                            ApplyMutationAfterBarrier::SplitChange {
623                                                target_splits,
624                                                should_trim_state: true,
625                                                split_change_count: &source_split_change_count,
626                                            },
627                                        )
628                                    },
629                                );
630                            }
631
632                            Mutation::ConnectorPropsChange(maybe_mutation) => {
633                                if let Some(new_props) = maybe_mutation.get(&source_id.table_id()) {
634                                    // rebuild the stream reader with new props
635                                    tracing::info!(
636                                        "updating source properties from {:?} to {:?}",
637                                        source_desc.source.config,
638                                        new_props
639                                    );
640                                    source_desc.update_reader(new_props.clone())?;
641                                    // suppose the connector props change will not involve state change
642                                    split_change = Some((
643                                        &source_desc,
644                                        &mut stream,
645                                        ApplyMutationAfterBarrier::ConnectorPropsChange,
646                                    ));
647                                }
648                            }
649
650                            Mutation::Update(UpdateMutation { actor_splits, .. }) => {
651                                split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
652                                    |target_splits| {
653                                        (
654                                            &source_desc,
655                                            &mut stream,
656                                            ApplyMutationAfterBarrier::SplitChange {
657                                                target_splits,
658                                                should_trim_state: false,
659                                                split_change_count: &source_split_change_count,
660                                            },
661                                        )
662                                    },
663                                );
664                            }
665                            Mutation::Throttle(actor_to_apply) => {
666                                if let Some(new_rate_limit) = actor_to_apply.get(&self.actor_ctx.id)
667                                    && *new_rate_limit != self.rate_limit_rps
668                                {
669                                    tracing::info!(
670                                        "updating rate limit from {:?} to {:?}",
671                                        self.rate_limit_rps,
672                                        *new_rate_limit
673                                    );
674                                    self.rate_limit_rps = *new_rate_limit;
675                                    // recreate from latest_split_info
676                                    self.rebuild_stream_reader(&source_desc, &mut stream)?;
677                                }
678                            }
679                            _ => {}
680                        }
681                    }
682
683                    let updated_splits = self.persist_state_and_clear_cache(epoch).await?;
684
685                    // when handle a checkpoint barrier, spawn a task to wait for epoch commit notification
686                    if barrier.kind.is_checkpoint()
687                        && let Some(task_builder) = &mut wait_checkpoint_task_builder
688                    {
689                        task_builder.update_task_on_checkpoint(updated_splits);
690
691                        tracing::debug!("epoch to wait {:?}", epoch);
692                        task_builder.send(Epoch(epoch.prev)).await?
693                    }
694
695                    let barrier_epoch = barrier.epoch;
696                    yield Message::Barrier(barrier);
697
698                    if let Some((source_desc, stream, to_apply_mutation)) = split_change {
699                        self.apply_split_change_after_yield_barrier(
700                            barrier_epoch,
701                            source_desc,
702                            stream,
703                            to_apply_mutation,
704                        )
705                        .await?;
706                    }
707                }
708                Either::Left(_) => {
709                    // For the source executor, the message we receive from this arm
710                    // should always be barrier message.
711                    unreachable!();
712                }
713
714                Either::Right((chunk, latest_state)) => {
715                    if let Some(task_builder) = &mut wait_checkpoint_task_builder {
716                        let offset_col = chunk.column_at(offset_idx);
717                        task_builder.update_task_on_chunk(offset_col.clone());
718                    }
719                    if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
720                        // Exceeds the max wait barrier time, the source will be paused.
721                        // Currently we can guarantee the
722                        // source is not paused since it received stream
723                        // chunks.
724                        self_paused = true;
725                        tracing::warn!(
726                            "source paused, wait barrier for {:?}",
727                            last_barrier_time.elapsed()
728                        );
729                        stream.pause_stream();
730
731                        // Only update `max_wait_barrier_time_ms` to capture
732                        // `barrier_interval_ms`
733                        // changes here to avoid frequently accessing the shared
734                        // `system_params`.
735                        max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
736                            as u128
737                            * WAIT_BARRIER_MULTIPLE_TIMES;
738                    }
739
740                    latest_state.iter().for_each(|(split_id, new_split_impl)| {
741                        if let Some(split_impl) = self
742                            .stream_source_core
743                            .as_mut()
744                            .unwrap()
745                            .latest_split_info
746                            .get_mut(split_id)
747                        {
748                            *split_impl = new_split_impl.clone();
749                        }
750                    });
751
752                    self.stream_source_core
753                        .as_mut()
754                        .unwrap()
755                        .updated_splits_in_epoch
756                        .extend(latest_state);
757
758                    source_output_row_count.inc_by(chunk.cardinality() as u64);
759                    let chunk =
760                        prune_additional_cols(&chunk, split_idx, offset_idx, &source_desc.columns);
761                    yield Message::Chunk(chunk);
762                    self.try_flush_data().await?;
763                }
764            }
765        }
766
767        // The source executor should only be stopped by the actor when finding a `Stop` mutation.
768        tracing::error!(
769            actor_id = self.actor_ctx.id,
770            "source executor exited unexpectedly"
771        )
772    }
773
774    /// A source executor without stream source only receives barrier messages and sends them to
775    /// the downstream executor.
776    #[try_stream(ok = Message, error = StreamExecutorError)]
777    async fn execute_without_stream_source(mut self) {
778        let mut barrier_receiver = self.barrier_receiver.take().unwrap();
779        let barrier = barrier_receiver
780            .recv()
781            .instrument_await("source_recv_first_barrier")
782            .await
783            .ok_or_else(|| {
784                anyhow!(
785                    "failed to receive the first barrier, actor_id: {:?} with no stream source",
786                    self.actor_ctx.id
787                )
788            })?;
789        yield Message::Barrier(barrier);
790
791        while let Some(barrier) = barrier_receiver.recv().await {
792            yield Message::Barrier(barrier);
793        }
794    }
795}
796
797#[derive(Debug, Clone)]
798enum ApplyMutationAfterBarrier<'a> {
799    SplitChange {
800        target_splits: Vec<SplitImpl>,
801        should_trim_state: bool,
802        split_change_count: &'a LabelGuardedMetric<GenericCounter<AtomicU64>>,
803    },
804    ConnectorPropsChange,
805}
806
807impl<S: StateStore> Execute for SourceExecutor<S> {
808    fn execute(self: Box<Self>) -> BoxedMessageStream {
809        if self.stream_source_core.is_some() {
810            self.execute_with_stream_source().boxed()
811        } else {
812            self.execute_without_stream_source().boxed()
813        }
814    }
815}
816
817impl<S: StateStore> Debug for SourceExecutor<S> {
818    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
819        if let Some(core) = &self.stream_source_core {
820            f.debug_struct("SourceExecutor")
821                .field("source_id", &core.source_id)
822                .field("column_ids", &core.column_ids)
823                .finish()
824        } else {
825            f.debug_struct("SourceExecutor").finish()
826        }
827    }
828}
829
830struct WaitCheckpointTaskBuilder {
831    wait_checkpoint_tx: UnboundedSender<(Epoch, WaitCheckpointTask)>,
832    source_reader: SourceReader,
833    building_task: WaitCheckpointTask,
834}
835
836impl WaitCheckpointTaskBuilder {
837    fn update_task_on_chunk(&mut self, offset_col: ArrayRef) {
838        match &mut self.building_task {
839            WaitCheckpointTask::AckPubsubMessage(_, arrays) => {
840                arrays.push(offset_col);
841            }
842            WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => {
843                arrays.push(offset_col);
844            }
845            WaitCheckpointTask::CommitCdcOffset(_) => {}
846        }
847    }
848
849    fn update_task_on_checkpoint(&mut self, updated_splits: HashMap<SplitId, SplitImpl>) {
850        #[expect(clippy::single_match)]
851        match &mut self.building_task {
852            WaitCheckpointTask::CommitCdcOffset(offsets) => {
853                if !updated_splits.is_empty() {
854                    // cdc source only has one split
855                    assert_eq!(1, updated_splits.len());
856                    for (split_id, split_impl) in updated_splits {
857                        if split_impl.is_cdc_split() {
858                            *offsets = Some((split_id, split_impl.get_cdc_split_offset()));
859                        } else {
860                            unreachable!()
861                        }
862                    }
863                }
864            }
865            _ => {}
866        }
867    }
868
869    /// Send and reset the building task to a new one.
870    async fn send(&mut self, epoch: Epoch) -> Result<(), anyhow::Error> {
871        let new_task = self
872            .source_reader
873            .create_wait_checkpoint_task()
874            .await?
875            .expect("wait checkpoint task should be created");
876        self.wait_checkpoint_tx
877            .send((epoch, std::mem::replace(&mut self.building_task, new_task)))
878            .expect("wait_checkpoint_tx send should succeed");
879        Ok(())
880    }
881}
882
883/// A worker used to do some work after each checkpoint epoch is committed.
884///
885/// # Usage Cases
886///
887/// Typically there are 2 issues related with ack on checkpoint:
888///
889/// 1. Correctness (at-least-once), or don't let upstream clean uncommitted data.
890///    For message queueing semantics (delete after ack), we should ack to avoid redelivery,
891///    and only ack after checkpoint to avoid data loss.
892///
893/// 2. Allow upstream to clean data after commit.
894///
895/// See also <https://github.com/risingwavelabs/risingwave/issues/16736#issuecomment-2109379790>
896///
897/// ## CDC
898///
899/// Commit last consumed offset to upstream DB, so that old data can be discarded.
900///
901/// ## Google Pub/Sub
902///
903/// Due to queueing semantics.
904/// Although Pub/Sub supports `retain_acked_messages` and `seek` functionality,
905/// it's quite limited unlike Kafka.
906///
907/// See also <https://cloud.google.com/pubsub/docs/subscribe-best-practices#process-messages>
908struct WaitCheckpointWorker<S: StateStore> {
909    wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>,
910    state_store: S,
911    table_id: TableId,
912}
913
914impl<S: StateStore> WaitCheckpointWorker<S> {
915    pub async fn run(mut self) {
916        tracing::debug!("wait epoch worker start success");
917        loop {
918            // poll the rx and wait for the epoch commit
919            match self.wait_checkpoint_rx.recv().await {
920                Some((epoch, task)) => {
921                    tracing::debug!("start to wait epoch {}", epoch.0);
922                    let ret = self
923                        .state_store
924                        .try_wait_epoch(
925                            HummockReadEpoch::Committed(epoch.0),
926                            TryWaitEpochOptions {
927                                table_id: self.table_id,
928                            },
929                        )
930                        .await;
931
932                    match ret {
933                        Ok(()) => {
934                            tracing::debug!(epoch = epoch.0, "wait epoch success");
935                            task.run().await;
936                        }
937                        Err(e) => {
938                            tracing::error!(
939                            error = %e.as_report(),
940                            "wait epoch {} failed", epoch.0
941                            );
942                        }
943                    }
944                }
945                None => {
946                    tracing::error!("wait epoch rx closed");
947                    break;
948                }
949            }
950        }
951    }
952}
953
954#[cfg(test)]
955mod tests {
956    use std::collections::HashSet;
957
958    use maplit::{btreemap, convert_args, hashmap};
959    use risingwave_common::catalog::{ColumnId, Field, TableId};
960    use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
961    use risingwave_common::test_prelude::StreamChunkTestExt;
962    use risingwave_common::util::epoch::{EpochExt, test_epoch};
963    use risingwave_connector::source::datagen::DatagenSplit;
964    use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
965    use risingwave_pb::catalog::StreamSourceInfo;
966    use risingwave_pb::plan_common::PbRowFormatType;
967    use risingwave_storage::memory::MemoryStateStore;
968    use tokio::sync::mpsc::unbounded_channel;
969    use tracing_test::traced_test;
970
971    use super::*;
972    use crate::executor::AddMutation;
973    use crate::executor::source::{SourceStateTableHandler, default_source_internal_table};
974
975    const MOCK_SOURCE_NAME: &str = "mock_source";
976
977    #[tokio::test]
978    async fn test_source_executor() {
979        let table_id = TableId::default();
980        let schema = Schema {
981            fields: vec![Field::with_name(DataType::Int32, "sequence_int")],
982        };
983        let row_id_index = None;
984        let source_info = StreamSourceInfo {
985            row_format: PbRowFormatType::Native as i32,
986            ..Default::default()
987        };
988        let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
989        let column_ids = vec![0].into_iter().map(ColumnId::from).collect();
990
991        // This datagen will generate 3 rows at one time.
992        let properties = convert_args!(btreemap!(
993            "connector" => "datagen",
994            "datagen.rows.per.second" => "3",
995            "fields.sequence_int.kind" => "sequence",
996            "fields.sequence_int.start" => "11",
997            "fields.sequence_int.end" => "11111",
998        ));
999        let source_desc_builder =
1000            create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1001        let split_state_store = SourceStateTableHandler::from_table_catalog(
1002            &default_source_internal_table(0x2333),
1003            MemoryStateStore::new(),
1004        )
1005        .await;
1006        let core = StreamSourceCore::<MemoryStateStore> {
1007            source_id: table_id,
1008            column_ids,
1009            source_desc_builder: Some(source_desc_builder),
1010            latest_split_info: HashMap::new(),
1011            split_state_store,
1012            updated_splits_in_epoch: HashMap::new(),
1013            source_name: MOCK_SOURCE_NAME.to_owned(),
1014        };
1015
1016        let system_params_manager = LocalSystemParamsManager::for_test();
1017
1018        let executor = SourceExecutor::new(
1019            ActorContext::for_test(0),
1020            Some(core),
1021            Arc::new(StreamingMetrics::unused()),
1022            barrier_rx,
1023            system_params_manager.get_params(),
1024            None,
1025            false,
1026        );
1027        let mut executor = executor.boxed().execute();
1028
1029        let init_barrier =
1030            Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
1031                adds: HashMap::new(),
1032                added_actors: HashSet::new(),
1033                splits: hashmap! {
1034                    ActorId::default() => vec![
1035                        SplitImpl::Datagen(DatagenSplit {
1036                            split_index: 0,
1037                            split_num: 1,
1038                            start_offset: None,
1039                        }),
1040                    ],
1041                },
1042                pause: false,
1043                subscriptions_to_add: vec![],
1044                backfill_nodes_to_pause: Default::default(),
1045            }));
1046        barrier_tx.send(init_barrier).unwrap();
1047
1048        // Consume barrier.
1049        executor.next().await.unwrap().unwrap();
1050
1051        // Consume data chunk.
1052        let msg = executor.next().await.unwrap().unwrap();
1053
1054        // Row id will not be filled here.
1055        assert_eq!(
1056            msg.into_chunk().unwrap(),
1057            StreamChunk::from_pretty(
1058                " i
1059                + 11
1060                + 12
1061                + 13"
1062            )
1063        );
1064    }
1065
1066    #[traced_test]
1067    #[tokio::test]
1068    async fn test_split_change_mutation() {
1069        let table_id = TableId::default();
1070        let schema = Schema {
1071            fields: vec![Field::with_name(DataType::Int32, "v1")],
1072        };
1073        let row_id_index = None;
1074        let source_info = StreamSourceInfo {
1075            row_format: PbRowFormatType::Native as i32,
1076            ..Default::default()
1077        };
1078        let properties = convert_args!(btreemap!(
1079            "connector" => "datagen",
1080            "fields.v1.kind" => "sequence",
1081            "fields.v1.start" => "11",
1082            "fields.v1.end" => "11111",
1083        ));
1084
1085        let source_desc_builder =
1086            create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1087        let mem_state_store = MemoryStateStore::new();
1088
1089        let column_ids = vec![ColumnId::from(0)];
1090        let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1091        let split_state_store = SourceStateTableHandler::from_table_catalog(
1092            &default_source_internal_table(0x2333),
1093            mem_state_store.clone(),
1094        )
1095        .await;
1096
1097        let core = StreamSourceCore::<MemoryStateStore> {
1098            source_id: table_id,
1099            column_ids: column_ids.clone(),
1100            source_desc_builder: Some(source_desc_builder),
1101            latest_split_info: HashMap::new(),
1102            split_state_store,
1103            updated_splits_in_epoch: HashMap::new(),
1104            source_name: MOCK_SOURCE_NAME.to_owned(),
1105        };
1106
1107        let system_params_manager = LocalSystemParamsManager::for_test();
1108
1109        let executor = SourceExecutor::new(
1110            ActorContext::for_test(0),
1111            Some(core),
1112            Arc::new(StreamingMetrics::unused()),
1113            barrier_rx,
1114            system_params_manager.get_params(),
1115            None,
1116            false,
1117        );
1118        let mut handler = executor.boxed().execute();
1119
1120        let mut epoch = test_epoch(1);
1121        let init_barrier =
1122            Barrier::new_test_barrier(epoch).with_mutation(Mutation::Add(AddMutation {
1123                adds: HashMap::new(),
1124                added_actors: HashSet::new(),
1125                splits: hashmap! {
1126                    ActorId::default() => vec![
1127                        SplitImpl::Datagen(DatagenSplit {
1128                            split_index: 0,
1129                            split_num: 3,
1130                            start_offset: None,
1131                        }),
1132                    ],
1133                },
1134                pause: false,
1135                subscriptions_to_add: vec![],
1136                backfill_nodes_to_pause: Default::default(),
1137            }));
1138        barrier_tx.send(init_barrier).unwrap();
1139
1140        // Consume barrier.
1141        handler
1142            .next()
1143            .await
1144            .unwrap()
1145            .unwrap()
1146            .into_barrier()
1147            .unwrap();
1148
1149        let mut ready_chunks = handler.ready_chunks(10);
1150
1151        let _ = ready_chunks.next().await.unwrap();
1152
1153        let new_assignment = vec![
1154            SplitImpl::Datagen(DatagenSplit {
1155                split_index: 0,
1156                split_num: 3,
1157                start_offset: None,
1158            }),
1159            SplitImpl::Datagen(DatagenSplit {
1160                split_index: 1,
1161                split_num: 3,
1162                start_offset: None,
1163            }),
1164            SplitImpl::Datagen(DatagenSplit {
1165                split_index: 2,
1166                split_num: 3,
1167                start_offset: None,
1168            }),
1169        ];
1170
1171        epoch.inc_epoch();
1172        let change_split_mutation =
1173            Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1174                ActorId::default() => new_assignment.clone()
1175            }));
1176
1177        barrier_tx.send(change_split_mutation).unwrap();
1178
1179        let _ = ready_chunks.next().await.unwrap(); // barrier
1180
1181        epoch.inc_epoch();
1182        let barrier = Barrier::new_test_barrier(epoch);
1183        barrier_tx.send(barrier).unwrap();
1184
1185        ready_chunks.next().await.unwrap(); // barrier
1186
1187        let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1188            &default_source_internal_table(0x2333),
1189            mem_state_store.clone(),
1190        )
1191        .await;
1192
1193        // there must exist state for new add partition
1194        source_state_handler
1195            .init_epoch(EpochPair::new_test_epoch(epoch))
1196            .await
1197            .unwrap();
1198        source_state_handler
1199            .get(&new_assignment[1].id())
1200            .await
1201            .unwrap()
1202            .unwrap();
1203
1204        tokio::time::sleep(Duration::from_millis(100)).await;
1205
1206        let _ = ready_chunks.next().await.unwrap();
1207
1208        epoch.inc_epoch();
1209        let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Pause);
1210        barrier_tx.send(barrier).unwrap();
1211
1212        epoch.inc_epoch();
1213        let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Resume);
1214        barrier_tx.send(barrier).unwrap();
1215
1216        // receive all
1217        ready_chunks.next().await.unwrap();
1218
1219        let prev_assignment = new_assignment;
1220        let new_assignment = vec![prev_assignment[2].clone()];
1221
1222        epoch.inc_epoch();
1223        let drop_split_mutation =
1224            Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1225                ActorId::default() => new_assignment.clone()
1226            }));
1227
1228        barrier_tx.send(drop_split_mutation).unwrap();
1229
1230        ready_chunks.next().await.unwrap(); // barrier
1231
1232        epoch.inc_epoch();
1233        let barrier = Barrier::new_test_barrier(epoch);
1234        barrier_tx.send(barrier).unwrap();
1235
1236        ready_chunks.next().await.unwrap(); // barrier
1237
1238        let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1239            &default_source_internal_table(0x2333),
1240            mem_state_store.clone(),
1241        )
1242        .await;
1243
1244        let new_epoch = EpochPair::new_test_epoch(epoch);
1245        source_state_handler.init_epoch(new_epoch).await.unwrap();
1246
1247        let committed_reader = source_state_handler
1248            .new_committed_reader(new_epoch)
1249            .await
1250            .unwrap();
1251        assert!(
1252            committed_reader
1253                .try_recover_from_state_store(&prev_assignment[0])
1254                .await
1255                .unwrap()
1256                .is_none()
1257        );
1258
1259        assert!(
1260            committed_reader
1261                .try_recover_from_state_store(&prev_assignment[1])
1262                .await
1263                .unwrap()
1264                .is_none()
1265        );
1266
1267        assert!(
1268            committed_reader
1269                .try_recover_from_state_store(&prev_assignment[2])
1270                .await
1271                .unwrap()
1272                .is_some()
1273        );
1274    }
1275}