1use std::collections::HashMap;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use either::Either;
20use itertools::Itertools;
21use prometheus::core::{AtomicU64, GenericCounter};
22use risingwave_common::array::ArrayRef;
23use risingwave_common::catalog::{ColumnId, TableId};
24use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedMetric};
25use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::util::epoch::{Epoch, EpochPair};
28use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
29use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
30use risingwave_connector::source::reader::reader::SourceReader;
31use risingwave_connector::source::{
32 ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData,
33 StreamChunkWithState, WaitCheckpointTask,
34};
35use risingwave_hummock_sdk::HummockReadEpoch;
36use risingwave_storage::store::TryWaitEpochOptions;
37use serde_json;
38use thiserror_ext::AsReport;
39use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
40use tokio::sync::{mpsc, oneshot};
41use tokio::time::Instant;
42
43use super::executor_core::StreamSourceCore;
44use super::{barrier_to_message_stream, get_split_offset_col_idx, prune_additional_cols};
45use crate::common::rate_limit::limited_chunk_size;
46use crate::executor::UpdateMutation;
47use crate::executor::prelude::*;
48use crate::executor::source::reader_stream::StreamReaderBuilder;
49use crate::executor::stream_reader::StreamReaderWithPause;
50use crate::task::LocalBarrierManager;
51
52pub const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;
55
56fn extract_split_offset(split: &SplitImpl) -> Option<u64> {
61 match split {
62 SplitImpl::PostgresCdc(pg_split) => {
63 let offset_str = pg_split.start_offset().as_ref()?;
64 extract_pg_cdc_lsn_from_offset(offset_str)
65 }
66 _ => None,
67 }
68}
69
70fn extract_pg_cdc_lsn_from_offset(offset_str: &str) -> Option<u64> {
73 let offset = serde_json::from_str::<serde_json::Value>(offset_str).ok()?;
74 let source_offset = offset.get("sourceOffset")?;
75 let lsn = source_offset.get("lsn")?;
76 lsn.as_u64()
77}
78
79pub struct SourceExecutor<S: StateStore> {
80 actor_ctx: ActorContextRef,
81
82 stream_source_core: StreamSourceCore<S>,
84
85 metrics: Arc<StreamingMetrics>,
87
88 barrier_receiver: Option<UnboundedReceiver<Barrier>>,
90
91 system_params: SystemParamsReaderRef,
93
94 rate_limit_rps: Option<u32>,
96
97 is_shared_non_cdc: bool,
98
99 _barrier_manager: LocalBarrierManager,
101}
102
103impl<S: StateStore> SourceExecutor<S> {
104 #[expect(clippy::too_many_arguments)]
105 pub fn new(
106 actor_ctx: ActorContextRef,
107 stream_source_core: StreamSourceCore<S>,
108 metrics: Arc<StreamingMetrics>,
109 barrier_receiver: UnboundedReceiver<Barrier>,
110 system_params: SystemParamsReaderRef,
111 rate_limit_rps: Option<u32>,
112 is_shared_non_cdc: bool,
113 barrier_manager: LocalBarrierManager,
114 ) -> Self {
115 Self {
116 actor_ctx,
117 stream_source_core,
118 metrics,
119 barrier_receiver: Some(barrier_receiver),
120 system_params,
121 rate_limit_rps,
122 is_shared_non_cdc,
123 _barrier_manager: barrier_manager,
124 }
125 }
126
127 fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder {
128 StreamReaderBuilder {
129 source_desc,
130 rate_limit: self.rate_limit_rps,
131 source_id: self.stream_source_core.source_id,
132 source_name: self.stream_source_core.source_name.clone(),
133 is_auto_schema_change_enable: self.is_auto_schema_change_enable(),
134 actor_ctx: self.actor_ctx.clone(),
135 reader_stream: None,
136 }
137 }
138
139 async fn spawn_wait_checkpoint_worker(
140 core: &StreamSourceCore<S>,
141 source_reader: SourceReader,
142 metrics: Arc<StreamingMetrics>,
143 ) -> StreamExecutorResult<Option<WaitCheckpointTaskBuilder>> {
144 let Some(initial_task) = source_reader.create_wait_checkpoint_task().await? else {
145 return Ok(None);
146 };
147 let (wait_checkpoint_tx, wait_checkpoint_rx) = mpsc::unbounded_channel();
148 let wait_checkpoint_worker = WaitCheckpointWorker {
149 wait_checkpoint_rx,
150 state_store: core.split_state_store.state_table().state_store().clone(),
151 table_id: core.split_state_store.state_table().table_id().into(),
152 metrics,
153 };
154 tokio::spawn(wait_checkpoint_worker.run());
155 Ok(Some(WaitCheckpointTaskBuilder {
156 wait_checkpoint_tx,
157 source_reader,
158 building_task: initial_task,
159 }))
160 }
161
162 pub fn prepare_source_stream_build(
164 &self,
165 source_desc: &SourceDesc,
166 ) -> (Vec<ColumnId>, SourceContext) {
167 let column_ids = source_desc
168 .columns
169 .iter()
170 .map(|column_desc| column_desc.column_id)
171 .collect_vec();
172
173 let (schema_change_tx, mut schema_change_rx) =
174 mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
175 let schema_change_tx = if self.is_auto_schema_change_enable() {
176 let meta_client = self.actor_ctx.meta_client.clone();
177 let _join_handle = tokio::task::spawn(async move {
179 while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
180 let table_ids = schema_change.table_ids();
181 tracing::info!(
182 target: "auto_schema_change",
183 "recv a schema change event for tables: {:?}", table_ids);
184 if let Some(ref meta_client) = meta_client {
186 match meta_client
187 .auto_schema_change(schema_change.to_protobuf())
188 .await
189 {
190 Ok(_) => {
191 tracing::info!(
192 target: "auto_schema_change",
193 "schema change success for tables: {:?}", table_ids);
194 finish_tx.send(()).unwrap();
195 }
196 Err(e) => {
197 tracing::error!(
198 target: "auto_schema_change",
199 error = ?e.as_report(), "schema change error");
200 finish_tx.send(()).unwrap();
201 }
202 }
203 }
204 }
205 });
206 Some(schema_change_tx)
207 } else {
208 info!("auto schema change is disabled in config");
209 None
210 };
211 let source_ctx = SourceContext::new(
212 self.actor_ctx.id,
213 self.stream_source_core.source_id,
214 self.actor_ctx.fragment_id,
215 self.stream_source_core.source_name.clone(),
216 source_desc.metrics.clone(),
217 SourceCtrlOpts {
218 chunk_size: limited_chunk_size(self.rate_limit_rps),
219 split_txn: self.rate_limit_rps.is_some(), },
221 source_desc.source.config.clone(),
222 schema_change_tx,
223 );
224
225 (column_ids, source_ctx)
226 }
227
228 fn is_auto_schema_change_enable(&self) -> bool {
229 self.actor_ctx
230 .streaming_config
231 .developer
232 .enable_auto_schema_change
233 }
234
235 #[inline]
237 fn get_metric_labels(&self) -> [String; 4] {
238 [
239 self.stream_source_core.source_id.to_string(),
240 self.stream_source_core.source_name.clone(),
241 self.actor_ctx.id.to_string(),
242 self.actor_ctx.fragment_id.to_string(),
243 ]
244 }
245
246 async fn apply_split_change_after_yield_barrier<const BIASED: bool>(
255 &mut self,
256 barrier_epoch: EpochPair,
257 source_desc: &SourceDesc,
258 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
259 apply_mutation: ApplyMutationAfterBarrier<'_>,
260 ) -> StreamExecutorResult<()> {
261 {
262 let mut should_rebuild_stream = false;
263 match apply_mutation {
264 ApplyMutationAfterBarrier::SplitChange {
265 target_splits,
266 should_trim_state,
267 split_change_count,
268 } => {
269 split_change_count.inc();
270 if self
271 .update_state_if_changed(barrier_epoch, target_splits, should_trim_state)
272 .await?
273 {
274 should_rebuild_stream = true;
275 }
276 }
277 ApplyMutationAfterBarrier::ConnectorPropsChange => {
278 should_rebuild_stream = true;
279 }
280 }
281
282 if should_rebuild_stream {
283 self.rebuild_stream_reader(source_desc, stream)?;
284 }
285 }
286
287 Ok(())
288 }
289
290 async fn update_state_if_changed(
292 &mut self,
293 barrier_epoch: EpochPair,
294 target_splits: Vec<SplitImpl>,
295 should_trim_state: bool,
296 ) -> StreamExecutorResult<bool> {
297 let core = &mut self.stream_source_core;
298
299 let target_splits: HashMap<_, _> = target_splits
300 .into_iter()
301 .map(|split| (split.id(), split))
302 .collect();
303
304 let mut target_state: HashMap<SplitId, SplitImpl> =
305 HashMap::with_capacity(target_splits.len());
306
307 let mut split_changed = false;
308
309 let committed_reader = core
310 .split_state_store
311 .new_committed_reader(barrier_epoch)
312 .await?;
313
314 for (split_id, split) in target_splits {
316 if let Some(s) = core.latest_split_info.get(&split_id) {
317 target_state.insert(split_id, s.clone());
320 } else {
321 split_changed = true;
322 let initial_state = if let Some(recover_state) = committed_reader
325 .try_recover_from_state_store(&split)
326 .await?
327 {
328 recover_state
329 } else {
330 split
331 };
332
333 core.updated_splits_in_epoch
334 .entry(split_id.clone())
335 .or_insert_with(|| initial_state.clone());
336
337 target_state.insert(split_id, initial_state);
338 }
339 }
340
341 for existing_split_id in core.latest_split_info.keys() {
343 if !target_state.contains_key(existing_split_id) {
344 tracing::info!("split dropping detected: {}", existing_split_id);
345 split_changed = true;
346 }
347 }
348
349 if split_changed {
350 tracing::info!(
351 actor_id = self.actor_ctx.id,
352 state = ?target_state,
353 "apply split change"
354 );
355
356 core.updated_splits_in_epoch
357 .retain(|split_id, _| target_state.contains_key(split_id));
358
359 let dropped_splits = core
360 .latest_split_info
361 .extract_if(|split_id, _| !target_state.contains_key(split_id))
362 .map(|(_, split)| split)
363 .collect_vec();
364
365 if should_trim_state && !dropped_splits.is_empty() {
366 core.split_state_store.trim_state(&dropped_splits).await?;
368 }
369
370 core.latest_split_info = target_state;
371 }
372
373 Ok(split_changed)
374 }
375
376 fn rebuild_stream_reader_from_error<const BIASED: bool>(
378 &mut self,
379 source_desc: &SourceDesc,
380 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
381 e: StreamExecutorError,
382 ) -> StreamExecutorResult<()> {
383 let core = &mut self.stream_source_core;
384 tracing::error!(
385 error = ?e.as_report(),
386 actor_id = self.actor_ctx.id,
387 source_id = %core.source_id,
388 "stream source reader error",
389 );
390 GLOBAL_ERROR_METRICS.user_source_error.report([
391 e.variant_name().to_owned(),
392 core.source_id.to_string(),
393 core.source_name.clone(),
394 self.actor_ctx.fragment_id.to_string(),
395 ]);
396
397 self.rebuild_stream_reader(source_desc, stream)
398 }
399
400 fn rebuild_stream_reader<const BIASED: bool>(
401 &mut self,
402 source_desc: &SourceDesc,
403 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
404 ) -> StreamExecutorResult<()> {
405 let core = &mut self.stream_source_core;
406 let target_state: Vec<SplitImpl> = core.latest_split_info.values().cloned().collect();
407
408 tracing::info!(
409 "actor {:?} apply source split change to {:?}",
410 self.actor_ctx.id,
411 target_state
412 );
413
414 let reader_stream_builder = self.stream_reader_builder(source_desc.clone());
416 let reader_stream = reader_stream_builder.into_retry_stream(Some(target_state), false);
417
418 stream.replace_data_stream(reader_stream);
419
420 Ok(())
421 }
422
423 async fn persist_state_and_clear_cache(
424 &mut self,
425 epoch: EpochPair,
426 ) -> StreamExecutorResult<HashMap<SplitId, SplitImpl>> {
427 let core = &mut self.stream_source_core;
428
429 let cache = core
430 .updated_splits_in_epoch
431 .values()
432 .map(|split_impl| split_impl.to_owned())
433 .collect_vec();
434
435 if !cache.is_empty() {
436 tracing::debug!(state = ?cache, "take snapshot");
437
438 let source_id = core.source_id.to_string();
440 for split_impl in &cache {
441 if let Some(state_table_lsn_value) = extract_split_offset(split_impl) {
443 self.metrics
444 .pg_cdc_state_table_lsn
445 .with_guarded_label_values(&[&source_id])
446 .set(state_table_lsn_value as i64);
447 }
448 }
449
450 core.split_state_store.set_states(cache).await?;
451 }
452
453 core.split_state_store.commit(epoch).await?;
455
456 let updated_splits = core.updated_splits_in_epoch.clone();
457
458 core.updated_splits_in_epoch.clear();
459
460 Ok(updated_splits)
461 }
462
463 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
465 let core = &mut self.stream_source_core;
466 core.split_state_store.try_flush().await?;
467
468 Ok(())
469 }
470
471 #[try_stream(ok = Message, error = StreamExecutorError)]
476 async fn execute_inner(mut self) {
477 let mut barrier_receiver = self.barrier_receiver.take().unwrap();
478 let first_barrier = barrier_receiver
479 .recv()
480 .instrument_await("source_recv_first_barrier")
481 .await
482 .ok_or_else(|| {
483 anyhow!(
484 "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
485 self.actor_ctx.id,
486 self.stream_source_core.source_id
487 )
488 })?;
489 let first_epoch = first_barrier.epoch;
490 let mut boot_state =
491 if let Some(splits) = first_barrier.initial_split_assignment(self.actor_ctx.id) {
492 tracing::debug!(?splits, "boot with splits");
493 splits.to_vec()
494 } else {
495 Vec::default()
496 };
497 let is_pause_on_startup = first_barrier.is_pause_on_startup();
498 let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id);
499
500 yield Message::Barrier(first_barrier);
501
502 let mut core = self.stream_source_core;
503 let source_id = core.source_id;
504
505 let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap();
507 let mut source_desc = source_desc_builder
508 .build()
509 .map_err(StreamExecutorError::connector_error)?;
510
511 let mut wait_checkpoint_task_builder = Self::spawn_wait_checkpoint_worker(
512 &core,
513 source_desc.source.clone(),
514 self.metrics.clone(),
515 )
516 .await?;
517
518 let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns)
519 else {
520 unreachable!("Partition and offset columns must be set.");
521 };
522
523 core.split_state_store.init_epoch(first_epoch).await?;
524 {
525 let committed_reader = core
526 .split_state_store
527 .new_committed_reader(first_epoch)
528 .await?;
529 for ele in &mut boot_state {
530 if let Some(recover_state) =
531 committed_reader.try_recover_from_state_store(ele).await?
532 {
533 *ele = recover_state;
534 is_uninitialized = false;
536 } else {
537 core.updated_splits_in_epoch.insert(ele.id(), ele.clone());
541 }
542 }
543 }
544
545 core.init_split_state(boot_state.clone());
547
548 self.stream_source_core = core;
550
551 let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
552 tracing::debug!(state = ?recover_state, "start with state");
553
554 let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
555 let mut reader_stream_builder = self.stream_reader_builder(source_desc.clone());
556 let mut latest_splits = None;
557 if is_uninitialized {
559 let create_split_reader_result = reader_stream_builder
560 .fetch_latest_splits(recover_state.clone(), self.is_shared_non_cdc)
561 .await?;
562 latest_splits = create_split_reader_result.latest_splits;
563 }
564
565 if let Some(latest_splits) = latest_splits {
566 self.stream_source_core
569 .updated_splits_in_epoch
570 .extend(latest_splits.into_iter().map(|s| (s.id(), s)));
571 }
572 let mut stream = StreamReaderWithPause::<true, StreamChunkWithState>::new(
575 barrier_stream,
576 reader_stream_builder
577 .into_retry_stream(recover_state, is_uninitialized && self.is_shared_non_cdc),
578 );
579 let mut command_paused = false;
580
581 if is_pause_on_startup {
583 tracing::info!("source paused on startup");
584 stream.pause_stream();
585 command_paused = true;
586 }
587
588 let mut max_wait_barrier_time_ms =
591 self.system_params.load().barrier_interval_ms() as u128 * WAIT_BARRIER_MULTIPLE_TIMES;
592 let mut last_barrier_time = Instant::now();
593 let mut self_paused = false;
594
595 let source_output_row_count = self
596 .metrics
597 .source_output_row_count
598 .with_guarded_label_values(&self.get_metric_labels());
599
600 let source_split_change_count = self
601 .metrics
602 .source_split_change_count
603 .with_guarded_label_values(&self.get_metric_labels());
604
605 while let Some(msg) = stream.next().await {
606 let Ok(msg) = msg else {
607 tokio::time::sleep(Duration::from_millis(1000)).await;
608 self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err())?;
609 continue;
610 };
611
612 match msg {
613 Either::Left(Message::Barrier(barrier)) => {
615 last_barrier_time = Instant::now();
616
617 if self_paused {
618 self_paused = false;
619 if !command_paused {
621 stream.resume_stream();
622 }
623 }
624
625 let epoch = barrier.epoch;
626 let mut split_change = None;
627
628 if let Some(mutation) = barrier.mutation.as_deref() {
629 match mutation {
630 Mutation::Pause => {
631 command_paused = true;
632 stream.pause_stream()
633 }
634 Mutation::Resume => {
635 command_paused = false;
636 stream.resume_stream()
637 }
638 Mutation::SourceChangeSplit(actor_splits) => {
639 tracing::info!(
640 actor_id = self.actor_ctx.id,
641 actor_splits = ?actor_splits,
642 "source change split received"
643 );
644
645 split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
646 |target_splits| {
647 (
648 &source_desc,
649 &mut stream,
650 ApplyMutationAfterBarrier::SplitChange {
651 target_splits,
652 should_trim_state: true,
653 split_change_count: &source_split_change_count,
654 },
655 )
656 },
657 );
658 }
659
660 Mutation::ConnectorPropsChange(maybe_mutation) => {
661 if let Some(new_props) = maybe_mutation.get(&source_id.table_id()) {
662 tracing::info!(
664 "updating source properties from {:?} to {:?}",
665 source_desc.source.config,
666 new_props
667 );
668 source_desc.update_reader(new_props.clone())?;
669 split_change = Some((
671 &source_desc,
672 &mut stream,
673 ApplyMutationAfterBarrier::ConnectorPropsChange,
674 ));
675 }
676 }
677
678 Mutation::Update(UpdateMutation { actor_splits, .. }) => {
679 split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
680 |target_splits| {
681 (
682 &source_desc,
683 &mut stream,
684 ApplyMutationAfterBarrier::SplitChange {
685 target_splits,
686 should_trim_state: false,
687 split_change_count: &source_split_change_count,
688 },
689 )
690 },
691 );
692 }
693 Mutation::Throttle(actor_to_apply) => {
694 if let Some(new_rate_limit) = actor_to_apply.get(&self.actor_ctx.id)
695 && *new_rate_limit != self.rate_limit_rps
696 {
697 tracing::info!(
698 "updating rate limit from {:?} to {:?}",
699 self.rate_limit_rps,
700 *new_rate_limit
701 );
702 self.rate_limit_rps = *new_rate_limit;
703 self.rebuild_stream_reader(&source_desc, &mut stream)?;
705 }
706 }
707 _ => {}
708 }
709 }
710
711 let updated_splits = self.persist_state_and_clear_cache(epoch).await?;
712
713 if barrier.kind.is_checkpoint()
715 && let Some(task_builder) = &mut wait_checkpoint_task_builder
716 {
717 task_builder.update_task_on_checkpoint(updated_splits);
718
719 tracing::debug!("epoch to wait {:?}", epoch);
720 task_builder.send(Epoch(epoch.prev)).await?
721 }
722
723 let barrier_epoch = barrier.epoch;
724 yield Message::Barrier(barrier);
725
726 if let Some((source_desc, stream, to_apply_mutation)) = split_change {
727 self.apply_split_change_after_yield_barrier(
728 barrier_epoch,
729 source_desc,
730 stream,
731 to_apply_mutation,
732 )
733 .await?;
734 }
735 }
736 Either::Left(_) => {
737 unreachable!();
740 }
741
742 Either::Right((chunk, latest_state)) => {
743 if let Some(task_builder) = &mut wait_checkpoint_task_builder {
744 let offset_col = chunk.column_at(offset_idx);
745 task_builder.update_task_on_chunk(offset_col.clone());
746 }
747 if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
748 self_paused = true;
753 tracing::warn!(
754 "source paused, wait barrier for {:?}",
755 last_barrier_time.elapsed()
756 );
757 stream.pause_stream();
758
759 max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
764 as u128
765 * WAIT_BARRIER_MULTIPLE_TIMES;
766 }
767
768 latest_state.iter().for_each(|(split_id, new_split_impl)| {
769 if let Some(split_impl) =
770 self.stream_source_core.latest_split_info.get_mut(split_id)
771 {
772 *split_impl = new_split_impl.clone();
773 }
774 });
775
776 self.stream_source_core
777 .updated_splits_in_epoch
778 .extend(latest_state);
779
780 let card = chunk.cardinality();
781 source_output_row_count.inc_by(card as u64);
782 let chunk =
783 prune_additional_cols(&chunk, split_idx, offset_idx, &source_desc.columns);
784 yield Message::Chunk(chunk);
785 self.try_flush_data().await?;
786 }
787 }
788 }
789
790 tracing::error!(
792 actor_id = self.actor_ctx.id,
793 "source executor exited unexpectedly"
794 )
795 }
796}
797
798#[derive(Debug, Clone)]
799enum ApplyMutationAfterBarrier<'a> {
800 SplitChange {
801 target_splits: Vec<SplitImpl>,
802 should_trim_state: bool,
803 split_change_count: &'a LabelGuardedMetric<GenericCounter<AtomicU64>>,
804 },
805 ConnectorPropsChange,
806}
807
808impl<S: StateStore> Execute for SourceExecutor<S> {
809 fn execute(self: Box<Self>) -> BoxedMessageStream {
810 self.execute_inner().boxed()
811 }
812}
813
814impl<S: StateStore> Debug for SourceExecutor<S> {
815 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
816 f.debug_struct("SourceExecutor")
817 .field("source_id", &self.stream_source_core.source_id)
818 .field("column_ids", &self.stream_source_core.column_ids)
819 .finish()
820 }
821}
822
823struct WaitCheckpointTaskBuilder {
824 wait_checkpoint_tx: UnboundedSender<(Epoch, WaitCheckpointTask)>,
825 source_reader: SourceReader,
826 building_task: WaitCheckpointTask,
827}
828
829impl WaitCheckpointTaskBuilder {
830 fn update_task_on_chunk(&mut self, offset_col: ArrayRef) {
831 match &mut self.building_task {
832 WaitCheckpointTask::AckPubsubMessage(_, arrays) => {
833 arrays.push(offset_col);
834 }
835 WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => {
836 arrays.push(offset_col);
837 }
838 WaitCheckpointTask::CommitCdcOffset(_) => {}
839 }
840 }
841
842 fn update_task_on_checkpoint(&mut self, updated_splits: HashMap<SplitId, SplitImpl>) {
843 #[expect(clippy::single_match)]
844 match &mut self.building_task {
845 WaitCheckpointTask::CommitCdcOffset(offsets) => {
846 if !updated_splits.is_empty() {
847 assert_eq!(1, updated_splits.len());
849 for (split_id, split_impl) in updated_splits {
850 if split_impl.is_cdc_split() {
851 *offsets = Some((split_id, split_impl.get_cdc_split_offset()));
852 } else {
853 unreachable!()
854 }
855 }
856 }
857 }
858 _ => {}
859 }
860 }
861
862 async fn send(&mut self, epoch: Epoch) -> Result<(), anyhow::Error> {
864 let new_task = self
865 .source_reader
866 .create_wait_checkpoint_task()
867 .await?
868 .expect("wait checkpoint task should be created");
869 self.wait_checkpoint_tx
870 .send((epoch, std::mem::replace(&mut self.building_task, new_task)))
871 .expect("wait_checkpoint_tx send should succeed");
872 Ok(())
873 }
874}
875
876struct WaitCheckpointWorker<S: StateStore> {
902 wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>,
903 state_store: S,
904 table_id: TableId,
905 metrics: Arc<StreamingMetrics>,
906}
907
908impl<S: StateStore> WaitCheckpointWorker<S> {
909 pub async fn run(mut self) {
910 tracing::debug!("wait epoch worker start success");
911 loop {
912 match self.wait_checkpoint_rx.recv().await {
914 Some((epoch, task)) => {
915 tracing::debug!("start to wait epoch {}", epoch.0);
916 let ret = self
917 .state_store
918 .try_wait_epoch(
919 HummockReadEpoch::Committed(epoch.0),
920 TryWaitEpochOptions {
921 table_id: self.table_id,
922 },
923 )
924 .await;
925
926 match ret {
927 Ok(()) => {
928 tracing::debug!(epoch = epoch.0, "wait epoch success");
929
930 task.run_with_on_commit_success(|source_id: u64, offset| {
932 if let Some(lsn_value) = extract_pg_cdc_lsn_from_offset(offset) {
933 self.metrics
934 .pg_cdc_jni_commit_offset_lsn
935 .with_guarded_label_values(&[&source_id.to_string()])
936 .set(lsn_value as i64);
937 }
938 })
939 .await;
940 }
941 Err(e) => {
942 tracing::error!(
943 error = %e.as_report(),
944 "wait epoch {} failed", epoch.0
945 );
946 }
947 }
948 }
949 None => {
950 tracing::error!("wait epoch rx closed");
951 break;
952 }
953 }
954 }
955 }
956}
957
958#[cfg(test)]
959mod tests {
960 use maplit::{btreemap, convert_args, hashmap};
961 use risingwave_common::catalog::{ColumnId, Field, TableId};
962 use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
963 use risingwave_common::test_prelude::StreamChunkTestExt;
964 use risingwave_common::util::epoch::{EpochExt, test_epoch};
965 use risingwave_connector::source::datagen::DatagenSplit;
966 use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
967 use risingwave_pb::catalog::StreamSourceInfo;
968 use risingwave_pb::plan_common::PbRowFormatType;
969 use risingwave_storage::memory::MemoryStateStore;
970 use tokio::sync::mpsc::unbounded_channel;
971 use tracing_test::traced_test;
972
973 use super::*;
974 use crate::executor::AddMutation;
975 use crate::executor::source::{SourceStateTableHandler, default_source_internal_table};
976 use crate::task::LocalBarrierManager;
977
978 const MOCK_SOURCE_NAME: &str = "mock_source";
979
980 #[tokio::test]
981 async fn test_source_executor() {
982 let table_id = TableId::default();
983 let schema = Schema {
984 fields: vec![Field::with_name(DataType::Int32, "sequence_int")],
985 };
986 let row_id_index = None;
987 let source_info = StreamSourceInfo {
988 row_format: PbRowFormatType::Native as i32,
989 ..Default::default()
990 };
991 let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
992 let column_ids = vec![0].into_iter().map(ColumnId::from).collect();
993
994 let properties = convert_args!(btreemap!(
996 "connector" => "datagen",
997 "datagen.rows.per.second" => "3",
998 "fields.sequence_int.kind" => "sequence",
999 "fields.sequence_int.start" => "11",
1000 "fields.sequence_int.end" => "11111",
1001 ));
1002 let source_desc_builder =
1003 create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1004 let split_state_store = SourceStateTableHandler::from_table_catalog(
1005 &default_source_internal_table(0x2333),
1006 MemoryStateStore::new(),
1007 )
1008 .await;
1009 let core = StreamSourceCore::<MemoryStateStore> {
1010 source_id: table_id,
1011 column_ids,
1012 source_desc_builder: Some(source_desc_builder),
1013 latest_split_info: HashMap::new(),
1014 split_state_store,
1015 updated_splits_in_epoch: HashMap::new(),
1016 source_name: MOCK_SOURCE_NAME.to_owned(),
1017 };
1018
1019 let system_params_manager = LocalSystemParamsManager::for_test();
1020
1021 let executor = SourceExecutor::new(
1022 ActorContext::for_test(0),
1023 core,
1024 Arc::new(StreamingMetrics::unused()),
1025 barrier_rx,
1026 system_params_manager.get_params(),
1027 None,
1028 false,
1029 LocalBarrierManager::for_test(),
1030 );
1031 let mut executor = executor.boxed().execute();
1032
1033 let init_barrier =
1034 Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
1035 splits: hashmap! {
1036 ActorId::default() => vec![
1037 SplitImpl::Datagen(DatagenSplit {
1038 split_index: 0,
1039 split_num: 1,
1040 start_offset: None,
1041 }),
1042 ],
1043 },
1044 ..Default::default()
1045 }));
1046 barrier_tx.send(init_barrier).unwrap();
1047
1048 executor.next().await.unwrap().unwrap();
1050
1051 let msg = executor.next().await.unwrap().unwrap();
1053
1054 assert_eq!(
1056 msg.into_chunk().unwrap(),
1057 StreamChunk::from_pretty(
1058 " i
1059 + 11
1060 + 12
1061 + 13"
1062 )
1063 );
1064 }
1065
1066 #[traced_test]
1067 #[tokio::test]
1068 async fn test_split_change_mutation() {
1069 let table_id = TableId::default();
1070 let schema = Schema {
1071 fields: vec![Field::with_name(DataType::Int32, "v1")],
1072 };
1073 let row_id_index = None;
1074 let source_info = StreamSourceInfo {
1075 row_format: PbRowFormatType::Native as i32,
1076 ..Default::default()
1077 };
1078 let properties = convert_args!(btreemap!(
1079 "connector" => "datagen",
1080 "fields.v1.kind" => "sequence",
1081 "fields.v1.start" => "11",
1082 "fields.v1.end" => "11111",
1083 ));
1084
1085 let source_desc_builder =
1086 create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1087 let mem_state_store = MemoryStateStore::new();
1088
1089 let column_ids = vec![ColumnId::from(0)];
1090 let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1091 let split_state_store = SourceStateTableHandler::from_table_catalog(
1092 &default_source_internal_table(0x2333),
1093 mem_state_store.clone(),
1094 )
1095 .await;
1096
1097 let core = StreamSourceCore::<MemoryStateStore> {
1098 source_id: table_id,
1099 column_ids: column_ids.clone(),
1100 source_desc_builder: Some(source_desc_builder),
1101 latest_split_info: HashMap::new(),
1102 split_state_store,
1103 updated_splits_in_epoch: HashMap::new(),
1104 source_name: MOCK_SOURCE_NAME.to_owned(),
1105 };
1106
1107 let system_params_manager = LocalSystemParamsManager::for_test();
1108
1109 let executor = SourceExecutor::new(
1110 ActorContext::for_test(0),
1111 core,
1112 Arc::new(StreamingMetrics::unused()),
1113 barrier_rx,
1114 system_params_manager.get_params(),
1115 None,
1116 false,
1117 LocalBarrierManager::for_test(),
1118 );
1119 let mut handler = executor.boxed().execute();
1120
1121 let mut epoch = test_epoch(1);
1122 let init_barrier =
1123 Barrier::new_test_barrier(epoch).with_mutation(Mutation::Add(AddMutation {
1124 splits: hashmap! {
1125 ActorId::default() => vec![
1126 SplitImpl::Datagen(DatagenSplit {
1127 split_index: 0,
1128 split_num: 3,
1129 start_offset: None,
1130 }),
1131 ],
1132 },
1133 ..Default::default()
1134 }));
1135 barrier_tx.send(init_barrier).unwrap();
1136
1137 handler
1139 .next()
1140 .await
1141 .unwrap()
1142 .unwrap()
1143 .into_barrier()
1144 .unwrap();
1145
1146 let mut ready_chunks = handler.ready_chunks(10);
1147
1148 let _ = ready_chunks.next().await.unwrap();
1149
1150 let new_assignment = vec![
1151 SplitImpl::Datagen(DatagenSplit {
1152 split_index: 0,
1153 split_num: 3,
1154 start_offset: None,
1155 }),
1156 SplitImpl::Datagen(DatagenSplit {
1157 split_index: 1,
1158 split_num: 3,
1159 start_offset: None,
1160 }),
1161 SplitImpl::Datagen(DatagenSplit {
1162 split_index: 2,
1163 split_num: 3,
1164 start_offset: None,
1165 }),
1166 ];
1167
1168 epoch.inc_epoch();
1169 let change_split_mutation =
1170 Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1171 ActorId::default() => new_assignment.clone()
1172 }));
1173
1174 barrier_tx.send(change_split_mutation).unwrap();
1175
1176 let _ = ready_chunks.next().await.unwrap(); epoch.inc_epoch();
1179 let barrier = Barrier::new_test_barrier(epoch);
1180 barrier_tx.send(barrier).unwrap();
1181
1182 ready_chunks.next().await.unwrap(); let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1185 &default_source_internal_table(0x2333),
1186 mem_state_store.clone(),
1187 )
1188 .await;
1189
1190 source_state_handler
1192 .init_epoch(EpochPair::new_test_epoch(epoch))
1193 .await
1194 .unwrap();
1195 source_state_handler
1196 .get(&new_assignment[1].id())
1197 .await
1198 .unwrap()
1199 .unwrap();
1200
1201 tokio::time::sleep(Duration::from_millis(100)).await;
1202
1203 let _ = ready_chunks.next().await.unwrap();
1204
1205 epoch.inc_epoch();
1206 let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Pause);
1207 barrier_tx.send(barrier).unwrap();
1208
1209 epoch.inc_epoch();
1210 let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Resume);
1211 barrier_tx.send(barrier).unwrap();
1212
1213 ready_chunks.next().await.unwrap();
1215
1216 let prev_assignment = new_assignment;
1217 let new_assignment = vec![prev_assignment[2].clone()];
1218
1219 epoch.inc_epoch();
1220 let drop_split_mutation =
1221 Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1222 ActorId::default() => new_assignment.clone()
1223 }));
1224
1225 barrier_tx.send(drop_split_mutation).unwrap();
1226
1227 ready_chunks.next().await.unwrap(); epoch.inc_epoch();
1230 let barrier = Barrier::new_test_barrier(epoch);
1231 barrier_tx.send(barrier).unwrap();
1232
1233 ready_chunks.next().await.unwrap(); let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1236 &default_source_internal_table(0x2333),
1237 mem_state_store.clone(),
1238 )
1239 .await;
1240
1241 let new_epoch = EpochPair::new_test_epoch(epoch);
1242 source_state_handler.init_epoch(new_epoch).await.unwrap();
1243
1244 let committed_reader = source_state_handler
1245 .new_committed_reader(new_epoch)
1246 .await
1247 .unwrap();
1248 assert!(
1249 committed_reader
1250 .try_recover_from_state_store(&prev_assignment[0])
1251 .await
1252 .unwrap()
1253 .is_none()
1254 );
1255
1256 assert!(
1257 committed_reader
1258 .try_recover_from_state_store(&prev_assignment[1])
1259 .await
1260 .unwrap()
1261 .is_none()
1262 );
1263
1264 assert!(
1265 committed_reader
1266 .try_recover_from_state_store(&prev_assignment[2])
1267 .await
1268 .unwrap()
1269 .is_some()
1270 );
1271 }
1272}