1use std::collections::{BTreeMap, HashMap};
16use std::{assert_matches, mem};
17
18use anyhow::anyhow;
19use futures::stream::select;
20use futures::{FutureExt, TryFutureExt, TryStreamExt};
21use itertools::Itertools;
22use risingwave_common::array::Op;
23use risingwave_common::array::stream_chunk::StreamChunkMut;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::catalog::{ColumnCatalog, Field};
26use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntGauge};
27use risingwave_common_estimate_size::EstimateSize;
28use risingwave_common_estimate_size::collections::EstimatedVec;
29use risingwave_common_rate_limit::RateLimit;
30use risingwave_connector::dispatch_sink;
31use risingwave_connector::sink::catalog::{SinkId, SinkType};
32use risingwave_connector::sink::log_store::{
33 FlushCurrentEpochOptions, LogReader, LogReaderExt, LogReaderMetrics, LogStoreFactory,
34 LogWriter, LogWriterExt, LogWriterMetrics,
35};
36use risingwave_connector::sink::{
37 GLOBAL_SINK_METRICS, LogSinker, SINK_USER_FORCE_COMPACTION, Sink, SinkImpl, SinkParam,
38 SinkWriterParam,
39};
40use risingwave_pb::common::ThrottleType;
41use risingwave_pb::id::FragmentId;
42use risingwave_pb::stream_plan::stream_node::StreamKind;
43use thiserror_ext::AsReport;
44use tokio::select;
45use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
46use tokio::sync::oneshot;
47
48use crate::common::change_buffer::{OutputKind, output_kind};
49use crate::common::compact_chunk::{
50 InconsistencyBehavior, StreamChunkCompactor, compact_chunk_inline,
51};
52use crate::executor::prelude::*;
53pub struct SinkExecutor<F: LogStoreFactory> {
54 actor_context: ActorContextRef,
55 info: ExecutorInfo,
56 input: Executor,
57 sink: SinkImpl,
58 input_columns: Vec<ColumnCatalog>,
59 sink_param: SinkParam,
60 log_store_factory: F,
61 sink_writer_param: SinkWriterParam,
62 chunk_size: usize,
63 input_data_types: Vec<DataType>,
64 non_append_only_behavior: Option<NonAppendOnlyBehavior>,
65 rate_limit: Option<u32>,
66}
67
68fn force_append_only(c: StreamChunk) -> StreamChunk {
70 let mut c: StreamChunkMut = c.into();
71 for (_, mut r) in c.to_rows_mut() {
72 match r.op() {
73 Op::Insert => {}
74 Op::Delete | Op::UpdateDelete => r.set_vis(false),
75 Op::UpdateInsert => r.set_op(Op::Insert),
76 }
77 }
78 c.into()
79}
80
81fn force_delete_only(c: StreamChunk) -> StreamChunk {
83 let mut c: StreamChunkMut = c.into();
84 for (_, mut r) in c.to_rows_mut() {
85 match r.op() {
86 Op::Delete => {}
87 Op::Insert | Op::UpdateInsert => r.set_vis(false),
88 Op::UpdateDelete => r.set_op(Op::Delete),
89 }
90 }
91 c.into()
92}
93
94#[derive(Clone, Copy, Debug)]
97struct NonAppendOnlyBehavior {
98 pk_specified_and_matched: bool,
103 force_compaction: bool,
105}
106
107impl NonAppendOnlyBehavior {
108 fn should_compact_in_log_reader(self) -> bool {
129 self.pk_specified_and_matched && !self.force_compaction
130 }
131
132 fn should_reorder_records(self) -> bool {
201 !self.pk_specified_and_matched || self.force_compaction
202 }
203}
204
205fn compact_output_kind(sink_type: SinkType) -> OutputKind {
207 match sink_type {
208 SinkType::Upsert => output_kind::UPSERT,
209 SinkType::Retract => output_kind::RETRACT,
210 SinkType::AppendOnly => output_kind::RETRACT,
212 }
213}
214
215macro_rules! dispatch_output_kind {
217 ($sink_type:expr, $KIND:ident, $body:tt) => {
218 #[allow(unused_braces)]
219 match compact_output_kind($sink_type) {
220 output_kind::UPSERT => {
221 const KIND: OutputKind = output_kind::UPSERT;
222 $body
223 }
224 output_kind::RETRACT => {
225 const KIND: OutputKind = output_kind::RETRACT;
226 $body
227 }
228 }
229 };
230}
231
232impl<F: LogStoreFactory> SinkExecutor<F> {
233 #[expect(clippy::too_many_arguments)]
234 pub fn new(
235 actor_context: ActorContextRef,
236 info: ExecutorInfo,
237 input: Executor,
238 sink_writer_param: SinkWriterParam,
239 sink: SinkImpl,
240 sink_param: SinkParam,
241 columns: Vec<ColumnCatalog>,
242 log_store_factory: F,
243 chunk_size: usize,
244 input_data_types: Vec<DataType>,
245 rate_limit: Option<u32>,
246 ) -> StreamExecutorResult<Self> {
247 let sink_input_schema: Schema = columns
248 .iter()
249 .map(|column| Field::from(&column.column_desc))
250 .collect();
251
252 if let Some(col_dix) = sink_writer_param.extra_partition_col_idx {
253 assert_eq!(sink_input_schema.data_types(), {
255 let mut data_type = info.schema.data_types();
256 data_type.remove(col_dix);
257 data_type
258 });
259 } else {
260 assert_eq!(sink_input_schema.data_types(), info.schema.data_types());
261 }
262
263 let non_append_only_behavior = if !sink_param.sink_type.is_append_only() {
264 let stream_key = &info.stream_key;
265 let pk_specified_and_matched = (sink_param.downstream_pk.as_ref())
266 .is_some_and(|downstream_pk| stream_key.iter().all(|i| downstream_pk.contains(i)));
267 let force_compaction = sink_param
268 .properties
269 .get(SINK_USER_FORCE_COMPACTION)
270 .map(|v| v.eq_ignore_ascii_case("true"))
271 .unwrap_or(false);
272 Some(NonAppendOnlyBehavior {
273 pk_specified_and_matched,
274 force_compaction,
275 })
276 } else {
277 None
278 };
279
280 tracing::info!(
281 sink_id = %sink_param.sink_id,
282 actor_id = %actor_context.id,
283 ?non_append_only_behavior,
284 "Sink executor info"
285 );
286
287 Ok(Self {
288 actor_context,
289 info,
290 input,
291 sink,
292 input_columns: columns,
293 sink_param,
294 log_store_factory,
295 sink_writer_param,
296 chunk_size,
297 input_data_types,
298 non_append_only_behavior,
299 rate_limit,
300 })
301 }
302
303 fn execute_inner(self) -> BoxedMessageStream {
304 let sink_id = self.sink_param.sink_id;
305 let actor_id = self.actor_context.id;
306 let fragment_id = self.actor_context.fragment_id;
307
308 let stream_key = self.info.stream_key.clone();
309 let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics(
310 sink_id,
311 actor_id,
312 fragment_id,
313 );
314
315 let input_compact_ib = if self.input.stream_kind() == StreamKind::Upsert {
318 InconsistencyBehavior::Tolerate
319 } else {
320 InconsistencyBehavior::Panic
321 };
322
323 let input = self.input.execute();
324
325 let input = input.inspect_ok(move |msg| {
326 if let Message::Chunk(c) = msg {
327 metrics.sink_input_row_count.inc_by(c.capacity() as u64);
328 metrics.sink_input_bytes.inc_by(c.estimated_size() as u64);
329 }
330 });
331
332 let processed_input = Self::process_msg(
333 input,
334 self.sink_param.sink_type,
335 stream_key,
336 self.chunk_size,
337 self.input_data_types,
338 input_compact_ib,
339 self.sink_param.downstream_pk.clone(),
340 self.non_append_only_behavior,
341 metrics.sink_chunk_buffer_size,
342 self.sink.is_blackhole(), );
344
345 let processed_input = if self.sink_param.ignore_delete {
346 processed_input
348 .map_ok(|msg| match msg {
349 Message::Chunk(chunk) => Message::Chunk(force_append_only(chunk)),
350 other => other,
351 })
352 .left_stream()
353 } else {
354 processed_input.right_stream()
355 };
356
357 if self.sink.is_sink_into_table() {
358 processed_input.boxed()
360 } else {
361 let labels = [
362 &actor_id.to_string(),
363 &sink_id.to_string(),
364 self.sink_param.sink_name.as_str(),
365 ];
366 let log_store_first_write_epoch = GLOBAL_SINK_METRICS
367 .log_store_first_write_epoch
368 .with_guarded_label_values(&labels);
369 let log_store_latest_write_epoch = GLOBAL_SINK_METRICS
370 .log_store_latest_write_epoch
371 .with_guarded_label_values(&labels);
372 let log_store_write_rows = GLOBAL_SINK_METRICS
373 .log_store_write_rows
374 .with_guarded_label_values(&labels);
375 let log_writer_metrics = LogWriterMetrics {
376 log_store_first_write_epoch,
377 log_store_latest_write_epoch,
378 log_store_write_rows,
379 };
380
381 let (rate_limit_tx, rate_limit_rx) = unbounded_channel();
382 rate_limit_tx.send(self.rate_limit.into()).unwrap();
384
385 let (rebuild_sink_tx, rebuild_sink_rx) = unbounded_channel();
386
387 self.log_store_factory
388 .build()
389 .map(move |(log_reader, log_writer)| {
390 let write_log_stream = Self::execute_write_log(
391 processed_input,
392 log_writer.monitored(log_writer_metrics),
393 actor_id,
394 fragment_id,
395 sink_id,
396 rate_limit_tx,
397 rebuild_sink_tx,
398 );
399
400 let consume_log_stream_future = dispatch_sink!(self.sink, sink, {
401 let consume_log_stream = Self::execute_consume_log(
402 *sink,
403 log_reader,
404 self.input_columns,
405 self.sink_param,
406 self.sink_writer_param,
407 self.non_append_only_behavior,
408 self.actor_context,
409 rate_limit_rx,
410 rebuild_sink_rx,
411 )
412 .instrument_await(
413 await_tree::span!("consume_log (sink_id {sink_id})").long_running(),
414 )
415 .map_ok(|never| never); consume_log_stream.boxed()
418 });
419 select(consume_log_stream_future.into_stream(), write_log_stream)
420 })
421 .into_stream()
422 .flatten()
423 .boxed()
424 }
425 }
426
427 #[try_stream(ok = Message, error = StreamExecutorError)]
428 async fn execute_write_log<W: LogWriter>(
429 input: impl MessageStream,
430 mut log_writer: W,
431 actor_id: ActorId,
432 fragment_id: FragmentId,
433 sink_id: SinkId,
434 rate_limit_tx: UnboundedSender<RateLimit>,
435 rebuild_sink_tx: UnboundedSender<RebuildSinkMessage>,
436 ) {
437 pin_mut!(input);
438 let barrier = expect_first_barrier(&mut input).await?;
439 let epoch_pair = barrier.epoch;
440 let is_pause_on_startup = barrier.is_pause_on_startup();
441 yield Message::Barrier(barrier);
443
444 log_writer.init(epoch_pair, is_pause_on_startup).await?;
445
446 let mut is_paused = false;
447
448 #[for_await]
449 for msg in input {
450 match msg? {
451 Message::Watermark(w) => yield Message::Watermark(w),
452 Message::Chunk(chunk) => {
453 assert!(
454 !is_paused,
455 "Actor {actor_id} should not receive any data after pause"
456 );
457 log_writer.write_chunk(chunk.clone()).await?;
458 yield Message::Chunk(chunk);
459 }
460 Message::Barrier(barrier) => {
461 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
462 let schema_change = barrier.as_sink_schema_change(sink_id);
463 if let Some(schema_change) = &schema_change {
464 info!(?schema_change, %sink_id, "sink receive schema change");
465 }
466 let post_flush = log_writer
467 .flush_current_epoch(
468 barrier.epoch.curr,
469 FlushCurrentEpochOptions {
470 is_checkpoint: barrier.kind.is_checkpoint(),
471 new_vnode_bitmap: update_vnode_bitmap.clone(),
472 is_stop: barrier.is_stop(actor_id),
473 schema_change,
474 },
475 )
476 .await?;
477
478 let mutation = barrier.mutation.clone();
479 yield Message::Barrier(barrier);
480 if F::REBUILD_SINK_ON_UPDATE_VNODE_BITMAP
481 && let Some(new_vnode_bitmap) = update_vnode_bitmap.clone()
482 {
483 let (tx, rx) = oneshot::channel();
484 rebuild_sink_tx
485 .send(RebuildSinkMessage::RebuildSink(new_vnode_bitmap, tx))
486 .map_err(|_| anyhow!("fail to send rebuild sink to reader"))?;
487 rx.await
488 .map_err(|_| anyhow!("fail to wait rebuild sink finish"))?;
489 }
490 post_flush.post_yield_barrier().await?;
491
492 if let Some(mutation) = mutation.as_deref() {
493 match mutation {
494 Mutation::Pause => {
495 log_writer.pause()?;
496 is_paused = true;
497 }
498 Mutation::Resume => {
499 log_writer.resume()?;
500 is_paused = false;
501 }
502 Mutation::Throttle(fragment_to_apply) => {
503 if let Some(entry) = fragment_to_apply.get(&fragment_id)
504 && entry.throttle_type() == ThrottleType::Sink
505 {
506 tracing::info!(
507 rate_limit = entry.rate_limit,
508 "received sink rate limit on actor {actor_id}"
509 );
510 if let Err(e) = rate_limit_tx.send(entry.rate_limit.into()) {
511 error!(
512 error = %e.as_report(),
513 "fail to send sink rate limit update"
514 );
515 return Err(StreamExecutorError::from(
516 e.to_report_string(),
517 ));
518 }
519 }
520 }
521 Mutation::ConnectorPropsChange(config) => {
522 if let Some(map) = config.get(&sink_id.as_raw_id())
523 && let Err(e) = rebuild_sink_tx
524 .send(RebuildSinkMessage::UpdateConfig(map.clone()))
525 {
526 error!(
527 error = %e.as_report(),
528 "fail to send sink alter props"
529 );
530 return Err(StreamExecutorError::from(e.to_report_string()));
531 }
532 }
533 _ => (),
534 }
535 }
536 }
537 }
538 }
539 }
540
541 #[expect(clippy::too_many_arguments)]
542 #[try_stream(ok = Message, error = StreamExecutorError)]
543 async fn process_msg(
544 input: impl MessageStream,
545 sink_type: SinkType,
546 stream_key: StreamKey,
547 chunk_size: usize,
548 input_data_types: Vec<DataType>,
549 input_compact_ib: InconsistencyBehavior,
550 downstream_pk: Option<Vec<usize>>,
551 non_append_only_behavior: Option<NonAppendOnlyBehavior>,
552 sink_chunk_buffer_size_metrics: LabelGuardedIntGauge,
553 skip_compact: bool,
554 ) {
555 if let Some(b) = non_append_only_behavior
557 && b.should_reorder_records()
558 {
559 assert_matches!(sink_type, SinkType::Upsert | SinkType::Retract);
560
561 let mut chunk_buffer = EstimatedVec::new();
562 let mut watermark: Option<super::Watermark> = None;
563 #[for_await]
564 for msg in input {
565 match msg? {
566 Message::Watermark(w) => watermark = Some(w),
567 Message::Chunk(c) => {
568 chunk_buffer.push(c);
569 sink_chunk_buffer_size_metrics.set(chunk_buffer.estimated_size() as i64);
570 }
571 Message::Barrier(barrier) => {
572 let chunks = mem::take(&mut chunk_buffer).into_inner();
573
574 let mut delete_chunks = vec![];
577 let mut insert_chunks = vec![];
578
579 for c in dispatch_output_kind!(sink_type, KIND, {
580 StreamChunkCompactor::new(stream_key.clone(), chunks)
581 .into_compacted_chunks_inline::<KIND>(input_compact_ib)
582 }) {
583 let chunk = force_delete_only(c.clone());
584 if chunk.cardinality() > 0 {
585 delete_chunks.push(chunk);
586 }
587 let chunk = force_append_only(c);
588 if chunk.cardinality() > 0 {
589 insert_chunks.push(chunk);
590 }
591 }
592 let chunks = delete_chunks
593 .into_iter()
594 .chain(insert_chunks.into_iter())
595 .collect();
596
597 if let Some(downstream_pk) = &downstream_pk {
602 let chunks = dispatch_output_kind!(sink_type, KIND, {
603 StreamChunkCompactor::new(downstream_pk.clone(), chunks)
604 .into_compacted_chunks_reconstructed::<KIND>(
605 chunk_size,
606 input_data_types.clone(),
607 InconsistencyBehavior::Warn,
610 )
611 });
612 for c in chunks {
613 yield Message::Chunk(c);
614 }
615 } else {
616 let mut chunk_builder =
617 StreamChunkBuilder::new(chunk_size, input_data_types.clone());
618 for chunk in chunks {
619 for (op, row) in chunk.rows() {
620 if let Some(c) = chunk_builder.append_row(op, row) {
621 yield Message::Chunk(c);
622 }
623 }
624 }
625
626 if let Some(c) = chunk_builder.take() {
627 yield Message::Chunk(c);
628 }
629 };
630
631 if let Some(w) = mem::take(&mut watermark) {
633 yield Message::Watermark(w)
634 }
635 yield Message::Barrier(barrier);
636 }
637 }
638 }
639 } else {
640 #[for_await]
643 for msg in input {
644 match msg? {
645 Message::Watermark(w) => yield Message::Watermark(w),
646 Message::Chunk(mut chunk) => {
647 if !sink_type.is_append_only()
651 && let Some(downstream_pk) = &downstream_pk
652 {
653 if skip_compact {
654 assert_eq!(&stream_key, downstream_pk);
657 } else {
658 chunk = dispatch_output_kind!(sink_type, KIND, {
659 compact_chunk_inline::<KIND>(
660 chunk,
661 downstream_pk,
662 InconsistencyBehavior::Warn,
665 )
666 });
667 }
668 }
669 yield Message::Chunk(chunk);
670 }
671 Message::Barrier(barrier) => {
672 yield Message::Barrier(barrier);
673 }
674 }
675 }
676 }
677 }
678
679 #[expect(clippy::too_many_arguments)]
680 async fn execute_consume_log<S: Sink, R: LogReader>(
681 mut sink: S,
682 log_reader: R,
683 columns: Vec<ColumnCatalog>,
684 mut sink_param: SinkParam,
685 mut sink_writer_param: SinkWriterParam,
686 non_append_only_behavior: Option<NonAppendOnlyBehavior>,
687 actor_context: ActorContextRef,
688 rate_limit_rx: UnboundedReceiver<RateLimit>,
689 mut rebuild_sink_rx: UnboundedReceiver<RebuildSinkMessage>,
690 ) -> StreamExecutorResult<!> {
691 let visible_columns = columns
692 .iter()
693 .enumerate()
694 .filter_map(|(idx, column)| (!column.is_hidden).then_some(idx))
695 .collect_vec();
696
697 let labels = [
698 &actor_context.id.to_string(),
699 sink_writer_param.connector.as_str(),
700 &sink_writer_param.sink_id.to_string(),
701 sink_writer_param.sink_name.as_str(),
702 ];
703 let log_store_reader_wait_new_future_duration_ns = GLOBAL_SINK_METRICS
704 .log_store_reader_wait_new_future_duration_ns
705 .with_guarded_label_values(&labels);
706 let log_store_read_rows = GLOBAL_SINK_METRICS
707 .log_store_read_rows
708 .with_guarded_label_values(&labels);
709 let log_store_read_bytes = GLOBAL_SINK_METRICS
710 .log_store_read_bytes
711 .with_guarded_label_values(&labels);
712 let log_store_latest_read_epoch = GLOBAL_SINK_METRICS
713 .log_store_latest_read_epoch
714 .with_guarded_label_values(&labels);
715 let metrics = LogReaderMetrics {
716 log_store_latest_read_epoch,
717 log_store_read_rows,
718 log_store_read_bytes,
719 log_store_reader_wait_new_future_duration_ns,
720 };
721
722 let downstream_pk = sink_param.downstream_pk.clone();
723
724 let mut log_reader = log_reader
725 .transform_chunk(move |chunk| {
726 let chunk = if let Some(b) = non_append_only_behavior
727 && b.should_compact_in_log_reader()
728 {
729 let downstream_pk = downstream_pk.as_ref().unwrap();
731 dispatch_output_kind!(sink_param.sink_type, KIND, {
732 compact_chunk_inline::<KIND>(
733 chunk,
734 downstream_pk,
735 InconsistencyBehavior::Warn,
738 )
739 })
740 } else {
741 chunk
742 };
743 if visible_columns.len() != columns.len() {
744 chunk.project(&visible_columns)
747 } else {
748 chunk
749 }
750 })
751 .monitored(metrics)
752 .rate_limited(rate_limit_rx);
753
754 log_reader.init().await?;
755 loop {
756 let future = async {
757 loop {
758 let Err(e) = sink
759 .new_log_sinker(sink_writer_param.clone())
760 .and_then(|log_sinker| log_sinker.consume_log_and_sink(&mut log_reader))
761 .await;
762 GLOBAL_ERROR_METRICS.user_sink_error.report([
763 "sink_executor_error".to_owned(),
764 sink_param.sink_id.to_string(),
765 sink_param.sink_name.clone(),
766 actor_context.fragment_id.to_string(),
767 ]);
768
769 if let Some(meta_client) = sink_writer_param.meta_client.as_ref() {
770 meta_client
771 .add_sink_fail_evet_log(
772 sink_writer_param.sink_id,
773 sink_writer_param.sink_name.clone(),
774 sink_writer_param.connector.clone(),
775 e.to_report_string(),
776 )
777 .await;
778 }
779
780 if F::ALLOW_REWIND {
781 match log_reader.rewind().await {
782 Ok(()) => {
783 error!(
784 error = %e.as_report(),
785 executor_id = %sink_writer_param.executor_id,
786 sink_id = %sink_param.sink_id,
787 "reset log reader stream successfully after sink error"
788 );
789 Ok(())
790 }
791 Err(rewind_err) => {
792 error!(
793 error = %rewind_err.as_report(),
794 "fail to rewind log reader"
795 );
796 Err(e)
797 }
798 }
799 } else {
800 Err(e)
801 }
802 .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
803 }
804 };
805 select! {
806 result = future => {
807 let Err(e): StreamExecutorResult<!> = result;
808 return Err(e);
809 }
810 result = rebuild_sink_rx.recv() => {
811 match result.ok_or_else(|| anyhow!("failed to receive rebuild sink notify"))? {
812 RebuildSinkMessage::RebuildSink(new_vnode, notify) => {
813 sink_writer_param.vnode_bitmap = Some((*new_vnode).clone());
814 if notify.send(()).is_err() {
815 warn!("failed to notify rebuild sink");
816 }
817 log_reader.init().await?;
818 },
819 RebuildSinkMessage::UpdateConfig(config) => {
820 if !sink_config_has_changes(&sink_param.properties, &config) {
821 info!(
822 executor_id = %sink_writer_param.executor_id,
823 sink_id = %sink_param.sink_id,
824 "skip alter sink config because properties are unchanged"
825 );
826 Ok(())
827 } else if F::ALLOW_REWIND {
828 match log_reader.rewind().await {
829 Ok(()) => {
830 sink_param.properties.extend(config.into_iter());
831 sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
832 info!(
833 executor_id = %sink_writer_param.executor_id,
834 sink_id = %sink_param.sink_id,
835 "alter sink config successfully with rewind"
836 );
837 Ok(())
838 }
839 Err(rewind_err) => {
840 error!(
841 error = %rewind_err.as_report(),
842 "fail to rewind log reader for alter sink config "
843 );
844 Err(anyhow!("fail to rewind log after alter table").into())
845 }
846 }
847 } else {
848 sink_param.properties.extend(config.into_iter());
849 sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
850 Err(anyhow!("This is not an actual error condition. The system is intentionally triggering recovery procedures to ensure ALTER SINK CONFIG are fully applied.").into())
851 }
852 .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
853 },
854 }
855 }
856 }
857 }
858 }
859}
860
861enum RebuildSinkMessage {
862 RebuildSink(Arc<Bitmap>, oneshot::Sender<()>),
863 UpdateConfig(HashMap<String, String>),
864}
865
866fn sink_config_has_changes(
867 current: &BTreeMap<String, String>,
868 incoming: &HashMap<String, String>,
869) -> bool {
870 incoming
871 .iter()
872 .any(|(key, value)| current.get(key) != Some(value))
873}
874
875impl<F: LogStoreFactory> Execute for SinkExecutor<F> {
876 fn execute(self: Box<Self>) -> BoxedMessageStream {
877 self.execute_inner()
878 }
879}
880
881#[cfg(test)]
882mod test {
883 use risingwave_common::catalog::{ColumnDesc, ColumnId};
884 use risingwave_common::util::epoch::test_epoch;
885 use risingwave_connector::sink::build_sink;
886
887 use super::*;
888 use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
889 use crate::executor::test_utils::*;
890
891 #[test]
892 fn test_sink_config_has_changes() {
893 let current = BTreeMap::from([
894 ("connector".to_owned(), "blackhole".to_owned()),
895 ("commit_checkpoint_interval".to_owned(), "1".to_owned()),
896 ]);
897
898 assert!(!sink_config_has_changes(
899 ¤t,
900 &HashMap::from([("commit_checkpoint_interval".to_owned(), "1".to_owned())])
901 ));
902 assert!(sink_config_has_changes(
903 ¤t,
904 &HashMap::from([("commit_checkpoint_interval".to_owned(), "2".to_owned())])
905 ));
906 assert!(sink_config_has_changes(
907 ¤t,
908 &HashMap::from([("force_append_only".to_owned(), "true".to_owned())])
909 ));
910 }
911
912 #[tokio::test]
913 async fn test_force_append_only_sink() {
914 use risingwave_common::array::StreamChunkTestExt;
915 use risingwave_common::array::stream_chunk::StreamChunk;
916 use risingwave_common::types::DataType;
917
918 use crate::executor::Barrier;
919
920 let properties = maplit::btreemap! {
921 "connector".into() => "blackhole".into(),
922 "type".into() => "append-only".into(),
923 "force_append_only".into() => "true".into()
924 };
925
926 let columns = vec![
929 ColumnCatalog {
930 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
931 is_hidden: false,
932 },
933 ColumnCatalog {
934 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
935 is_hidden: false,
936 },
937 ColumnCatalog {
938 column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
939 is_hidden: true,
940 },
941 ];
942 let schema: Schema = columns
943 .iter()
944 .map(|column| Field::from(column.column_desc.clone()))
945 .collect();
946 let stream_key = vec![0];
947
948 let source = MockSource::with_messages(vec![
949 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
950 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
951 " I I I
952 + 3 2 1",
953 ))),
954 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
955 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
956 " I I I
957 U- 3 2 1
958 U+ 3 4 1
959 + 5 6 7",
960 ))),
961 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
962 " I I I
963 - 5 6 7",
964 ))),
965 ])
966 .into_executor(schema.clone(), stream_key.clone());
967
968 let sink_param = SinkParam {
969 sink_id: 0.into(),
970 sink_name: "test".into(),
971 properties,
972
973 columns: columns
974 .iter()
975 .filter(|col| !col.is_hidden)
976 .map(|col| col.column_desc.clone())
977 .collect(),
978 downstream_pk: Some(stream_key.clone()),
979 sink_type: SinkType::AppendOnly,
980 ignore_delete: true,
981 format_desc: None,
982 db_name: "test".into(),
983 sink_from_name: "test".into(),
984 };
985
986 let info = ExecutorInfo::for_test(schema, stream_key, "SinkExecutor".to_owned(), 0);
987
988 let sink = build_sink(sink_param.clone()).unwrap();
989
990 let sink_executor = SinkExecutor::new(
991 ActorContext::for_test(0),
992 info,
993 source,
994 SinkWriterParam::for_test(),
995 sink,
996 sink_param,
997 columns.clone(),
998 BoundedInMemLogStoreFactory::for_test(1),
999 1024,
1000 vec![DataType::Int32, DataType::Int32, DataType::Int32],
1001 None,
1002 )
1003 .unwrap();
1004
1005 let mut executor = sink_executor.boxed().execute();
1006
1007 executor.next().await.unwrap().unwrap();
1009
1010 let chunk_msg = executor.next().await.unwrap().unwrap();
1011 assert_eq!(
1012 chunk_msg.into_chunk().unwrap().compact_vis(),
1013 StreamChunk::from_pretty(
1014 " I I I
1015 + 3 2 1",
1016 )
1017 );
1018
1019 executor.next().await.unwrap().unwrap();
1021
1022 let chunk_msg = executor.next().await.unwrap().unwrap();
1023 assert_eq!(
1024 chunk_msg.into_chunk().unwrap().compact_vis(),
1025 StreamChunk::from_pretty(
1026 " I I I
1027 + 3 4 1
1028 + 5 6 7",
1029 )
1030 );
1031
1032 executor.next().await.unwrap().unwrap();
1037 }
1038
1039 #[tokio::test]
1040 async fn stream_key_sink_pk_mismatch_upsert() {
1041 stream_key_sink_pk_mismatch(SinkType::Upsert).await;
1042 }
1043
1044 #[tokio::test]
1045 async fn stream_key_sink_pk_mismatch_retract() {
1046 stream_key_sink_pk_mismatch(SinkType::Retract).await;
1047 }
1048
1049 async fn stream_key_sink_pk_mismatch(sink_type: SinkType) {
1050 use risingwave_common::array::StreamChunkTestExt;
1051 use risingwave_common::array::stream_chunk::StreamChunk;
1052 use risingwave_common::types::DataType;
1053
1054 use crate::executor::Barrier;
1055
1056 let properties = maplit::btreemap! {
1057 "connector".into() => "blackhole".into(),
1058 };
1059
1060 let columns = vec![
1063 ColumnCatalog {
1064 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1065 is_hidden: false,
1066 },
1067 ColumnCatalog {
1068 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1069 is_hidden: false,
1070 },
1071 ColumnCatalog {
1072 column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1073 is_hidden: true,
1074 },
1075 ];
1076 let schema: Schema = columns
1077 .iter()
1078 .map(|column| Field::from(column.column_desc.clone()))
1079 .collect();
1080
1081 let source = MockSource::with_messages(vec![
1082 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1083 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1084 " I I I
1085 + 1 1 10",
1086 ))),
1087 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1088 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1089 " I I I
1090 + 1 3 30",
1091 ))),
1092 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1093 " I I I
1094 + 1 2 20
1095 - 1 2 20",
1096 ))),
1097 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1098 " I I I
1099 - 1 1 10
1100 + 1 1 40",
1101 ))),
1102 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1103 ])
1104 .into_executor(schema.clone(), vec![0, 1]);
1105
1106 let sink_param = SinkParam {
1107 sink_id: 0.into(),
1108 sink_name: "test".into(),
1109 properties,
1110
1111 columns: columns
1112 .iter()
1113 .filter(|col| !col.is_hidden)
1114 .map(|col| col.column_desc.clone())
1115 .collect(),
1116 downstream_pk: Some(vec![0]),
1117 sink_type,
1118 ignore_delete: false,
1119 format_desc: None,
1120 db_name: "test".into(),
1121 sink_from_name: "test".into(),
1122 };
1123
1124 let info = ExecutorInfo::for_test(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
1125
1126 let sink = build_sink(sink_param.clone()).unwrap();
1127
1128 let sink_executor = SinkExecutor::new(
1129 ActorContext::for_test(0),
1130 info,
1131 source,
1132 SinkWriterParam::for_test(),
1133 sink,
1134 sink_param,
1135 columns.clone(),
1136 BoundedInMemLogStoreFactory::for_test(1),
1137 1024,
1138 vec![DataType::Int64, DataType::Int64, DataType::Int64],
1139 None,
1140 )
1141 .unwrap();
1142
1143 let mut executor = sink_executor.boxed().execute();
1144
1145 executor.next().await.unwrap().unwrap();
1147
1148 let chunk_msg = executor.next().await.unwrap().unwrap();
1149 assert_eq!(
1150 chunk_msg.into_chunk().unwrap().compact_vis(),
1151 StreamChunk::from_pretty(
1152 " I I I
1153 + 1 1 10",
1154 )
1155 );
1156
1157 executor.next().await.unwrap().unwrap();
1159
1160 let chunk_msg = executor.next().await.unwrap().unwrap();
1161 let expected = match sink_type {
1162 SinkType::Retract => StreamChunk::from_pretty(
1163 " I I I
1164 U- 1 1 10
1165 U+ 1 1 40",
1166 ),
1167 SinkType::Upsert => StreamChunk::from_pretty(
1168 " I I I
1169 + 1 1 40", ),
1171 _ => unreachable!(),
1172 };
1173 assert_eq!(chunk_msg.into_chunk().unwrap().compact_vis(), expected);
1174
1175 executor.next().await.unwrap().unwrap();
1177 }
1178
1179 #[tokio::test]
1180 async fn test_empty_barrier_sink() {
1181 use risingwave_common::types::DataType;
1182
1183 use crate::executor::Barrier;
1184
1185 let properties = maplit::btreemap! {
1186 "connector".into() => "blackhole".into(),
1187 "type".into() => "append-only".into(),
1188 "force_append_only".into() => "true".into()
1189 };
1190 let columns = vec![
1191 ColumnCatalog {
1192 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1193 is_hidden: false,
1194 },
1195 ColumnCatalog {
1196 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1197 is_hidden: false,
1198 },
1199 ];
1200 let schema: Schema = columns
1201 .iter()
1202 .map(|column| Field::from(column.column_desc.clone()))
1203 .collect();
1204 let stream_key = vec![0];
1205
1206 let source = MockSource::with_messages(vec![
1207 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1208 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1209 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1210 ])
1211 .into_executor(schema.clone(), stream_key.clone());
1212
1213 let sink_param = SinkParam {
1214 sink_id: 0.into(),
1215 sink_name: "test".into(),
1216 properties,
1217
1218 columns: columns
1219 .iter()
1220 .filter(|col| !col.is_hidden)
1221 .map(|col| col.column_desc.clone())
1222 .collect(),
1223 downstream_pk: Some(stream_key.clone()),
1224 sink_type: SinkType::AppendOnly,
1225 ignore_delete: true,
1226 format_desc: None,
1227 db_name: "test".into(),
1228 sink_from_name: "test".into(),
1229 };
1230
1231 let info = ExecutorInfo::for_test(schema, stream_key, "SinkExecutor".to_owned(), 0);
1232
1233 let sink = build_sink(sink_param.clone()).unwrap();
1234
1235 let sink_executor = SinkExecutor::new(
1236 ActorContext::for_test(0),
1237 info,
1238 source,
1239 SinkWriterParam::for_test(),
1240 sink,
1241 sink_param,
1242 columns,
1243 BoundedInMemLogStoreFactory::for_test(1),
1244 1024,
1245 vec![DataType::Int64, DataType::Int64],
1246 None,
1247 )
1248 .unwrap();
1249
1250 let mut executor = sink_executor.boxed().execute();
1251
1252 assert_eq!(
1254 executor.next().await.unwrap().unwrap(),
1255 Message::Barrier(Barrier::new_test_barrier(test_epoch(1)))
1256 );
1257
1258 assert_eq!(
1260 executor.next().await.unwrap().unwrap(),
1261 Message::Barrier(Barrier::new_test_barrier(test_epoch(2)))
1262 );
1263
1264 assert_eq!(
1266 executor.next().await.unwrap().unwrap(),
1267 Message::Barrier(Barrier::new_test_barrier(test_epoch(3)))
1268 );
1269 }
1270
1271 #[tokio::test]
1272 async fn test_force_compaction() {
1273 use risingwave_common::array::StreamChunkTestExt;
1274 use risingwave_common::array::stream_chunk::StreamChunk;
1275 use risingwave_common::types::DataType;
1276
1277 use crate::executor::Barrier;
1278
1279 let properties = maplit::btreemap! {
1280 "connector".into() => "blackhole".into(),
1281 "force_compaction".into() => "true".into()
1282 };
1283
1284 let columns = vec![
1287 ColumnCatalog {
1288 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1289 is_hidden: false,
1290 },
1291 ColumnCatalog {
1292 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1293 is_hidden: false,
1294 },
1295 ColumnCatalog {
1296 column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1297 is_hidden: true,
1298 },
1299 ];
1300 let schema: Schema = columns
1301 .iter()
1302 .map(|column| Field::from(column.column_desc.clone()))
1303 .collect();
1304
1305 let source = MockSource::with_messages(vec![
1306 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1307 Message::Chunk(StreamChunk::from_pretty(
1308 " I I I
1309 + 1 1 10",
1310 )),
1311 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1312 Message::Chunk(StreamChunk::from_pretty(
1313 " I I I
1314 + 1 3 30",
1315 )),
1316 Message::Chunk(StreamChunk::from_pretty(
1317 " I I I
1318 + 1 2 20
1319 - 1 2 20
1320 + 1 4 10",
1321 )),
1322 Message::Chunk(StreamChunk::from_pretty(
1323 " I I I
1324 - 1 1 10
1325 + 1 1 40",
1326 )),
1327 Message::Chunk(StreamChunk::from_pretty(
1328 " I I I
1329 - 1 4 30",
1330 )),
1331 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1332 ])
1333 .into_executor(schema.clone(), vec![0, 1]);
1334
1335 let sink_param = SinkParam {
1336 sink_id: 0.into(),
1337 sink_name: "test".into(),
1338 properties,
1339
1340 columns: columns
1341 .iter()
1342 .filter(|col| !col.is_hidden)
1343 .map(|col| col.column_desc.clone())
1344 .collect(),
1345 downstream_pk: Some(vec![0, 1]),
1346 sink_type: SinkType::Upsert,
1347 ignore_delete: false,
1348 format_desc: None,
1349 db_name: "test".into(),
1350 sink_from_name: "test".into(),
1351 };
1352
1353 let info = ExecutorInfo::for_test(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
1354
1355 let sink = build_sink(sink_param.clone()).unwrap();
1356
1357 let sink_executor = SinkExecutor::new(
1358 ActorContext::for_test(0),
1359 info,
1360 source,
1361 SinkWriterParam::for_test(),
1362 sink,
1363 sink_param,
1364 columns.clone(),
1365 BoundedInMemLogStoreFactory::for_test(1),
1366 1024,
1367 vec![DataType::Int64, DataType::Int64, DataType::Int64],
1368 None,
1369 )
1370 .unwrap();
1371
1372 let mut executor = sink_executor.boxed().execute();
1373
1374 executor.next().await.unwrap().unwrap();
1376
1377 let chunk_msg = executor.next().await.unwrap().unwrap();
1378 assert_eq!(
1379 chunk_msg.into_chunk().unwrap().compact_vis(),
1380 StreamChunk::from_pretty(
1381 " I I I
1382 + 1 1 10",
1383 )
1384 );
1385
1386 executor.next().await.unwrap().unwrap();
1388
1389 let chunk_msg = executor.next().await.unwrap().unwrap();
1390 assert_eq!(
1391 chunk_msg.into_chunk().unwrap().compact_vis(),
1392 StreamChunk::from_pretty(
1393 " I I I
1394 + 1 3 30
1395 + 1 1 40",
1396 )
1397 );
1398
1399 executor.next().await.unwrap().unwrap();
1401 }
1402}