1use std::collections::HashMap;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use either::Either;
20use itertools::Itertools;
21use prometheus::core::{AtomicU64, GenericCounter};
22use risingwave_common::array::ArrayRef;
23use risingwave_common::catalog::{ColumnId, TableId};
24use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedMetric};
25use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::util::epoch::{Epoch, EpochPair};
28use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
29use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
30use risingwave_connector::source::reader::reader::SourceReader;
31use risingwave_connector::source::{
32 ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData,
33 StreamChunkWithState, WaitCheckpointTask, build_pulsar_ack_channel_id,
34};
35use risingwave_hummock_sdk::HummockReadEpoch;
36use risingwave_pb::id::SourceId;
37use risingwave_storage::store::TryWaitEpochOptions;
38use serde_json;
39use thiserror_ext::AsReport;
40use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
41use tokio::sync::{mpsc, oneshot};
42use tokio::time::Instant;
43
44use super::executor_core::StreamSourceCore;
45use super::{barrier_to_message_stream, get_split_offset_col_idx, prune_additional_cols};
46use crate::common::rate_limit::limited_chunk_size;
47use crate::executor::UpdateMutation;
48use crate::executor::prelude::*;
49use crate::executor::source::reader_stream::StreamReaderBuilder;
50use crate::executor::stream_reader::StreamReaderWithPause;
51use crate::task::LocalBarrierManager;
52
53pub const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;
56
57fn extract_split_offset(split: &SplitImpl) -> Option<u64> {
62 match split {
63 SplitImpl::PostgresCdc(pg_split) => {
64 let offset_str = pg_split.start_offset().as_ref()?;
65 extract_pg_cdc_lsn_from_offset(offset_str)
66 }
67 _ => None,
68 }
69}
70
71fn extract_pg_cdc_lsn_from_offset(offset_str: &str) -> Option<u64> {
74 let offset = serde_json::from_str::<serde_json::Value>(offset_str).ok()?;
75 let source_offset = offset.get("sourceOffset")?;
76 let lsn = source_offset.get("lsn")?;
77 lsn.as_u64()
78}
79
80pub struct SourceExecutor<S: StateStore> {
81 actor_ctx: ActorContextRef,
82
83 stream_source_core: StreamSourceCore<S>,
85
86 metrics: Arc<StreamingMetrics>,
88
89 barrier_receiver: Option<UnboundedReceiver<Barrier>>,
91
92 system_params: SystemParamsReaderRef,
94
95 rate_limit_rps: Option<u32>,
97
98 is_shared_non_cdc: bool,
99
100 _barrier_manager: LocalBarrierManager,
102}
103
104impl<S: StateStore> SourceExecutor<S> {
105 #[expect(clippy::too_many_arguments)]
106 pub fn new(
107 actor_ctx: ActorContextRef,
108 stream_source_core: StreamSourceCore<S>,
109 metrics: Arc<StreamingMetrics>,
110 barrier_receiver: UnboundedReceiver<Barrier>,
111 system_params: SystemParamsReaderRef,
112 rate_limit_rps: Option<u32>,
113 is_shared_non_cdc: bool,
114 barrier_manager: LocalBarrierManager,
115 ) -> Self {
116 Self {
117 actor_ctx,
118 stream_source_core,
119 metrics,
120 barrier_receiver: Some(barrier_receiver),
121 system_params,
122 rate_limit_rps,
123 is_shared_non_cdc,
124 _barrier_manager: barrier_manager,
125 }
126 }
127
128 fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder {
129 StreamReaderBuilder {
130 source_desc,
131 rate_limit: self.rate_limit_rps,
132 source_id: self.stream_source_core.source_id,
133 source_name: self.stream_source_core.source_name.clone(),
134 is_auto_schema_change_enable: self.is_auto_schema_change_enable(),
135 actor_ctx: self.actor_ctx.clone(),
136 reader_stream: None,
137 }
138 }
139
140 async fn spawn_wait_checkpoint_worker(
141 core: &StreamSourceCore<S>,
142 source_reader: SourceReader,
143 metrics: Arc<StreamingMetrics>,
144 ) -> StreamExecutorResult<Option<WaitCheckpointTaskBuilder>> {
145 let Some(initial_task) = source_reader.create_wait_checkpoint_task().await? else {
146 return Ok(None);
147 };
148 let (wait_checkpoint_tx, wait_checkpoint_rx) = mpsc::unbounded_channel();
149 let wait_checkpoint_worker = WaitCheckpointWorker {
150 wait_checkpoint_rx,
151 state_store: core.split_state_store.state_table().state_store().clone(),
152 table_id: core.split_state_store.state_table().table_id(),
153 metrics,
154 };
155 tokio::spawn(wait_checkpoint_worker.run());
156 Ok(Some(WaitCheckpointTaskBuilder {
157 wait_checkpoint_tx,
158 source_reader,
159 building_task: initial_task,
160 }))
161 }
162
163 pub fn prepare_source_stream_build(
165 &self,
166 source_desc: &SourceDesc,
167 ) -> (Vec<ColumnId>, SourceContext) {
168 let column_ids = source_desc
169 .columns
170 .iter()
171 .map(|column_desc| column_desc.column_id)
172 .collect_vec();
173
174 let (schema_change_tx, mut schema_change_rx) =
175 mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
176 let schema_change_tx = if self.is_auto_schema_change_enable() {
177 let meta_client = self.actor_ctx.meta_client.clone();
178 let _join_handle = tokio::task::spawn(async move {
180 while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
181 let table_ids = schema_change.table_ids();
182 tracing::info!(
183 target: "auto_schema_change",
184 "recv a schema change event for tables: {:?}", table_ids);
185 if let Some(ref meta_client) = meta_client {
187 match meta_client
188 .auto_schema_change(schema_change.to_protobuf())
189 .await
190 {
191 Ok(_) => {
192 tracing::info!(
193 target: "auto_schema_change",
194 "schema change success for tables: {:?}", table_ids);
195 finish_tx.send(()).unwrap();
196 }
197 Err(e) => {
198 tracing::error!(
199 target: "auto_schema_change",
200 error = ?e.as_report(), "schema change error");
201 finish_tx.send(()).unwrap();
202 }
203 }
204 }
205 }
206 });
207 Some(schema_change_tx)
208 } else {
209 info!("auto schema change is disabled in config");
210 None
211 };
212 let source_ctx = SourceContext::new(
213 self.actor_ctx.id,
214 self.stream_source_core.source_id,
215 self.actor_ctx.fragment_id,
216 self.stream_source_core.source_name.clone(),
217 source_desc.metrics.clone(),
218 SourceCtrlOpts {
219 chunk_size: limited_chunk_size(self.rate_limit_rps),
220 split_txn: self.rate_limit_rps.is_some(), },
222 source_desc.source.config.clone(),
223 schema_change_tx,
224 );
225
226 (column_ids, source_ctx)
227 }
228
229 fn is_auto_schema_change_enable(&self) -> bool {
230 self.actor_ctx.config.developer.enable_auto_schema_change
231 }
232
233 #[inline]
235 fn get_metric_labels(&self) -> [String; 4] {
236 [
237 self.stream_source_core.source_id.to_string(),
238 self.stream_source_core.source_name.clone(),
239 self.actor_ctx.id.to_string(),
240 self.actor_ctx.fragment_id.to_string(),
241 ]
242 }
243
244 async fn apply_split_change_after_yield_barrier<const BIASED: bool>(
253 &mut self,
254 barrier_epoch: EpochPair,
255 source_desc: &SourceDesc,
256 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
257 apply_mutation: ApplyMutationAfterBarrier<'_>,
258 ) -> StreamExecutorResult<()> {
259 {
260 let mut should_rebuild_stream = false;
261 match apply_mutation {
262 ApplyMutationAfterBarrier::SplitChange {
263 target_splits,
264 should_trim_state,
265 split_change_count,
266 } => {
267 split_change_count.inc();
268 if self
269 .update_state_if_changed(barrier_epoch, target_splits, should_trim_state)
270 .await?
271 {
272 should_rebuild_stream = true;
273 }
274 }
275 ApplyMutationAfterBarrier::ConnectorPropsChange => {
276 should_rebuild_stream = true;
277 }
278 }
279
280 if should_rebuild_stream {
281 self.rebuild_stream_reader(source_desc, stream)?;
282 }
283 }
284
285 Ok(())
286 }
287
288 async fn update_state_if_changed(
290 &mut self,
291 barrier_epoch: EpochPair,
292 target_splits: Vec<SplitImpl>,
293 should_trim_state: bool,
294 ) -> StreamExecutorResult<bool> {
295 let core = &mut self.stream_source_core;
296
297 let target_splits: HashMap<_, _> = target_splits
298 .into_iter()
299 .map(|split| (split.id(), split))
300 .collect();
301
302 let mut target_state: HashMap<SplitId, SplitImpl> =
303 HashMap::with_capacity(target_splits.len());
304
305 let mut split_changed = false;
306
307 let committed_reader = core
308 .split_state_store
309 .new_committed_reader(barrier_epoch)
310 .await?;
311
312 for (split_id, split) in target_splits {
314 if let Some(s) = core.latest_split_info.get(&split_id) {
315 target_state.insert(split_id, s.clone());
318 } else {
319 split_changed = true;
320 let initial_state = if let Some(recover_state) = committed_reader
323 .try_recover_from_state_store(&split)
324 .await?
325 {
326 recover_state
327 } else {
328 split
329 };
330
331 core.updated_splits_in_epoch
332 .entry(split_id.clone())
333 .or_insert_with(|| initial_state.clone());
334
335 target_state.insert(split_id, initial_state);
336 }
337 }
338
339 for existing_split_id in core.latest_split_info.keys() {
341 if !target_state.contains_key(existing_split_id) {
342 tracing::info!("split dropping detected: {}", existing_split_id);
343 split_changed = true;
344 }
345 }
346
347 if split_changed {
348 tracing::info!(
349 actor_id = %self.actor_ctx.id,
350 state = ?target_state,
351 "apply split change"
352 );
353
354 core.updated_splits_in_epoch
355 .retain(|split_id, _| target_state.contains_key(split_id));
356
357 let dropped_splits = core
358 .latest_split_info
359 .extract_if(|split_id, _| !target_state.contains_key(split_id))
360 .map(|(_, split)| split)
361 .collect_vec();
362
363 if should_trim_state && !dropped_splits.is_empty() {
364 core.split_state_store.trim_state(&dropped_splits).await?;
366 }
367
368 core.latest_split_info = target_state;
369 }
370
371 Ok(split_changed)
372 }
373
374 fn rebuild_stream_reader_from_error<const BIASED: bool>(
376 &mut self,
377 source_desc: &SourceDesc,
378 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
379 e: StreamExecutorError,
380 ) -> StreamExecutorResult<()> {
381 let core = &mut self.stream_source_core;
382 tracing::error!(
383 error = ?e.as_report(),
384 actor_id = %self.actor_ctx.id,
385 source_id = %core.source_id,
386 "stream source reader error",
387 );
388 GLOBAL_ERROR_METRICS.user_source_error.report([
389 e.variant_name().to_owned(),
390 core.source_id.to_string(),
391 core.source_name.clone(),
392 self.actor_ctx.fragment_id.to_string(),
393 ]);
394
395 self.rebuild_stream_reader(source_desc, stream)
396 }
397
398 fn rebuild_stream_reader<const BIASED: bool>(
399 &mut self,
400 source_desc: &SourceDesc,
401 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
402 ) -> StreamExecutorResult<()> {
403 let core = &mut self.stream_source_core;
404 let target_state: Vec<SplitImpl> = core.latest_split_info.values().cloned().collect();
405
406 tracing::info!(
407 "actor {:?} apply source split change to {:?}",
408 self.actor_ctx.id,
409 target_state
410 );
411
412 let reader_stream_builder = self.stream_reader_builder(source_desc.clone());
414 let reader_stream = reader_stream_builder.into_retry_stream(Some(target_state), false);
415
416 stream.replace_data_stream(reader_stream);
417
418 Ok(())
419 }
420
421 async fn persist_state_and_clear_cache(
422 &mut self,
423 epoch: EpochPair,
424 ) -> StreamExecutorResult<HashMap<SplitId, SplitImpl>> {
425 let core = &mut self.stream_source_core;
426
427 let cache = core
428 .updated_splits_in_epoch
429 .values()
430 .map(|split_impl| split_impl.to_owned())
431 .collect_vec();
432
433 if !cache.is_empty() {
434 tracing::debug!(state = ?cache, "take snapshot");
435
436 let source_id = core.source_id.to_string();
438 for split_impl in &cache {
439 if let Some(state_table_lsn_value) = extract_split_offset(split_impl) {
441 self.metrics
442 .pg_cdc_state_table_lsn
443 .with_guarded_label_values(&[&source_id])
444 .set(state_table_lsn_value as i64);
445 }
446 }
447
448 core.split_state_store.set_states(cache).await?;
449 }
450
451 core.split_state_store.commit(epoch).await?;
453
454 let updated_splits = core.updated_splits_in_epoch.clone();
455
456 core.updated_splits_in_epoch.clear();
457
458 Ok(updated_splits)
459 }
460
461 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
463 let core = &mut self.stream_source_core;
464 core.split_state_store.try_flush().await?;
465
466 Ok(())
467 }
468
469 #[try_stream(ok = Message, error = StreamExecutorError)]
474 async fn execute_inner(mut self) {
475 let mut barrier_receiver = self.barrier_receiver.take().unwrap();
476 let first_barrier = barrier_receiver
477 .recv()
478 .instrument_await("source_recv_first_barrier")
479 .await
480 .ok_or_else(|| {
481 anyhow!(
482 "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
483 self.actor_ctx.id,
484 self.stream_source_core.source_id
485 )
486 })?;
487 let first_epoch = first_barrier.epoch;
488 let mut boot_state =
489 if let Some(splits) = first_barrier.initial_split_assignment(self.actor_ctx.id) {
490 tracing::debug!(?splits, "boot with splits");
491 splits.to_vec()
492 } else {
493 Vec::default()
494 };
495 let is_pause_on_startup = first_barrier.is_pause_on_startup();
496 let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id);
497
498 yield Message::Barrier(first_barrier);
499
500 let mut core = self.stream_source_core;
501 let source_id = core.source_id;
502
503 let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap();
505 let mut source_desc = source_desc_builder
506 .build()
507 .map_err(StreamExecutorError::connector_error)?;
508
509 let mut wait_checkpoint_task_builder = Self::spawn_wait_checkpoint_worker(
510 &core,
511 source_desc.source.clone(),
512 self.metrics.clone(),
513 )
514 .await?;
515
516 let (Some(split_idx), Some(offset_idx), pulsar_message_id_idx) =
517 get_split_offset_col_idx(&source_desc.columns)
518 else {
519 unreachable!("Partition and offset columns must be set.");
520 };
521
522 core.split_state_store.init_epoch(first_epoch).await?;
523 {
524 let committed_reader = core
525 .split_state_store
526 .new_committed_reader(first_epoch)
527 .await?;
528 for ele in &mut boot_state {
529 if let Some(recover_state) =
530 committed_reader.try_recover_from_state_store(ele).await?
531 {
532 *ele = recover_state;
533 is_uninitialized = false;
535 } else {
536 core.updated_splits_in_epoch.insert(ele.id(), ele.clone());
540 }
541 }
542 }
543
544 core.init_split_state(boot_state.clone());
546
547 self.stream_source_core = core;
549
550 let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
551 tracing::debug!(state = ?recover_state, "start with state");
552
553 let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
554 let mut reader_stream_builder = self.stream_reader_builder(source_desc.clone());
555 let mut latest_splits = None;
556 if is_uninitialized {
558 let create_split_reader_result = reader_stream_builder
559 .fetch_latest_splits(recover_state.clone(), self.is_shared_non_cdc)
560 .await?;
561 latest_splits = create_split_reader_result.latest_splits;
562 }
563
564 if let Some(latest_splits) = latest_splits {
565 self.stream_source_core
568 .updated_splits_in_epoch
569 .extend(latest_splits.into_iter().map(|s| (s.id(), s)));
570 }
571 let mut stream = StreamReaderWithPause::<true, StreamChunkWithState>::new(
574 barrier_stream,
575 reader_stream_builder
576 .into_retry_stream(recover_state, is_uninitialized && self.is_shared_non_cdc),
577 );
578 let mut command_paused = false;
579
580 if is_pause_on_startup {
582 tracing::info!("source paused on startup");
583 stream.pause_stream();
584 command_paused = true;
585 }
586
587 let mut max_wait_barrier_time_ms =
590 self.system_params.load().barrier_interval_ms() as u128 * WAIT_BARRIER_MULTIPLE_TIMES;
591 let mut last_barrier_time = Instant::now();
592 let mut self_paused = false;
593
594 let source_output_row_count = self
595 .metrics
596 .source_output_row_count
597 .with_guarded_label_values(&self.get_metric_labels());
598
599 let source_split_change_count = self
600 .metrics
601 .source_split_change_count
602 .with_guarded_label_values(&self.get_metric_labels());
603
604 while let Some(msg) = stream.next().await {
605 let Ok(msg) = msg else {
606 tokio::time::sleep(Duration::from_millis(1000)).await;
607 self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err())?;
608 continue;
609 };
610
611 match msg {
612 Either::Left(Message::Barrier(barrier)) => {
614 last_barrier_time = Instant::now();
615
616 if self_paused {
617 self_paused = false;
618 if !command_paused {
620 stream.resume_stream();
621 }
622 }
623
624 let epoch = barrier.epoch;
625 let mut split_change = None;
626
627 if let Some(mutation) = barrier.mutation.as_deref() {
628 match mutation {
629 Mutation::Pause => {
630 command_paused = true;
631 stream.pause_stream()
632 }
633 Mutation::Resume => {
634 command_paused = false;
635 stream.resume_stream()
636 }
637 Mutation::SourceChangeSplit(actor_splits) => {
638 tracing::info!(
639 actor_id = %self.actor_ctx.id,
640 actor_splits = ?actor_splits,
641 "source change split received"
642 );
643
644 split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
645 |target_splits| {
646 (
647 &source_desc,
648 &mut stream,
649 ApplyMutationAfterBarrier::SplitChange {
650 target_splits,
651 should_trim_state: true,
652 split_change_count: &source_split_change_count,
653 },
654 )
655 },
656 );
657 }
658
659 Mutation::ConnectorPropsChange(maybe_mutation) => {
660 if let Some(new_props) = maybe_mutation.get(&source_id.as_raw_id())
661 {
662 tracing::info!(
664 "updating source properties from {:?} to {:?}",
665 source_desc.source.config,
666 new_props
667 );
668 source_desc.update_reader(new_props.clone())?;
669 split_change = Some((
671 &source_desc,
672 &mut stream,
673 ApplyMutationAfterBarrier::ConnectorPropsChange,
674 ));
675 }
676 }
677
678 Mutation::Update(UpdateMutation { actor_splits, .. }) => {
679 split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
680 |target_splits| {
681 (
682 &source_desc,
683 &mut stream,
684 ApplyMutationAfterBarrier::SplitChange {
685 target_splits,
686 should_trim_state: false,
687 split_change_count: &source_split_change_count,
688 },
689 )
690 },
691 );
692 }
693 Mutation::Throttle(actor_to_apply) => {
694 if let Some(new_rate_limit) = actor_to_apply.get(&self.actor_ctx.id)
695 && *new_rate_limit != self.rate_limit_rps
696 {
697 tracing::info!(
698 "updating rate limit from {:?} to {:?}",
699 self.rate_limit_rps,
700 *new_rate_limit
701 );
702 self.rate_limit_rps = *new_rate_limit;
703 self.rebuild_stream_reader(&source_desc, &mut stream)?;
705 }
706 }
707 _ => {}
708 }
709 }
710
711 let updated_splits = self.persist_state_and_clear_cache(epoch).await?;
712
713 if barrier.kind.is_checkpoint()
715 && let Some(task_builder) = &mut wait_checkpoint_task_builder
716 {
717 task_builder.update_task_on_checkpoint(updated_splits);
718
719 tracing::debug!("epoch to wait {:?}", epoch);
720 task_builder.send(Epoch(epoch.prev)).await?
721 }
722
723 let barrier_epoch = barrier.epoch;
724 yield Message::Barrier(barrier);
725
726 if let Some((source_desc, stream, to_apply_mutation)) = split_change {
727 self.apply_split_change_after_yield_barrier(
728 barrier_epoch,
729 source_desc,
730 stream,
731 to_apply_mutation,
732 )
733 .await?;
734 }
735 }
736 Either::Left(_) => {
737 unreachable!();
740 }
741
742 Either::Right((chunk, latest_state)) => {
743 if let Some(task_builder) = &mut wait_checkpoint_task_builder {
744 if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
745 let pulsar_message_id_col = chunk.column_at(pulsar_message_id_idx);
746 task_builder.update_task_on_chunk(
747 source_id,
748 &latest_state,
749 pulsar_message_id_col.clone(),
750 );
751 } else {
752 let offset_col = chunk.column_at(offset_idx);
753 task_builder.update_task_on_chunk(
754 source_id,
755 &latest_state,
756 offset_col.clone(),
757 );
758 }
759 }
760 if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
761 self_paused = true;
766 tracing::warn!(
767 "source paused, wait barrier for {:?}",
768 last_barrier_time.elapsed()
769 );
770 stream.pause_stream();
771
772 max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
777 as u128
778 * WAIT_BARRIER_MULTIPLE_TIMES;
779 }
780
781 latest_state.iter().for_each(|(split_id, new_split_impl)| {
782 if let Some(split_impl) =
783 self.stream_source_core.latest_split_info.get_mut(split_id)
784 {
785 *split_impl = new_split_impl.clone();
786 }
787 });
788
789 self.stream_source_core
790 .updated_splits_in_epoch
791 .extend(latest_state);
792
793 let card = chunk.cardinality();
794 source_output_row_count.inc_by(card as u64);
795 let to_remove_col_indices =
796 if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
797 vec![split_idx, offset_idx, pulsar_message_id_idx]
798 } else {
799 vec![split_idx, offset_idx]
800 };
801 let chunk =
802 prune_additional_cols(&chunk, &to_remove_col_indices, &source_desc.columns);
803 yield Message::Chunk(chunk);
804 self.try_flush_data().await?;
805 }
806 }
807 }
808
809 tracing::error!(
811 actor_id = %self.actor_ctx.id,
812 "source executor exited unexpectedly"
813 )
814 }
815}
816
817#[derive(Debug, Clone)]
818enum ApplyMutationAfterBarrier<'a> {
819 SplitChange {
820 target_splits: Vec<SplitImpl>,
821 should_trim_state: bool,
822 split_change_count: &'a LabelGuardedMetric<GenericCounter<AtomicU64>>,
823 },
824 ConnectorPropsChange,
825}
826
827impl<S: StateStore> Execute for SourceExecutor<S> {
828 fn execute(self: Box<Self>) -> BoxedMessageStream {
829 self.execute_inner().boxed()
830 }
831}
832
833impl<S: StateStore> Debug for SourceExecutor<S> {
834 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
835 f.debug_struct("SourceExecutor")
836 .field("source_id", &self.stream_source_core.source_id)
837 .field("column_ids", &self.stream_source_core.column_ids)
838 .finish()
839 }
840}
841
842struct WaitCheckpointTaskBuilder {
843 wait_checkpoint_tx: UnboundedSender<(Epoch, WaitCheckpointTask)>,
844 source_reader: SourceReader,
845 building_task: WaitCheckpointTask,
846}
847
848impl WaitCheckpointTaskBuilder {
849 fn update_task_on_chunk(
850 &mut self,
851 source_id: SourceId,
852 latest_state: &HashMap<SplitId, SplitImpl>,
853 offset_col: ArrayRef,
854 ) {
855 match &mut self.building_task {
856 WaitCheckpointTask::AckPubsubMessage(_, arrays) => {
857 arrays.push(offset_col);
858 }
859 WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => {
860 arrays.push(offset_col);
861 }
862 WaitCheckpointTask::AckPulsarMessage(arrays) => {
863 let split_id = latest_state.keys().next().unwrap();
865 let pulsar_ack_channel_id = build_pulsar_ack_channel_id(source_id, split_id);
866 arrays.push((pulsar_ack_channel_id, offset_col));
867 }
868 WaitCheckpointTask::CommitCdcOffset(_) => {}
869 }
870 }
871
872 fn update_task_on_checkpoint(&mut self, updated_splits: HashMap<SplitId, SplitImpl>) {
873 #[expect(clippy::single_match)]
874 match &mut self.building_task {
875 WaitCheckpointTask::CommitCdcOffset(offsets) => {
876 if !updated_splits.is_empty() {
877 assert_eq!(1, updated_splits.len());
879 for (split_id, split_impl) in updated_splits {
880 if split_impl.is_cdc_split() {
881 *offsets = Some((split_id, split_impl.get_cdc_split_offset()));
882 } else {
883 unreachable!()
884 }
885 }
886 }
887 }
888 _ => {}
889 }
890 }
891
892 async fn send(&mut self, epoch: Epoch) -> Result<(), anyhow::Error> {
894 let new_task = self
895 .source_reader
896 .create_wait_checkpoint_task()
897 .await?
898 .expect("wait checkpoint task should be created");
899 self.wait_checkpoint_tx
900 .send((epoch, std::mem::replace(&mut self.building_task, new_task)))
901 .expect("wait_checkpoint_tx send should succeed");
902 Ok(())
903 }
904}
905
906struct WaitCheckpointWorker<S: StateStore> {
932 wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>,
933 state_store: S,
934 table_id: TableId,
935 metrics: Arc<StreamingMetrics>,
936}
937
938impl<S: StateStore> WaitCheckpointWorker<S> {
939 pub async fn run(mut self) {
940 tracing::debug!("wait epoch worker start success");
941 loop {
942 match self.wait_checkpoint_rx.recv().await {
944 Some((epoch, task)) => {
945 tracing::debug!("start to wait epoch {}", epoch.0);
946 let ret = self
947 .state_store
948 .try_wait_epoch(
949 HummockReadEpoch::Committed(epoch.0),
950 TryWaitEpochOptions {
951 table_id: self.table_id,
952 },
953 )
954 .await;
955
956 match ret {
957 Ok(()) => {
958 tracing::debug!(epoch = epoch.0, "wait epoch success");
959
960 task.run_with_on_commit_success(|source_id: u64, offset| {
962 if let Some(lsn_value) = extract_pg_cdc_lsn_from_offset(offset) {
963 self.metrics
964 .pg_cdc_jni_commit_offset_lsn
965 .with_guarded_label_values(&[&source_id.to_string()])
966 .set(lsn_value as i64);
967 }
968 })
969 .await;
970 }
971 Err(e) => {
972 tracing::error!(
973 error = %e.as_report(),
974 "wait epoch {} failed", epoch.0
975 );
976 }
977 }
978 }
979 None => {
980 tracing::error!("wait epoch rx closed");
981 break;
982 }
983 }
984 }
985 }
986}
987
988#[cfg(test)]
989mod tests {
990 use maplit::{btreemap, convert_args, hashmap};
991 use risingwave_common::catalog::{ColumnId, Field};
992 use risingwave_common::id::SourceId;
993 use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
994 use risingwave_common::test_prelude::StreamChunkTestExt;
995 use risingwave_common::util::epoch::{EpochExt, test_epoch};
996 use risingwave_connector::source::datagen::DatagenSplit;
997 use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
998 use risingwave_pb::catalog::StreamSourceInfo;
999 use risingwave_pb::plan_common::PbRowFormatType;
1000 use risingwave_storage::memory::MemoryStateStore;
1001 use tokio::sync::mpsc::unbounded_channel;
1002 use tracing_test::traced_test;
1003
1004 use super::*;
1005 use crate::executor::AddMutation;
1006 use crate::executor::source::{SourceStateTableHandler, default_source_internal_table};
1007 use crate::task::LocalBarrierManager;
1008
1009 const MOCK_SOURCE_NAME: &str = "mock_source";
1010
1011 #[tokio::test]
1012 async fn test_source_executor() {
1013 let source_id = 0.into();
1014 let schema = Schema {
1015 fields: vec![Field::with_name(DataType::Int32, "sequence_int")],
1016 };
1017 let row_id_index = None;
1018 let source_info = StreamSourceInfo {
1019 row_format: PbRowFormatType::Native as i32,
1020 ..Default::default()
1021 };
1022 let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1023 let column_ids = vec![0].into_iter().map(ColumnId::from).collect();
1024
1025 let properties = convert_args!(btreemap!(
1027 "connector" => "datagen",
1028 "datagen.rows.per.second" => "3",
1029 "fields.sequence_int.kind" => "sequence",
1030 "fields.sequence_int.start" => "11",
1031 "fields.sequence_int.end" => "11111",
1032 ));
1033 let source_desc_builder =
1034 create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1035 let split_state_store = SourceStateTableHandler::from_table_catalog(
1036 &default_source_internal_table(0x2333),
1037 MemoryStateStore::new(),
1038 )
1039 .await;
1040 let core = StreamSourceCore::<MemoryStateStore> {
1041 source_id,
1042 column_ids,
1043 source_desc_builder: Some(source_desc_builder),
1044 latest_split_info: HashMap::new(),
1045 split_state_store,
1046 updated_splits_in_epoch: HashMap::new(),
1047 source_name: MOCK_SOURCE_NAME.to_owned(),
1048 };
1049
1050 let system_params_manager = LocalSystemParamsManager::for_test();
1051
1052 let executor = SourceExecutor::new(
1053 ActorContext::for_test(0),
1054 core,
1055 Arc::new(StreamingMetrics::unused()),
1056 barrier_rx,
1057 system_params_manager.get_params(),
1058 None,
1059 false,
1060 LocalBarrierManager::for_test(),
1061 );
1062 let mut executor = executor.boxed().execute();
1063
1064 let init_barrier =
1065 Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
1066 splits: hashmap! {
1067 ActorId::default() => vec![
1068 SplitImpl::Datagen(DatagenSplit {
1069 split_index: 0,
1070 split_num: 1,
1071 start_offset: None,
1072 }),
1073 ],
1074 },
1075 ..Default::default()
1076 }));
1077 barrier_tx.send(init_barrier).unwrap();
1078
1079 executor.next().await.unwrap().unwrap();
1081
1082 let msg = executor.next().await.unwrap().unwrap();
1084
1085 assert_eq!(
1087 msg.into_chunk().unwrap(),
1088 StreamChunk::from_pretty(
1089 " i
1090 + 11
1091 + 12
1092 + 13"
1093 )
1094 );
1095 }
1096
1097 #[traced_test]
1098 #[tokio::test]
1099 async fn test_split_change_mutation() {
1100 let source_id = SourceId::new(0);
1101 let schema = Schema {
1102 fields: vec![Field::with_name(DataType::Int32, "v1")],
1103 };
1104 let row_id_index = None;
1105 let source_info = StreamSourceInfo {
1106 row_format: PbRowFormatType::Native as i32,
1107 ..Default::default()
1108 };
1109 let properties = convert_args!(btreemap!(
1110 "connector" => "datagen",
1111 "fields.v1.kind" => "sequence",
1112 "fields.v1.start" => "11",
1113 "fields.v1.end" => "11111",
1114 ));
1115
1116 let source_desc_builder =
1117 create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1118 let mem_state_store = MemoryStateStore::new();
1119
1120 let column_ids = vec![ColumnId::from(0)];
1121 let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1122 let split_state_store = SourceStateTableHandler::from_table_catalog(
1123 &default_source_internal_table(0x2333),
1124 mem_state_store.clone(),
1125 )
1126 .await;
1127
1128 let core = StreamSourceCore::<MemoryStateStore> {
1129 source_id,
1130 column_ids: column_ids.clone(),
1131 source_desc_builder: Some(source_desc_builder),
1132 latest_split_info: HashMap::new(),
1133 split_state_store,
1134 updated_splits_in_epoch: HashMap::new(),
1135 source_name: MOCK_SOURCE_NAME.to_owned(),
1136 };
1137
1138 let system_params_manager = LocalSystemParamsManager::for_test();
1139
1140 let executor = SourceExecutor::new(
1141 ActorContext::for_test(0),
1142 core,
1143 Arc::new(StreamingMetrics::unused()),
1144 barrier_rx,
1145 system_params_manager.get_params(),
1146 None,
1147 false,
1148 LocalBarrierManager::for_test(),
1149 );
1150 let mut handler = executor.boxed().execute();
1151
1152 let mut epoch = test_epoch(1);
1153 let init_barrier =
1154 Barrier::new_test_barrier(epoch).with_mutation(Mutation::Add(AddMutation {
1155 splits: hashmap! {
1156 ActorId::default() => vec![
1157 SplitImpl::Datagen(DatagenSplit {
1158 split_index: 0,
1159 split_num: 3,
1160 start_offset: None,
1161 }),
1162 ],
1163 },
1164 ..Default::default()
1165 }));
1166 barrier_tx.send(init_barrier).unwrap();
1167
1168 handler
1170 .next()
1171 .await
1172 .unwrap()
1173 .unwrap()
1174 .into_barrier()
1175 .unwrap();
1176
1177 let mut ready_chunks = handler.ready_chunks(10);
1178
1179 let _ = ready_chunks.next().await.unwrap();
1180
1181 let new_assignment = vec![
1182 SplitImpl::Datagen(DatagenSplit {
1183 split_index: 0,
1184 split_num: 3,
1185 start_offset: None,
1186 }),
1187 SplitImpl::Datagen(DatagenSplit {
1188 split_index: 1,
1189 split_num: 3,
1190 start_offset: None,
1191 }),
1192 SplitImpl::Datagen(DatagenSplit {
1193 split_index: 2,
1194 split_num: 3,
1195 start_offset: None,
1196 }),
1197 ];
1198
1199 epoch.inc_epoch();
1200 let change_split_mutation =
1201 Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1202 ActorId::default() => new_assignment.clone()
1203 }));
1204
1205 barrier_tx.send(change_split_mutation).unwrap();
1206
1207 let _ = ready_chunks.next().await.unwrap(); epoch.inc_epoch();
1210 let barrier = Barrier::new_test_barrier(epoch);
1211 barrier_tx.send(barrier).unwrap();
1212
1213 ready_chunks.next().await.unwrap(); let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1216 &default_source_internal_table(0x2333),
1217 mem_state_store.clone(),
1218 )
1219 .await;
1220
1221 source_state_handler
1223 .init_epoch(EpochPair::new_test_epoch(epoch))
1224 .await
1225 .unwrap();
1226 source_state_handler
1227 .get(&new_assignment[1].id())
1228 .await
1229 .unwrap()
1230 .unwrap();
1231
1232 tokio::time::sleep(Duration::from_millis(100)).await;
1233
1234 let _ = ready_chunks.next().await.unwrap();
1235
1236 epoch.inc_epoch();
1237 let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Pause);
1238 barrier_tx.send(barrier).unwrap();
1239
1240 epoch.inc_epoch();
1241 let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Resume);
1242 barrier_tx.send(barrier).unwrap();
1243
1244 ready_chunks.next().await.unwrap();
1246
1247 let prev_assignment = new_assignment;
1248 let new_assignment = vec![prev_assignment[2].clone()];
1249
1250 epoch.inc_epoch();
1251 let drop_split_mutation =
1252 Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1253 ActorId::default() => new_assignment.clone()
1254 }));
1255
1256 barrier_tx.send(drop_split_mutation).unwrap();
1257
1258 ready_chunks.next().await.unwrap(); epoch.inc_epoch();
1261 let barrier = Barrier::new_test_barrier(epoch);
1262 barrier_tx.send(barrier).unwrap();
1263
1264 ready_chunks.next().await.unwrap(); let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1267 &default_source_internal_table(0x2333),
1268 mem_state_store.clone(),
1269 )
1270 .await;
1271
1272 let new_epoch = EpochPair::new_test_epoch(epoch);
1273 source_state_handler.init_epoch(new_epoch).await.unwrap();
1274
1275 let committed_reader = source_state_handler
1276 .new_committed_reader(new_epoch)
1277 .await
1278 .unwrap();
1279 assert!(
1280 committed_reader
1281 .try_recover_from_state_store(&prev_assignment[0])
1282 .await
1283 .unwrap()
1284 .is_none()
1285 );
1286
1287 assert!(
1288 committed_reader
1289 .try_recover_from_state_store(&prev_assignment[1])
1290 .await
1291 .unwrap()
1292 .is_none()
1293 );
1294
1295 assert!(
1296 committed_reader
1297 .try_recover_from_state_store(&prev_assignment[2])
1298 .await
1299 .unwrap()
1300 .is_some()
1301 );
1302 }
1303}