risingwave_stream/executor/
sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::mem;
17
18use anyhow::anyhow;
19use futures::stream::select;
20use futures::{FutureExt, TryFutureExt, TryStreamExt};
21use itertools::Itertools;
22use risingwave_common::array::Op;
23use risingwave_common::array::stream_chunk::StreamChunkMut;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::catalog::{ColumnCatalog, Field};
26use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntGauge};
27use risingwave_common_estimate_size::EstimateSize;
28use risingwave_common_estimate_size::collections::EstimatedVec;
29use risingwave_common_rate_limit::RateLimit;
30use risingwave_connector::dispatch_sink;
31use risingwave_connector::sink::catalog::{SinkId, SinkType};
32use risingwave_connector::sink::log_store::{
33    FlushCurrentEpochOptions, LogReader, LogReaderExt, LogReaderMetrics, LogStoreFactory,
34    LogWriter, LogWriterExt, LogWriterMetrics,
35};
36use risingwave_connector::sink::{
37    GLOBAL_SINK_METRICS, LogSinker, Sink, SinkImpl, SinkParam, SinkWriterParam,
38};
39use risingwave_pb::stream_plan::stream_node::StreamKind;
40use thiserror_ext::AsReport;
41use tokio::select;
42use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
43use tokio::sync::oneshot;
44
45use crate::common::compact_chunk::{InconsistencyBehavior, StreamChunkCompactor, merge_chunk_row};
46use crate::executor::prelude::*;
47pub struct SinkExecutor<F: LogStoreFactory> {
48    actor_context: ActorContextRef,
49    info: ExecutorInfo,
50    input: Executor,
51    sink: SinkImpl,
52    input_columns: Vec<ColumnCatalog>,
53    sink_param: SinkParam,
54    log_store_factory: F,
55    sink_writer_param: SinkWriterParam,
56    chunk_size: usize,
57    input_data_types: Vec<DataType>,
58    need_advance_delete: bool,
59    re_construct_with_sink_pk: bool,
60    /// Upstream and Downstream PK matched.
61    pk_matched: bool,
62    compact_chunk: bool,
63    rate_limit: Option<u32>,
64}
65
66// Drop all the DELETE messages in this chunk and convert UPDATE INSERT into INSERT.
67fn force_append_only(c: StreamChunk) -> StreamChunk {
68    let mut c: StreamChunkMut = c.into();
69    for (_, mut r) in c.to_rows_mut() {
70        match r.op() {
71            Op::Insert => {}
72            Op::Delete | Op::UpdateDelete => r.set_vis(false),
73            Op::UpdateInsert => r.set_op(Op::Insert),
74        }
75    }
76    c.into()
77}
78
79// Drop all the INSERT messages in this chunk and convert UPDATE DELETE into DELETE.
80fn force_delete_only(c: StreamChunk) -> StreamChunk {
81    let mut c: StreamChunkMut = c.into();
82    for (_, mut r) in c.to_rows_mut() {
83        match r.op() {
84            Op::Delete => {}
85            Op::Insert | Op::UpdateInsert => r.set_vis(false),
86            Op::UpdateDelete => r.set_op(Op::Delete),
87        }
88    }
89    c.into()
90}
91
92impl<F: LogStoreFactory> SinkExecutor<F> {
93    #[allow(clippy::too_many_arguments)]
94    #[expect(clippy::unused_async)]
95    pub async fn new(
96        actor_context: ActorContextRef,
97        info: ExecutorInfo,
98        input: Executor,
99        sink_writer_param: SinkWriterParam,
100        sink: SinkImpl,
101        sink_param: SinkParam,
102        columns: Vec<ColumnCatalog>,
103        log_store_factory: F,
104        chunk_size: usize,
105        input_data_types: Vec<DataType>,
106        rate_limit: Option<u32>,
107    ) -> StreamExecutorResult<Self> {
108        let sink_input_schema: Schema = columns
109            .iter()
110            .map(|column| Field::from(&column.column_desc))
111            .collect();
112
113        if let Some(col_dix) = sink_writer_param.extra_partition_col_idx {
114            // Remove the partition column from the schema.
115            assert_eq!(sink_input_schema.data_types(), {
116                let mut data_type = info.schema.data_types();
117                data_type.remove(col_dix);
118                data_type
119            });
120        } else {
121            assert_eq!(sink_input_schema.data_types(), info.schema.data_types());
122        }
123
124        let stream_key = info.pk_indices.clone();
125        let stream_key_sink_pk_mismatch = {
126            stream_key
127                .iter()
128                .any(|i| !sink_param.downstream_pk.contains(i))
129        };
130        // When stream key is different from the user defined primary key columns for sinks. The operations could be out of order
131        // stream key: a,b
132        // sink pk: a
133
134        // original:
135        // (1,1) -> (1,2)
136        // (1,2) -> (1,3)
137
138        // mv fragment 1:
139        // delete (1,1)
140
141        // mv fragment 2:
142        // insert (1,2)
143        // delete (1,2)
144
145        // mv fragment 3:
146        // insert (1,3)
147
148        // merge to sink fragment:
149        // insert (1,3)
150        // insert (1,2)
151        // delete (1,2)
152        // delete (1,1)
153        // So we do additional compaction in the sink executor per barrier.
154
155        //     1. compact all the changes with the stream key.
156        //     2. sink all the delete events and then sink all insert events.
157
158        // after compacting with the stream key, the two event with the same user defined sink pk must have different stream key.
159        // So the delete event is not to delete the inserted record in our internal streaming SQL semantic.
160        let need_advance_delete = stream_key_sink_pk_mismatch
161            && !matches!(
162                sink_param.sink_type,
163                SinkType::AppendOnly | SinkType::ForceAppendOnly
164            );
165        // NOTE(st1page): reconstruct with sink pk need extra cost to buffer a barrier's data, so currently we bind it with mismatch case.
166        let re_construct_with_sink_pk = need_advance_delete
167            && sink_param.sink_type == SinkType::Upsert
168            && !sink_param.downstream_pk.is_empty();
169        let pk_matched = !stream_key_sink_pk_mismatch;
170        // Don't compact chunk for blackhole sink for better benchmark performance.
171        let compact_chunk = !sink.is_blackhole();
172
173        tracing::info!(
174            sink_id = sink_param.sink_id.sink_id,
175            actor_id = actor_context.id,
176            need_advance_delete,
177            re_construct_with_sink_pk,
178            pk_matched,
179            compact_chunk,
180            "Sink executor info"
181        );
182
183        Ok(Self {
184            actor_context,
185            info,
186            input,
187            sink,
188            input_columns: columns,
189            sink_param,
190            log_store_factory,
191            sink_writer_param,
192            chunk_size,
193            input_data_types,
194            need_advance_delete,
195            re_construct_with_sink_pk,
196            pk_matched,
197            compact_chunk,
198            rate_limit,
199        })
200    }
201
202    fn execute_inner(self) -> BoxedMessageStream {
203        let sink_id = self.sink_param.sink_id;
204        let actor_id = self.actor_context.id;
205        let fragment_id = self.actor_context.fragment_id;
206
207        let stream_key = self.info.pk_indices.clone();
208        let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics(
209            sink_id,
210            actor_id,
211            fragment_id,
212        );
213
214        // When processing upsert stream, we need to tolerate the inconsistency (mismatched `DELETE`
215        // and `INSERT` pairs) when compacting input chunks with derived stream key.
216        let input_compact_ib = if self.input.stream_kind() == StreamKind::Upsert {
217            InconsistencyBehavior::Tolerate
218        } else {
219            InconsistencyBehavior::Panic
220        };
221
222        let input = self.input.execute();
223
224        let input = input.inspect_ok(move |msg| {
225            if let Message::Chunk(c) = msg {
226                metrics.sink_input_row_count.inc_by(c.capacity() as u64);
227                metrics.sink_input_bytes.inc_by(c.estimated_size() as u64);
228            }
229        });
230
231        let processed_input = Self::process_msg(
232            input,
233            self.sink_param.sink_type,
234            stream_key,
235            self.need_advance_delete,
236            self.re_construct_with_sink_pk,
237            self.chunk_size,
238            self.input_data_types,
239            input_compact_ib,
240            self.sink_param.downstream_pk.clone(),
241            metrics.sink_chunk_buffer_size,
242            self.compact_chunk,
243        );
244
245        if self.sink.is_sink_into_table() {
246            // TODO(hzxa21): support rate limit?
247            processed_input.boxed()
248        } else {
249            let labels = [
250                &actor_id.to_string(),
251                &sink_id.to_string(),
252                self.sink_param.sink_name.as_str(),
253            ];
254            let log_store_first_write_epoch = GLOBAL_SINK_METRICS
255                .log_store_first_write_epoch
256                .with_guarded_label_values(&labels);
257            let log_store_latest_write_epoch = GLOBAL_SINK_METRICS
258                .log_store_latest_write_epoch
259                .with_guarded_label_values(&labels);
260            let log_store_write_rows = GLOBAL_SINK_METRICS
261                .log_store_write_rows
262                .with_guarded_label_values(&labels);
263            let log_writer_metrics = LogWriterMetrics {
264                log_store_first_write_epoch,
265                log_store_latest_write_epoch,
266                log_store_write_rows,
267            };
268
269            let (rate_limit_tx, rate_limit_rx) = unbounded_channel();
270            // Init the rate limit
271            rate_limit_tx.send(self.rate_limit.into()).unwrap();
272
273            let (rebuild_sink_tx, rebuild_sink_rx) = unbounded_channel();
274
275            self.log_store_factory
276                .build()
277                .map(move |(log_reader, log_writer)| {
278                    let write_log_stream = Self::execute_write_log(
279                        processed_input,
280                        log_writer.monitored(log_writer_metrics),
281                        actor_id,
282                        sink_id,
283                        rate_limit_tx,
284                        rebuild_sink_tx,
285                    );
286
287                    let consume_log_stream_future = dispatch_sink!(self.sink, sink, {
288                        let consume_log_stream = Self::execute_consume_log(
289                            *sink,
290                            log_reader,
291                            self.input_columns,
292                            self.sink_param,
293                            self.sink_writer_param,
294                            self.pk_matched,
295                            input_compact_ib,
296                            self.actor_context,
297                            rate_limit_rx,
298                            rebuild_sink_rx,
299                        )
300                        .instrument_await(
301                            await_tree::span!("consume_log (sink_id {sink_id})").long_running(),
302                        )
303                        .map_ok(|never| never); // unify return type to `Message`
304
305                        consume_log_stream.boxed()
306                    });
307                    select(consume_log_stream_future.into_stream(), write_log_stream)
308                })
309                .into_stream()
310                .flatten()
311                .boxed()
312        }
313    }
314
315    #[try_stream(ok = Message, error = StreamExecutorError)]
316    async fn execute_write_log<W: LogWriter>(
317        input: impl MessageStream,
318        mut log_writer: W,
319        actor_id: ActorId,
320        sink_id: SinkId,
321        rate_limit_tx: UnboundedSender<RateLimit>,
322        rebuild_sink_tx: UnboundedSender<RebuildSinkMessage>,
323    ) {
324        pin_mut!(input);
325        let barrier = expect_first_barrier(&mut input).await?;
326        let epoch_pair = barrier.epoch;
327        let is_pause_on_startup = barrier.is_pause_on_startup();
328        // Propagate the first barrier
329        yield Message::Barrier(barrier);
330
331        log_writer.init(epoch_pair, is_pause_on_startup).await?;
332
333        let mut is_paused = false;
334
335        #[for_await]
336        for msg in input {
337            match msg? {
338                Message::Watermark(w) => yield Message::Watermark(w),
339                Message::Chunk(chunk) => {
340                    assert!(
341                        !is_paused,
342                        "Actor {actor_id} should not receive any data after pause"
343                    );
344                    log_writer.write_chunk(chunk.clone()).await?;
345                    yield Message::Chunk(chunk);
346                }
347                Message::Barrier(barrier) => {
348                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
349                    let add_columns = barrier.as_sink_add_columns(sink_id);
350                    if let Some(add_columns) = &add_columns {
351                        info!(?add_columns, %sink_id, "sink receive add columns");
352                    }
353                    let post_flush = log_writer
354                        .flush_current_epoch(
355                            barrier.epoch.curr,
356                            FlushCurrentEpochOptions {
357                                is_checkpoint: barrier.kind.is_checkpoint(),
358                                new_vnode_bitmap: update_vnode_bitmap.clone(),
359                                is_stop: barrier.is_stop(actor_id),
360                                add_columns,
361                            },
362                        )
363                        .await?;
364
365                    let mutation = barrier.mutation.clone();
366                    yield Message::Barrier(barrier);
367                    if F::REBUILD_SINK_ON_UPDATE_VNODE_BITMAP
368                        && let Some(new_vnode_bitmap) = update_vnode_bitmap.clone()
369                    {
370                        let (tx, rx) = oneshot::channel();
371                        rebuild_sink_tx
372                            .send(RebuildSinkMessage::RebuildSink(new_vnode_bitmap, tx))
373                            .map_err(|_| anyhow!("fail to send rebuild sink to reader"))?;
374                        rx.await
375                            .map_err(|_| anyhow!("fail to wait rebuild sink finish"))?;
376                    }
377                    post_flush.post_yield_barrier().await?;
378
379                    if let Some(mutation) = mutation.as_deref() {
380                        match mutation {
381                            Mutation::Pause => {
382                                log_writer.pause()?;
383                                is_paused = true;
384                            }
385                            Mutation::Resume => {
386                                log_writer.resume()?;
387                                is_paused = false;
388                            }
389                            Mutation::Throttle(actor_to_apply) => {
390                                if let Some(new_rate_limit) = actor_to_apply.get(&actor_id) {
391                                    tracing::info!(
392                                        rate_limit = new_rate_limit,
393                                        "received sink rate limit on actor {actor_id}"
394                                    );
395                                    if let Err(e) = rate_limit_tx.send((*new_rate_limit).into()) {
396                                        error!(
397                                            error = %e.as_report(),
398                                            "fail to send sink ate limit update"
399                                        );
400                                        return Err(StreamExecutorError::from(
401                                            e.to_report_string(),
402                                        ));
403                                    }
404                                }
405                            }
406                            Mutation::ConnectorPropsChange(config) => {
407                                if let Some(map) = config.get(&sink_id.sink_id)
408                                    && let Err(e) = rebuild_sink_tx
409                                        .send(RebuildSinkMessage::UpdateConfig(map.clone()))
410                                {
411                                    error!(
412                                        error = %e.as_report(),
413                                        "fail to send sink alter props"
414                                    );
415                                    return Err(StreamExecutorError::from(e.to_report_string()));
416                                }
417                            }
418                            _ => (),
419                        }
420                    }
421                }
422            }
423        }
424    }
425
426    #[allow(clippy::too_many_arguments)]
427    #[try_stream(ok = Message, error = StreamExecutorError)]
428    async fn process_msg(
429        input: impl MessageStream,
430        sink_type: SinkType,
431        stream_key: PkIndices,
432        need_advance_delete: bool,
433        re_construct_with_sink_pk: bool,
434        chunk_size: usize,
435        input_data_types: Vec<DataType>,
436        input_compact_ib: InconsistencyBehavior,
437        down_stream_pk: Vec<usize>,
438        sink_chunk_buffer_size_metrics: LabelGuardedIntGauge,
439        compact_chunk: bool,
440    ) {
441        // need to buffer chunks during one barrier
442        if need_advance_delete || re_construct_with_sink_pk {
443            let mut chunk_buffer = EstimatedVec::new();
444            let mut watermark: Option<super::Watermark> = None;
445            #[for_await]
446            for msg in input {
447                match msg? {
448                    Message::Watermark(w) => watermark = Some(w),
449                    Message::Chunk(c) => {
450                        chunk_buffer.push(c);
451
452                        sink_chunk_buffer_size_metrics.set(chunk_buffer.estimated_size() as i64);
453                    }
454                    Message::Barrier(barrier) => {
455                        let chunks = mem::take(&mut chunk_buffer).into_inner();
456                        let chunks = if need_advance_delete {
457                            let mut delete_chunks = vec![];
458                            let mut insert_chunks = vec![];
459
460                            for c in StreamChunkCompactor::new(stream_key.clone(), chunks)
461                                .into_compacted_chunks(input_compact_ib)
462                            {
463                                // We only enter the branch if need_advance_delete, in which case `sink_type` is not ForceAppendOnly or AppendOnly.
464                                {
465                                    // Force append-only by dropping UPDATE/DELETE messages. We do this when the
466                                    // user forces the sink to be append-only while it is actually not based on
467                                    // the frontend derivation result.
468                                    let chunk = force_delete_only(c.clone());
469                                    if chunk.cardinality() > 0 {
470                                        delete_chunks.push(chunk);
471                                    }
472                                }
473                                let chunk = force_append_only(c);
474                                if chunk.cardinality() > 0 {
475                                    insert_chunks.push(chunk);
476                                }
477                            }
478                            delete_chunks
479                                .into_iter()
480                                .chain(insert_chunks.into_iter())
481                                .collect()
482                        } else {
483                            chunks
484                        };
485                        if re_construct_with_sink_pk {
486                            let chunks = StreamChunkCompactor::new(down_stream_pk.clone(), chunks)
487                                .reconstructed_compacted_chunks(
488                                    chunk_size,
489                                    input_data_types.clone(),
490                                    // When compacting based on user provided primary key, we should never panic
491                                    // on inconsistency in case the user provided primary key is not unique.
492                                    if sink_type == SinkType::ForceAppendOnly {
493                                        InconsistencyBehavior::Tolerate
494                                    } else {
495                                        InconsistencyBehavior::Warn
496                                    },
497                                );
498                            for c in chunks {
499                                yield Message::Chunk(c);
500                            }
501                        } else {
502                            let mut chunk_builder =
503                                StreamChunkBuilder::new(chunk_size, input_data_types.clone());
504                            for chunk in chunks {
505                                for (op, row) in chunk.rows() {
506                                    if let Some(c) = chunk_builder.append_row(op, row) {
507                                        yield Message::Chunk(c);
508                                    }
509                                }
510                            }
511
512                            if let Some(c) = chunk_builder.take() {
513                                yield Message::Chunk(c);
514                            }
515                        };
516
517                        if let Some(w) = mem::take(&mut watermark) {
518                            yield Message::Watermark(w)
519                        }
520                        yield Message::Barrier(barrier);
521                    }
522                }
523            }
524        } else {
525            #[for_await]
526            for msg in input {
527                match msg? {
528                    Message::Watermark(w) => yield Message::Watermark(w),
529                    Message::Chunk(mut chunk) => {
530                        // Compact the chunk to eliminate any useless intermediate result (e.g. UPDATE
531                        // V->V).
532                        if compact_chunk {
533                            chunk = merge_chunk_row(chunk, &stream_key, input_compact_ib);
534                        }
535                        match sink_type {
536                            SinkType::AppendOnly => yield Message::Chunk(chunk),
537                            SinkType::ForceAppendOnly => {
538                                // Force append-only by dropping UPDATE/DELETE messages. We do this when the
539                                // user forces the sink to be append-only while it is actually not based on
540                                // the frontend derivation result.
541                                yield Message::Chunk(force_append_only(chunk))
542                            }
543                            SinkType::Upsert => yield Message::Chunk(chunk),
544                        }
545                    }
546                    Message::Barrier(barrier) => {
547                        yield Message::Barrier(barrier);
548                    }
549                }
550            }
551        }
552    }
553
554    #[expect(clippy::too_many_arguments)]
555    async fn execute_consume_log<S: Sink, R: LogReader>(
556        mut sink: S,
557        log_reader: R,
558        columns: Vec<ColumnCatalog>,
559        mut sink_param: SinkParam,
560        mut sink_writer_param: SinkWriterParam,
561        pk_matched: bool,
562        input_compact_ib: InconsistencyBehavior,
563        actor_context: ActorContextRef,
564        rate_limit_rx: UnboundedReceiver<RateLimit>,
565        mut rebuild_sink_rx: UnboundedReceiver<RebuildSinkMessage>,
566    ) -> StreamExecutorResult<!> {
567        let visible_columns = columns
568            .iter()
569            .enumerate()
570            .filter_map(|(idx, column)| (!column.is_hidden).then_some(idx))
571            .collect_vec();
572
573        let labels = [
574            &actor_context.id.to_string(),
575            sink_writer_param.connector.as_str(),
576            &sink_writer_param.sink_id.to_string(),
577            sink_writer_param.sink_name.as_str(),
578        ];
579        let log_store_reader_wait_new_future_duration_ns = GLOBAL_SINK_METRICS
580            .log_store_reader_wait_new_future_duration_ns
581            .with_guarded_label_values(&labels);
582        let log_store_read_rows = GLOBAL_SINK_METRICS
583            .log_store_read_rows
584            .with_guarded_label_values(&labels);
585        let log_store_read_bytes = GLOBAL_SINK_METRICS
586            .log_store_read_bytes
587            .with_guarded_label_values(&labels);
588        let log_store_latest_read_epoch = GLOBAL_SINK_METRICS
589            .log_store_latest_read_epoch
590            .with_guarded_label_values(&labels);
591        let metrics = LogReaderMetrics {
592            log_store_latest_read_epoch,
593            log_store_read_rows,
594            log_store_read_bytes,
595            log_store_reader_wait_new_future_duration_ns,
596        };
597
598        let downstream_pk = sink_param.downstream_pk.clone();
599
600        let mut log_reader = log_reader
601            .transform_chunk(move |chunk| {
602                // NOTE(kwannoel):
603                // After the optimization in https://github.com/risingwavelabs/risingwave/pull/12250,
604                // `DELETE`s will be sequenced before `INSERT`s in JDBC sinks and PG rust sink.
605                // There's a risk that adjacent chunks with `DELETE`s on the same PK will get
606                // merged into a single chunk, since the logstore format doesn't preserve chunk
607                // boundaries. Then we will have double `DELETE`s followed by unspecified sequence
608                // of `INSERT`s, and lead to inconsistent data downstream.
609                //
610                // We only need to do the compaction for upsert sinks, when the upstream and
611                // downstream PKs are matched. When the upstream and downstream PKs are not matched,
612                // we will buffer the chunks between two barriers, so the compaction is not needed,
613                // since the barriers will preserve chunk boundaries.
614                //
615                // When the sink is not an upsert sink, it is either `force_append_only` or
616                // `append_only`, we should only append to downstream, so there should not be any
617                // overlapping keys.
618                let chunk = if pk_matched && matches!(sink_param.sink_type, SinkType::Upsert) {
619                    merge_chunk_row(chunk, &downstream_pk, input_compact_ib)
620                } else {
621                    chunk
622                };
623                if visible_columns.len() != columns.len() {
624                    // Do projection here because we may have columns that aren't visible to
625                    // the downstream.
626                    chunk.project(&visible_columns)
627                } else {
628                    chunk
629                }
630            })
631            .monitored(metrics)
632            .rate_limited(rate_limit_rx);
633
634        log_reader.init().await?;
635        loop {
636            let future = async {
637                loop {
638                    let Err(e) = sink
639                        .new_log_sinker(sink_writer_param.clone())
640                        .and_then(|log_sinker| log_sinker.consume_log_and_sink(&mut log_reader))
641                        .await;
642                    GLOBAL_ERROR_METRICS.user_sink_error.report([
643                        "sink_executor_error".to_owned(),
644                        sink_param.sink_id.to_string(),
645                        sink_param.sink_name.clone(),
646                        actor_context.fragment_id.to_string(),
647                    ]);
648
649                    if let Some(meta_client) = sink_writer_param.meta_client.as_ref() {
650                        meta_client
651                            .add_sink_fail_evet_log(
652                                sink_writer_param.sink_id.sink_id,
653                                sink_writer_param.sink_name.clone(),
654                                sink_writer_param.connector.clone(),
655                                e.to_report_string(),
656                            )
657                            .await;
658                    }
659
660                    if F::ALLOW_REWIND {
661                        match log_reader.rewind().await {
662                            Ok(()) => {
663                                error!(
664                                    error = %e.as_report(),
665                                    executor_id = sink_writer_param.executor_id,
666                                    sink_id = sink_param.sink_id.sink_id,
667                                    "reset log reader stream successfully after sink error"
668                                );
669                                Ok(())
670                            }
671                            Err(rewind_err) => {
672                                error!(
673                                    error = %rewind_err.as_report(),
674                                    "fail to rewind log reader"
675                                );
676                                Err(e)
677                            }
678                        }
679                    } else {
680                        Err(e)
681                    }
682                    .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
683                }
684            };
685            select! {
686                result = future => {
687                    let Err(e): StreamExecutorResult<!> = result;
688                    return Err(e);
689                }
690                result = rebuild_sink_rx.recv() => {
691                    match result.ok_or_else(|| anyhow!("failed to receive rebuild sink notify"))? {
692                        RebuildSinkMessage::RebuildSink(new_vnode, notify) => {
693                            sink_writer_param.vnode_bitmap = Some((*new_vnode).clone());
694                            if notify.send(()).is_err() {
695                                warn!("failed to notify rebuild sink");
696                            }
697                            log_reader.init().await?;
698                        },
699                        RebuildSinkMessage::UpdateConfig(config) => {
700                            if F::ALLOW_REWIND {
701                                match log_reader.rewind().await {
702                                    Ok(()) => {
703                                        sink_param.properties = config.into_iter().collect();
704                                        sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
705                                        info!(
706                                            executor_id = sink_writer_param.executor_id,
707                                            sink_id = sink_param.sink_id.sink_id,
708                                            "alter sink config successfully with rewind"
709                                        );
710                                        Ok(())
711                                    }
712                                    Err(rewind_err) => {
713                                        error!(
714                                            error = %rewind_err.as_report(),
715                                            "fail to rewind log reader for alter sink config "
716                                        );
717                                        Err(anyhow!("fail to rewind log after alter table").into())
718                                    }
719                                }
720                            } else {
721                                sink_param.properties = config.into_iter().collect();
722                                sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
723                                Err(anyhow!("This is not an actual error condition. The system is intentionally triggering recovery procedures to ensure ALTER SINK CONFIG are fully applied.").into())
724                            }
725                            .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
726                        },
727                    }
728                }
729            }
730        }
731    }
732}
733
734enum RebuildSinkMessage {
735    RebuildSink(Arc<Bitmap>, oneshot::Sender<()>),
736    UpdateConfig(HashMap<String, String>),
737}
738
739impl<F: LogStoreFactory> Execute for SinkExecutor<F> {
740    fn execute(self: Box<Self>) -> BoxedMessageStream {
741        self.execute_inner()
742    }
743}
744
745#[cfg(test)]
746mod test {
747    use risingwave_common::catalog::{ColumnDesc, ColumnId};
748    use risingwave_common::util::epoch::test_epoch;
749    use risingwave_connector::sink::build_sink;
750
751    use super::*;
752    use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
753    use crate::executor::test_utils::*;
754
755    #[tokio::test]
756    async fn test_force_append_only_sink() {
757        use risingwave_common::array::StreamChunkTestExt;
758        use risingwave_common::array::stream_chunk::StreamChunk;
759        use risingwave_common::types::DataType;
760
761        use crate::executor::Barrier;
762
763        let properties = maplit::btreemap! {
764            "connector".into() => "blackhole".into(),
765            "type".into() => "append-only".into(),
766            "force_append_only".into() => "true".into()
767        };
768
769        // We have two visible columns and one hidden column. The hidden column will be pruned out
770        // within the sink executor.
771        let columns = vec![
772            ColumnCatalog {
773                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
774                is_hidden: false,
775            },
776            ColumnCatalog {
777                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
778                is_hidden: false,
779            },
780            ColumnCatalog {
781                column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
782                is_hidden: true,
783            },
784        ];
785        let schema: Schema = columns
786            .iter()
787            .map(|column| Field::from(column.column_desc.clone()))
788            .collect();
789        let pk_indices = vec![0];
790
791        let source = MockSource::with_messages(vec![
792            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
793            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
794                " I I I
795                    + 3 2 1",
796            ))),
797            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
798            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
799                "  I I I
800                    U- 3 2 1
801                    U+ 3 4 1
802                     + 5 6 7",
803            ))),
804            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
805                " I I I
806                    - 5 6 7",
807            ))),
808        ])
809        .into_executor(schema.clone(), pk_indices.clone());
810
811        let sink_param = SinkParam {
812            sink_id: 0.into(),
813            sink_name: "test".into(),
814            properties,
815
816            columns: columns
817                .iter()
818                .filter(|col| !col.is_hidden)
819                .map(|col| col.column_desc.clone())
820                .collect(),
821            downstream_pk: pk_indices.clone(),
822            sink_type: SinkType::ForceAppendOnly,
823            format_desc: None,
824            db_name: "test".into(),
825            sink_from_name: "test".into(),
826        };
827
828        let info = ExecutorInfo::for_test(schema, pk_indices, "SinkExecutor".to_owned(), 0);
829
830        let sink = build_sink(sink_param.clone()).unwrap();
831
832        let sink_executor = SinkExecutor::new(
833            ActorContext::for_test(0),
834            info,
835            source,
836            SinkWriterParam::for_test(),
837            sink,
838            sink_param,
839            columns.clone(),
840            BoundedInMemLogStoreFactory::for_test(1),
841            1024,
842            vec![DataType::Int32, DataType::Int32, DataType::Int32],
843            None,
844        )
845        .await
846        .unwrap();
847
848        let mut executor = sink_executor.boxed().execute();
849
850        // Barrier message.
851        executor.next().await.unwrap().unwrap();
852
853        let chunk_msg = executor.next().await.unwrap().unwrap();
854        assert_eq!(
855            chunk_msg.into_chunk().unwrap().compact(),
856            StreamChunk::from_pretty(
857                " I I I
858                + 3 2 1",
859            )
860        );
861
862        // Barrier message.
863        executor.next().await.unwrap().unwrap();
864
865        let chunk_msg = executor.next().await.unwrap().unwrap();
866        assert_eq!(
867            chunk_msg.into_chunk().unwrap().compact(),
868            StreamChunk::from_pretty(
869                " I I I
870                + 3 4 1
871                + 5 6 7",
872            )
873        );
874
875        // Should not receive the third stream chunk message because the force-append-only sink
876        // executor will drop all DELETE messages.
877
878        // The last barrier message.
879        executor.next().await.unwrap().unwrap();
880    }
881
882    #[tokio::test]
883    async fn stream_key_sink_pk_mismatch() {
884        use risingwave_common::array::StreamChunkTestExt;
885        use risingwave_common::array::stream_chunk::StreamChunk;
886        use risingwave_common::types::DataType;
887
888        use crate::executor::Barrier;
889
890        let properties = maplit::btreemap! {
891            "connector".into() => "blackhole".into(),
892        };
893
894        // We have two visible columns and one hidden column. The hidden column will be pruned out
895        // within the sink executor.
896        let columns = vec![
897            ColumnCatalog {
898                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
899                is_hidden: false,
900            },
901            ColumnCatalog {
902                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
903                is_hidden: false,
904            },
905            ColumnCatalog {
906                column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
907                is_hidden: true,
908            },
909        ];
910        let schema: Schema = columns
911            .iter()
912            .map(|column| Field::from(column.column_desc.clone()))
913            .collect();
914
915        let source = MockSource::with_messages(vec![
916            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
917            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
918                " I I I
919                    + 1 1 10",
920            ))),
921            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
922            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
923                " I I I
924                    + 1 3 30",
925            ))),
926            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
927                " I I I
928                    + 1 2 20
929                    - 1 2 20",
930            ))),
931            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
932                " I I I
933                    - 1 1 10
934                    + 1 1 40",
935            ))),
936            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
937        ])
938        .into_executor(schema.clone(), vec![0, 1]);
939
940        let sink_param = SinkParam {
941            sink_id: 0.into(),
942            sink_name: "test".into(),
943            properties,
944
945            columns: columns
946                .iter()
947                .filter(|col| !col.is_hidden)
948                .map(|col| col.column_desc.clone())
949                .collect(),
950            downstream_pk: vec![0],
951            sink_type: SinkType::Upsert,
952            format_desc: None,
953            db_name: "test".into(),
954            sink_from_name: "test".into(),
955        };
956
957        let info = ExecutorInfo::for_test(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
958
959        let sink = build_sink(sink_param.clone()).unwrap();
960
961        let sink_executor = SinkExecutor::new(
962            ActorContext::for_test(0),
963            info,
964            source,
965            SinkWriterParam::for_test(),
966            sink,
967            sink_param,
968            columns.clone(),
969            BoundedInMemLogStoreFactory::for_test(1),
970            1024,
971            vec![DataType::Int64, DataType::Int64, DataType::Int64],
972            None,
973        )
974        .await
975        .unwrap();
976
977        let mut executor = sink_executor.boxed().execute();
978
979        // Barrier message.
980        executor.next().await.unwrap().unwrap();
981
982        let chunk_msg = executor.next().await.unwrap().unwrap();
983        assert_eq!(
984            chunk_msg.into_chunk().unwrap().compact(),
985            StreamChunk::from_pretty(
986                " I I I
987                + 1 1 10",
988            )
989        );
990
991        // Barrier message.
992        executor.next().await.unwrap().unwrap();
993
994        let chunk_msg = executor.next().await.unwrap().unwrap();
995        assert_eq!(
996            chunk_msg.into_chunk().unwrap().compact(),
997            StreamChunk::from_pretty(
998                " I I I
999                U- 1 1 10
1000                U+ 1 1 40",
1001            )
1002        );
1003
1004        // The last barrier message.
1005        executor.next().await.unwrap().unwrap();
1006    }
1007
1008    #[tokio::test]
1009    async fn test_empty_barrier_sink() {
1010        use risingwave_common::types::DataType;
1011
1012        use crate::executor::Barrier;
1013
1014        let properties = maplit::btreemap! {
1015            "connector".into() => "blackhole".into(),
1016            "type".into() => "append-only".into(),
1017            "force_append_only".into() => "true".into()
1018        };
1019        let columns = vec![
1020            ColumnCatalog {
1021                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1022                is_hidden: false,
1023            },
1024            ColumnCatalog {
1025                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1026                is_hidden: false,
1027            },
1028        ];
1029        let schema: Schema = columns
1030            .iter()
1031            .map(|column| Field::from(column.column_desc.clone()))
1032            .collect();
1033        let pk_indices = vec![0];
1034
1035        let source = MockSource::with_messages(vec![
1036            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1037            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1038            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1039        ])
1040        .into_executor(schema.clone(), pk_indices.clone());
1041
1042        let sink_param = SinkParam {
1043            sink_id: 0.into(),
1044            sink_name: "test".into(),
1045            properties,
1046
1047            columns: columns
1048                .iter()
1049                .filter(|col| !col.is_hidden)
1050                .map(|col| col.column_desc.clone())
1051                .collect(),
1052            downstream_pk: pk_indices.clone(),
1053            sink_type: SinkType::ForceAppendOnly,
1054            format_desc: None,
1055            db_name: "test".into(),
1056            sink_from_name: "test".into(),
1057        };
1058
1059        let info = ExecutorInfo::for_test(schema, pk_indices, "SinkExecutor".to_owned(), 0);
1060
1061        let sink = build_sink(sink_param.clone()).unwrap();
1062
1063        let sink_executor = SinkExecutor::new(
1064            ActorContext::for_test(0),
1065            info,
1066            source,
1067            SinkWriterParam::for_test(),
1068            sink,
1069            sink_param,
1070            columns,
1071            BoundedInMemLogStoreFactory::for_test(1),
1072            1024,
1073            vec![DataType::Int64, DataType::Int64],
1074            None,
1075        )
1076        .await
1077        .unwrap();
1078
1079        let mut executor = sink_executor.boxed().execute();
1080
1081        // Barrier message.
1082        assert_eq!(
1083            executor.next().await.unwrap().unwrap(),
1084            Message::Barrier(Barrier::new_test_barrier(test_epoch(1)))
1085        );
1086
1087        // Barrier message.
1088        assert_eq!(
1089            executor.next().await.unwrap().unwrap(),
1090            Message::Barrier(Barrier::new_test_barrier(test_epoch(2)))
1091        );
1092
1093        // The last barrier message.
1094        assert_eq!(
1095            executor.next().await.unwrap().unwrap(),
1096            Message::Barrier(Barrier::new_test_barrier(test_epoch(3)))
1097        );
1098    }
1099}