1use std::collections::HashMap;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use either::Either;
20use itertools::Itertools;
21use prometheus::core::{AtomicU64, GenericCounter};
22use risingwave_common::array::ArrayRef;
23use risingwave_common::catalog::{ColumnId, TableId};
24use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedMetric};
25use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::util::epoch::{Epoch, EpochPair};
28use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
29use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
30use risingwave_connector::source::reader::reader::SourceReader;
31use risingwave_connector::source::{
32 ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData,
33 StreamChunkWithState, WaitCheckpointTask, build_pulsar_ack_channel_id,
34};
35use risingwave_hummock_sdk::HummockReadEpoch;
36use risingwave_pb::id::SourceId;
37use risingwave_storage::store::TryWaitEpochOptions;
38use serde_json;
39use thiserror_ext::AsReport;
40use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
41use tokio::sync::{mpsc, oneshot};
42use tokio::time::Instant;
43
44use super::executor_core::StreamSourceCore;
45use super::{barrier_to_message_stream, get_split_offset_col_idx, prune_additional_cols};
46use crate::common::rate_limit::limited_chunk_size;
47use crate::executor::UpdateMutation;
48use crate::executor::prelude::*;
49use crate::executor::source::reader_stream::StreamReaderBuilder;
50use crate::executor::stream_reader::StreamReaderWithPause;
51use crate::task::LocalBarrierManager;
52
53pub const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;
56
57fn extract_split_offset(split: &SplitImpl) -> Option<u64> {
62 match split {
63 SplitImpl::PostgresCdc(pg_split) => {
64 let offset_str = pg_split.start_offset().as_ref()?;
65 extract_pg_cdc_lsn_from_offset(offset_str)
66 }
67 _ => None,
68 }
69}
70
71fn extract_pg_cdc_lsn_from_offset(offset_str: &str) -> Option<u64> {
74 let offset = serde_json::from_str::<serde_json::Value>(offset_str).ok()?;
75 let source_offset = offset.get("sourceOffset")?;
76 let lsn = source_offset.get("lsn")?;
77 lsn.as_u64()
78}
79
80pub struct SourceExecutor<S: StateStore> {
81 actor_ctx: ActorContextRef,
82
83 stream_source_core: StreamSourceCore<S>,
85
86 metrics: Arc<StreamingMetrics>,
88
89 barrier_receiver: Option<UnboundedReceiver<Barrier>>,
91
92 system_params: SystemParamsReaderRef,
94
95 rate_limit_rps: Option<u32>,
97
98 is_shared_non_cdc: bool,
99
100 _barrier_manager: LocalBarrierManager,
102}
103
104impl<S: StateStore> SourceExecutor<S> {
105 #[expect(clippy::too_many_arguments)]
106 pub fn new(
107 actor_ctx: ActorContextRef,
108 stream_source_core: StreamSourceCore<S>,
109 metrics: Arc<StreamingMetrics>,
110 barrier_receiver: UnboundedReceiver<Barrier>,
111 system_params: SystemParamsReaderRef,
112 rate_limit_rps: Option<u32>,
113 is_shared_non_cdc: bool,
114 barrier_manager: LocalBarrierManager,
115 ) -> Self {
116 Self {
117 actor_ctx,
118 stream_source_core,
119 metrics,
120 barrier_receiver: Some(barrier_receiver),
121 system_params,
122 rate_limit_rps,
123 is_shared_non_cdc,
124 _barrier_manager: barrier_manager,
125 }
126 }
127
128 fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder {
129 StreamReaderBuilder {
130 source_desc,
131 rate_limit: self.rate_limit_rps,
132 source_id: self.stream_source_core.source_id,
133 source_name: self.stream_source_core.source_name.clone(),
134 is_auto_schema_change_enable: self.is_auto_schema_change_enable(),
135 actor_ctx: self.actor_ctx.clone(),
136 reader_stream: None,
137 }
138 }
139
140 async fn spawn_wait_checkpoint_worker(
141 core: &StreamSourceCore<S>,
142 source_reader: SourceReader,
143 metrics: Arc<StreamingMetrics>,
144 ) -> StreamExecutorResult<Option<WaitCheckpointTaskBuilder>> {
145 let Some(initial_task) = source_reader.create_wait_checkpoint_task().await? else {
146 return Ok(None);
147 };
148 let (wait_checkpoint_tx, wait_checkpoint_rx) = mpsc::unbounded_channel();
149 let wait_checkpoint_worker = WaitCheckpointWorker {
150 wait_checkpoint_rx,
151 state_store: core.split_state_store.state_table().state_store().clone(),
152 table_id: core.split_state_store.state_table().table_id(),
153 metrics,
154 };
155 tokio::spawn(wait_checkpoint_worker.run());
156 Ok(Some(WaitCheckpointTaskBuilder {
157 wait_checkpoint_tx,
158 source_reader,
159 building_task: initial_task,
160 }))
161 }
162
163 pub fn prepare_source_stream_build(
165 &self,
166 source_desc: &SourceDesc,
167 ) -> (Vec<ColumnId>, SourceContext) {
168 let column_ids = source_desc
169 .columns
170 .iter()
171 .map(|column_desc| column_desc.column_id)
172 .collect_vec();
173
174 let (schema_change_tx, mut schema_change_rx) =
175 mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
176 let schema_change_tx = if self.is_auto_schema_change_enable() {
177 let meta_client = self.actor_ctx.meta_client.clone();
178 let _join_handle = tokio::task::spawn(async move {
180 while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
181 let table_ids = schema_change.table_ids();
182 tracing::info!(
183 target: "auto_schema_change",
184 "recv a schema change event for tables: {:?}", table_ids);
185 if let Some(ref meta_client) = meta_client {
187 match meta_client
188 .auto_schema_change(schema_change.to_protobuf())
189 .await
190 {
191 Ok(_) => {
192 tracing::info!(
193 target: "auto_schema_change",
194 "schema change success for tables: {:?}", table_ids);
195 finish_tx.send(()).unwrap();
196 }
197 Err(e) => {
198 tracing::error!(
199 target: "auto_schema_change",
200 error = ?e.as_report(), "schema change error");
201 finish_tx.send(()).unwrap();
202 }
203 }
204 }
205 }
206 });
207 Some(schema_change_tx)
208 } else {
209 info!("auto schema change is disabled in config");
210 None
211 };
212 let source_ctx = SourceContext::new(
213 self.actor_ctx.id,
214 self.stream_source_core.source_id,
215 self.actor_ctx.fragment_id,
216 self.stream_source_core.source_name.clone(),
217 source_desc.metrics.clone(),
218 SourceCtrlOpts {
219 chunk_size: limited_chunk_size(self.rate_limit_rps),
220 split_txn: self.rate_limit_rps.is_some(), },
222 source_desc.source.config.clone(),
223 schema_change_tx,
224 );
225
226 (column_ids, source_ctx)
227 }
228
229 fn is_auto_schema_change_enable(&self) -> bool {
230 self.actor_ctx.config.developer.enable_auto_schema_change
231 }
232
233 #[inline]
235 fn get_metric_labels(&self) -> [String; 4] {
236 [
237 self.stream_source_core.source_id.to_string(),
238 self.stream_source_core.source_name.clone(),
239 self.actor_ctx.id.to_string(),
240 self.actor_ctx.fragment_id.to_string(),
241 ]
242 }
243
244 async fn apply_split_change_after_yield_barrier<const BIASED: bool>(
253 &mut self,
254 barrier_epoch: EpochPair,
255 source_desc: &SourceDesc,
256 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
257 apply_mutation: ApplyMutationAfterBarrier<'_>,
258 ) -> StreamExecutorResult<()> {
259 {
260 let mut should_rebuild_stream = false;
261 match apply_mutation {
262 ApplyMutationAfterBarrier::SplitChange {
263 target_splits,
264 should_trim_state,
265 split_change_count,
266 } => {
267 split_change_count.inc();
268 if self
269 .update_state_if_changed(barrier_epoch, target_splits, should_trim_state)
270 .await?
271 {
272 should_rebuild_stream = true;
273 }
274 }
275 ApplyMutationAfterBarrier::ConnectorPropsChange => {
276 should_rebuild_stream = true;
277 }
278 }
279
280 if should_rebuild_stream {
281 self.rebuild_stream_reader(source_desc, stream)?;
282 }
283 }
284
285 Ok(())
286 }
287
288 async fn update_state_if_changed(
290 &mut self,
291 barrier_epoch: EpochPair,
292 target_splits: Vec<SplitImpl>,
293 should_trim_state: bool,
294 ) -> StreamExecutorResult<bool> {
295 let core = &mut self.stream_source_core;
296
297 let target_splits: HashMap<_, _> = target_splits
298 .into_iter()
299 .map(|split| (split.id(), split))
300 .collect();
301
302 let mut target_state: HashMap<SplitId, SplitImpl> =
303 HashMap::with_capacity(target_splits.len());
304
305 let mut split_changed = false;
306
307 let committed_reader = core
308 .split_state_store
309 .new_committed_reader(barrier_epoch)
310 .await?;
311
312 for (split_id, split) in target_splits {
314 if let Some(s) = core.latest_split_info.get(&split_id) {
315 target_state.insert(split_id, s.clone());
318 } else {
319 split_changed = true;
320 let initial_state = if let Some(recover_state) = committed_reader
323 .try_recover_from_state_store(&split)
324 .await?
325 {
326 recover_state
327 } else {
328 split
329 };
330
331 core.updated_splits_in_epoch
332 .entry(split_id.clone())
333 .or_insert_with(|| initial_state.clone());
334
335 target_state.insert(split_id, initial_state);
336 }
337 }
338
339 for existing_split_id in core.latest_split_info.keys() {
341 if !target_state.contains_key(existing_split_id) {
342 tracing::info!("split dropping detected: {}", existing_split_id);
343 split_changed = true;
344 }
345 }
346
347 if split_changed {
348 tracing::info!(
349 actor_id = %self.actor_ctx.id,
350 state = ?target_state,
351 "apply split change"
352 );
353
354 core.updated_splits_in_epoch
355 .retain(|split_id, _| target_state.contains_key(split_id));
356
357 let dropped_splits = core
358 .latest_split_info
359 .extract_if(|split_id, _| !target_state.contains_key(split_id))
360 .map(|(_, split)| split)
361 .collect_vec();
362
363 if should_trim_state && !dropped_splits.is_empty() {
364 core.split_state_store.trim_state(&dropped_splits).await?;
366 }
367
368 core.latest_split_info = target_state;
369 }
370
371 Ok(split_changed)
372 }
373
374 fn rebuild_stream_reader_from_error<const BIASED: bool>(
376 &mut self,
377 source_desc: &SourceDesc,
378 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
379 e: StreamExecutorError,
380 ) -> StreamExecutorResult<()> {
381 let core = &mut self.stream_source_core;
382 tracing::error!(
383 error = ?e.as_report(),
384 actor_id = %self.actor_ctx.id,
385 source_id = %core.source_id,
386 "stream source reader error",
387 );
388 GLOBAL_ERROR_METRICS.user_source_error.report([
389 e.variant_name().to_owned(),
390 core.source_id.to_string(),
391 core.source_name.clone(),
392 self.actor_ctx.fragment_id.to_string(),
393 ]);
394
395 self.rebuild_stream_reader(source_desc, stream)
396 }
397
398 fn rebuild_stream_reader<const BIASED: bool>(
399 &mut self,
400 source_desc: &SourceDesc,
401 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
402 ) -> StreamExecutorResult<()> {
403 let core = &mut self.stream_source_core;
404 let target_state: Vec<SplitImpl> = core.latest_split_info.values().cloned().collect();
405
406 tracing::info!(
407 "actor {:?} apply source split change to {:?}",
408 self.actor_ctx.id,
409 target_state
410 );
411
412 let reader_stream_builder = self.stream_reader_builder(source_desc.clone());
414 let reader_stream = reader_stream_builder.into_retry_stream(Some(target_state), false);
415
416 stream.replace_data_stream(reader_stream);
417
418 Ok(())
419 }
420
421 async fn persist_state_and_clear_cache(
422 &mut self,
423 epoch: EpochPair,
424 ) -> StreamExecutorResult<HashMap<SplitId, SplitImpl>> {
425 let core = &mut self.stream_source_core;
426
427 let cache = core
428 .updated_splits_in_epoch
429 .values()
430 .map(|split_impl| split_impl.to_owned())
431 .collect_vec();
432
433 if !cache.is_empty() {
434 tracing::debug!(state = ?cache, "take snapshot");
435
436 let source_id = core.source_id.to_string();
438 for split_impl in &cache {
439 if let Some(state_table_lsn_value) = extract_split_offset(split_impl) {
441 self.metrics
442 .pg_cdc_state_table_lsn
443 .with_guarded_label_values(&[&source_id])
444 .set(state_table_lsn_value as i64);
445 }
446 }
447
448 core.split_state_store.set_states(cache).await?;
449 }
450
451 core.split_state_store.commit(epoch).await?;
453
454 let updated_splits = core.updated_splits_in_epoch.clone();
455
456 core.updated_splits_in_epoch.clear();
457
458 Ok(updated_splits)
459 }
460
461 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
463 let core = &mut self.stream_source_core;
464 core.split_state_store.try_flush().await?;
465
466 Ok(())
467 }
468
469 #[try_stream(ok = Message, error = StreamExecutorError)]
474 async fn execute_inner(mut self) {
475 let mut barrier_receiver = self.barrier_receiver.take().unwrap();
476 let first_barrier = barrier_receiver
477 .recv()
478 .instrument_await("source_recv_first_barrier")
479 .await
480 .ok_or_else(|| {
481 anyhow!(
482 "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
483 self.actor_ctx.id,
484 self.stream_source_core.source_id
485 )
486 })?;
487 let first_epoch = first_barrier.epoch;
488 let mut boot_state =
489 if let Some(splits) = first_barrier.initial_split_assignment(self.actor_ctx.id) {
490 tracing::debug!(?splits, "boot with splits");
491 splits.to_vec()
492 } else {
493 Vec::default()
494 };
495 let is_pause_on_startup = first_barrier.is_pause_on_startup();
496 let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id);
497
498 yield Message::Barrier(first_barrier);
499
500 let mut core = self.stream_source_core;
501 let source_id = core.source_id;
502
503 let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap();
505 let mut source_desc = source_desc_builder
506 .build()
507 .map_err(StreamExecutorError::connector_error)?;
508
509 let mut wait_checkpoint_task_builder = Self::spawn_wait_checkpoint_worker(
510 &core,
511 source_desc.source.clone(),
512 self.metrics.clone(),
513 )
514 .await?;
515
516 let (Some(split_idx), Some(offset_idx), pulsar_message_id_idx) =
517 get_split_offset_col_idx(&source_desc.columns)
518 else {
519 unreachable!("Partition and offset columns must be set.");
520 };
521
522 core.split_state_store.init_epoch(first_epoch).await?;
523 {
524 let committed_reader = core
525 .split_state_store
526 .new_committed_reader(first_epoch)
527 .await?;
528 for ele in &mut boot_state {
529 if let Some(recover_state) =
530 committed_reader.try_recover_from_state_store(ele).await?
531 {
532 *ele = recover_state;
533 is_uninitialized = false;
535 } else {
536 core.updated_splits_in_epoch.insert(ele.id(), ele.clone());
540 }
541 }
542 }
543
544 core.init_split_state(boot_state.clone());
546
547 self.stream_source_core = core;
549
550 let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
551 tracing::debug!(state = ?recover_state, "start with state");
552
553 let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
554 let mut reader_stream_builder = self.stream_reader_builder(source_desc.clone());
555 let mut latest_splits = None;
556 if is_uninitialized {
558 let create_split_reader_result = reader_stream_builder
559 .fetch_latest_splits(recover_state.clone(), self.is_shared_non_cdc)
560 .await?;
561 latest_splits = create_split_reader_result.latest_splits;
562 }
563
564 if let Some(latest_splits) = latest_splits {
565 self.stream_source_core
568 .updated_splits_in_epoch
569 .extend(latest_splits.into_iter().map(|s| (s.id(), s)));
570 }
571 let mut stream = StreamReaderWithPause::<true, StreamChunkWithState>::new(
574 barrier_stream,
575 reader_stream_builder
576 .into_retry_stream(recover_state, is_uninitialized && self.is_shared_non_cdc),
577 );
578 let mut command_paused = false;
579
580 if is_pause_on_startup {
582 tracing::info!("source paused on startup");
583 stream.pause_stream();
584 command_paused = true;
585 }
586
587 let mut max_wait_barrier_time_ms =
590 self.system_params.load().barrier_interval_ms() as u128 * WAIT_BARRIER_MULTIPLE_TIMES;
591 let mut last_barrier_time = Instant::now();
592 let mut self_paused = false;
593
594 let source_output_row_count = self
595 .metrics
596 .source_output_row_count
597 .with_guarded_label_values(&self.get_metric_labels());
598
599 let source_split_change_count = self
600 .metrics
601 .source_split_change_count
602 .with_guarded_label_values(&self.get_metric_labels());
603
604 while let Some(msg) = stream.next().await {
605 let Ok(msg) = msg else {
606 tokio::time::sleep(Duration::from_millis(1000)).await;
607 self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err())?;
608 continue;
609 };
610
611 match msg {
612 Either::Left(Message::Barrier(barrier)) => {
614 last_barrier_time = Instant::now();
615
616 if self_paused {
617 self_paused = false;
618 if !command_paused {
620 stream.resume_stream();
621 }
622 }
623
624 let epoch = barrier.epoch;
625 let mut split_change = None;
626
627 if let Some(mutation) = barrier.mutation.as_deref() {
628 match mutation {
629 Mutation::Pause => {
630 command_paused = true;
631 stream.pause_stream()
632 }
633 Mutation::Resume => {
634 command_paused = false;
635 stream.resume_stream()
636 }
637 Mutation::SourceChangeSplit(actor_splits) => {
638 tracing::info!(
639 actor_id = %self.actor_ctx.id,
640 actor_splits = ?actor_splits,
641 "source change split received"
642 );
643
644 split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
645 |target_splits| {
646 (
647 &source_desc,
648 &mut stream,
649 ApplyMutationAfterBarrier::SplitChange {
650 target_splits,
651 should_trim_state: true,
652 split_change_count: &source_split_change_count,
653 },
654 )
655 },
656 );
657 }
658
659 Mutation::ConnectorPropsChange(maybe_mutation) => {
660 if let Some(new_props) = maybe_mutation.get(&source_id.as_raw_id())
661 {
662 tracing::info!(
664 "updating source properties from {:?} to {:?}",
665 source_desc.source.config,
666 new_props
667 );
668 source_desc.update_reader(new_props.clone())?;
669 split_change = Some((
671 &source_desc,
672 &mut stream,
673 ApplyMutationAfterBarrier::ConnectorPropsChange,
674 ));
675 }
676 }
677
678 Mutation::Update(UpdateMutation { actor_splits, .. }) => {
679 split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
680 |target_splits| {
681 (
682 &source_desc,
683 &mut stream,
684 ApplyMutationAfterBarrier::SplitChange {
685 target_splits,
686 should_trim_state: false,
687 split_change_count: &source_split_change_count,
688 },
689 )
690 },
691 );
692 }
693 Mutation::Throttle(actor_to_apply) => {
694 if let Some(new_rate_limit) = actor_to_apply.get(&self.actor_ctx.id)
695 && *new_rate_limit != self.rate_limit_rps
696 {
697 tracing::info!(
698 "updating rate limit from {:?} to {:?}",
699 self.rate_limit_rps,
700 *new_rate_limit
701 );
702 self.rate_limit_rps = *new_rate_limit;
703 self.rebuild_stream_reader(&source_desc, &mut stream)?;
705 }
706 }
707 _ => {}
708 }
709 }
710
711 let updated_splits = self.persist_state_and_clear_cache(epoch).await?;
712
713 if barrier.kind.is_checkpoint()
715 && let Some(task_builder) = &mut wait_checkpoint_task_builder
716 {
717 task_builder.update_task_on_checkpoint(updated_splits);
718
719 tracing::debug!("epoch to wait {:?}", epoch);
720 task_builder.send(Epoch(epoch.prev)).await?
721 }
722
723 let barrier_epoch = barrier.epoch;
724 yield Message::Barrier(barrier);
725
726 if let Some((source_desc, stream, to_apply_mutation)) = split_change {
727 self.apply_split_change_after_yield_barrier(
728 barrier_epoch,
729 source_desc,
730 stream,
731 to_apply_mutation,
732 )
733 .await?;
734 }
735 }
736 Either::Left(_) => {
737 unreachable!();
740 }
741
742 Either::Right((chunk, latest_state)) => {
743 if let Some(task_builder) = &mut wait_checkpoint_task_builder {
744 if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
745 let pulsar_message_id_col = chunk.column_at(pulsar_message_id_idx);
746 task_builder.update_task_on_chunk(
747 source_id,
748 &latest_state,
749 pulsar_message_id_col.clone(),
750 );
751 } else {
752 let offset_col = chunk.column_at(offset_idx);
753 task_builder.update_task_on_chunk(
754 source_id,
755 &latest_state,
756 offset_col.clone(),
757 );
758 }
759 }
760 if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
761 self_paused = true;
766 tracing::warn!(
767 "source paused, wait barrier for {:?}",
768 last_barrier_time.elapsed()
769 );
770 stream.pause_stream();
771
772 max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
777 as u128
778 * WAIT_BARRIER_MULTIPLE_TIMES;
779 }
780
781 latest_state.iter().for_each(|(split_id, new_split_impl)| {
782 if let Some(split_impl) =
783 self.stream_source_core.latest_split_info.get_mut(split_id)
784 {
785 *split_impl = new_split_impl.clone();
786 }
787 });
788
789 self.stream_source_core
790 .updated_splits_in_epoch
791 .extend(latest_state);
792
793 let card = chunk.cardinality();
794 if card == 0 {
795 continue;
796 }
797 source_output_row_count.inc_by(card as u64);
798 let to_remove_col_indices =
799 if let Some(pulsar_message_id_idx) = pulsar_message_id_idx {
800 vec![split_idx, offset_idx, pulsar_message_id_idx]
801 } else {
802 vec![split_idx, offset_idx]
803 };
804 let chunk =
805 prune_additional_cols(&chunk, &to_remove_col_indices, &source_desc.columns);
806 yield Message::Chunk(chunk);
807 self.try_flush_data().await?;
808 }
809 }
810 }
811
812 tracing::error!(
814 actor_id = %self.actor_ctx.id,
815 "source executor exited unexpectedly"
816 )
817 }
818}
819
820#[derive(Debug, Clone)]
821enum ApplyMutationAfterBarrier<'a> {
822 SplitChange {
823 target_splits: Vec<SplitImpl>,
824 should_trim_state: bool,
825 split_change_count: &'a LabelGuardedMetric<GenericCounter<AtomicU64>>,
826 },
827 ConnectorPropsChange,
828}
829
830impl<S: StateStore> Execute for SourceExecutor<S> {
831 fn execute(self: Box<Self>) -> BoxedMessageStream {
832 self.execute_inner().boxed()
833 }
834}
835
836impl<S: StateStore> Debug for SourceExecutor<S> {
837 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
838 f.debug_struct("SourceExecutor")
839 .field("source_id", &self.stream_source_core.source_id)
840 .field("column_ids", &self.stream_source_core.column_ids)
841 .finish()
842 }
843}
844
845struct WaitCheckpointTaskBuilder {
846 wait_checkpoint_tx: UnboundedSender<(Epoch, WaitCheckpointTask)>,
847 source_reader: SourceReader,
848 building_task: WaitCheckpointTask,
849}
850
851impl WaitCheckpointTaskBuilder {
852 fn update_task_on_chunk(
853 &mut self,
854 source_id: SourceId,
855 latest_state: &HashMap<SplitId, SplitImpl>,
856 offset_col: ArrayRef,
857 ) {
858 match &mut self.building_task {
859 WaitCheckpointTask::AckPubsubMessage(_, arrays) => {
860 arrays.push(offset_col);
861 }
862 WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => {
863 arrays.push(offset_col);
864 }
865 WaitCheckpointTask::AckPulsarMessage(arrays) => {
866 let split_id = latest_state.keys().next().unwrap();
868 let pulsar_ack_channel_id = build_pulsar_ack_channel_id(source_id, split_id);
869 arrays.push((pulsar_ack_channel_id, offset_col));
870 }
871 WaitCheckpointTask::CommitCdcOffset(_) => {}
872 }
873 }
874
875 fn update_task_on_checkpoint(&mut self, updated_splits: HashMap<SplitId, SplitImpl>) {
876 #[expect(clippy::single_match)]
877 match &mut self.building_task {
878 WaitCheckpointTask::CommitCdcOffset(offsets) => {
879 if !updated_splits.is_empty() {
880 assert_eq!(1, updated_splits.len());
882 for (split_id, split_impl) in updated_splits {
883 if split_impl.is_cdc_split() {
884 *offsets = Some((split_id, split_impl.get_cdc_split_offset()));
885 } else {
886 unreachable!()
887 }
888 }
889 }
890 }
891 _ => {}
892 }
893 }
894
895 async fn send(&mut self, epoch: Epoch) -> Result<(), anyhow::Error> {
897 let new_task = self
898 .source_reader
899 .create_wait_checkpoint_task()
900 .await?
901 .expect("wait checkpoint task should be created");
902 self.wait_checkpoint_tx
903 .send((epoch, std::mem::replace(&mut self.building_task, new_task)))
904 .expect("wait_checkpoint_tx send should succeed");
905 Ok(())
906 }
907}
908
909struct WaitCheckpointWorker<S: StateStore> {
935 wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>,
936 state_store: S,
937 table_id: TableId,
938 metrics: Arc<StreamingMetrics>,
939}
940
941impl<S: StateStore> WaitCheckpointWorker<S> {
942 pub async fn run(mut self) {
943 tracing::debug!("wait epoch worker start success");
944 loop {
945 match self.wait_checkpoint_rx.recv().await {
947 Some((epoch, task)) => {
948 tracing::debug!("start to wait epoch {}", epoch.0);
949 let ret = self
950 .state_store
951 .try_wait_epoch(
952 HummockReadEpoch::Committed(epoch.0),
953 TryWaitEpochOptions {
954 table_id: self.table_id,
955 },
956 )
957 .await;
958
959 match ret {
960 Ok(()) => {
961 tracing::debug!(epoch = epoch.0, "wait epoch success");
962
963 task.run_with_on_commit_success(|source_id: u64, offset| {
965 if let Some(lsn_value) = extract_pg_cdc_lsn_from_offset(offset) {
966 self.metrics
967 .pg_cdc_jni_commit_offset_lsn
968 .with_guarded_label_values(&[&source_id.to_string()])
969 .set(lsn_value as i64);
970 }
971 })
972 .await;
973 }
974 Err(e) => {
975 tracing::error!(
976 error = %e.as_report(),
977 "wait epoch {} failed", epoch.0
978 );
979 }
980 }
981 }
982 None => {
983 tracing::error!("wait epoch rx closed");
984 break;
985 }
986 }
987 }
988 }
989}
990
991#[cfg(test)]
992mod tests {
993 use maplit::{btreemap, convert_args, hashmap};
994 use risingwave_common::catalog::{ColumnId, Field};
995 use risingwave_common::id::SourceId;
996 use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
997 use risingwave_common::test_prelude::StreamChunkTestExt;
998 use risingwave_common::util::epoch::{EpochExt, test_epoch};
999 use risingwave_connector::source::datagen::DatagenSplit;
1000 use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
1001 use risingwave_pb::catalog::StreamSourceInfo;
1002 use risingwave_pb::plan_common::PbRowFormatType;
1003 use risingwave_storage::memory::MemoryStateStore;
1004 use tokio::sync::mpsc::unbounded_channel;
1005 use tracing_test::traced_test;
1006
1007 use super::*;
1008 use crate::executor::AddMutation;
1009 use crate::executor::source::{SourceStateTableHandler, default_source_internal_table};
1010 use crate::task::LocalBarrierManager;
1011
1012 const MOCK_SOURCE_NAME: &str = "mock_source";
1013
1014 #[tokio::test]
1015 async fn test_source_executor() {
1016 let source_id = 0.into();
1017 let schema = Schema {
1018 fields: vec![Field::with_name(DataType::Int32, "sequence_int")],
1019 };
1020 let row_id_index = None;
1021 let source_info = StreamSourceInfo {
1022 row_format: PbRowFormatType::Native as i32,
1023 ..Default::default()
1024 };
1025 let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1026 let column_ids = vec![0].into_iter().map(ColumnId::from).collect();
1027
1028 let properties = convert_args!(btreemap!(
1030 "connector" => "datagen",
1031 "datagen.rows.per.second" => "3",
1032 "fields.sequence_int.kind" => "sequence",
1033 "fields.sequence_int.start" => "11",
1034 "fields.sequence_int.end" => "11111",
1035 ));
1036 let source_desc_builder =
1037 create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1038 let split_state_store = SourceStateTableHandler::from_table_catalog(
1039 &default_source_internal_table(0x2333),
1040 MemoryStateStore::new(),
1041 )
1042 .await;
1043 let core = StreamSourceCore::<MemoryStateStore> {
1044 source_id,
1045 column_ids,
1046 source_desc_builder: Some(source_desc_builder),
1047 latest_split_info: HashMap::new(),
1048 split_state_store,
1049 updated_splits_in_epoch: HashMap::new(),
1050 source_name: MOCK_SOURCE_NAME.to_owned(),
1051 };
1052
1053 let system_params_manager = LocalSystemParamsManager::for_test();
1054
1055 let executor = SourceExecutor::new(
1056 ActorContext::for_test(0),
1057 core,
1058 Arc::new(StreamingMetrics::unused()),
1059 barrier_rx,
1060 system_params_manager.get_params(),
1061 None,
1062 false,
1063 LocalBarrierManager::for_test(),
1064 );
1065 let mut executor = executor.boxed().execute();
1066
1067 let init_barrier =
1068 Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
1069 splits: hashmap! {
1070 ActorId::default() => vec![
1071 SplitImpl::Datagen(DatagenSplit {
1072 split_index: 0,
1073 split_num: 1,
1074 start_offset: None,
1075 }),
1076 ],
1077 },
1078 ..Default::default()
1079 }));
1080 barrier_tx.send(init_barrier).unwrap();
1081
1082 executor.next().await.unwrap().unwrap();
1084
1085 let msg = executor.next().await.unwrap().unwrap();
1087
1088 assert_eq!(
1090 msg.into_chunk().unwrap(),
1091 StreamChunk::from_pretty(
1092 " i
1093 + 11
1094 + 12
1095 + 13"
1096 )
1097 );
1098 }
1099
1100 #[traced_test]
1101 #[tokio::test]
1102 async fn test_split_change_mutation() {
1103 let source_id = SourceId::new(0);
1104 let schema = Schema {
1105 fields: vec![Field::with_name(DataType::Int32, "v1")],
1106 };
1107 let row_id_index = None;
1108 let source_info = StreamSourceInfo {
1109 row_format: PbRowFormatType::Native as i32,
1110 ..Default::default()
1111 };
1112 let properties = convert_args!(btreemap!(
1113 "connector" => "datagen",
1114 "fields.v1.kind" => "sequence",
1115 "fields.v1.start" => "11",
1116 "fields.v1.end" => "11111",
1117 ));
1118
1119 let source_desc_builder =
1120 create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1121 let mem_state_store = MemoryStateStore::new();
1122
1123 let column_ids = vec![ColumnId::from(0)];
1124 let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1125 let split_state_store = SourceStateTableHandler::from_table_catalog(
1126 &default_source_internal_table(0x2333),
1127 mem_state_store.clone(),
1128 )
1129 .await;
1130
1131 let core = StreamSourceCore::<MemoryStateStore> {
1132 source_id,
1133 column_ids: column_ids.clone(),
1134 source_desc_builder: Some(source_desc_builder),
1135 latest_split_info: HashMap::new(),
1136 split_state_store,
1137 updated_splits_in_epoch: HashMap::new(),
1138 source_name: MOCK_SOURCE_NAME.to_owned(),
1139 };
1140
1141 let system_params_manager = LocalSystemParamsManager::for_test();
1142
1143 let executor = SourceExecutor::new(
1144 ActorContext::for_test(0),
1145 core,
1146 Arc::new(StreamingMetrics::unused()),
1147 barrier_rx,
1148 system_params_manager.get_params(),
1149 None,
1150 false,
1151 LocalBarrierManager::for_test(),
1152 );
1153 let mut handler = executor.boxed().execute();
1154
1155 let mut epoch = test_epoch(1);
1156 let init_barrier =
1157 Barrier::new_test_barrier(epoch).with_mutation(Mutation::Add(AddMutation {
1158 splits: hashmap! {
1159 ActorId::default() => vec![
1160 SplitImpl::Datagen(DatagenSplit {
1161 split_index: 0,
1162 split_num: 3,
1163 start_offset: None,
1164 }),
1165 ],
1166 },
1167 ..Default::default()
1168 }));
1169 barrier_tx.send(init_barrier).unwrap();
1170
1171 handler
1173 .next()
1174 .await
1175 .unwrap()
1176 .unwrap()
1177 .into_barrier()
1178 .unwrap();
1179
1180 let mut ready_chunks = handler.ready_chunks(10);
1181
1182 let _ = ready_chunks.next().await.unwrap();
1183
1184 let new_assignment = vec![
1185 SplitImpl::Datagen(DatagenSplit {
1186 split_index: 0,
1187 split_num: 3,
1188 start_offset: None,
1189 }),
1190 SplitImpl::Datagen(DatagenSplit {
1191 split_index: 1,
1192 split_num: 3,
1193 start_offset: None,
1194 }),
1195 SplitImpl::Datagen(DatagenSplit {
1196 split_index: 2,
1197 split_num: 3,
1198 start_offset: None,
1199 }),
1200 ];
1201
1202 epoch.inc_epoch();
1203 let change_split_mutation =
1204 Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1205 ActorId::default() => new_assignment.clone()
1206 }));
1207
1208 barrier_tx.send(change_split_mutation).unwrap();
1209
1210 let _ = ready_chunks.next().await.unwrap(); epoch.inc_epoch();
1213 let barrier = Barrier::new_test_barrier(epoch);
1214 barrier_tx.send(barrier).unwrap();
1215
1216 ready_chunks.next().await.unwrap(); let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1219 &default_source_internal_table(0x2333),
1220 mem_state_store.clone(),
1221 )
1222 .await;
1223
1224 source_state_handler
1226 .init_epoch(EpochPair::new_test_epoch(epoch))
1227 .await
1228 .unwrap();
1229 source_state_handler
1230 .get(&new_assignment[1].id())
1231 .await
1232 .unwrap()
1233 .unwrap();
1234
1235 tokio::time::sleep(Duration::from_millis(100)).await;
1236
1237 let _ = ready_chunks.next().await.unwrap();
1238
1239 epoch.inc_epoch();
1240 let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Pause);
1241 barrier_tx.send(barrier).unwrap();
1242
1243 epoch.inc_epoch();
1244 let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Resume);
1245 barrier_tx.send(barrier).unwrap();
1246
1247 ready_chunks.next().await.unwrap();
1249
1250 let prev_assignment = new_assignment;
1251 let new_assignment = vec![prev_assignment[2].clone()];
1252
1253 epoch.inc_epoch();
1254 let drop_split_mutation =
1255 Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1256 ActorId::default() => new_assignment.clone()
1257 }));
1258
1259 barrier_tx.send(drop_split_mutation).unwrap();
1260
1261 ready_chunks.next().await.unwrap(); epoch.inc_epoch();
1264 let barrier = Barrier::new_test_barrier(epoch);
1265 barrier_tx.send(barrier).unwrap();
1266
1267 ready_chunks.next().await.unwrap(); let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1270 &default_source_internal_table(0x2333),
1271 mem_state_store.clone(),
1272 )
1273 .await;
1274
1275 let new_epoch = EpochPair::new_test_epoch(epoch);
1276 source_state_handler.init_epoch(new_epoch).await.unwrap();
1277
1278 let committed_reader = source_state_handler
1279 .new_committed_reader(new_epoch)
1280 .await
1281 .unwrap();
1282 assert!(
1283 committed_reader
1284 .try_recover_from_state_store(&prev_assignment[0])
1285 .await
1286 .unwrap()
1287 .is_none()
1288 );
1289
1290 assert!(
1291 committed_reader
1292 .try_recover_from_state_store(&prev_assignment[1])
1293 .await
1294 .unwrap()
1295 .is_none()
1296 );
1297
1298 assert!(
1299 committed_reader
1300 .try_recover_from_state_store(&prev_assignment[2])
1301 .await
1302 .unwrap()
1303 .is_some()
1304 );
1305 }
1306}