1use std::assert_matches::assert_matches;
16use std::collections::{BTreeMap, HashMap};
17use std::mem;
18
19use anyhow::anyhow;
20use futures::stream::select;
21use futures::{FutureExt, TryFutureExt, TryStreamExt};
22use itertools::Itertools;
23use risingwave_common::array::Op;
24use risingwave_common::array::stream_chunk::StreamChunkMut;
25use risingwave_common::bitmap::Bitmap;
26use risingwave_common::catalog::{ColumnCatalog, Field};
27use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntGauge};
28use risingwave_common_estimate_size::EstimateSize;
29use risingwave_common_estimate_size::collections::EstimatedVec;
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_connector::dispatch_sink;
32use risingwave_connector::sink::catalog::{SinkId, SinkType};
33use risingwave_connector::sink::log_store::{
34 FlushCurrentEpochOptions, LogReader, LogReaderExt, LogReaderMetrics, LogStoreFactory,
35 LogWriter, LogWriterExt, LogWriterMetrics,
36};
37use risingwave_connector::sink::{
38 GLOBAL_SINK_METRICS, LogSinker, SINK_USER_FORCE_COMPACTION, Sink, SinkImpl, SinkParam,
39 SinkWriterParam,
40};
41use risingwave_pb::common::ThrottleType;
42use risingwave_pb::id::FragmentId;
43use risingwave_pb::stream_plan::stream_node::StreamKind;
44use thiserror_ext::AsReport;
45use tokio::select;
46use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
47use tokio::sync::oneshot;
48
49use crate::common::change_buffer::{OutputKind, output_kind};
50use crate::common::compact_chunk::{
51 InconsistencyBehavior, StreamChunkCompactor, compact_chunk_inline,
52};
53use crate::executor::prelude::*;
54pub struct SinkExecutor<F: LogStoreFactory> {
55 actor_context: ActorContextRef,
56 info: ExecutorInfo,
57 input: Executor,
58 sink: SinkImpl,
59 input_columns: Vec<ColumnCatalog>,
60 sink_param: SinkParam,
61 log_store_factory: F,
62 sink_writer_param: SinkWriterParam,
63 chunk_size: usize,
64 input_data_types: Vec<DataType>,
65 non_append_only_behavior: Option<NonAppendOnlyBehavior>,
66 rate_limit: Option<u32>,
67}
68
69fn force_append_only(c: StreamChunk) -> StreamChunk {
71 let mut c: StreamChunkMut = c.into();
72 for (_, mut r) in c.to_rows_mut() {
73 match r.op() {
74 Op::Insert => {}
75 Op::Delete | Op::UpdateDelete => r.set_vis(false),
76 Op::UpdateInsert => r.set_op(Op::Insert),
77 }
78 }
79 c.into()
80}
81
82fn force_delete_only(c: StreamChunk) -> StreamChunk {
84 let mut c: StreamChunkMut = c.into();
85 for (_, mut r) in c.to_rows_mut() {
86 match r.op() {
87 Op::Delete => {}
88 Op::Insert | Op::UpdateInsert => r.set_vis(false),
89 Op::UpdateDelete => r.set_op(Op::Delete),
90 }
91 }
92 c.into()
93}
94
95#[derive(Clone, Copy, Debug)]
98struct NonAppendOnlyBehavior {
99 pk_specified_and_matched: bool,
104 force_compaction: bool,
106}
107
108impl NonAppendOnlyBehavior {
109 fn should_compact_in_log_reader(self) -> bool {
130 self.pk_specified_and_matched && !self.force_compaction
131 }
132
133 fn should_reorder_records(self) -> bool {
202 !self.pk_specified_and_matched || self.force_compaction
203 }
204}
205
206fn compact_output_kind(sink_type: SinkType) -> OutputKind {
208 match sink_type {
209 SinkType::Upsert => output_kind::UPSERT,
210 SinkType::Retract => output_kind::RETRACT,
211 SinkType::AppendOnly => output_kind::RETRACT,
213 }
214}
215
216macro_rules! dispatch_output_kind {
218 ($sink_type:expr, $KIND:ident, $body:tt) => {
219 #[allow(unused_braces)]
220 match compact_output_kind($sink_type) {
221 output_kind::UPSERT => {
222 const KIND: OutputKind = output_kind::UPSERT;
223 $body
224 }
225 output_kind::RETRACT => {
226 const KIND: OutputKind = output_kind::RETRACT;
227 $body
228 }
229 }
230 };
231}
232
233impl<F: LogStoreFactory> SinkExecutor<F> {
234 #[expect(clippy::too_many_arguments)]
235 pub fn new(
236 actor_context: ActorContextRef,
237 info: ExecutorInfo,
238 input: Executor,
239 sink_writer_param: SinkWriterParam,
240 sink: SinkImpl,
241 sink_param: SinkParam,
242 columns: Vec<ColumnCatalog>,
243 log_store_factory: F,
244 chunk_size: usize,
245 input_data_types: Vec<DataType>,
246 rate_limit: Option<u32>,
247 ) -> StreamExecutorResult<Self> {
248 let sink_input_schema: Schema = columns
249 .iter()
250 .map(|column| Field::from(&column.column_desc))
251 .collect();
252
253 if let Some(col_dix) = sink_writer_param.extra_partition_col_idx {
254 assert_eq!(sink_input_schema.data_types(), {
256 let mut data_type = info.schema.data_types();
257 data_type.remove(col_dix);
258 data_type
259 });
260 } else {
261 assert_eq!(sink_input_schema.data_types(), info.schema.data_types());
262 }
263
264 let non_append_only_behavior = if !sink_param.sink_type.is_append_only() {
265 let stream_key = &info.stream_key;
266 let pk_specified_and_matched = (sink_param.downstream_pk.as_ref())
267 .is_some_and(|downstream_pk| stream_key.iter().all(|i| downstream_pk.contains(i)));
268 let force_compaction = sink_param
269 .properties
270 .get(SINK_USER_FORCE_COMPACTION)
271 .map(|v| v.eq_ignore_ascii_case("true"))
272 .unwrap_or(false);
273 Some(NonAppendOnlyBehavior {
274 pk_specified_and_matched,
275 force_compaction,
276 })
277 } else {
278 None
279 };
280
281 tracing::info!(
282 sink_id = %sink_param.sink_id,
283 actor_id = %actor_context.id,
284 ?non_append_only_behavior,
285 "Sink executor info"
286 );
287
288 Ok(Self {
289 actor_context,
290 info,
291 input,
292 sink,
293 input_columns: columns,
294 sink_param,
295 log_store_factory,
296 sink_writer_param,
297 chunk_size,
298 input_data_types,
299 non_append_only_behavior,
300 rate_limit,
301 })
302 }
303
304 fn execute_inner(self) -> BoxedMessageStream {
305 let sink_id = self.sink_param.sink_id;
306 let actor_id = self.actor_context.id;
307 let fragment_id = self.actor_context.fragment_id;
308
309 let stream_key = self.info.stream_key.clone();
310 let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics(
311 sink_id,
312 actor_id,
313 fragment_id,
314 );
315
316 let input_compact_ib = if self.input.stream_kind() == StreamKind::Upsert {
319 InconsistencyBehavior::Tolerate
320 } else {
321 InconsistencyBehavior::Panic
322 };
323
324 let input = self.input.execute();
325
326 let input = input.inspect_ok(move |msg| {
327 if let Message::Chunk(c) = msg {
328 metrics.sink_input_row_count.inc_by(c.capacity() as u64);
329 metrics.sink_input_bytes.inc_by(c.estimated_size() as u64);
330 }
331 });
332
333 let processed_input = Self::process_msg(
334 input,
335 self.sink_param.sink_type,
336 stream_key,
337 self.chunk_size,
338 self.input_data_types,
339 input_compact_ib,
340 self.sink_param.downstream_pk.clone(),
341 self.non_append_only_behavior,
342 metrics.sink_chunk_buffer_size,
343 self.sink.is_blackhole(), );
345
346 let processed_input = if self.sink_param.ignore_delete {
347 processed_input
349 .map_ok(|msg| match msg {
350 Message::Chunk(chunk) => Message::Chunk(force_append_only(chunk)),
351 other => other,
352 })
353 .left_stream()
354 } else {
355 processed_input.right_stream()
356 };
357
358 if self.sink.is_sink_into_table() {
359 processed_input.boxed()
361 } else {
362 let labels = [
363 &actor_id.to_string(),
364 &sink_id.to_string(),
365 self.sink_param.sink_name.as_str(),
366 ];
367 let log_store_first_write_epoch = GLOBAL_SINK_METRICS
368 .log_store_first_write_epoch
369 .with_guarded_label_values(&labels);
370 let log_store_latest_write_epoch = GLOBAL_SINK_METRICS
371 .log_store_latest_write_epoch
372 .with_guarded_label_values(&labels);
373 let log_store_write_rows = GLOBAL_SINK_METRICS
374 .log_store_write_rows
375 .with_guarded_label_values(&labels);
376 let log_writer_metrics = LogWriterMetrics {
377 log_store_first_write_epoch,
378 log_store_latest_write_epoch,
379 log_store_write_rows,
380 };
381
382 let (rate_limit_tx, rate_limit_rx) = unbounded_channel();
383 rate_limit_tx.send(self.rate_limit.into()).unwrap();
385
386 let (rebuild_sink_tx, rebuild_sink_rx) = unbounded_channel();
387
388 self.log_store_factory
389 .build()
390 .map(move |(log_reader, log_writer)| {
391 let write_log_stream = Self::execute_write_log(
392 processed_input,
393 log_writer.monitored(log_writer_metrics),
394 actor_id,
395 fragment_id,
396 sink_id,
397 rate_limit_tx,
398 rebuild_sink_tx,
399 );
400
401 let consume_log_stream_future = dispatch_sink!(self.sink, sink, {
402 let consume_log_stream = Self::execute_consume_log(
403 *sink,
404 log_reader,
405 self.input_columns,
406 self.sink_param,
407 self.sink_writer_param,
408 self.non_append_only_behavior,
409 self.actor_context,
410 rate_limit_rx,
411 rebuild_sink_rx,
412 )
413 .instrument_await(
414 await_tree::span!("consume_log (sink_id {sink_id})").long_running(),
415 )
416 .map_ok(|never| never); consume_log_stream.boxed()
419 });
420 select(consume_log_stream_future.into_stream(), write_log_stream)
421 })
422 .into_stream()
423 .flatten()
424 .boxed()
425 }
426 }
427
428 #[try_stream(ok = Message, error = StreamExecutorError)]
429 async fn execute_write_log<W: LogWriter>(
430 input: impl MessageStream,
431 mut log_writer: W,
432 actor_id: ActorId,
433 fragment_id: FragmentId,
434 sink_id: SinkId,
435 rate_limit_tx: UnboundedSender<RateLimit>,
436 rebuild_sink_tx: UnboundedSender<RebuildSinkMessage>,
437 ) {
438 pin_mut!(input);
439 let barrier = expect_first_barrier(&mut input).await?;
440 let epoch_pair = barrier.epoch;
441 let is_pause_on_startup = barrier.is_pause_on_startup();
442 yield Message::Barrier(barrier);
444
445 log_writer.init(epoch_pair, is_pause_on_startup).await?;
446
447 let mut is_paused = false;
448
449 #[for_await]
450 for msg in input {
451 match msg? {
452 Message::Watermark(w) => yield Message::Watermark(w),
453 Message::Chunk(chunk) => {
454 assert!(
455 !is_paused,
456 "Actor {actor_id} should not receive any data after pause"
457 );
458 log_writer.write_chunk(chunk.clone()).await?;
459 yield Message::Chunk(chunk);
460 }
461 Message::Barrier(barrier) => {
462 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
463 let schema_change = barrier.as_sink_schema_change(sink_id);
464 if let Some(schema_change) = &schema_change {
465 info!(?schema_change, %sink_id, "sink receive schema change");
466 }
467 let post_flush = log_writer
468 .flush_current_epoch(
469 barrier.epoch.curr,
470 FlushCurrentEpochOptions {
471 is_checkpoint: barrier.kind.is_checkpoint(),
472 new_vnode_bitmap: update_vnode_bitmap.clone(),
473 is_stop: barrier.is_stop(actor_id),
474 schema_change,
475 },
476 )
477 .await?;
478
479 let mutation = barrier.mutation.clone();
480 yield Message::Barrier(barrier);
481 if F::REBUILD_SINK_ON_UPDATE_VNODE_BITMAP
482 && let Some(new_vnode_bitmap) = update_vnode_bitmap.clone()
483 {
484 let (tx, rx) = oneshot::channel();
485 rebuild_sink_tx
486 .send(RebuildSinkMessage::RebuildSink(new_vnode_bitmap, tx))
487 .map_err(|_| anyhow!("fail to send rebuild sink to reader"))?;
488 rx.await
489 .map_err(|_| anyhow!("fail to wait rebuild sink finish"))?;
490 }
491 post_flush.post_yield_barrier().await?;
492
493 if let Some(mutation) = mutation.as_deref() {
494 match mutation {
495 Mutation::Pause => {
496 log_writer.pause()?;
497 is_paused = true;
498 }
499 Mutation::Resume => {
500 log_writer.resume()?;
501 is_paused = false;
502 }
503 Mutation::Throttle(fragment_to_apply) => {
504 if let Some(entry) = fragment_to_apply.get(&fragment_id)
505 && entry.throttle_type() == ThrottleType::Sink
506 {
507 tracing::info!(
508 rate_limit = entry.rate_limit,
509 "received sink rate limit on actor {actor_id}"
510 );
511 if let Err(e) = rate_limit_tx.send(entry.rate_limit.into()) {
512 error!(
513 error = %e.as_report(),
514 "fail to send sink rate limit update"
515 );
516 return Err(StreamExecutorError::from(
517 e.to_report_string(),
518 ));
519 }
520 }
521 }
522 Mutation::ConnectorPropsChange(config) => {
523 if let Some(map) = config.get(&sink_id.as_raw_id())
524 && let Err(e) = rebuild_sink_tx
525 .send(RebuildSinkMessage::UpdateConfig(map.clone()))
526 {
527 error!(
528 error = %e.as_report(),
529 "fail to send sink alter props"
530 );
531 return Err(StreamExecutorError::from(e.to_report_string()));
532 }
533 }
534 _ => (),
535 }
536 }
537 }
538 }
539 }
540 }
541
542 #[expect(clippy::too_many_arguments)]
543 #[try_stream(ok = Message, error = StreamExecutorError)]
544 async fn process_msg(
545 input: impl MessageStream,
546 sink_type: SinkType,
547 stream_key: StreamKey,
548 chunk_size: usize,
549 input_data_types: Vec<DataType>,
550 input_compact_ib: InconsistencyBehavior,
551 downstream_pk: Option<Vec<usize>>,
552 non_append_only_behavior: Option<NonAppendOnlyBehavior>,
553 sink_chunk_buffer_size_metrics: LabelGuardedIntGauge,
554 skip_compact: bool,
555 ) {
556 if let Some(b) = non_append_only_behavior
558 && b.should_reorder_records()
559 {
560 assert_matches!(sink_type, SinkType::Upsert | SinkType::Retract);
561
562 let mut chunk_buffer = EstimatedVec::new();
563 let mut watermark: Option<super::Watermark> = None;
564 #[for_await]
565 for msg in input {
566 match msg? {
567 Message::Watermark(w) => watermark = Some(w),
568 Message::Chunk(c) => {
569 chunk_buffer.push(c);
570 sink_chunk_buffer_size_metrics.set(chunk_buffer.estimated_size() as i64);
571 }
572 Message::Barrier(barrier) => {
573 let chunks = mem::take(&mut chunk_buffer).into_inner();
574
575 let mut delete_chunks = vec![];
578 let mut insert_chunks = vec![];
579
580 for c in dispatch_output_kind!(sink_type, KIND, {
581 StreamChunkCompactor::new(stream_key.clone(), chunks)
582 .into_compacted_chunks_inline::<KIND>(input_compact_ib)
583 }) {
584 let chunk = force_delete_only(c.clone());
585 if chunk.cardinality() > 0 {
586 delete_chunks.push(chunk);
587 }
588 let chunk = force_append_only(c);
589 if chunk.cardinality() > 0 {
590 insert_chunks.push(chunk);
591 }
592 }
593 let chunks = delete_chunks
594 .into_iter()
595 .chain(insert_chunks.into_iter())
596 .collect();
597
598 if let Some(downstream_pk) = &downstream_pk {
603 let chunks = dispatch_output_kind!(sink_type, KIND, {
604 StreamChunkCompactor::new(downstream_pk.clone(), chunks)
605 .into_compacted_chunks_reconstructed::<KIND>(
606 chunk_size,
607 input_data_types.clone(),
608 InconsistencyBehavior::Warn,
611 )
612 });
613 for c in chunks {
614 yield Message::Chunk(c);
615 }
616 } else {
617 let mut chunk_builder =
618 StreamChunkBuilder::new(chunk_size, input_data_types.clone());
619 for chunk in chunks {
620 for (op, row) in chunk.rows() {
621 if let Some(c) = chunk_builder.append_row(op, row) {
622 yield Message::Chunk(c);
623 }
624 }
625 }
626
627 if let Some(c) = chunk_builder.take() {
628 yield Message::Chunk(c);
629 }
630 };
631
632 if let Some(w) = mem::take(&mut watermark) {
634 yield Message::Watermark(w)
635 }
636 yield Message::Barrier(barrier);
637 }
638 }
639 }
640 } else {
641 #[for_await]
644 for msg in input {
645 match msg? {
646 Message::Watermark(w) => yield Message::Watermark(w),
647 Message::Chunk(mut chunk) => {
648 if !sink_type.is_append_only()
652 && let Some(downstream_pk) = &downstream_pk
653 {
654 if skip_compact {
655 assert_eq!(&stream_key, downstream_pk);
658 } else {
659 chunk = dispatch_output_kind!(sink_type, KIND, {
660 compact_chunk_inline::<KIND>(
661 chunk,
662 downstream_pk,
663 InconsistencyBehavior::Warn,
666 )
667 });
668 }
669 }
670 yield Message::Chunk(chunk);
671 }
672 Message::Barrier(barrier) => {
673 yield Message::Barrier(barrier);
674 }
675 }
676 }
677 }
678 }
679
680 #[expect(clippy::too_many_arguments)]
681 async fn execute_consume_log<S: Sink, R: LogReader>(
682 mut sink: S,
683 log_reader: R,
684 columns: Vec<ColumnCatalog>,
685 mut sink_param: SinkParam,
686 mut sink_writer_param: SinkWriterParam,
687 non_append_only_behavior: Option<NonAppendOnlyBehavior>,
688 actor_context: ActorContextRef,
689 rate_limit_rx: UnboundedReceiver<RateLimit>,
690 mut rebuild_sink_rx: UnboundedReceiver<RebuildSinkMessage>,
691 ) -> StreamExecutorResult<!> {
692 let visible_columns = columns
693 .iter()
694 .enumerate()
695 .filter_map(|(idx, column)| (!column.is_hidden).then_some(idx))
696 .collect_vec();
697
698 let labels = [
699 &actor_context.id.to_string(),
700 sink_writer_param.connector.as_str(),
701 &sink_writer_param.sink_id.to_string(),
702 sink_writer_param.sink_name.as_str(),
703 ];
704 let log_store_reader_wait_new_future_duration_ns = GLOBAL_SINK_METRICS
705 .log_store_reader_wait_new_future_duration_ns
706 .with_guarded_label_values(&labels);
707 let log_store_read_rows = GLOBAL_SINK_METRICS
708 .log_store_read_rows
709 .with_guarded_label_values(&labels);
710 let log_store_read_bytes = GLOBAL_SINK_METRICS
711 .log_store_read_bytes
712 .with_guarded_label_values(&labels);
713 let log_store_latest_read_epoch = GLOBAL_SINK_METRICS
714 .log_store_latest_read_epoch
715 .with_guarded_label_values(&labels);
716 let metrics = LogReaderMetrics {
717 log_store_latest_read_epoch,
718 log_store_read_rows,
719 log_store_read_bytes,
720 log_store_reader_wait_new_future_duration_ns,
721 };
722
723 let downstream_pk = sink_param.downstream_pk.clone();
724
725 let mut log_reader = log_reader
726 .transform_chunk(move |chunk| {
727 let chunk = if let Some(b) = non_append_only_behavior
728 && b.should_compact_in_log_reader()
729 {
730 let downstream_pk = downstream_pk.as_ref().unwrap();
732 dispatch_output_kind!(sink_param.sink_type, KIND, {
733 compact_chunk_inline::<KIND>(
734 chunk,
735 downstream_pk,
736 InconsistencyBehavior::Warn,
739 )
740 })
741 } else {
742 chunk
743 };
744 if visible_columns.len() != columns.len() {
745 chunk.project(&visible_columns)
748 } else {
749 chunk
750 }
751 })
752 .monitored(metrics)
753 .rate_limited(rate_limit_rx);
754
755 log_reader.init().await?;
756 loop {
757 let future = async {
758 loop {
759 let Err(e) = sink
760 .new_log_sinker(sink_writer_param.clone())
761 .and_then(|log_sinker| log_sinker.consume_log_and_sink(&mut log_reader))
762 .await;
763 GLOBAL_ERROR_METRICS.user_sink_error.report([
764 "sink_executor_error".to_owned(),
765 sink_param.sink_id.to_string(),
766 sink_param.sink_name.clone(),
767 actor_context.fragment_id.to_string(),
768 ]);
769
770 if let Some(meta_client) = sink_writer_param.meta_client.as_ref() {
771 meta_client
772 .add_sink_fail_evet_log(
773 sink_writer_param.sink_id,
774 sink_writer_param.sink_name.clone(),
775 sink_writer_param.connector.clone(),
776 e.to_report_string(),
777 )
778 .await;
779 }
780
781 if F::ALLOW_REWIND {
782 match log_reader.rewind().await {
783 Ok(()) => {
784 error!(
785 error = %e.as_report(),
786 executor_id = %sink_writer_param.executor_id,
787 sink_id = %sink_param.sink_id,
788 "reset log reader stream successfully after sink error"
789 );
790 Ok(())
791 }
792 Err(rewind_err) => {
793 error!(
794 error = %rewind_err.as_report(),
795 "fail to rewind log reader"
796 );
797 Err(e)
798 }
799 }
800 } else {
801 Err(e)
802 }
803 .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
804 }
805 };
806 select! {
807 result = future => {
808 let Err(e): StreamExecutorResult<!> = result;
809 return Err(e);
810 }
811 result = rebuild_sink_rx.recv() => {
812 match result.ok_or_else(|| anyhow!("failed to receive rebuild sink notify"))? {
813 RebuildSinkMessage::RebuildSink(new_vnode, notify) => {
814 sink_writer_param.vnode_bitmap = Some((*new_vnode).clone());
815 if notify.send(()).is_err() {
816 warn!("failed to notify rebuild sink");
817 }
818 log_reader.init().await?;
819 },
820 RebuildSinkMessage::UpdateConfig(config) => {
821 if !sink_config_has_changes(&sink_param.properties, &config) {
822 info!(
823 executor_id = %sink_writer_param.executor_id,
824 sink_id = %sink_param.sink_id,
825 "skip alter sink config because properties are unchanged"
826 );
827 Ok(())
828 } else if F::ALLOW_REWIND {
829 match log_reader.rewind().await {
830 Ok(()) => {
831 sink_param.properties.extend(config.into_iter());
832 sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
833 info!(
834 executor_id = %sink_writer_param.executor_id,
835 sink_id = %sink_param.sink_id,
836 "alter sink config successfully with rewind"
837 );
838 Ok(())
839 }
840 Err(rewind_err) => {
841 error!(
842 error = %rewind_err.as_report(),
843 "fail to rewind log reader for alter sink config "
844 );
845 Err(anyhow!("fail to rewind log after alter table").into())
846 }
847 }
848 } else {
849 sink_param.properties.extend(config.into_iter());
850 sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
851 Err(anyhow!("This is not an actual error condition. The system is intentionally triggering recovery procedures to ensure ALTER SINK CONFIG are fully applied.").into())
852 }
853 .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
854 },
855 }
856 }
857 }
858 }
859 }
860}
861
862enum RebuildSinkMessage {
863 RebuildSink(Arc<Bitmap>, oneshot::Sender<()>),
864 UpdateConfig(HashMap<String, String>),
865}
866
867fn sink_config_has_changes(
868 current: &BTreeMap<String, String>,
869 incoming: &HashMap<String, String>,
870) -> bool {
871 incoming
872 .iter()
873 .any(|(key, value)| current.get(key) != Some(value))
874}
875
876impl<F: LogStoreFactory> Execute for SinkExecutor<F> {
877 fn execute(self: Box<Self>) -> BoxedMessageStream {
878 self.execute_inner()
879 }
880}
881
882#[cfg(test)]
883mod test {
884 use risingwave_common::catalog::{ColumnDesc, ColumnId};
885 use risingwave_common::util::epoch::test_epoch;
886 use risingwave_connector::sink::build_sink;
887
888 use super::*;
889 use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
890 use crate::executor::test_utils::*;
891
892 #[test]
893 fn test_sink_config_has_changes() {
894 let current = BTreeMap::from([
895 ("connector".to_owned(), "blackhole".to_owned()),
896 ("commit_checkpoint_interval".to_owned(), "1".to_owned()),
897 ]);
898
899 assert!(!sink_config_has_changes(
900 ¤t,
901 &HashMap::from([("commit_checkpoint_interval".to_owned(), "1".to_owned())])
902 ));
903 assert!(sink_config_has_changes(
904 ¤t,
905 &HashMap::from([("commit_checkpoint_interval".to_owned(), "2".to_owned())])
906 ));
907 assert!(sink_config_has_changes(
908 ¤t,
909 &HashMap::from([("force_append_only".to_owned(), "true".to_owned())])
910 ));
911 }
912
913 #[tokio::test]
914 async fn test_force_append_only_sink() {
915 use risingwave_common::array::StreamChunkTestExt;
916 use risingwave_common::array::stream_chunk::StreamChunk;
917 use risingwave_common::types::DataType;
918
919 use crate::executor::Barrier;
920
921 let properties = maplit::btreemap! {
922 "connector".into() => "blackhole".into(),
923 "type".into() => "append-only".into(),
924 "force_append_only".into() => "true".into()
925 };
926
927 let columns = vec![
930 ColumnCatalog {
931 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
932 is_hidden: false,
933 },
934 ColumnCatalog {
935 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
936 is_hidden: false,
937 },
938 ColumnCatalog {
939 column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
940 is_hidden: true,
941 },
942 ];
943 let schema: Schema = columns
944 .iter()
945 .map(|column| Field::from(column.column_desc.clone()))
946 .collect();
947 let stream_key = vec![0];
948
949 let source = MockSource::with_messages(vec![
950 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
951 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
952 " I I I
953 + 3 2 1",
954 ))),
955 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
956 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
957 " I I I
958 U- 3 2 1
959 U+ 3 4 1
960 + 5 6 7",
961 ))),
962 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
963 " I I I
964 - 5 6 7",
965 ))),
966 ])
967 .into_executor(schema.clone(), stream_key.clone());
968
969 let sink_param = SinkParam {
970 sink_id: 0.into(),
971 sink_name: "test".into(),
972 properties,
973
974 columns: columns
975 .iter()
976 .filter(|col| !col.is_hidden)
977 .map(|col| col.column_desc.clone())
978 .collect(),
979 downstream_pk: Some(stream_key.clone()),
980 sink_type: SinkType::AppendOnly,
981 ignore_delete: true,
982 format_desc: None,
983 db_name: "test".into(),
984 sink_from_name: "test".into(),
985 };
986
987 let info = ExecutorInfo::for_test(schema, stream_key, "SinkExecutor".to_owned(), 0);
988
989 let sink = build_sink(sink_param.clone()).unwrap();
990
991 let sink_executor = SinkExecutor::new(
992 ActorContext::for_test(0),
993 info,
994 source,
995 SinkWriterParam::for_test(),
996 sink,
997 sink_param,
998 columns.clone(),
999 BoundedInMemLogStoreFactory::for_test(1),
1000 1024,
1001 vec![DataType::Int32, DataType::Int32, DataType::Int32],
1002 None,
1003 )
1004 .unwrap();
1005
1006 let mut executor = sink_executor.boxed().execute();
1007
1008 executor.next().await.unwrap().unwrap();
1010
1011 let chunk_msg = executor.next().await.unwrap().unwrap();
1012 assert_eq!(
1013 chunk_msg.into_chunk().unwrap().compact_vis(),
1014 StreamChunk::from_pretty(
1015 " I I I
1016 + 3 2 1",
1017 )
1018 );
1019
1020 executor.next().await.unwrap().unwrap();
1022
1023 let chunk_msg = executor.next().await.unwrap().unwrap();
1024 assert_eq!(
1025 chunk_msg.into_chunk().unwrap().compact_vis(),
1026 StreamChunk::from_pretty(
1027 " I I I
1028 + 3 4 1
1029 + 5 6 7",
1030 )
1031 );
1032
1033 executor.next().await.unwrap().unwrap();
1038 }
1039
1040 #[tokio::test]
1041 async fn stream_key_sink_pk_mismatch_upsert() {
1042 stream_key_sink_pk_mismatch(SinkType::Upsert).await;
1043 }
1044
1045 #[tokio::test]
1046 async fn stream_key_sink_pk_mismatch_retract() {
1047 stream_key_sink_pk_mismatch(SinkType::Retract).await;
1048 }
1049
1050 async fn stream_key_sink_pk_mismatch(sink_type: SinkType) {
1051 use risingwave_common::array::StreamChunkTestExt;
1052 use risingwave_common::array::stream_chunk::StreamChunk;
1053 use risingwave_common::types::DataType;
1054
1055 use crate::executor::Barrier;
1056
1057 let properties = maplit::btreemap! {
1058 "connector".into() => "blackhole".into(),
1059 };
1060
1061 let columns = vec![
1064 ColumnCatalog {
1065 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1066 is_hidden: false,
1067 },
1068 ColumnCatalog {
1069 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1070 is_hidden: false,
1071 },
1072 ColumnCatalog {
1073 column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1074 is_hidden: true,
1075 },
1076 ];
1077 let schema: Schema = columns
1078 .iter()
1079 .map(|column| Field::from(column.column_desc.clone()))
1080 .collect();
1081
1082 let source = MockSource::with_messages(vec![
1083 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1084 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1085 " I I I
1086 + 1 1 10",
1087 ))),
1088 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1089 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1090 " I I I
1091 + 1 3 30",
1092 ))),
1093 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1094 " I I I
1095 + 1 2 20
1096 - 1 2 20",
1097 ))),
1098 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1099 " I I I
1100 - 1 1 10
1101 + 1 1 40",
1102 ))),
1103 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1104 ])
1105 .into_executor(schema.clone(), vec![0, 1]);
1106
1107 let sink_param = SinkParam {
1108 sink_id: 0.into(),
1109 sink_name: "test".into(),
1110 properties,
1111
1112 columns: columns
1113 .iter()
1114 .filter(|col| !col.is_hidden)
1115 .map(|col| col.column_desc.clone())
1116 .collect(),
1117 downstream_pk: Some(vec![0]),
1118 sink_type,
1119 ignore_delete: false,
1120 format_desc: None,
1121 db_name: "test".into(),
1122 sink_from_name: "test".into(),
1123 };
1124
1125 let info = ExecutorInfo::for_test(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
1126
1127 let sink = build_sink(sink_param.clone()).unwrap();
1128
1129 let sink_executor = SinkExecutor::new(
1130 ActorContext::for_test(0),
1131 info,
1132 source,
1133 SinkWriterParam::for_test(),
1134 sink,
1135 sink_param,
1136 columns.clone(),
1137 BoundedInMemLogStoreFactory::for_test(1),
1138 1024,
1139 vec![DataType::Int64, DataType::Int64, DataType::Int64],
1140 None,
1141 )
1142 .unwrap();
1143
1144 let mut executor = sink_executor.boxed().execute();
1145
1146 executor.next().await.unwrap().unwrap();
1148
1149 let chunk_msg = executor.next().await.unwrap().unwrap();
1150 assert_eq!(
1151 chunk_msg.into_chunk().unwrap().compact_vis(),
1152 StreamChunk::from_pretty(
1153 " I I I
1154 + 1 1 10",
1155 )
1156 );
1157
1158 executor.next().await.unwrap().unwrap();
1160
1161 let chunk_msg = executor.next().await.unwrap().unwrap();
1162 let expected = match sink_type {
1163 SinkType::Retract => StreamChunk::from_pretty(
1164 " I I I
1165 U- 1 1 10
1166 U+ 1 1 40",
1167 ),
1168 SinkType::Upsert => StreamChunk::from_pretty(
1169 " I I I
1170 + 1 1 40", ),
1172 _ => unreachable!(),
1173 };
1174 assert_eq!(chunk_msg.into_chunk().unwrap().compact_vis(), expected);
1175
1176 executor.next().await.unwrap().unwrap();
1178 }
1179
1180 #[tokio::test]
1181 async fn test_empty_barrier_sink() {
1182 use risingwave_common::types::DataType;
1183
1184 use crate::executor::Barrier;
1185
1186 let properties = maplit::btreemap! {
1187 "connector".into() => "blackhole".into(),
1188 "type".into() => "append-only".into(),
1189 "force_append_only".into() => "true".into()
1190 };
1191 let columns = vec![
1192 ColumnCatalog {
1193 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1194 is_hidden: false,
1195 },
1196 ColumnCatalog {
1197 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1198 is_hidden: false,
1199 },
1200 ];
1201 let schema: Schema = columns
1202 .iter()
1203 .map(|column| Field::from(column.column_desc.clone()))
1204 .collect();
1205 let stream_key = vec![0];
1206
1207 let source = MockSource::with_messages(vec![
1208 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1209 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1210 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1211 ])
1212 .into_executor(schema.clone(), stream_key.clone());
1213
1214 let sink_param = SinkParam {
1215 sink_id: 0.into(),
1216 sink_name: "test".into(),
1217 properties,
1218
1219 columns: columns
1220 .iter()
1221 .filter(|col| !col.is_hidden)
1222 .map(|col| col.column_desc.clone())
1223 .collect(),
1224 downstream_pk: Some(stream_key.clone()),
1225 sink_type: SinkType::AppendOnly,
1226 ignore_delete: true,
1227 format_desc: None,
1228 db_name: "test".into(),
1229 sink_from_name: "test".into(),
1230 };
1231
1232 let info = ExecutorInfo::for_test(schema, stream_key, "SinkExecutor".to_owned(), 0);
1233
1234 let sink = build_sink(sink_param.clone()).unwrap();
1235
1236 let sink_executor = SinkExecutor::new(
1237 ActorContext::for_test(0),
1238 info,
1239 source,
1240 SinkWriterParam::for_test(),
1241 sink,
1242 sink_param,
1243 columns,
1244 BoundedInMemLogStoreFactory::for_test(1),
1245 1024,
1246 vec![DataType::Int64, DataType::Int64],
1247 None,
1248 )
1249 .unwrap();
1250
1251 let mut executor = sink_executor.boxed().execute();
1252
1253 assert_eq!(
1255 executor.next().await.unwrap().unwrap(),
1256 Message::Barrier(Barrier::new_test_barrier(test_epoch(1)))
1257 );
1258
1259 assert_eq!(
1261 executor.next().await.unwrap().unwrap(),
1262 Message::Barrier(Barrier::new_test_barrier(test_epoch(2)))
1263 );
1264
1265 assert_eq!(
1267 executor.next().await.unwrap().unwrap(),
1268 Message::Barrier(Barrier::new_test_barrier(test_epoch(3)))
1269 );
1270 }
1271
1272 #[tokio::test]
1273 async fn test_force_compaction() {
1274 use risingwave_common::array::StreamChunkTestExt;
1275 use risingwave_common::array::stream_chunk::StreamChunk;
1276 use risingwave_common::types::DataType;
1277
1278 use crate::executor::Barrier;
1279
1280 let properties = maplit::btreemap! {
1281 "connector".into() => "blackhole".into(),
1282 "force_compaction".into() => "true".into()
1283 };
1284
1285 let columns = vec![
1288 ColumnCatalog {
1289 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1290 is_hidden: false,
1291 },
1292 ColumnCatalog {
1293 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1294 is_hidden: false,
1295 },
1296 ColumnCatalog {
1297 column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1298 is_hidden: true,
1299 },
1300 ];
1301 let schema: Schema = columns
1302 .iter()
1303 .map(|column| Field::from(column.column_desc.clone()))
1304 .collect();
1305
1306 let source = MockSource::with_messages(vec![
1307 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1308 Message::Chunk(StreamChunk::from_pretty(
1309 " I I I
1310 + 1 1 10",
1311 )),
1312 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1313 Message::Chunk(StreamChunk::from_pretty(
1314 " I I I
1315 + 1 3 30",
1316 )),
1317 Message::Chunk(StreamChunk::from_pretty(
1318 " I I I
1319 + 1 2 20
1320 - 1 2 20
1321 + 1 4 10",
1322 )),
1323 Message::Chunk(StreamChunk::from_pretty(
1324 " I I I
1325 - 1 1 10
1326 + 1 1 40",
1327 )),
1328 Message::Chunk(StreamChunk::from_pretty(
1329 " I I I
1330 - 1 4 30",
1331 )),
1332 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1333 ])
1334 .into_executor(schema.clone(), vec![0, 1]);
1335
1336 let sink_param = SinkParam {
1337 sink_id: 0.into(),
1338 sink_name: "test".into(),
1339 properties,
1340
1341 columns: columns
1342 .iter()
1343 .filter(|col| !col.is_hidden)
1344 .map(|col| col.column_desc.clone())
1345 .collect(),
1346 downstream_pk: Some(vec![0, 1]),
1347 sink_type: SinkType::Upsert,
1348 ignore_delete: false,
1349 format_desc: None,
1350 db_name: "test".into(),
1351 sink_from_name: "test".into(),
1352 };
1353
1354 let info = ExecutorInfo::for_test(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
1355
1356 let sink = build_sink(sink_param.clone()).unwrap();
1357
1358 let sink_executor = SinkExecutor::new(
1359 ActorContext::for_test(0),
1360 info,
1361 source,
1362 SinkWriterParam::for_test(),
1363 sink,
1364 sink_param,
1365 columns.clone(),
1366 BoundedInMemLogStoreFactory::for_test(1),
1367 1024,
1368 vec![DataType::Int64, DataType::Int64, DataType::Int64],
1369 None,
1370 )
1371 .unwrap();
1372
1373 let mut executor = sink_executor.boxed().execute();
1374
1375 executor.next().await.unwrap().unwrap();
1377
1378 let chunk_msg = executor.next().await.unwrap().unwrap();
1379 assert_eq!(
1380 chunk_msg.into_chunk().unwrap().compact_vis(),
1381 StreamChunk::from_pretty(
1382 " I I I
1383 + 1 1 10",
1384 )
1385 );
1386
1387 executor.next().await.unwrap().unwrap();
1389
1390 let chunk_msg = executor.next().await.unwrap().unwrap();
1391 assert_eq!(
1392 chunk_msg.into_chunk().unwrap().compact_vis(),
1393 StreamChunk::from_pretty(
1394 " I I I
1395 + 1 3 30
1396 + 1 1 40",
1397 )
1398 );
1399
1400 executor.next().await.unwrap().unwrap();
1402 }
1403}