risingwave_stream/executor/
sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::HashMap;
17use std::mem;
18
19use anyhow::anyhow;
20use futures::stream::select;
21use futures::{FutureExt, TryFutureExt, TryStreamExt};
22use itertools::Itertools;
23use risingwave_common::array::Op;
24use risingwave_common::array::stream_chunk::StreamChunkMut;
25use risingwave_common::bitmap::Bitmap;
26use risingwave_common::catalog::{ColumnCatalog, Field};
27use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntGauge};
28use risingwave_common_estimate_size::EstimateSize;
29use risingwave_common_estimate_size::collections::EstimatedVec;
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_connector::dispatch_sink;
32use risingwave_connector::sink::catalog::{SinkId, SinkType};
33use risingwave_connector::sink::log_store::{
34    FlushCurrentEpochOptions, LogReader, LogReaderExt, LogReaderMetrics, LogStoreFactory,
35    LogWriter, LogWriterExt, LogWriterMetrics,
36};
37use risingwave_connector::sink::{
38    GLOBAL_SINK_METRICS, LogSinker, SINK_USER_FORCE_COMPACTION, Sink, SinkImpl, SinkParam,
39    SinkWriterParam,
40};
41use risingwave_pb::stream_plan::stream_node::StreamKind;
42use thiserror_ext::AsReport;
43use tokio::select;
44use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
45use tokio::sync::oneshot;
46
47use crate::common::change_buffer::{OutputKind, output_kind};
48use crate::common::compact_chunk::{
49    InconsistencyBehavior, StreamChunkCompactor, compact_chunk_inline,
50};
51use crate::executor::prelude::*;
52pub struct SinkExecutor<F: LogStoreFactory> {
53    actor_context: ActorContextRef,
54    info: ExecutorInfo,
55    input: Executor,
56    sink: SinkImpl,
57    input_columns: Vec<ColumnCatalog>,
58    sink_param: SinkParam,
59    log_store_factory: F,
60    sink_writer_param: SinkWriterParam,
61    chunk_size: usize,
62    input_data_types: Vec<DataType>,
63    non_append_only_behavior: Option<NonAppendOnlyBehavior>,
64    rate_limit: Option<u32>,
65}
66
67// Drop all the DELETE messages in this chunk and convert UPDATE INSERT into INSERT.
68fn force_append_only(c: StreamChunk) -> StreamChunk {
69    let mut c: StreamChunkMut = c.into();
70    for (_, mut r) in c.to_rows_mut() {
71        match r.op() {
72            Op::Insert => {}
73            Op::Delete | Op::UpdateDelete => r.set_vis(false),
74            Op::UpdateInsert => r.set_op(Op::Insert),
75        }
76    }
77    c.into()
78}
79
80// Drop all the INSERT messages in this chunk and convert UPDATE DELETE into DELETE.
81fn force_delete_only(c: StreamChunk) -> StreamChunk {
82    let mut c: StreamChunkMut = c.into();
83    for (_, mut r) in c.to_rows_mut() {
84        match r.op() {
85            Op::Delete => {}
86            Op::Insert | Op::UpdateInsert => r.set_vis(false),
87            Op::UpdateDelete => r.set_op(Op::Delete),
88        }
89    }
90    c.into()
91}
92
93/// When the sink is non-append-only, i.e. upsert or retract, we need to do some extra work for
94/// correctness and performance.
95#[derive(Clone, Copy, Debug)]
96struct NonAppendOnlyBehavior {
97    /// Whether the user specifies a primary key for the sink, and it matches the derived stream
98    /// key of the stream.
99    ///
100    /// By matching, we mean that stream key is a subset of the downstream pk.
101    pk_specified_and_matched: bool,
102    /// Whether the user forces buffering all chunks between two barriers.
103    force_compaction: bool,
104}
105
106impl NonAppendOnlyBehavior {
107    /// NOTE(kwannoel):
108    ///
109    /// After the optimization in <https://github.com/risingwavelabs/risingwave/pull/12250>,
110    /// `DELETE`s will be sequenced before `INSERT`s in JDBC sinks and PG rust sink.
111    /// There's a risk that adjacent chunks with `DELETE`s on the same PK will get
112    /// merged into a single chunk, since the logstore format doesn't preserve chunk
113    /// boundaries. Then we will have double `DELETE`s followed by unspecified sequence
114    /// of `INSERT`s, and lead to inconsistent data downstream.
115    ///
116    /// We only need to do the compaction for non-append-only sinks, when the upstream and
117    /// downstream PKs are matched. When the upstream and downstream PKs are not matched,
118    /// we will buffer the chunks between two barriers, so the compaction is not needed,
119    /// since the barriers will preserve chunk boundaries.
120    ///
121    /// When `force_compaction` is true, we also skip compaction here, since the buffering
122    /// will also make compaction.
123    ///
124    /// When the sink is an append-only sink, it is either `force_append_only` or
125    /// `append_only`, we should only append to downstream, so there should not be any
126    /// overlapping keys.
127    fn should_compact_in_log_reader(self) -> bool {
128        self.pk_specified_and_matched && !self.force_compaction
129    }
130
131    /// When stream key is different from the user defined primary key columns for sinks.
132    /// The operations could be out of order.
133    ///
134    /// For example, we have a stream with derived stream key `a, b` and user-specified sink
135    /// primary key `a`. Assume that we have `(1, 1)` in the table. Then, we perform two `UPDATE`
136    /// operations:
137    ///
138    /// ```text
139    /// UPDATE SET b = 2 WHERE a = 1 ... which issues:
140    ///   - (1, 1)
141    ///   + (1, 2)
142    ///
143    /// UPDATE SET b = 3 WHERE a = 1 ... which issues:
144    ///   - (1, 2)
145    ///   + (1, 3)
146    /// ```
147    ///
148    /// When these changes go into streaming pipeline, they could be shuffled to different parallelism
149    /// (actor), given that they are under different stream keys.
150    ///
151    /// ```text
152    /// Actor 1:
153    /// - (1, 1)
154    ///
155    /// Actor 2:
156    /// + (1, 2)
157    /// - (1, 2)
158    ///
159    /// Actor 3:
160    /// + (1, 3)
161    /// ```
162    ///
163    /// When these records are merged back into sink actor, we may get the records from different
164    /// parallelism in arbitrary order, like:
165    ///
166    /// ```text
167    /// + (1, 2) -- Actor 2, first row
168    /// + (1, 3) -- Actor 3
169    /// - (1, 1) -- Actor 1
170    /// - (1, 2) -- Actor 2, second row
171    /// ```
172    ///
173    /// Note that in terms of stream key (`a, b`), the operations in the order above are completely
174    /// correct, because we are operating on 3 different rows. However, in terms of user defined sink
175    /// primary key `a`, we're violating the unique constraint all the time.
176    ///
177    /// Therefore, in this case, we have to do additional reordering in the sink executor per barrier.
178    /// Specifically, we need to:
179    ///
180    /// First, compact all the changes with the stream key, so we have:
181    /// ```text
182    /// + (1, 3)
183    /// - (1, 1)
184    /// ```
185    ///
186    /// Then, sink all the delete events before sinking all insert events, so we have:
187    /// ```text
188    /// - (1, 1)
189    /// + (1, 3)
190    /// ```
191    /// Since we've compacted the chunk with the stream key, the `DELETE` records survived must be to
192    /// delete an existing row, so we can safely move them to the front. After the deletion is done,
193    /// we can then safely sink the insert events with uniqueness guarantee.
194    ///
195    /// When `force_compaction` is true, we also perform additional reordering to gain the
196    /// benefits of compaction:
197    /// - reduce the number of output messages;
198    /// - emit at most one update per key within a barrier interval, simplifying downstream logic.
199    fn should_reorder_records(self) -> bool {
200        !self.pk_specified_and_matched || self.force_compaction
201    }
202}
203
204/// Get the output kind for chunk compaction based on the given sink type.
205fn compact_output_kind(sink_type: SinkType) -> OutputKind {
206    match sink_type {
207        SinkType::Upsert => output_kind::UPSERT,
208        SinkType::Retract => output_kind::RETRACT,
209        // There won't be any `Update` or `Delete` in the chunk, so it doesn't matter.
210        SinkType::AppendOnly | SinkType::ForceAppendOnly => output_kind::RETRACT,
211    }
212}
213
214/// Dispatch the code block to different output kinds for chunk compaction based on sink type.
215macro_rules! dispatch_output_kind {
216    ($sink_type:expr, $KIND:ident, $body:tt) => {
217        #[allow(unused_braces)]
218        match compact_output_kind($sink_type) {
219            output_kind::UPSERT => {
220                const KIND: OutputKind = output_kind::UPSERT;
221                $body
222            }
223            output_kind::RETRACT => {
224                const KIND: OutputKind = output_kind::RETRACT;
225                $body
226            }
227        }
228    };
229}
230
231impl<F: LogStoreFactory> SinkExecutor<F> {
232    #[allow(clippy::too_many_arguments)]
233    #[expect(clippy::unused_async)]
234    pub async fn new(
235        actor_context: ActorContextRef,
236        info: ExecutorInfo,
237        input: Executor,
238        sink_writer_param: SinkWriterParam,
239        sink: SinkImpl,
240        sink_param: SinkParam,
241        columns: Vec<ColumnCatalog>,
242        log_store_factory: F,
243        chunk_size: usize,
244        input_data_types: Vec<DataType>,
245        rate_limit: Option<u32>,
246    ) -> StreamExecutorResult<Self> {
247        let sink_input_schema: Schema = columns
248            .iter()
249            .map(|column| Field::from(&column.column_desc))
250            .collect();
251
252        if let Some(col_dix) = sink_writer_param.extra_partition_col_idx {
253            // Remove the partition column from the schema.
254            assert_eq!(sink_input_schema.data_types(), {
255                let mut data_type = info.schema.data_types();
256                data_type.remove(col_dix);
257                data_type
258            });
259        } else {
260            assert_eq!(sink_input_schema.data_types(), info.schema.data_types());
261        }
262
263        let non_append_only_behavior = if !sink_param.sink_type.is_append_only() {
264            let stream_key = &info.stream_key;
265            let pk_specified_and_matched = (sink_param.downstream_pk.as_ref())
266                .is_some_and(|downstream_pk| stream_key.iter().all(|i| downstream_pk.contains(i)));
267            let force_compaction = sink_param
268                .properties
269                .get(SINK_USER_FORCE_COMPACTION)
270                .map(|v| v.eq_ignore_ascii_case("true"))
271                .unwrap_or(false);
272            Some(NonAppendOnlyBehavior {
273                pk_specified_and_matched,
274                force_compaction,
275            })
276        } else {
277            None
278        };
279
280        tracing::info!(
281            sink_id = %sink_param.sink_id,
282            actor_id = %actor_context.id,
283            ?non_append_only_behavior,
284            "Sink executor info"
285        );
286
287        Ok(Self {
288            actor_context,
289            info,
290            input,
291            sink,
292            input_columns: columns,
293            sink_param,
294            log_store_factory,
295            sink_writer_param,
296            chunk_size,
297            input_data_types,
298            non_append_only_behavior,
299            rate_limit,
300        })
301    }
302
303    fn execute_inner(self) -> BoxedMessageStream {
304        let sink_id = self.sink_param.sink_id;
305        let actor_id = self.actor_context.id;
306        let fragment_id = self.actor_context.fragment_id;
307
308        let stream_key = self.info.stream_key.clone();
309        let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics(
310            sink_id,
311            actor_id,
312            fragment_id,
313        );
314
315        // When processing upsert stream, we need to tolerate the inconsistency (mismatched `DELETE`
316        // and `INSERT` pairs) when compacting input chunks with derived stream key.
317        let input_compact_ib = if self.input.stream_kind() == StreamKind::Upsert {
318            InconsistencyBehavior::Tolerate
319        } else {
320            InconsistencyBehavior::Panic
321        };
322
323        let input = self.input.execute();
324
325        let input = input.inspect_ok(move |msg| {
326            if let Message::Chunk(c) = msg {
327                metrics.sink_input_row_count.inc_by(c.capacity() as u64);
328                metrics.sink_input_bytes.inc_by(c.estimated_size() as u64);
329            }
330        });
331
332        let processed_input = Self::process_msg(
333            input,
334            self.sink_param.sink_type,
335            stream_key,
336            self.chunk_size,
337            self.input_data_types,
338            input_compact_ib,
339            self.sink_param.downstream_pk.clone(),
340            self.non_append_only_behavior,
341            metrics.sink_chunk_buffer_size,
342            self.sink.is_blackhole(), // skip compact for blackhole for better benchmark results
343        );
344
345        if self.sink.is_sink_into_table() {
346            // TODO(hzxa21): support rate limit?
347            processed_input.boxed()
348        } else {
349            let labels = [
350                &actor_id.to_string(),
351                &sink_id.to_string(),
352                self.sink_param.sink_name.as_str(),
353            ];
354            let log_store_first_write_epoch = GLOBAL_SINK_METRICS
355                .log_store_first_write_epoch
356                .with_guarded_label_values(&labels);
357            let log_store_latest_write_epoch = GLOBAL_SINK_METRICS
358                .log_store_latest_write_epoch
359                .with_guarded_label_values(&labels);
360            let log_store_write_rows = GLOBAL_SINK_METRICS
361                .log_store_write_rows
362                .with_guarded_label_values(&labels);
363            let log_writer_metrics = LogWriterMetrics {
364                log_store_first_write_epoch,
365                log_store_latest_write_epoch,
366                log_store_write_rows,
367            };
368
369            let (rate_limit_tx, rate_limit_rx) = unbounded_channel();
370            // Init the rate limit
371            rate_limit_tx.send(self.rate_limit.into()).unwrap();
372
373            let (rebuild_sink_tx, rebuild_sink_rx) = unbounded_channel();
374
375            self.log_store_factory
376                .build()
377                .map(move |(log_reader, log_writer)| {
378                    let write_log_stream = Self::execute_write_log(
379                        processed_input,
380                        log_writer.monitored(log_writer_metrics),
381                        actor_id,
382                        sink_id,
383                        rate_limit_tx,
384                        rebuild_sink_tx,
385                    );
386
387                    let consume_log_stream_future = dispatch_sink!(self.sink, sink, {
388                        let consume_log_stream = Self::execute_consume_log(
389                            *sink,
390                            log_reader,
391                            self.input_columns,
392                            self.sink_param,
393                            self.sink_writer_param,
394                            self.non_append_only_behavior,
395                            input_compact_ib,
396                            self.actor_context,
397                            rate_limit_rx,
398                            rebuild_sink_rx,
399                        )
400                        .instrument_await(
401                            await_tree::span!("consume_log (sink_id {sink_id})").long_running(),
402                        )
403                        .map_ok(|never| never); // unify return type to `Message`
404
405                        consume_log_stream.boxed()
406                    });
407                    select(consume_log_stream_future.into_stream(), write_log_stream)
408                })
409                .into_stream()
410                .flatten()
411                .boxed()
412        }
413    }
414
415    #[try_stream(ok = Message, error = StreamExecutorError)]
416    async fn execute_write_log<W: LogWriter>(
417        input: impl MessageStream,
418        mut log_writer: W,
419        actor_id: ActorId,
420        sink_id: SinkId,
421        rate_limit_tx: UnboundedSender<RateLimit>,
422        rebuild_sink_tx: UnboundedSender<RebuildSinkMessage>,
423    ) {
424        pin_mut!(input);
425        let barrier = expect_first_barrier(&mut input).await?;
426        let epoch_pair = barrier.epoch;
427        let is_pause_on_startup = barrier.is_pause_on_startup();
428        // Propagate the first barrier
429        yield Message::Barrier(barrier);
430
431        log_writer.init(epoch_pair, is_pause_on_startup).await?;
432
433        let mut is_paused = false;
434
435        #[for_await]
436        for msg in input {
437            match msg? {
438                Message::Watermark(w) => yield Message::Watermark(w),
439                Message::Chunk(chunk) => {
440                    assert!(
441                        !is_paused,
442                        "Actor {actor_id} should not receive any data after pause"
443                    );
444                    log_writer.write_chunk(chunk.clone()).await?;
445                    yield Message::Chunk(chunk);
446                }
447                Message::Barrier(barrier) => {
448                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
449                    let add_columns = barrier.as_sink_add_columns(sink_id);
450                    if let Some(add_columns) = &add_columns {
451                        info!(?add_columns, %sink_id, "sink receive add columns");
452                    }
453                    let post_flush = log_writer
454                        .flush_current_epoch(
455                            barrier.epoch.curr,
456                            FlushCurrentEpochOptions {
457                                is_checkpoint: barrier.kind.is_checkpoint(),
458                                new_vnode_bitmap: update_vnode_bitmap.clone(),
459                                is_stop: barrier.is_stop(actor_id),
460                                add_columns,
461                            },
462                        )
463                        .await?;
464
465                    let mutation = barrier.mutation.clone();
466                    yield Message::Barrier(barrier);
467                    if F::REBUILD_SINK_ON_UPDATE_VNODE_BITMAP
468                        && let Some(new_vnode_bitmap) = update_vnode_bitmap.clone()
469                    {
470                        let (tx, rx) = oneshot::channel();
471                        rebuild_sink_tx
472                            .send(RebuildSinkMessage::RebuildSink(new_vnode_bitmap, tx))
473                            .map_err(|_| anyhow!("fail to send rebuild sink to reader"))?;
474                        rx.await
475                            .map_err(|_| anyhow!("fail to wait rebuild sink finish"))?;
476                    }
477                    post_flush.post_yield_barrier().await?;
478
479                    if let Some(mutation) = mutation.as_deref() {
480                        match mutation {
481                            Mutation::Pause => {
482                                log_writer.pause()?;
483                                is_paused = true;
484                            }
485                            Mutation::Resume => {
486                                log_writer.resume()?;
487                                is_paused = false;
488                            }
489                            Mutation::Throttle(actor_to_apply) => {
490                                if let Some(new_rate_limit) = actor_to_apply.get(&actor_id) {
491                                    tracing::info!(
492                                        rate_limit = new_rate_limit,
493                                        "received sink rate limit on actor {actor_id}"
494                                    );
495                                    if let Err(e) = rate_limit_tx.send((*new_rate_limit).into()) {
496                                        error!(
497                                            error = %e.as_report(),
498                                            "fail to send sink ate limit update"
499                                        );
500                                        return Err(StreamExecutorError::from(
501                                            e.to_report_string(),
502                                        ));
503                                    }
504                                }
505                            }
506                            Mutation::ConnectorPropsChange(config) => {
507                                if let Some(map) = config.get(&sink_id.as_raw_id())
508                                    && let Err(e) = rebuild_sink_tx
509                                        .send(RebuildSinkMessage::UpdateConfig(map.clone()))
510                                {
511                                    error!(
512                                        error = %e.as_report(),
513                                        "fail to send sink alter props"
514                                    );
515                                    return Err(StreamExecutorError::from(e.to_report_string()));
516                                }
517                            }
518                            _ => (),
519                        }
520                    }
521                }
522            }
523        }
524    }
525
526    #[allow(clippy::too_many_arguments)]
527    #[try_stream(ok = Message, error = StreamExecutorError)]
528    async fn process_msg(
529        input: impl MessageStream,
530        sink_type: SinkType,
531        stream_key: StreamKey,
532        chunk_size: usize,
533        input_data_types: Vec<DataType>,
534        input_compact_ib: InconsistencyBehavior,
535        downstream_pk: Option<Vec<usize>>,
536        non_append_only_behavior: Option<NonAppendOnlyBehavior>,
537        sink_chunk_buffer_size_metrics: LabelGuardedIntGauge,
538        skip_compact: bool,
539    ) {
540        // To reorder records, we need to buffer chunks of the entire epoch.
541        if let Some(b) = non_append_only_behavior
542            && b.should_reorder_records()
543        {
544            assert_matches!(sink_type, SinkType::Upsert | SinkType::Retract);
545
546            let mut chunk_buffer = EstimatedVec::new();
547            let mut watermark: Option<super::Watermark> = None;
548            #[for_await]
549            for msg in input {
550                match msg? {
551                    Message::Watermark(w) => watermark = Some(w),
552                    Message::Chunk(c) => {
553                        chunk_buffer.push(c);
554                        sink_chunk_buffer_size_metrics.set(chunk_buffer.estimated_size() as i64);
555                    }
556                    Message::Barrier(barrier) => {
557                        let chunks = mem::take(&mut chunk_buffer).into_inner();
558
559                        // 1. Compact the chunk based on the **stream key**, so that we have at most 2 rows for each
560                        //    stream key. Then, move all delete records to the front.
561                        let mut delete_chunks = vec![];
562                        let mut insert_chunks = vec![];
563
564                        for c in dispatch_output_kind!(sink_type, KIND, {
565                            StreamChunkCompactor::new(stream_key.clone(), chunks)
566                                .into_compacted_chunks_inline::<KIND>(input_compact_ib)
567                        }) {
568                            let chunk = force_delete_only(c.clone());
569                            if chunk.cardinality() > 0 {
570                                delete_chunks.push(chunk);
571                            }
572                            let chunk = force_append_only(c);
573                            if chunk.cardinality() > 0 {
574                                insert_chunks.push(chunk);
575                            }
576                        }
577                        let chunks = delete_chunks
578                            .into_iter()
579                            .chain(insert_chunks.into_iter())
580                            .collect();
581
582                        // 2. If user specifies a primary key, compact the chunk based on the **downstream pk**
583                        //    to eliminate any unnecessary updates to external systems. This also rewrites the
584                        //    `DELETE` and `INSERT` operations on the same key into `UPDATE` operations, which
585                        //    usually have more efficient implementation.
586                        if let Some(downstream_pk) = &downstream_pk {
587                            let chunks = dispatch_output_kind!(sink_type, KIND, {
588                                StreamChunkCompactor::new(downstream_pk.clone(), chunks)
589                                    .into_compacted_chunks_reconstructed::<KIND>(
590                                        chunk_size,
591                                        input_data_types.clone(),
592                                        // When compacting based on user provided primary key, we should never panic
593                                        // on inconsistency in case the user provided primary key is not unique.
594                                        InconsistencyBehavior::Warn,
595                                    )
596                            });
597                            for c in chunks {
598                                yield Message::Chunk(c);
599                            }
600                        } else {
601                            let mut chunk_builder =
602                                StreamChunkBuilder::new(chunk_size, input_data_types.clone());
603                            for chunk in chunks {
604                                for (op, row) in chunk.rows() {
605                                    if let Some(c) = chunk_builder.append_row(op, row) {
606                                        yield Message::Chunk(c);
607                                    }
608                                }
609                            }
610
611                            if let Some(c) = chunk_builder.take() {
612                                yield Message::Chunk(c);
613                            }
614                        };
615
616                        // 3. Forward watermark and barrier.
617                        if let Some(w) = mem::take(&mut watermark) {
618                            yield Message::Watermark(w)
619                        }
620                        yield Message::Barrier(barrier);
621                    }
622                }
623            }
624        } else {
625            #[for_await]
626            for msg in input {
627                match msg? {
628                    Message::Watermark(w) => yield Message::Watermark(w),
629                    Message::Chunk(mut chunk) => {
630                        // Compact the chunk to eliminate any unnecessary updates to external systems.
631                        // This should be performed against the downstream pk, not the stream key, to
632                        // ensure correct retract/upsert semantics from the downstream's perspective.
633                        if sink_type != SinkType::AppendOnly
634                            && let Some(downstream_pk) = &downstream_pk
635                        {
636                            if skip_compact {
637                                // We can only skip compaction if the keys are exactly the same, not just
638                                // matching by being a subset.
639                                assert_eq!(&stream_key, downstream_pk);
640                            } else {
641                                chunk = dispatch_output_kind!(sink_type, KIND, {
642                                    compact_chunk_inline::<KIND>(
643                                        chunk,
644                                        downstream_pk,
645                                        input_compact_ib,
646                                    )
647                                });
648                            }
649                        }
650                        match sink_type {
651                            SinkType::AppendOnly => yield Message::Chunk(chunk),
652                            SinkType::ForceAppendOnly => {
653                                // Force append-only by dropping UPDATE/DELETE messages. We do this when the
654                                // user forces the sink to be append-only while it is actually not based on
655                                // the frontend derivation result.
656                                yield Message::Chunk(force_append_only(chunk))
657                            }
658                            SinkType::Upsert | SinkType::Retract => yield Message::Chunk(chunk),
659                        }
660                    }
661                    Message::Barrier(barrier) => {
662                        yield Message::Barrier(barrier);
663                    }
664                }
665            }
666        }
667    }
668
669    #[expect(clippy::too_many_arguments)]
670    async fn execute_consume_log<S: Sink, R: LogReader>(
671        mut sink: S,
672        log_reader: R,
673        columns: Vec<ColumnCatalog>,
674        mut sink_param: SinkParam,
675        mut sink_writer_param: SinkWriterParam,
676        non_append_only_behavior: Option<NonAppendOnlyBehavior>,
677        input_compact_ib: InconsistencyBehavior,
678        actor_context: ActorContextRef,
679        rate_limit_rx: UnboundedReceiver<RateLimit>,
680        mut rebuild_sink_rx: UnboundedReceiver<RebuildSinkMessage>,
681    ) -> StreamExecutorResult<!> {
682        let visible_columns = columns
683            .iter()
684            .enumerate()
685            .filter_map(|(idx, column)| (!column.is_hidden).then_some(idx))
686            .collect_vec();
687
688        let labels = [
689            &actor_context.id.to_string(),
690            sink_writer_param.connector.as_str(),
691            &sink_writer_param.sink_id.to_string(),
692            sink_writer_param.sink_name.as_str(),
693        ];
694        let log_store_reader_wait_new_future_duration_ns = GLOBAL_SINK_METRICS
695            .log_store_reader_wait_new_future_duration_ns
696            .with_guarded_label_values(&labels);
697        let log_store_read_rows = GLOBAL_SINK_METRICS
698            .log_store_read_rows
699            .with_guarded_label_values(&labels);
700        let log_store_read_bytes = GLOBAL_SINK_METRICS
701            .log_store_read_bytes
702            .with_guarded_label_values(&labels);
703        let log_store_latest_read_epoch = GLOBAL_SINK_METRICS
704            .log_store_latest_read_epoch
705            .with_guarded_label_values(&labels);
706        let metrics = LogReaderMetrics {
707            log_store_latest_read_epoch,
708            log_store_read_rows,
709            log_store_read_bytes,
710            log_store_reader_wait_new_future_duration_ns,
711        };
712
713        let downstream_pk = sink_param.downstream_pk.clone();
714
715        let mut log_reader = log_reader
716            .transform_chunk(move |chunk| {
717                let chunk = if let Some(b) = non_append_only_behavior
718                    && b.should_compact_in_log_reader()
719                {
720                    // This guarantees that user has specified a `downstream_pk`.
721                    let downstream_pk = downstream_pk.as_ref().unwrap();
722                    dispatch_output_kind!(sink_param.sink_type, KIND, {
723                        compact_chunk_inline::<KIND>(chunk, downstream_pk, input_compact_ib)
724                    })
725                } else {
726                    chunk
727                };
728                if visible_columns.len() != columns.len() {
729                    // Do projection here because we may have columns that aren't visible to
730                    // the downstream.
731                    chunk.project(&visible_columns)
732                } else {
733                    chunk
734                }
735            })
736            .monitored(metrics)
737            .rate_limited(rate_limit_rx);
738
739        log_reader.init().await?;
740        loop {
741            let future = async {
742                loop {
743                    let Err(e) = sink
744                        .new_log_sinker(sink_writer_param.clone())
745                        .and_then(|log_sinker| log_sinker.consume_log_and_sink(&mut log_reader))
746                        .await;
747                    GLOBAL_ERROR_METRICS.user_sink_error.report([
748                        "sink_executor_error".to_owned(),
749                        sink_param.sink_id.to_string(),
750                        sink_param.sink_name.clone(),
751                        actor_context.fragment_id.to_string(),
752                    ]);
753
754                    if let Some(meta_client) = sink_writer_param.meta_client.as_ref() {
755                        meta_client
756                            .add_sink_fail_evet_log(
757                                sink_writer_param.sink_id,
758                                sink_writer_param.sink_name.clone(),
759                                sink_writer_param.connector.clone(),
760                                e.to_report_string(),
761                            )
762                            .await;
763                    }
764
765                    if F::ALLOW_REWIND {
766                        match log_reader.rewind().await {
767                            Ok(()) => {
768                                error!(
769                                    error = %e.as_report(),
770                                    executor_id = sink_writer_param.executor_id,
771                                    sink_id = %sink_param.sink_id,
772                                    "reset log reader stream successfully after sink error"
773                                );
774                                Ok(())
775                            }
776                            Err(rewind_err) => {
777                                error!(
778                                    error = %rewind_err.as_report(),
779                                    "fail to rewind log reader"
780                                );
781                                Err(e)
782                            }
783                        }
784                    } else {
785                        Err(e)
786                    }
787                    .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
788                }
789            };
790            select! {
791                result = future => {
792                    let Err(e): StreamExecutorResult<!> = result;
793                    return Err(e);
794                }
795                result = rebuild_sink_rx.recv() => {
796                    match result.ok_or_else(|| anyhow!("failed to receive rebuild sink notify"))? {
797                        RebuildSinkMessage::RebuildSink(new_vnode, notify) => {
798                            sink_writer_param.vnode_bitmap = Some((*new_vnode).clone());
799                            if notify.send(()).is_err() {
800                                warn!("failed to notify rebuild sink");
801                            }
802                            log_reader.init().await?;
803                        },
804                        RebuildSinkMessage::UpdateConfig(config) => {
805                            if F::ALLOW_REWIND {
806                                match log_reader.rewind().await {
807                                    Ok(()) => {
808                                        sink_param.properties.extend(config.into_iter());
809                                        sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
810                                        info!(
811                                            executor_id = sink_writer_param.executor_id,
812                                            sink_id = %sink_param.sink_id,
813                                            "alter sink config successfully with rewind"
814                                        );
815                                        Ok(())
816                                    }
817                                    Err(rewind_err) => {
818                                        error!(
819                                            error = %rewind_err.as_report(),
820                                            "fail to rewind log reader for alter sink config "
821                                        );
822                                        Err(anyhow!("fail to rewind log after alter table").into())
823                                    }
824                                }
825                            } else {
826                                sink_param.properties.extend(config.into_iter());
827                                sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
828                                Err(anyhow!("This is not an actual error condition. The system is intentionally triggering recovery procedures to ensure ALTER SINK CONFIG are fully applied.").into())
829                            }
830                            .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
831                        },
832                    }
833                }
834            }
835        }
836    }
837}
838
839enum RebuildSinkMessage {
840    RebuildSink(Arc<Bitmap>, oneshot::Sender<()>),
841    UpdateConfig(HashMap<String, String>),
842}
843
844impl<F: LogStoreFactory> Execute for SinkExecutor<F> {
845    fn execute(self: Box<Self>) -> BoxedMessageStream {
846        self.execute_inner()
847    }
848}
849
850#[cfg(test)]
851mod test {
852    use risingwave_common::catalog::{ColumnDesc, ColumnId};
853    use risingwave_common::util::epoch::test_epoch;
854    use risingwave_connector::sink::build_sink;
855
856    use super::*;
857    use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
858    use crate::executor::test_utils::*;
859
860    #[tokio::test]
861    async fn test_force_append_only_sink() {
862        use risingwave_common::array::StreamChunkTestExt;
863        use risingwave_common::array::stream_chunk::StreamChunk;
864        use risingwave_common::types::DataType;
865
866        use crate::executor::Barrier;
867
868        let properties = maplit::btreemap! {
869            "connector".into() => "blackhole".into(),
870            "type".into() => "append-only".into(),
871            "force_append_only".into() => "true".into()
872        };
873
874        // We have two visible columns and one hidden column. The hidden column will be pruned out
875        // within the sink executor.
876        let columns = vec![
877            ColumnCatalog {
878                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
879                is_hidden: false,
880            },
881            ColumnCatalog {
882                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
883                is_hidden: false,
884            },
885            ColumnCatalog {
886                column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
887                is_hidden: true,
888            },
889        ];
890        let schema: Schema = columns
891            .iter()
892            .map(|column| Field::from(column.column_desc.clone()))
893            .collect();
894        let stream_key = vec![0];
895
896        let source = MockSource::with_messages(vec![
897            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
898            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
899                " I I I
900                    + 3 2 1",
901            ))),
902            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
903            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
904                "  I I I
905                    U- 3 2 1
906                    U+ 3 4 1
907                     + 5 6 7",
908            ))),
909            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
910                " I I I
911                    - 5 6 7",
912            ))),
913        ])
914        .into_executor(schema.clone(), stream_key.clone());
915
916        let sink_param = SinkParam {
917            sink_id: 0.into(),
918            sink_name: "test".into(),
919            properties,
920
921            columns: columns
922                .iter()
923                .filter(|col| !col.is_hidden)
924                .map(|col| col.column_desc.clone())
925                .collect(),
926            downstream_pk: Some(stream_key.clone()),
927            sink_type: SinkType::ForceAppendOnly,
928            format_desc: None,
929            db_name: "test".into(),
930            sink_from_name: "test".into(),
931        };
932
933        let info = ExecutorInfo::for_test(schema, stream_key, "SinkExecutor".to_owned(), 0);
934
935        let sink = build_sink(sink_param.clone()).unwrap();
936
937        let sink_executor = SinkExecutor::new(
938            ActorContext::for_test(0),
939            info,
940            source,
941            SinkWriterParam::for_test(),
942            sink,
943            sink_param,
944            columns.clone(),
945            BoundedInMemLogStoreFactory::for_test(1),
946            1024,
947            vec![DataType::Int32, DataType::Int32, DataType::Int32],
948            None,
949        )
950        .await
951        .unwrap();
952
953        let mut executor = sink_executor.boxed().execute();
954
955        // Barrier message.
956        executor.next().await.unwrap().unwrap();
957
958        let chunk_msg = executor.next().await.unwrap().unwrap();
959        assert_eq!(
960            chunk_msg.into_chunk().unwrap().compact_vis(),
961            StreamChunk::from_pretty(
962                " I I I
963                + 3 2 1",
964            )
965        );
966
967        // Barrier message.
968        executor.next().await.unwrap().unwrap();
969
970        let chunk_msg = executor.next().await.unwrap().unwrap();
971        assert_eq!(
972            chunk_msg.into_chunk().unwrap().compact_vis(),
973            StreamChunk::from_pretty(
974                " I I I
975                + 3 4 1
976                + 5 6 7",
977            )
978        );
979
980        // Should not receive the third stream chunk message because the force-append-only sink
981        // executor will drop all DELETE messages.
982
983        // The last barrier message.
984        executor.next().await.unwrap().unwrap();
985    }
986
987    #[tokio::test]
988    async fn stream_key_sink_pk_mismatch_upsert() {
989        stream_key_sink_pk_mismatch(SinkType::Upsert).await;
990    }
991
992    #[tokio::test]
993    async fn stream_key_sink_pk_mismatch_retract() {
994        stream_key_sink_pk_mismatch(SinkType::Retract).await;
995    }
996
997    async fn stream_key_sink_pk_mismatch(sink_type: SinkType) {
998        use risingwave_common::array::StreamChunkTestExt;
999        use risingwave_common::array::stream_chunk::StreamChunk;
1000        use risingwave_common::types::DataType;
1001
1002        use crate::executor::Barrier;
1003
1004        let properties = maplit::btreemap! {
1005            "connector".into() => "blackhole".into(),
1006        };
1007
1008        // We have two visible columns and one hidden column. The hidden column will be pruned out
1009        // within the sink executor.
1010        let columns = vec![
1011            ColumnCatalog {
1012                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1013                is_hidden: false,
1014            },
1015            ColumnCatalog {
1016                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1017                is_hidden: false,
1018            },
1019            ColumnCatalog {
1020                column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1021                is_hidden: true,
1022            },
1023        ];
1024        let schema: Schema = columns
1025            .iter()
1026            .map(|column| Field::from(column.column_desc.clone()))
1027            .collect();
1028
1029        let source = MockSource::with_messages(vec![
1030            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1031            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1032                " I I I
1033                    + 1 1 10",
1034            ))),
1035            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1036            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1037                " I I I
1038                    + 1 3 30",
1039            ))),
1040            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1041                " I I I
1042                    + 1 2 20
1043                    - 1 2 20",
1044            ))),
1045            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1046                " I I I
1047                    - 1 1 10
1048                    + 1 1 40",
1049            ))),
1050            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1051        ])
1052        .into_executor(schema.clone(), vec![0, 1]);
1053
1054        let sink_param = SinkParam {
1055            sink_id: 0.into(),
1056            sink_name: "test".into(),
1057            properties,
1058
1059            columns: columns
1060                .iter()
1061                .filter(|col| !col.is_hidden)
1062                .map(|col| col.column_desc.clone())
1063                .collect(),
1064            downstream_pk: Some(vec![0]),
1065            sink_type,
1066            format_desc: None,
1067            db_name: "test".into(),
1068            sink_from_name: "test".into(),
1069        };
1070
1071        let info = ExecutorInfo::for_test(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
1072
1073        let sink = build_sink(sink_param.clone()).unwrap();
1074
1075        let sink_executor = SinkExecutor::new(
1076            ActorContext::for_test(0),
1077            info,
1078            source,
1079            SinkWriterParam::for_test(),
1080            sink,
1081            sink_param,
1082            columns.clone(),
1083            BoundedInMemLogStoreFactory::for_test(1),
1084            1024,
1085            vec![DataType::Int64, DataType::Int64, DataType::Int64],
1086            None,
1087        )
1088        .await
1089        .unwrap();
1090
1091        let mut executor = sink_executor.boxed().execute();
1092
1093        // Barrier message.
1094        executor.next().await.unwrap().unwrap();
1095
1096        let chunk_msg = executor.next().await.unwrap().unwrap();
1097        assert_eq!(
1098            chunk_msg.into_chunk().unwrap().compact_vis(),
1099            StreamChunk::from_pretty(
1100                " I I I
1101                + 1 1 10",
1102            )
1103        );
1104
1105        // Barrier message.
1106        executor.next().await.unwrap().unwrap();
1107
1108        let chunk_msg = executor.next().await.unwrap().unwrap();
1109        let expected = match sink_type {
1110            SinkType::Retract => StreamChunk::from_pretty(
1111                " I I I
1112                U- 1 1 10
1113                U+ 1 1 40",
1114            ),
1115            SinkType::Upsert => StreamChunk::from_pretty(
1116                " I I I
1117                + 1 1 40", // For upsert format, there won't be `U- 1 1 10`.
1118            ),
1119            _ => unreachable!(),
1120        };
1121        assert_eq!(chunk_msg.into_chunk().unwrap().compact_vis(), expected);
1122
1123        // The last barrier message.
1124        executor.next().await.unwrap().unwrap();
1125    }
1126
1127    #[tokio::test]
1128    async fn test_empty_barrier_sink() {
1129        use risingwave_common::types::DataType;
1130
1131        use crate::executor::Barrier;
1132
1133        let properties = maplit::btreemap! {
1134            "connector".into() => "blackhole".into(),
1135            "type".into() => "append-only".into(),
1136            "force_append_only".into() => "true".into()
1137        };
1138        let columns = vec![
1139            ColumnCatalog {
1140                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1141                is_hidden: false,
1142            },
1143            ColumnCatalog {
1144                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1145                is_hidden: false,
1146            },
1147        ];
1148        let schema: Schema = columns
1149            .iter()
1150            .map(|column| Field::from(column.column_desc.clone()))
1151            .collect();
1152        let stream_key = vec![0];
1153
1154        let source = MockSource::with_messages(vec![
1155            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1156            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1157            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1158        ])
1159        .into_executor(schema.clone(), stream_key.clone());
1160
1161        let sink_param = SinkParam {
1162            sink_id: 0.into(),
1163            sink_name: "test".into(),
1164            properties,
1165
1166            columns: columns
1167                .iter()
1168                .filter(|col| !col.is_hidden)
1169                .map(|col| col.column_desc.clone())
1170                .collect(),
1171            downstream_pk: Some(stream_key.clone()),
1172            sink_type: SinkType::ForceAppendOnly,
1173            format_desc: None,
1174            db_name: "test".into(),
1175            sink_from_name: "test".into(),
1176        };
1177
1178        let info = ExecutorInfo::for_test(schema, stream_key, "SinkExecutor".to_owned(), 0);
1179
1180        let sink = build_sink(sink_param.clone()).unwrap();
1181
1182        let sink_executor = SinkExecutor::new(
1183            ActorContext::for_test(0),
1184            info,
1185            source,
1186            SinkWriterParam::for_test(),
1187            sink,
1188            sink_param,
1189            columns,
1190            BoundedInMemLogStoreFactory::for_test(1),
1191            1024,
1192            vec![DataType::Int64, DataType::Int64],
1193            None,
1194        )
1195        .await
1196        .unwrap();
1197
1198        let mut executor = sink_executor.boxed().execute();
1199
1200        // Barrier message.
1201        assert_eq!(
1202            executor.next().await.unwrap().unwrap(),
1203            Message::Barrier(Barrier::new_test_barrier(test_epoch(1)))
1204        );
1205
1206        // Barrier message.
1207        assert_eq!(
1208            executor.next().await.unwrap().unwrap(),
1209            Message::Barrier(Barrier::new_test_barrier(test_epoch(2)))
1210        );
1211
1212        // The last barrier message.
1213        assert_eq!(
1214            executor.next().await.unwrap().unwrap(),
1215            Message::Barrier(Barrier::new_test_barrier(test_epoch(3)))
1216        );
1217    }
1218
1219    #[tokio::test]
1220    async fn test_force_compaction() {
1221        use risingwave_common::array::StreamChunkTestExt;
1222        use risingwave_common::array::stream_chunk::StreamChunk;
1223        use risingwave_common::types::DataType;
1224
1225        use crate::executor::Barrier;
1226
1227        let properties = maplit::btreemap! {
1228            "connector".into() => "blackhole".into(),
1229            "force_compaction".into() => "true".into()
1230        };
1231
1232        // We have two visible columns and one hidden column. The hidden column will be pruned out
1233        // within the sink executor.
1234        let columns = vec![
1235            ColumnCatalog {
1236                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1237                is_hidden: false,
1238            },
1239            ColumnCatalog {
1240                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1241                is_hidden: false,
1242            },
1243            ColumnCatalog {
1244                column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1245                is_hidden: true,
1246            },
1247        ];
1248        let schema: Schema = columns
1249            .iter()
1250            .map(|column| Field::from(column.column_desc.clone()))
1251            .collect();
1252
1253        let source = MockSource::with_messages(vec![
1254            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1255            Message::Chunk(StreamChunk::from_pretty(
1256                " I I I
1257                    + 1 1 10",
1258            )),
1259            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1260            Message::Chunk(StreamChunk::from_pretty(
1261                " I I I
1262                    + 1 3 30",
1263            )),
1264            Message::Chunk(StreamChunk::from_pretty(
1265                " I I I
1266                    + 1 2 20
1267                    - 1 2 20
1268                    + 1 4 10",
1269            )),
1270            Message::Chunk(StreamChunk::from_pretty(
1271                " I I I
1272                    - 1 1 10
1273                    + 1 1 40",
1274            )),
1275            Message::Chunk(StreamChunk::from_pretty(
1276                " I I I
1277                    - 1 4 30",
1278            )),
1279            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1280        ])
1281        .into_executor(schema.clone(), vec![0, 1]);
1282
1283        let sink_param = SinkParam {
1284            sink_id: 0.into(),
1285            sink_name: "test".into(),
1286            properties,
1287
1288            columns: columns
1289                .iter()
1290                .filter(|col| !col.is_hidden)
1291                .map(|col| col.column_desc.clone())
1292                .collect(),
1293            downstream_pk: Some(vec![0, 1]),
1294            sink_type: SinkType::Upsert,
1295            format_desc: None,
1296            db_name: "test".into(),
1297            sink_from_name: "test".into(),
1298        };
1299
1300        let info = ExecutorInfo::for_test(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
1301
1302        let sink = build_sink(sink_param.clone()).unwrap();
1303
1304        let sink_executor = SinkExecutor::new(
1305            ActorContext::for_test(0),
1306            info,
1307            source,
1308            SinkWriterParam::for_test(),
1309            sink,
1310            sink_param,
1311            columns.clone(),
1312            BoundedInMemLogStoreFactory::for_test(1),
1313            1024,
1314            vec![DataType::Int64, DataType::Int64, DataType::Int64],
1315            None,
1316        )
1317        .await
1318        .unwrap();
1319
1320        let mut executor = sink_executor.boxed().execute();
1321
1322        // Barrier message.
1323        executor.next().await.unwrap().unwrap();
1324
1325        let chunk_msg = executor.next().await.unwrap().unwrap();
1326        assert_eq!(
1327            chunk_msg.into_chunk().unwrap().compact_vis(),
1328            StreamChunk::from_pretty(
1329                " I I I
1330                + 1 1 10",
1331            )
1332        );
1333
1334        // Barrier message.
1335        executor.next().await.unwrap().unwrap();
1336
1337        let chunk_msg = executor.next().await.unwrap().unwrap();
1338        assert_eq!(
1339            chunk_msg.into_chunk().unwrap().compact_vis(),
1340            StreamChunk::from_pretty(
1341                " I I I
1342                + 1 3 30
1343                + 1 1 40",
1344            )
1345        );
1346
1347        // The last barrier message.
1348        executor.next().await.unwrap().unwrap();
1349    }
1350}