risingwave_stream/executor/
sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::HashMap;
17use std::mem;
18
19use anyhow::anyhow;
20use futures::stream::select;
21use futures::{FutureExt, TryFutureExt, TryStreamExt};
22use itertools::Itertools;
23use risingwave_common::array::Op;
24use risingwave_common::array::stream_chunk::StreamChunkMut;
25use risingwave_common::bitmap::Bitmap;
26use risingwave_common::catalog::{ColumnCatalog, Field};
27use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntGauge};
28use risingwave_common_estimate_size::EstimateSize;
29use risingwave_common_estimate_size::collections::EstimatedVec;
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_connector::dispatch_sink;
32use risingwave_connector::sink::catalog::{SinkId, SinkType};
33use risingwave_connector::sink::log_store::{
34    FlushCurrentEpochOptions, LogReader, LogReaderExt, LogReaderMetrics, LogStoreFactory,
35    LogWriter, LogWriterExt, LogWriterMetrics,
36};
37use risingwave_connector::sink::{
38    GLOBAL_SINK_METRICS, LogSinker, Sink, SinkImpl, SinkParam, SinkWriterParam,
39};
40use risingwave_pb::stream_plan::stream_node::StreamKind;
41use thiserror_ext::AsReport;
42use tokio::select;
43use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
44use tokio::sync::oneshot;
45
46use crate::common::change_buffer::{OutputKind, output_kind};
47use crate::common::compact_chunk::{
48    InconsistencyBehavior, StreamChunkCompactor, compact_chunk_inline,
49};
50use crate::executor::prelude::*;
51pub struct SinkExecutor<F: LogStoreFactory> {
52    actor_context: ActorContextRef,
53    info: ExecutorInfo,
54    input: Executor,
55    sink: SinkImpl,
56    input_columns: Vec<ColumnCatalog>,
57    sink_param: SinkParam,
58    log_store_factory: F,
59    sink_writer_param: SinkWriterParam,
60    chunk_size: usize,
61    input_data_types: Vec<DataType>,
62    non_append_only_behavior: Option<NonAppendOnlyBehavior>,
63    rate_limit: Option<u32>,
64}
65
66// Drop all the DELETE messages in this chunk and convert UPDATE INSERT into INSERT.
67fn force_append_only(c: StreamChunk) -> StreamChunk {
68    let mut c: StreamChunkMut = c.into();
69    for (_, mut r) in c.to_rows_mut() {
70        match r.op() {
71            Op::Insert => {}
72            Op::Delete | Op::UpdateDelete => r.set_vis(false),
73            Op::UpdateInsert => r.set_op(Op::Insert),
74        }
75    }
76    c.into()
77}
78
79// Drop all the INSERT messages in this chunk and convert UPDATE DELETE into DELETE.
80fn force_delete_only(c: StreamChunk) -> StreamChunk {
81    let mut c: StreamChunkMut = c.into();
82    for (_, mut r) in c.to_rows_mut() {
83        match r.op() {
84            Op::Delete => {}
85            Op::Insert | Op::UpdateInsert => r.set_vis(false),
86            Op::UpdateDelete => r.set_op(Op::Delete),
87        }
88    }
89    c.into()
90}
91
92/// When the sink is non-append-only, i.e. upsert or retract, we need to do some extra work for
93/// correctness and performance.
94#[derive(Clone, Copy, Debug)]
95struct NonAppendOnlyBehavior {
96    /// Whether the user specifies a primary key for the sink, and it matches the derived stream
97    /// key of the stream.
98    ///
99    /// By matching, we mean that stream key is a subset of the downstream pk.
100    pk_specified_and_matched: bool,
101}
102
103impl NonAppendOnlyBehavior {
104    /// NOTE(kwannoel):
105    ///
106    /// After the optimization in <https://github.com/risingwavelabs/risingwave/pull/12250>,
107    /// `DELETE`s will be sequenced before `INSERT`s in JDBC sinks and PG rust sink.
108    /// There's a risk that adjacent chunks with `DELETE`s on the same PK will get
109    /// merged into a single chunk, since the logstore format doesn't preserve chunk
110    /// boundaries. Then we will have double `DELETE`s followed by unspecified sequence
111    /// of `INSERT`s, and lead to inconsistent data downstream.
112    ///
113    /// We only need to do the compaction for non-append-only sinks, when the upstream and
114    /// downstream PKs are matched. When the upstream and downstream PKs are not matched,
115    /// we will buffer the chunks between two barriers, so the compaction is not needed,
116    /// since the barriers will preserve chunk boundaries.
117    ///
118    /// When the sink is an append-only sink, it is either `force_append_only` or
119    /// `append_only`, we should only append to downstream, so there should not be any
120    /// overlapping keys.
121    fn should_compact_in_log_reader(self) -> bool {
122        self.pk_specified_and_matched
123    }
124
125    /// When stream key is different from the user defined primary key columns for sinks.
126    /// The operations could be out of order.
127    ///
128    /// For example, we have a stream with derived stream key `a, b` and user-specified sink
129    /// primary key `a`. Assume that we have `(1, 1)` in the table. Then, we perform two `UPDATE`
130    /// operations:
131    ///
132    /// ```text
133    /// UPDATE SET b = 2 WHERE a = 1 ... which issues:
134    ///   - (1, 1)
135    ///   + (1, 2)
136    ///
137    /// UPDATE SET b = 3 WHERE a = 1 ... which issues:
138    ///   - (1, 2)
139    ///   + (1, 3)
140    /// ```
141    ///
142    /// When these changes go into streaming pipeline, they could be shuffled to different parallelism
143    /// (actor), given that they are under different stream keys.
144    ///
145    /// ```text
146    /// Actor 1:
147    /// - (1, 1)
148    ///
149    /// Actor 2:
150    /// + (1, 2)
151    /// - (1, 2)
152    ///
153    /// Actor 3:
154    /// + (1, 3)
155    /// ```
156    ///
157    /// When these records are merged back into sink actor, we may get the records from different
158    /// parallelism in arbitrary order, like:
159    ///
160    /// ```text
161    /// + (1, 2) -- Actor 2, first row
162    /// + (1, 3) -- Actor 3
163    /// - (1, 1) -- Actor 1
164    /// - (1, 2) -- Actor 2, second row
165    /// ```
166    ///
167    /// Note that in terms of stream key (`a, b`), the operations in the order above are completely
168    /// correct, because we are operating on 3 different rows. However, in terms of user defined sink
169    /// primary key `a`, we're violating the unique constraint all the time.
170    ///
171    /// Therefore, in this case, we have to do additional reordering in the sink executor per barrier.
172    /// Specifically, we need to:
173    ///
174    /// First, compact all the changes with the stream key, so we have:
175    /// ```text
176    /// + (1, 3)
177    /// - (1, 1)
178    /// ```
179    ///
180    /// Then, sink all the delete events before sinking all insert events, so we have:
181    /// ```text
182    /// - (1, 1)
183    /// + (1, 3)
184    /// ```
185    /// Since we've compacted the chunk with the stream key, the `DELETE` records survived must be to
186    /// delete an existing row, so we can safely move them to the front. After the deletion is done,
187    /// we can then safely sink the insert events with uniqueness guarantee.
188    fn should_reorder_records(self) -> bool {
189        !self.pk_specified_and_matched
190    }
191}
192
193/// Get the output kind for chunk compaction based on the given sink type.
194fn compact_output_kind(sink_type: SinkType) -> OutputKind {
195    match sink_type {
196        SinkType::Upsert => output_kind::UPSERT,
197        SinkType::Retract => output_kind::RETRACT,
198        // There won't be any `Update` or `Delete` in the chunk, so it doesn't matter.
199        SinkType::AppendOnly | SinkType::ForceAppendOnly => output_kind::RETRACT,
200    }
201}
202
203/// Dispatch the code block to different output kinds for chunk compaction based on sink type.
204macro_rules! dispatch_output_kind {
205    ($sink_type:expr, $KIND:ident, $body:tt) => {
206        #[allow(unused_braces)]
207        match compact_output_kind($sink_type) {
208            output_kind::UPSERT => {
209                const KIND: OutputKind = output_kind::UPSERT;
210                $body
211            }
212            output_kind::RETRACT => {
213                const KIND: OutputKind = output_kind::RETRACT;
214                $body
215            }
216        }
217    };
218}
219
220impl<F: LogStoreFactory> SinkExecutor<F> {
221    #[allow(clippy::too_many_arguments)]
222    #[expect(clippy::unused_async)]
223    pub async fn new(
224        actor_context: ActorContextRef,
225        info: ExecutorInfo,
226        input: Executor,
227        sink_writer_param: SinkWriterParam,
228        sink: SinkImpl,
229        sink_param: SinkParam,
230        columns: Vec<ColumnCatalog>,
231        log_store_factory: F,
232        chunk_size: usize,
233        input_data_types: Vec<DataType>,
234        rate_limit: Option<u32>,
235    ) -> StreamExecutorResult<Self> {
236        let sink_input_schema: Schema = columns
237            .iter()
238            .map(|column| Field::from(&column.column_desc))
239            .collect();
240
241        if let Some(col_dix) = sink_writer_param.extra_partition_col_idx {
242            // Remove the partition column from the schema.
243            assert_eq!(sink_input_schema.data_types(), {
244                let mut data_type = info.schema.data_types();
245                data_type.remove(col_dix);
246                data_type
247            });
248        } else {
249            assert_eq!(sink_input_schema.data_types(), info.schema.data_types());
250        }
251
252        let non_append_only_behavior = if !sink_param.sink_type.is_append_only() {
253            let stream_key = &info.stream_key;
254            let pk_specified_and_matched = (sink_param.downstream_pk.as_ref())
255                .is_some_and(|downstream_pk| stream_key.iter().all(|i| downstream_pk.contains(i)));
256            Some(NonAppendOnlyBehavior {
257                pk_specified_and_matched,
258            })
259        } else {
260            None
261        };
262
263        tracing::info!(
264            sink_id = sink_param.sink_id.sink_id,
265            actor_id = actor_context.id,
266            ?non_append_only_behavior,
267            "Sink executor info"
268        );
269
270        Ok(Self {
271            actor_context,
272            info,
273            input,
274            sink,
275            input_columns: columns,
276            sink_param,
277            log_store_factory,
278            sink_writer_param,
279            chunk_size,
280            input_data_types,
281            non_append_only_behavior,
282            rate_limit,
283        })
284    }
285
286    fn execute_inner(self) -> BoxedMessageStream {
287        let sink_id = self.sink_param.sink_id;
288        let actor_id = self.actor_context.id;
289        let fragment_id = self.actor_context.fragment_id;
290
291        let stream_key = self.info.stream_key.clone();
292        let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics(
293            sink_id,
294            actor_id,
295            fragment_id,
296        );
297
298        // When processing upsert stream, we need to tolerate the inconsistency (mismatched `DELETE`
299        // and `INSERT` pairs) when compacting input chunks with derived stream key.
300        let input_compact_ib = if self.input.stream_kind() == StreamKind::Upsert {
301            InconsistencyBehavior::Tolerate
302        } else {
303            InconsistencyBehavior::Panic
304        };
305
306        let input = self.input.execute();
307
308        let input = input.inspect_ok(move |msg| {
309            if let Message::Chunk(c) = msg {
310                metrics.sink_input_row_count.inc_by(c.capacity() as u64);
311                metrics.sink_input_bytes.inc_by(c.estimated_size() as u64);
312            }
313        });
314
315        let processed_input = Self::process_msg(
316            input,
317            self.sink_param.sink_type,
318            stream_key,
319            self.chunk_size,
320            self.input_data_types,
321            input_compact_ib,
322            self.sink_param.downstream_pk.clone(),
323            self.non_append_only_behavior,
324            metrics.sink_chunk_buffer_size,
325            self.sink.is_blackhole(), // skip compact for blackhole for better benchmark results
326        );
327
328        if self.sink.is_sink_into_table() {
329            // TODO(hzxa21): support rate limit?
330            processed_input.boxed()
331        } else {
332            let labels = [
333                &actor_id.to_string(),
334                &sink_id.to_string(),
335                self.sink_param.sink_name.as_str(),
336            ];
337            let log_store_first_write_epoch = GLOBAL_SINK_METRICS
338                .log_store_first_write_epoch
339                .with_guarded_label_values(&labels);
340            let log_store_latest_write_epoch = GLOBAL_SINK_METRICS
341                .log_store_latest_write_epoch
342                .with_guarded_label_values(&labels);
343            let log_store_write_rows = GLOBAL_SINK_METRICS
344                .log_store_write_rows
345                .with_guarded_label_values(&labels);
346            let log_writer_metrics = LogWriterMetrics {
347                log_store_first_write_epoch,
348                log_store_latest_write_epoch,
349                log_store_write_rows,
350            };
351
352            let (rate_limit_tx, rate_limit_rx) = unbounded_channel();
353            // Init the rate limit
354            rate_limit_tx.send(self.rate_limit.into()).unwrap();
355
356            let (rebuild_sink_tx, rebuild_sink_rx) = unbounded_channel();
357
358            self.log_store_factory
359                .build()
360                .map(move |(log_reader, log_writer)| {
361                    let write_log_stream = Self::execute_write_log(
362                        processed_input,
363                        log_writer.monitored(log_writer_metrics),
364                        actor_id,
365                        sink_id,
366                        rate_limit_tx,
367                        rebuild_sink_tx,
368                    );
369
370                    let consume_log_stream_future = dispatch_sink!(self.sink, sink, {
371                        let consume_log_stream = Self::execute_consume_log(
372                            *sink,
373                            log_reader,
374                            self.input_columns,
375                            self.sink_param,
376                            self.sink_writer_param,
377                            self.non_append_only_behavior,
378                            input_compact_ib,
379                            self.actor_context,
380                            rate_limit_rx,
381                            rebuild_sink_rx,
382                        )
383                        .instrument_await(
384                            await_tree::span!("consume_log (sink_id {sink_id})").long_running(),
385                        )
386                        .map_ok(|never| never); // unify return type to `Message`
387
388                        consume_log_stream.boxed()
389                    });
390                    select(consume_log_stream_future.into_stream(), write_log_stream)
391                })
392                .into_stream()
393                .flatten()
394                .boxed()
395        }
396    }
397
398    #[try_stream(ok = Message, error = StreamExecutorError)]
399    async fn execute_write_log<W: LogWriter>(
400        input: impl MessageStream,
401        mut log_writer: W,
402        actor_id: ActorId,
403        sink_id: SinkId,
404        rate_limit_tx: UnboundedSender<RateLimit>,
405        rebuild_sink_tx: UnboundedSender<RebuildSinkMessage>,
406    ) {
407        pin_mut!(input);
408        let barrier = expect_first_barrier(&mut input).await?;
409        let epoch_pair = barrier.epoch;
410        let is_pause_on_startup = barrier.is_pause_on_startup();
411        // Propagate the first barrier
412        yield Message::Barrier(barrier);
413
414        log_writer.init(epoch_pair, is_pause_on_startup).await?;
415
416        let mut is_paused = false;
417
418        #[for_await]
419        for msg in input {
420            match msg? {
421                Message::Watermark(w) => yield Message::Watermark(w),
422                Message::Chunk(chunk) => {
423                    assert!(
424                        !is_paused,
425                        "Actor {actor_id} should not receive any data after pause"
426                    );
427                    log_writer.write_chunk(chunk.clone()).await?;
428                    yield Message::Chunk(chunk);
429                }
430                Message::Barrier(barrier) => {
431                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
432                    let add_columns = barrier.as_sink_add_columns(sink_id);
433                    if let Some(add_columns) = &add_columns {
434                        info!(?add_columns, %sink_id, "sink receive add columns");
435                    }
436                    let post_flush = log_writer
437                        .flush_current_epoch(
438                            barrier.epoch.curr,
439                            FlushCurrentEpochOptions {
440                                is_checkpoint: barrier.kind.is_checkpoint(),
441                                new_vnode_bitmap: update_vnode_bitmap.clone(),
442                                is_stop: barrier.is_stop(actor_id),
443                                add_columns,
444                            },
445                        )
446                        .await?;
447
448                    let mutation = barrier.mutation.clone();
449                    yield Message::Barrier(barrier);
450                    if F::REBUILD_SINK_ON_UPDATE_VNODE_BITMAP
451                        && let Some(new_vnode_bitmap) = update_vnode_bitmap.clone()
452                    {
453                        let (tx, rx) = oneshot::channel();
454                        rebuild_sink_tx
455                            .send(RebuildSinkMessage::RebuildSink(new_vnode_bitmap, tx))
456                            .map_err(|_| anyhow!("fail to send rebuild sink to reader"))?;
457                        rx.await
458                            .map_err(|_| anyhow!("fail to wait rebuild sink finish"))?;
459                    }
460                    post_flush.post_yield_barrier().await?;
461
462                    if let Some(mutation) = mutation.as_deref() {
463                        match mutation {
464                            Mutation::Pause => {
465                                log_writer.pause()?;
466                                is_paused = true;
467                            }
468                            Mutation::Resume => {
469                                log_writer.resume()?;
470                                is_paused = false;
471                            }
472                            Mutation::Throttle(actor_to_apply) => {
473                                if let Some(new_rate_limit) = actor_to_apply.get(&actor_id) {
474                                    tracing::info!(
475                                        rate_limit = new_rate_limit,
476                                        "received sink rate limit on actor {actor_id}"
477                                    );
478                                    if let Err(e) = rate_limit_tx.send((*new_rate_limit).into()) {
479                                        error!(
480                                            error = %e.as_report(),
481                                            "fail to send sink ate limit update"
482                                        );
483                                        return Err(StreamExecutorError::from(
484                                            e.to_report_string(),
485                                        ));
486                                    }
487                                }
488                            }
489                            Mutation::ConnectorPropsChange(config) => {
490                                if let Some(map) = config.get(&sink_id.sink_id)
491                                    && let Err(e) = rebuild_sink_tx
492                                        .send(RebuildSinkMessage::UpdateConfig(map.clone()))
493                                {
494                                    error!(
495                                        error = %e.as_report(),
496                                        "fail to send sink alter props"
497                                    );
498                                    return Err(StreamExecutorError::from(e.to_report_string()));
499                                }
500                            }
501                            _ => (),
502                        }
503                    }
504                }
505            }
506        }
507    }
508
509    #[allow(clippy::too_many_arguments)]
510    #[try_stream(ok = Message, error = StreamExecutorError)]
511    async fn process_msg(
512        input: impl MessageStream,
513        sink_type: SinkType,
514        stream_key: StreamKey,
515        chunk_size: usize,
516        input_data_types: Vec<DataType>,
517        input_compact_ib: InconsistencyBehavior,
518        downstream_pk: Option<Vec<usize>>,
519        non_append_only_behavior: Option<NonAppendOnlyBehavior>,
520        sink_chunk_buffer_size_metrics: LabelGuardedIntGauge,
521        skip_compact: bool,
522    ) {
523        // To reorder records, we need to buffer chunks of the entire epoch.
524        if let Some(b) = non_append_only_behavior
525            && b.should_reorder_records()
526        {
527            assert_matches!(sink_type, SinkType::Upsert | SinkType::Retract);
528
529            let mut chunk_buffer = EstimatedVec::new();
530            let mut watermark: Option<super::Watermark> = None;
531            #[for_await]
532            for msg in input {
533                match msg? {
534                    Message::Watermark(w) => watermark = Some(w),
535                    Message::Chunk(c) => {
536                        chunk_buffer.push(c);
537                        sink_chunk_buffer_size_metrics.set(chunk_buffer.estimated_size() as i64);
538                    }
539                    Message::Barrier(barrier) => {
540                        let chunks = mem::take(&mut chunk_buffer).into_inner();
541
542                        // 1. Compact the chunk based on the **stream key**, so that we have at most 2 rows for each
543                        //    stream key. Then, move all delete records to the front.
544                        let mut delete_chunks = vec![];
545                        let mut insert_chunks = vec![];
546
547                        for c in dispatch_output_kind!(sink_type, KIND, {
548                            StreamChunkCompactor::new(stream_key.clone(), chunks)
549                                .into_compacted_chunks_inline::<KIND>(input_compact_ib)
550                        }) {
551                            let chunk = force_delete_only(c.clone());
552                            if chunk.cardinality() > 0 {
553                                delete_chunks.push(chunk);
554                            }
555                            let chunk = force_append_only(c);
556                            if chunk.cardinality() > 0 {
557                                insert_chunks.push(chunk);
558                            }
559                        }
560                        let chunks = delete_chunks
561                            .into_iter()
562                            .chain(insert_chunks.into_iter())
563                            .collect();
564
565                        // 2. If user specifies a primary key, compact the chunk based on the **downstream pk**
566                        //    to eliminate any unnecessary updates to external systems. This also rewrites the
567                        //    `DELETE` and `INSERT` operations on the same key into `UPDATE` operations, which
568                        //    usually have more efficient implementation.
569                        if let Some(downstream_pk) = &downstream_pk {
570                            let chunks = dispatch_output_kind!(sink_type, KIND, {
571                                StreamChunkCompactor::new(downstream_pk.clone(), chunks)
572                                    .into_compacted_chunks_reconstructed::<KIND>(
573                                        chunk_size,
574                                        input_data_types.clone(),
575                                        // When compacting based on user provided primary key, we should never panic
576                                        // on inconsistency in case the user provided primary key is not unique.
577                                        InconsistencyBehavior::Warn,
578                                    )
579                            });
580                            for c in chunks {
581                                yield Message::Chunk(c);
582                            }
583                        } else {
584                            let mut chunk_builder =
585                                StreamChunkBuilder::new(chunk_size, input_data_types.clone());
586                            for chunk in chunks {
587                                for (op, row) in chunk.rows() {
588                                    if let Some(c) = chunk_builder.append_row(op, row) {
589                                        yield Message::Chunk(c);
590                                    }
591                                }
592                            }
593
594                            if let Some(c) = chunk_builder.take() {
595                                yield Message::Chunk(c);
596                            }
597                        };
598
599                        // 3. Forward watermark and barrier.
600                        if let Some(w) = mem::take(&mut watermark) {
601                            yield Message::Watermark(w)
602                        }
603                        yield Message::Barrier(barrier);
604                    }
605                }
606            }
607        } else {
608            #[for_await]
609            for msg in input {
610                match msg? {
611                    Message::Watermark(w) => yield Message::Watermark(w),
612                    Message::Chunk(mut chunk) => {
613                        // Compact the chunk to eliminate any unnecessary updates to external systems.
614                        if !skip_compact {
615                            chunk = dispatch_output_kind!(sink_type, KIND, {
616                                compact_chunk_inline::<KIND>(chunk, &stream_key, input_compact_ib)
617                            });
618                        }
619                        match sink_type {
620                            SinkType::AppendOnly => yield Message::Chunk(chunk),
621                            SinkType::ForceAppendOnly => {
622                                // Force append-only by dropping UPDATE/DELETE messages. We do this when the
623                                // user forces the sink to be append-only while it is actually not based on
624                                // the frontend derivation result.
625                                yield Message::Chunk(force_append_only(chunk))
626                            }
627                            SinkType::Upsert | SinkType::Retract => yield Message::Chunk(chunk),
628                        }
629                    }
630                    Message::Barrier(barrier) => {
631                        yield Message::Barrier(barrier);
632                    }
633                }
634            }
635        }
636    }
637
638    #[expect(clippy::too_many_arguments)]
639    async fn execute_consume_log<S: Sink, R: LogReader>(
640        mut sink: S,
641        log_reader: R,
642        columns: Vec<ColumnCatalog>,
643        mut sink_param: SinkParam,
644        mut sink_writer_param: SinkWriterParam,
645        non_append_only_behavior: Option<NonAppendOnlyBehavior>,
646        input_compact_ib: InconsistencyBehavior,
647        actor_context: ActorContextRef,
648        rate_limit_rx: UnboundedReceiver<RateLimit>,
649        mut rebuild_sink_rx: UnboundedReceiver<RebuildSinkMessage>,
650    ) -> StreamExecutorResult<!> {
651        let visible_columns = columns
652            .iter()
653            .enumerate()
654            .filter_map(|(idx, column)| (!column.is_hidden).then_some(idx))
655            .collect_vec();
656
657        let labels = [
658            &actor_context.id.to_string(),
659            sink_writer_param.connector.as_str(),
660            &sink_writer_param.sink_id.to_string(),
661            sink_writer_param.sink_name.as_str(),
662        ];
663        let log_store_reader_wait_new_future_duration_ns = GLOBAL_SINK_METRICS
664            .log_store_reader_wait_new_future_duration_ns
665            .with_guarded_label_values(&labels);
666        let log_store_read_rows = GLOBAL_SINK_METRICS
667            .log_store_read_rows
668            .with_guarded_label_values(&labels);
669        let log_store_read_bytes = GLOBAL_SINK_METRICS
670            .log_store_read_bytes
671            .with_guarded_label_values(&labels);
672        let log_store_latest_read_epoch = GLOBAL_SINK_METRICS
673            .log_store_latest_read_epoch
674            .with_guarded_label_values(&labels);
675        let metrics = LogReaderMetrics {
676            log_store_latest_read_epoch,
677            log_store_read_rows,
678            log_store_read_bytes,
679            log_store_reader_wait_new_future_duration_ns,
680        };
681
682        let downstream_pk = sink_param.downstream_pk.clone();
683
684        let mut log_reader = log_reader
685            .transform_chunk(move |chunk| {
686                let chunk = if let Some(b) = non_append_only_behavior
687                    && b.should_compact_in_log_reader()
688                {
689                    // This guarantees that user has specified a `downstream_pk`.
690                    let downstream_pk = downstream_pk.as_ref().unwrap();
691                    dispatch_output_kind!(sink_param.sink_type, KIND, {
692                        compact_chunk_inline::<KIND>(chunk, downstream_pk, input_compact_ib)
693                    })
694                } else {
695                    chunk
696                };
697                if visible_columns.len() != columns.len() {
698                    // Do projection here because we may have columns that aren't visible to
699                    // the downstream.
700                    chunk.project(&visible_columns)
701                } else {
702                    chunk
703                }
704            })
705            .monitored(metrics)
706            .rate_limited(rate_limit_rx);
707
708        log_reader.init().await?;
709        loop {
710            let future = async {
711                loop {
712                    let Err(e) = sink
713                        .new_log_sinker(sink_writer_param.clone())
714                        .and_then(|log_sinker| log_sinker.consume_log_and_sink(&mut log_reader))
715                        .await;
716                    GLOBAL_ERROR_METRICS.user_sink_error.report([
717                        "sink_executor_error".to_owned(),
718                        sink_param.sink_id.to_string(),
719                        sink_param.sink_name.clone(),
720                        actor_context.fragment_id.to_string(),
721                    ]);
722
723                    if let Some(meta_client) = sink_writer_param.meta_client.as_ref() {
724                        meta_client
725                            .add_sink_fail_evet_log(
726                                sink_writer_param.sink_id.sink_id,
727                                sink_writer_param.sink_name.clone(),
728                                sink_writer_param.connector.clone(),
729                                e.to_report_string(),
730                            )
731                            .await;
732                    }
733
734                    if F::ALLOW_REWIND {
735                        match log_reader.rewind().await {
736                            Ok(()) => {
737                                error!(
738                                    error = %e.as_report(),
739                                    executor_id = sink_writer_param.executor_id,
740                                    sink_id = sink_param.sink_id.sink_id,
741                                    "reset log reader stream successfully after sink error"
742                                );
743                                Ok(())
744                            }
745                            Err(rewind_err) => {
746                                error!(
747                                    error = %rewind_err.as_report(),
748                                    "fail to rewind log reader"
749                                );
750                                Err(e)
751                            }
752                        }
753                    } else {
754                        Err(e)
755                    }
756                    .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
757                }
758            };
759            select! {
760                result = future => {
761                    let Err(e): StreamExecutorResult<!> = result;
762                    return Err(e);
763                }
764                result = rebuild_sink_rx.recv() => {
765                    match result.ok_or_else(|| anyhow!("failed to receive rebuild sink notify"))? {
766                        RebuildSinkMessage::RebuildSink(new_vnode, notify) => {
767                            sink_writer_param.vnode_bitmap = Some((*new_vnode).clone());
768                            if notify.send(()).is_err() {
769                                warn!("failed to notify rebuild sink");
770                            }
771                            log_reader.init().await?;
772                        },
773                        RebuildSinkMessage::UpdateConfig(config) => {
774                            if F::ALLOW_REWIND {
775                                match log_reader.rewind().await {
776                                    Ok(()) => {
777                                        sink_param.properties.extend(config.into_iter());
778                                        sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
779                                        info!(
780                                            executor_id = sink_writer_param.executor_id,
781                                            sink_id = sink_param.sink_id.sink_id,
782                                            "alter sink config successfully with rewind"
783                                        );
784                                        Ok(())
785                                    }
786                                    Err(rewind_err) => {
787                                        error!(
788                                            error = %rewind_err.as_report(),
789                                            "fail to rewind log reader for alter sink config "
790                                        );
791                                        Err(anyhow!("fail to rewind log after alter table").into())
792                                    }
793                                }
794                            } else {
795                                sink_param.properties.extend(config.into_iter());
796                                sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
797                                Err(anyhow!("This is not an actual error condition. The system is intentionally triggering recovery procedures to ensure ALTER SINK CONFIG are fully applied.").into())
798                            }
799                            .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
800                        },
801                    }
802                }
803            }
804        }
805    }
806}
807
808enum RebuildSinkMessage {
809    RebuildSink(Arc<Bitmap>, oneshot::Sender<()>),
810    UpdateConfig(HashMap<String, String>),
811}
812
813impl<F: LogStoreFactory> Execute for SinkExecutor<F> {
814    fn execute(self: Box<Self>) -> BoxedMessageStream {
815        self.execute_inner()
816    }
817}
818
819#[cfg(test)]
820mod test {
821    use risingwave_common::catalog::{ColumnDesc, ColumnId};
822    use risingwave_common::util::epoch::test_epoch;
823    use risingwave_connector::sink::build_sink;
824
825    use super::*;
826    use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
827    use crate::executor::test_utils::*;
828
829    #[tokio::test]
830    async fn test_force_append_only_sink() {
831        use risingwave_common::array::StreamChunkTestExt;
832        use risingwave_common::array::stream_chunk::StreamChunk;
833        use risingwave_common::types::DataType;
834
835        use crate::executor::Barrier;
836
837        let properties = maplit::btreemap! {
838            "connector".into() => "blackhole".into(),
839            "type".into() => "append-only".into(),
840            "force_append_only".into() => "true".into()
841        };
842
843        // We have two visible columns and one hidden column. The hidden column will be pruned out
844        // within the sink executor.
845        let columns = vec![
846            ColumnCatalog {
847                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
848                is_hidden: false,
849            },
850            ColumnCatalog {
851                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
852                is_hidden: false,
853            },
854            ColumnCatalog {
855                column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
856                is_hidden: true,
857            },
858        ];
859        let schema: Schema = columns
860            .iter()
861            .map(|column| Field::from(column.column_desc.clone()))
862            .collect();
863        let stream_key = vec![0];
864
865        let source = MockSource::with_messages(vec![
866            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
867            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
868                " I I I
869                    + 3 2 1",
870            ))),
871            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
872            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
873                "  I I I
874                    U- 3 2 1
875                    U+ 3 4 1
876                     + 5 6 7",
877            ))),
878            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
879                " I I I
880                    - 5 6 7",
881            ))),
882        ])
883        .into_executor(schema.clone(), stream_key.clone());
884
885        let sink_param = SinkParam {
886            sink_id: 0.into(),
887            sink_name: "test".into(),
888            properties,
889
890            columns: columns
891                .iter()
892                .filter(|col| !col.is_hidden)
893                .map(|col| col.column_desc.clone())
894                .collect(),
895            downstream_pk: Some(stream_key.clone()),
896            sink_type: SinkType::ForceAppendOnly,
897            format_desc: None,
898            db_name: "test".into(),
899            sink_from_name: "test".into(),
900        };
901
902        let info = ExecutorInfo::for_test(schema, stream_key, "SinkExecutor".to_owned(), 0);
903
904        let sink = build_sink(sink_param.clone()).unwrap();
905
906        let sink_executor = SinkExecutor::new(
907            ActorContext::for_test(0),
908            info,
909            source,
910            SinkWriterParam::for_test(),
911            sink,
912            sink_param,
913            columns.clone(),
914            BoundedInMemLogStoreFactory::for_test(1),
915            1024,
916            vec![DataType::Int32, DataType::Int32, DataType::Int32],
917            None,
918        )
919        .await
920        .unwrap();
921
922        let mut executor = sink_executor.boxed().execute();
923
924        // Barrier message.
925        executor.next().await.unwrap().unwrap();
926
927        let chunk_msg = executor.next().await.unwrap().unwrap();
928        assert_eq!(
929            chunk_msg.into_chunk().unwrap().compact_vis(),
930            StreamChunk::from_pretty(
931                " I I I
932                + 3 2 1",
933            )
934        );
935
936        // Barrier message.
937        executor.next().await.unwrap().unwrap();
938
939        let chunk_msg = executor.next().await.unwrap().unwrap();
940        assert_eq!(
941            chunk_msg.into_chunk().unwrap().compact_vis(),
942            StreamChunk::from_pretty(
943                " I I I
944                + 3 4 1
945                + 5 6 7",
946            )
947        );
948
949        // Should not receive the third stream chunk message because the force-append-only sink
950        // executor will drop all DELETE messages.
951
952        // The last barrier message.
953        executor.next().await.unwrap().unwrap();
954    }
955
956    #[tokio::test]
957    async fn stream_key_sink_pk_mismatch_upsert() {
958        stream_key_sink_pk_mismatch(SinkType::Upsert).await;
959    }
960
961    #[tokio::test]
962    async fn stream_key_sink_pk_mismatch_retract() {
963        stream_key_sink_pk_mismatch(SinkType::Retract).await;
964    }
965
966    async fn stream_key_sink_pk_mismatch(sink_type: SinkType) {
967        use risingwave_common::array::StreamChunkTestExt;
968        use risingwave_common::array::stream_chunk::StreamChunk;
969        use risingwave_common::types::DataType;
970
971        use crate::executor::Barrier;
972
973        let properties = maplit::btreemap! {
974            "connector".into() => "blackhole".into(),
975        };
976
977        // We have two visible columns and one hidden column. The hidden column will be pruned out
978        // within the sink executor.
979        let columns = vec![
980            ColumnCatalog {
981                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
982                is_hidden: false,
983            },
984            ColumnCatalog {
985                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
986                is_hidden: false,
987            },
988            ColumnCatalog {
989                column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
990                is_hidden: true,
991            },
992        ];
993        let schema: Schema = columns
994            .iter()
995            .map(|column| Field::from(column.column_desc.clone()))
996            .collect();
997
998        let source = MockSource::with_messages(vec![
999            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1000            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1001                " I I I
1002                    + 1 1 10",
1003            ))),
1004            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1005            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1006                " I I I
1007                    + 1 3 30",
1008            ))),
1009            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1010                " I I I
1011                    + 1 2 20
1012                    - 1 2 20",
1013            ))),
1014            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1015                " I I I
1016                    - 1 1 10
1017                    + 1 1 40",
1018            ))),
1019            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1020        ])
1021        .into_executor(schema.clone(), vec![0, 1]);
1022
1023        let sink_param = SinkParam {
1024            sink_id: 0.into(),
1025            sink_name: "test".into(),
1026            properties,
1027
1028            columns: columns
1029                .iter()
1030                .filter(|col| !col.is_hidden)
1031                .map(|col| col.column_desc.clone())
1032                .collect(),
1033            downstream_pk: Some(vec![0]),
1034            sink_type,
1035            format_desc: None,
1036            db_name: "test".into(),
1037            sink_from_name: "test".into(),
1038        };
1039
1040        let info = ExecutorInfo::for_test(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
1041
1042        let sink = build_sink(sink_param.clone()).unwrap();
1043
1044        let sink_executor = SinkExecutor::new(
1045            ActorContext::for_test(0),
1046            info,
1047            source,
1048            SinkWriterParam::for_test(),
1049            sink,
1050            sink_param,
1051            columns.clone(),
1052            BoundedInMemLogStoreFactory::for_test(1),
1053            1024,
1054            vec![DataType::Int64, DataType::Int64, DataType::Int64],
1055            None,
1056        )
1057        .await
1058        .unwrap();
1059
1060        let mut executor = sink_executor.boxed().execute();
1061
1062        // Barrier message.
1063        executor.next().await.unwrap().unwrap();
1064
1065        let chunk_msg = executor.next().await.unwrap().unwrap();
1066        assert_eq!(
1067            chunk_msg.into_chunk().unwrap().compact_vis(),
1068            StreamChunk::from_pretty(
1069                " I I I
1070                + 1 1 10",
1071            )
1072        );
1073
1074        // Barrier message.
1075        executor.next().await.unwrap().unwrap();
1076
1077        let chunk_msg = executor.next().await.unwrap().unwrap();
1078        let expected = match sink_type {
1079            SinkType::Retract => StreamChunk::from_pretty(
1080                " I I I
1081                U- 1 1 10
1082                U+ 1 1 40",
1083            ),
1084            SinkType::Upsert => StreamChunk::from_pretty(
1085                " I I I
1086                + 1 1 40", // For upsert format, there won't be `U- 1 1 10`.
1087            ),
1088            _ => unreachable!(),
1089        };
1090        assert_eq!(chunk_msg.into_chunk().unwrap().compact_vis(), expected);
1091
1092        // The last barrier message.
1093        executor.next().await.unwrap().unwrap();
1094    }
1095
1096    #[tokio::test]
1097    async fn test_empty_barrier_sink() {
1098        use risingwave_common::types::DataType;
1099
1100        use crate::executor::Barrier;
1101
1102        let properties = maplit::btreemap! {
1103            "connector".into() => "blackhole".into(),
1104            "type".into() => "append-only".into(),
1105            "force_append_only".into() => "true".into()
1106        };
1107        let columns = vec![
1108            ColumnCatalog {
1109                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1110                is_hidden: false,
1111            },
1112            ColumnCatalog {
1113                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1114                is_hidden: false,
1115            },
1116        ];
1117        let schema: Schema = columns
1118            .iter()
1119            .map(|column| Field::from(column.column_desc.clone()))
1120            .collect();
1121        let stream_key = vec![0];
1122
1123        let source = MockSource::with_messages(vec![
1124            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1125            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1126            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1127        ])
1128        .into_executor(schema.clone(), stream_key.clone());
1129
1130        let sink_param = SinkParam {
1131            sink_id: 0.into(),
1132            sink_name: "test".into(),
1133            properties,
1134
1135            columns: columns
1136                .iter()
1137                .filter(|col| !col.is_hidden)
1138                .map(|col| col.column_desc.clone())
1139                .collect(),
1140            downstream_pk: Some(stream_key.clone()),
1141            sink_type: SinkType::ForceAppendOnly,
1142            format_desc: None,
1143            db_name: "test".into(),
1144            sink_from_name: "test".into(),
1145        };
1146
1147        let info = ExecutorInfo::for_test(schema, stream_key, "SinkExecutor".to_owned(), 0);
1148
1149        let sink = build_sink(sink_param.clone()).unwrap();
1150
1151        let sink_executor = SinkExecutor::new(
1152            ActorContext::for_test(0),
1153            info,
1154            source,
1155            SinkWriterParam::for_test(),
1156            sink,
1157            sink_param,
1158            columns,
1159            BoundedInMemLogStoreFactory::for_test(1),
1160            1024,
1161            vec![DataType::Int64, DataType::Int64],
1162            None,
1163        )
1164        .await
1165        .unwrap();
1166
1167        let mut executor = sink_executor.boxed().execute();
1168
1169        // Barrier message.
1170        assert_eq!(
1171            executor.next().await.unwrap().unwrap(),
1172            Message::Barrier(Barrier::new_test_barrier(test_epoch(1)))
1173        );
1174
1175        // Barrier message.
1176        assert_eq!(
1177            executor.next().await.unwrap().unwrap(),
1178            Message::Barrier(Barrier::new_test_barrier(test_epoch(2)))
1179        );
1180
1181        // The last barrier message.
1182        assert_eq!(
1183            executor.next().await.unwrap().unwrap(),
1184            Message::Barrier(Barrier::new_test_barrier(test_epoch(3)))
1185        );
1186    }
1187}