risingwave_stream/executor/
sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::mem;
17
18use anyhow::anyhow;
19use futures::stream::select;
20use futures::{FutureExt, TryFutureExt, TryStreamExt};
21use itertools::Itertools;
22use risingwave_common::array::Op;
23use risingwave_common::array::stream_chunk::StreamChunkMut;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::catalog::{ColumnCatalog, Field};
26use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntGauge};
27use risingwave_common_estimate_size::EstimateSize;
28use risingwave_common_estimate_size::collections::EstimatedVec;
29use risingwave_common_rate_limit::RateLimit;
30use risingwave_connector::dispatch_sink;
31use risingwave_connector::sink::catalog::{SinkId, SinkType};
32use risingwave_connector::sink::log_store::{
33    FlushCurrentEpochOptions, LogReader, LogReaderExt, LogReaderMetrics, LogStoreFactory,
34    LogWriter, LogWriterExt, LogWriterMetrics,
35};
36use risingwave_connector::sink::{
37    GLOBAL_SINK_METRICS, LogSinker, Sink, SinkImpl, SinkParam, SinkWriterParam,
38};
39use thiserror_ext::AsReport;
40use tokio::select;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
42use tokio::sync::oneshot;
43
44use crate::common::compact_chunk::{StreamChunkCompactor, merge_chunk_row};
45use crate::executor::prelude::*;
46pub struct SinkExecutor<F: LogStoreFactory> {
47    actor_context: ActorContextRef,
48    info: ExecutorInfo,
49    input: Executor,
50    sink: SinkImpl,
51    input_columns: Vec<ColumnCatalog>,
52    sink_param: SinkParam,
53    log_store_factory: F,
54    sink_writer_param: SinkWriterParam,
55    chunk_size: usize,
56    input_data_types: Vec<DataType>,
57    need_advance_delete: bool,
58    re_construct_with_sink_pk: bool,
59    compact_chunk: bool,
60    rate_limit: Option<u32>,
61}
62
63// Drop all the DELETE messages in this chunk and convert UPDATE INSERT into INSERT.
64fn force_append_only(c: StreamChunk) -> StreamChunk {
65    let mut c: StreamChunkMut = c.into();
66    for (_, mut r) in c.to_rows_mut() {
67        match r.op() {
68            Op::Insert => {}
69            Op::Delete | Op::UpdateDelete => r.set_vis(false),
70            Op::UpdateInsert => r.set_op(Op::Insert),
71        }
72    }
73    c.into()
74}
75
76// Drop all the INSERT messages in this chunk and convert UPDATE DELETE into DELETE.
77fn force_delete_only(c: StreamChunk) -> StreamChunk {
78    let mut c: StreamChunkMut = c.into();
79    for (_, mut r) in c.to_rows_mut() {
80        match r.op() {
81            Op::Delete => {}
82            Op::Insert | Op::UpdateInsert => r.set_vis(false),
83            Op::UpdateDelete => r.set_op(Op::Delete),
84        }
85    }
86    c.into()
87}
88
89impl<F: LogStoreFactory> SinkExecutor<F> {
90    #[allow(clippy::too_many_arguments)]
91    #[expect(clippy::unused_async)]
92    pub async fn new(
93        actor_context: ActorContextRef,
94        info: ExecutorInfo,
95        input: Executor,
96        sink_writer_param: SinkWriterParam,
97        sink: SinkImpl,
98        sink_param: SinkParam,
99        columns: Vec<ColumnCatalog>,
100        log_store_factory: F,
101        chunk_size: usize,
102        input_data_types: Vec<DataType>,
103        rate_limit: Option<u32>,
104    ) -> StreamExecutorResult<Self> {
105        let sink_input_schema: Schema = columns
106            .iter()
107            .map(|column| Field::from(&column.column_desc))
108            .collect();
109
110        if let Some(col_dix) = sink_writer_param.extra_partition_col_idx {
111            // Remove the partition column from the schema.
112            assert_eq!(sink_input_schema.data_types(), {
113                let mut data_type = info.schema.data_types();
114                data_type.remove(col_dix);
115                data_type
116            });
117        } else {
118            assert_eq!(sink_input_schema.data_types(), info.schema.data_types());
119        }
120
121        let stream_key = info.pk_indices.clone();
122        let stream_key_sink_pk_mismatch = {
123            stream_key
124                .iter()
125                .any(|i| !sink_param.downstream_pk.contains(i))
126        };
127        // When stream key is different from the user defined primary key columns for sinks. The operations could be out of order
128        // stream key: a,b
129        // sink pk: a
130
131        // original:
132        // (1,1) -> (1,2)
133        // (1,2) -> (1,3)
134
135        // mv fragment 1:
136        // delete (1,1)
137
138        // mv fragment 2:
139        // insert (1,2)
140        // delete (1,2)
141
142        // mv fragment 3:
143        // insert (1,3)
144
145        // merge to sink fragment:
146        // insert (1,3)
147        // insert (1,2)
148        // delete (1,2)
149        // delete (1,1)
150        // So we do additional compaction in the sink executor per barrier.
151
152        //     1. compact all the changes with the stream key.
153        //     2. sink all the delete events and then sink all insert events.
154
155        // after compacting with the stream key, the two event with the same user defined sink pk must have different stream key.
156        // So the delete event is not to delete the inserted record in our internal streaming SQL semantic.
157        let need_advance_delete =
158            stream_key_sink_pk_mismatch && sink_param.sink_type != SinkType::AppendOnly;
159        // NOTE(st1page): reconstruct with sink pk need extra cost to buffer a barrier's data, so currently we bind it with mismatch case.
160        let re_construct_with_sink_pk = need_advance_delete
161            && sink_param.sink_type == SinkType::Upsert
162            && !sink_param.downstream_pk.is_empty();
163        // Don't compact chunk for blackhole sink for better benchmark performance.
164        let compact_chunk = !sink.is_blackhole();
165
166        tracing::info!(
167            sink_id = sink_param.sink_id.sink_id,
168            actor_id = actor_context.id,
169            need_advance_delete,
170            re_construct_with_sink_pk,
171            compact_chunk,
172            "Sink executor info"
173        );
174
175        Ok(Self {
176            actor_context,
177            info,
178            input,
179            sink,
180            input_columns: columns,
181            sink_param,
182            log_store_factory,
183            sink_writer_param,
184            chunk_size,
185            input_data_types,
186            need_advance_delete,
187            re_construct_with_sink_pk,
188            compact_chunk,
189            rate_limit,
190        })
191    }
192
193    fn execute_inner(self) -> BoxedMessageStream {
194        let sink_id = self.sink_param.sink_id;
195        let actor_id = self.actor_context.id;
196        let fragment_id = self.actor_context.fragment_id;
197
198        let stream_key = self.info.pk_indices.clone();
199        let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics(
200            sink_id,
201            actor_id,
202            fragment_id,
203        );
204
205        let input = self.input.execute();
206
207        let input = input.inspect_ok(move |msg| {
208            if let Message::Chunk(c) = msg {
209                metrics.sink_input_row_count.inc_by(c.capacity() as u64);
210                metrics.sink_input_bytes.inc_by(c.estimated_size() as u64);
211            }
212        });
213
214        let processed_input = Self::process_msg(
215            input,
216            self.sink_param.sink_type,
217            stream_key,
218            self.need_advance_delete,
219            self.re_construct_with_sink_pk,
220            self.chunk_size,
221            self.input_data_types,
222            self.sink_param.downstream_pk.clone(),
223            metrics.sink_chunk_buffer_size,
224            self.compact_chunk,
225        );
226
227        if self.sink.is_sink_into_table() {
228            // TODO(hzxa21): support rate limit?
229            processed_input.boxed()
230        } else {
231            let labels = [
232                &actor_id.to_string(),
233                &sink_id.to_string(),
234                self.sink_param.sink_name.as_str(),
235            ];
236            let log_store_first_write_epoch = GLOBAL_SINK_METRICS
237                .log_store_first_write_epoch
238                .with_guarded_label_values(&labels);
239            let log_store_latest_write_epoch = GLOBAL_SINK_METRICS
240                .log_store_latest_write_epoch
241                .with_guarded_label_values(&labels);
242            let log_store_write_rows = GLOBAL_SINK_METRICS
243                .log_store_write_rows
244                .with_guarded_label_values(&labels);
245            let log_writer_metrics = LogWriterMetrics {
246                log_store_first_write_epoch,
247                log_store_latest_write_epoch,
248                log_store_write_rows,
249            };
250
251            let (rate_limit_tx, rate_limit_rx) = unbounded_channel();
252            // Init the rate limit
253            rate_limit_tx.send(self.rate_limit.into()).unwrap();
254
255            let (rebuild_sink_tx, rebuild_sink_rx) = unbounded_channel();
256
257            self.log_store_factory
258                .build()
259                .map(move |(log_reader, log_writer)| {
260                    let write_log_stream = Self::execute_write_log(
261                        processed_input,
262                        log_writer.monitored(log_writer_metrics),
263                        actor_id,
264                        sink_id,
265                        rate_limit_tx,
266                        rebuild_sink_tx,
267                    );
268
269                    let consume_log_stream_future = dispatch_sink!(self.sink, sink, {
270                        let consume_log_stream = Self::execute_consume_log(
271                            *sink,
272                            log_reader,
273                            self.input_columns,
274                            self.sink_param,
275                            self.sink_writer_param,
276                            self.actor_context,
277                            rate_limit_rx,
278                            rebuild_sink_rx,
279                        )
280                        .instrument_await(
281                            await_tree::span!("consume_log (sink_id {sink_id})").long_running(),
282                        )
283                        .map_ok(|never| never); // unify return type to `Message`
284
285                        consume_log_stream.boxed()
286                    });
287                    select(consume_log_stream_future.into_stream(), write_log_stream)
288                })
289                .into_stream()
290                .flatten()
291                .boxed()
292        }
293    }
294
295    #[try_stream(ok = Message, error = StreamExecutorError)]
296    async fn execute_write_log<W: LogWriter>(
297        input: impl MessageStream,
298        mut log_writer: W,
299        actor_id: ActorId,
300        sink_id: SinkId,
301        rate_limit_tx: UnboundedSender<RateLimit>,
302        rebuild_sink_tx: UnboundedSender<RebuildSinkMessage>,
303    ) {
304        pin_mut!(input);
305        let barrier = expect_first_barrier(&mut input).await?;
306        let epoch_pair = barrier.epoch;
307        let is_pause_on_startup = barrier.is_pause_on_startup();
308        // Propagate the first barrier
309        yield Message::Barrier(barrier);
310
311        log_writer.init(epoch_pair, is_pause_on_startup).await?;
312
313        let mut is_paused = false;
314
315        #[for_await]
316        for msg in input {
317            match msg? {
318                Message::Watermark(w) => yield Message::Watermark(w),
319                Message::Chunk(chunk) => {
320                    assert!(
321                        !is_paused,
322                        "Actor {actor_id} should not receive any data after pause"
323                    );
324                    log_writer.write_chunk(chunk.clone()).await?;
325                    yield Message::Chunk(chunk);
326                }
327                Message::Barrier(barrier) => {
328                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
329                    let post_flush = log_writer
330                        .flush_current_epoch(
331                            barrier.epoch.curr,
332                            FlushCurrentEpochOptions {
333                                is_checkpoint: barrier.kind.is_checkpoint(),
334                                new_vnode_bitmap: update_vnode_bitmap.clone(),
335                                is_stop: barrier.is_stop(actor_id),
336                            },
337                        )
338                        .await?;
339
340                    let mutation = barrier.mutation.clone();
341                    yield Message::Barrier(barrier);
342                    if F::REBUILD_SINK_ON_UPDATE_VNODE_BITMAP
343                        && let Some(new_vnode_bitmap) = update_vnode_bitmap.clone()
344                    {
345                        let (tx, rx) = oneshot::channel();
346                        rebuild_sink_tx
347                            .send(RebuildSinkMessage::RebuildSink(new_vnode_bitmap, tx))
348                            .map_err(|_| anyhow!("fail to send rebuild sink to reader"))?;
349                        rx.await
350                            .map_err(|_| anyhow!("fail to wait rebuild sink finish"))?;
351                    }
352                    post_flush.post_yield_barrier().await?;
353
354                    if let Some(mutation) = mutation.as_deref() {
355                        match mutation {
356                            Mutation::Pause => {
357                                log_writer.pause()?;
358                                is_paused = true;
359                            }
360                            Mutation::Resume => {
361                                log_writer.resume()?;
362                                is_paused = false;
363                            }
364                            Mutation::Throttle(actor_to_apply) => {
365                                if let Some(new_rate_limit) = actor_to_apply.get(&actor_id) {
366                                    tracing::info!(
367                                        rate_limit = new_rate_limit,
368                                        "received sink rate limit on actor {actor_id}"
369                                    );
370                                    if let Err(e) = rate_limit_tx.send((*new_rate_limit).into()) {
371                                        error!(
372                                            error = %e.as_report(),
373                                            "fail to send sink ate limit update"
374                                        );
375                                        return Err(StreamExecutorError::from(
376                                            e.to_report_string(),
377                                        ));
378                                    }
379                                }
380                            }
381                            Mutation::ConnectorPropsChange(config) => {
382                                if let Some(map) = config.get(&sink_id.sink_id) {
383                                    if let Err(e) = rebuild_sink_tx
384                                        .send(RebuildSinkMessage::UpdateConfig(map.clone()))
385                                    {
386                                        error!(
387                                            error = %e.as_report(),
388                                            "fail to send sink alter props"
389                                        );
390                                        return Err(StreamExecutorError::from(
391                                            e.to_report_string(),
392                                        ));
393                                    }
394                                }
395                            }
396                            _ => (),
397                        }
398                    }
399                }
400            }
401        }
402    }
403
404    #[allow(clippy::too_many_arguments)]
405    #[try_stream(ok = Message, error = StreamExecutorError)]
406    async fn process_msg(
407        input: impl MessageStream,
408        sink_type: SinkType,
409        stream_key: PkIndices,
410        need_advance_delete: bool,
411        re_construct_with_sink_pk: bool,
412        chunk_size: usize,
413        input_data_types: Vec<DataType>,
414        down_stream_pk: Vec<usize>,
415        sink_chunk_buffer_size_metrics: LabelGuardedIntGauge,
416        compact_chunk: bool,
417    ) {
418        // need to buffer chunks during one barrier
419        if need_advance_delete || re_construct_with_sink_pk {
420            let mut chunk_buffer = EstimatedVec::new();
421            let mut watermark: Option<super::Watermark> = None;
422            #[for_await]
423            for msg in input {
424                match msg? {
425                    Message::Watermark(w) => watermark = Some(w),
426                    Message::Chunk(c) => {
427                        chunk_buffer.push(c);
428
429                        sink_chunk_buffer_size_metrics.set(chunk_buffer.estimated_size() as i64);
430                    }
431                    Message::Barrier(barrier) => {
432                        let chunks = mem::take(&mut chunk_buffer).into_inner();
433                        let chunks = if need_advance_delete {
434                            let mut delete_chunks = vec![];
435                            let mut insert_chunks = vec![];
436
437                            for c in StreamChunkCompactor::new(stream_key.clone(), chunks)
438                                .into_compacted_chunks()
439                            {
440                                if sink_type != SinkType::ForceAppendOnly {
441                                    // Force append-only by dropping UPDATE/DELETE messages. We do this when the
442                                    // user forces the sink to be append-only while it is actually not based on
443                                    // the frontend derivation result.
444                                    let chunk = force_delete_only(c.clone());
445                                    if chunk.cardinality() > 0 {
446                                        delete_chunks.push(chunk);
447                                    }
448                                }
449                                let chunk = force_append_only(c);
450                                if chunk.cardinality() > 0 {
451                                    insert_chunks.push(chunk);
452                                }
453                            }
454                            delete_chunks
455                                .into_iter()
456                                .chain(insert_chunks.into_iter())
457                                .collect()
458                        } else {
459                            chunks
460                        };
461                        if re_construct_with_sink_pk {
462                            let chunks = StreamChunkCompactor::new(down_stream_pk.clone(), chunks)
463                                .reconstructed_compacted_chunks(
464                                    chunk_size,
465                                    input_data_types.clone(),
466                                    sink_type != SinkType::ForceAppendOnly,
467                                );
468                            for c in chunks {
469                                yield Message::Chunk(c);
470                            }
471                        } else {
472                            let mut chunk_builder =
473                                StreamChunkBuilder::new(chunk_size, input_data_types.clone());
474                            for chunk in chunks {
475                                for (op, row) in chunk.rows() {
476                                    if let Some(c) = chunk_builder.append_row(op, row) {
477                                        yield Message::Chunk(c);
478                                    }
479                                }
480                            }
481
482                            if let Some(c) = chunk_builder.take() {
483                                yield Message::Chunk(c);
484                            }
485                        };
486
487                        if let Some(w) = mem::take(&mut watermark) {
488                            yield Message::Watermark(w)
489                        }
490                        yield Message::Barrier(barrier);
491                    }
492                }
493            }
494        } else {
495            #[for_await]
496            for msg in input {
497                match msg? {
498                    Message::Watermark(w) => yield Message::Watermark(w),
499                    Message::Chunk(mut chunk) => {
500                        // Compact the chunk to eliminate any useless intermediate result (e.g. UPDATE
501                        // V->V).
502                        if compact_chunk {
503                            chunk = merge_chunk_row(chunk, &stream_key);
504                        }
505                        match sink_type {
506                            SinkType::AppendOnly => yield Message::Chunk(chunk),
507                            SinkType::ForceAppendOnly => {
508                                // Force append-only by dropping UPDATE/DELETE messages. We do this when the
509                                // user forces the sink to be append-only while it is actually not based on
510                                // the frontend derivation result.
511                                yield Message::Chunk(force_append_only(chunk))
512                            }
513                            SinkType::Upsert => {
514                                // Making sure the optimization in https://github.com/risingwavelabs/risingwave/pull/12250 is correct,
515                                // it is needed to do the compaction here.
516                                for chunk in
517                                    StreamChunkCompactor::new(stream_key.clone(), vec![chunk])
518                                        .into_compacted_chunks()
519                                {
520                                    yield Message::Chunk(chunk)
521                                }
522                            }
523                        }
524                    }
525                    Message::Barrier(barrier) => {
526                        yield Message::Barrier(barrier);
527                    }
528                }
529            }
530        }
531    }
532
533    #[expect(clippy::too_many_arguments)]
534    async fn execute_consume_log<S: Sink, R: LogReader>(
535        mut sink: S,
536        log_reader: R,
537        columns: Vec<ColumnCatalog>,
538        mut sink_param: SinkParam,
539        mut sink_writer_param: SinkWriterParam,
540        actor_context: ActorContextRef,
541        rate_limit_rx: UnboundedReceiver<RateLimit>,
542        mut rebuild_sink_rx: UnboundedReceiver<RebuildSinkMessage>,
543    ) -> StreamExecutorResult<!> {
544        let visible_columns = columns
545            .iter()
546            .enumerate()
547            .filter_map(|(idx, column)| (!column.is_hidden).then_some(idx))
548            .collect_vec();
549
550        let labels = [
551            &actor_context.id.to_string(),
552            sink_writer_param.connector.as_str(),
553            &sink_writer_param.sink_id.to_string(),
554            sink_writer_param.sink_name.as_str(),
555        ];
556        let log_store_reader_wait_new_future_duration_ns = GLOBAL_SINK_METRICS
557            .log_store_reader_wait_new_future_duration_ns
558            .with_guarded_label_values(&labels);
559        let log_store_read_rows = GLOBAL_SINK_METRICS
560            .log_store_read_rows
561            .with_guarded_label_values(&labels);
562        let log_store_read_bytes = GLOBAL_SINK_METRICS
563            .log_store_read_bytes
564            .with_guarded_label_values(&labels);
565        let log_store_latest_read_epoch = GLOBAL_SINK_METRICS
566            .log_store_latest_read_epoch
567            .with_guarded_label_values(&labels);
568        let metrics = LogReaderMetrics {
569            log_store_latest_read_epoch,
570            log_store_read_rows,
571            log_store_read_bytes,
572            log_store_reader_wait_new_future_duration_ns,
573        };
574
575        let mut log_reader = log_reader
576            .transform_chunk(move |chunk| {
577                if visible_columns.len() != columns.len() {
578                    // Do projection here because we may have columns that aren't visible to
579                    // the downstream.
580                    chunk.project(&visible_columns)
581                } else {
582                    chunk
583                }
584            })
585            .monitored(metrics)
586            .rate_limited(rate_limit_rx);
587
588        log_reader.init().await?;
589        loop {
590            let future = async {
591                loop {
592                    let Err(e) = sink
593                        .new_log_sinker(sink_writer_param.clone())
594                        .and_then(|log_sinker| log_sinker.consume_log_and_sink(&mut log_reader))
595                        .await;
596                    GLOBAL_ERROR_METRICS.user_sink_error.report([
597                        "sink_executor_error".to_owned(),
598                        sink_param.sink_id.to_string(),
599                        sink_param.sink_name.clone(),
600                        actor_context.fragment_id.to_string(),
601                    ]);
602
603                    if let Some(meta_client) = sink_writer_param.meta_client.as_ref() {
604                        meta_client
605                            .add_sink_fail_evet_log(
606                                sink_writer_param.sink_id.sink_id,
607                                sink_writer_param.sink_name.clone(),
608                                sink_writer_param.connector.clone(),
609                                e.to_report_string(),
610                            )
611                            .await;
612                    }
613
614                    if F::ALLOW_REWIND {
615                        match log_reader.rewind().await {
616                            Ok(()) => {
617                                warn!(
618                                    error = %e.as_report(),
619                                    executor_id = sink_writer_param.executor_id,
620                                    sink_id = sink_param.sink_id.sink_id,
621                                    "reset log reader stream successfully after sink error"
622                                );
623                                Ok(())
624                            }
625                            Err(rewind_err) => {
626                                error!(
627                                    error = %rewind_err.as_report(),
628                                    "fail to rewind log reader"
629                                );
630                                Err(e)
631                            }
632                        }
633                    } else {
634                        Err(e)
635                    }
636                    .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
637                }
638            };
639            select! {
640                result = future => {
641                    let Err(e): StreamExecutorResult<!> = result;
642                    return Err(e);
643                }
644                result = rebuild_sink_rx.recv() => {
645                    match result.ok_or_else(|| anyhow!("failed to receive rebuild sink notify"))? {
646                        RebuildSinkMessage::RebuildSink(new_vnode, notify) => {
647                            sink_writer_param.vnode_bitmap = Some((*new_vnode).clone());
648                            if notify.send(()).is_err() {
649                                warn!("failed to notify rebuild sink");
650                            }
651                            log_reader.init().await?;
652                        },
653                        RebuildSinkMessage::UpdateConfig(config) => {
654                            if F::ALLOW_REWIND {
655                                match log_reader.rewind().await {
656                                    Ok(()) => {
657                                        sink_param.properties = config.into_iter().collect();
658                                        sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
659                                        info!(
660                                            executor_id = sink_writer_param.executor_id,
661                                            sink_id = sink_param.sink_id.sink_id,
662                                            "alter sink config successfully with rewind"
663                                        );
664                                        Ok(())
665                                    }
666                                    Err(rewind_err) => {
667                                        error!(
668                                            error = %rewind_err.as_report(),
669                                            "fail to rewind log reader for alter sink config "
670                                        );
671                                        Err(anyhow!("fail to rewind log after alter table").into())
672                                    }
673                                }
674                            } else {
675                                sink_param.properties = config.into_iter().collect();
676                                sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
677                                Err(anyhow!("This is not an actual error condition. The system is intentionally triggering recovery procedures to ensure ALTER SINK CONFIG are fully applied.").into())
678                            }
679                            .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
680                        },
681                    }
682                }
683            }
684        }
685    }
686}
687
688enum RebuildSinkMessage {
689    RebuildSink(Arc<Bitmap>, oneshot::Sender<()>),
690    UpdateConfig(HashMap<String, String>),
691}
692
693impl<F: LogStoreFactory> Execute for SinkExecutor<F> {
694    fn execute(self: Box<Self>) -> BoxedMessageStream {
695        self.execute_inner()
696    }
697}
698
699#[cfg(test)]
700mod test {
701    use risingwave_common::catalog::{ColumnDesc, ColumnId};
702    use risingwave_common::util::epoch::test_epoch;
703    use risingwave_connector::sink::build_sink;
704
705    use super::*;
706    use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
707    use crate::executor::test_utils::*;
708
709    #[tokio::test]
710    async fn test_force_append_only_sink() {
711        use risingwave_common::array::StreamChunkTestExt;
712        use risingwave_common::array::stream_chunk::StreamChunk;
713        use risingwave_common::types::DataType;
714
715        use crate::executor::Barrier;
716
717        let properties = maplit::btreemap! {
718            "connector".into() => "blackhole".into(),
719            "type".into() => "append-only".into(),
720            "force_append_only".into() => "true".into()
721        };
722
723        // We have two visible columns and one hidden column. The hidden column will be pruned out
724        // within the sink executor.
725        let columns = vec![
726            ColumnCatalog {
727                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
728                is_hidden: false,
729            },
730            ColumnCatalog {
731                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
732                is_hidden: false,
733            },
734            ColumnCatalog {
735                column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
736                is_hidden: true,
737            },
738        ];
739        let schema: Schema = columns
740            .iter()
741            .map(|column| Field::from(column.column_desc.clone()))
742            .collect();
743        let pk_indices = vec![0];
744
745        let source = MockSource::with_messages(vec![
746            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
747            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
748                " I I I
749                    + 3 2 1",
750            ))),
751            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
752            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
753                "  I I I
754                    U- 3 2 1
755                    U+ 3 4 1
756                     + 5 6 7",
757            ))),
758            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
759                " I I I
760                    - 5 6 7",
761            ))),
762        ])
763        .into_executor(schema.clone(), pk_indices.clone());
764
765        let sink_param = SinkParam {
766            sink_id: 0.into(),
767            sink_name: "test".into(),
768            properties,
769
770            columns: columns
771                .iter()
772                .filter(|col| !col.is_hidden)
773                .map(|col| col.column_desc.clone())
774                .collect(),
775            downstream_pk: pk_indices.clone(),
776            sink_type: SinkType::ForceAppendOnly,
777            format_desc: None,
778            db_name: "test".into(),
779            sink_from_name: "test".into(),
780        };
781
782        let info = ExecutorInfo::new(schema, pk_indices, "SinkExecutor".to_owned(), 0);
783
784        let sink = build_sink(sink_param.clone()).unwrap();
785
786        let sink_executor = SinkExecutor::new(
787            ActorContext::for_test(0),
788            info,
789            source,
790            SinkWriterParam::for_test(),
791            sink,
792            sink_param,
793            columns.clone(),
794            BoundedInMemLogStoreFactory::new(1),
795            1024,
796            vec![DataType::Int32, DataType::Int32, DataType::Int32],
797            None,
798        )
799        .await
800        .unwrap();
801
802        let mut executor = sink_executor.boxed().execute();
803
804        // Barrier message.
805        executor.next().await.unwrap().unwrap();
806
807        let chunk_msg = executor.next().await.unwrap().unwrap();
808        assert_eq!(
809            chunk_msg.into_chunk().unwrap().compact(),
810            StreamChunk::from_pretty(
811                " I I I
812                + 3 2 1",
813            )
814        );
815
816        // Barrier message.
817        executor.next().await.unwrap().unwrap();
818
819        let chunk_msg = executor.next().await.unwrap().unwrap();
820        assert_eq!(
821            chunk_msg.into_chunk().unwrap().compact(),
822            StreamChunk::from_pretty(
823                " I I I
824                + 3 4 1
825                + 5 6 7",
826            )
827        );
828
829        // Should not receive the third stream chunk message because the force-append-only sink
830        // executor will drop all DELETE messages.
831
832        // The last barrier message.
833        executor.next().await.unwrap().unwrap();
834    }
835
836    #[tokio::test]
837    async fn stream_key_sink_pk_mismatch() {
838        use risingwave_common::array::StreamChunkTestExt;
839        use risingwave_common::array::stream_chunk::StreamChunk;
840        use risingwave_common::types::DataType;
841
842        use crate::executor::Barrier;
843
844        let properties = maplit::btreemap! {
845            "connector".into() => "blackhole".into(),
846        };
847
848        // We have two visible columns and one hidden column. The hidden column will be pruned out
849        // within the sink executor.
850        let columns = vec![
851            ColumnCatalog {
852                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
853                is_hidden: false,
854            },
855            ColumnCatalog {
856                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
857                is_hidden: false,
858            },
859            ColumnCatalog {
860                column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
861                is_hidden: true,
862            },
863        ];
864        let schema: Schema = columns
865            .iter()
866            .map(|column| Field::from(column.column_desc.clone()))
867            .collect();
868
869        let source = MockSource::with_messages(vec![
870            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
871            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
872                " I I I
873                    + 1 1 10",
874            ))),
875            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
876            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
877                " I I I
878                    + 1 3 30",
879            ))),
880            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
881                " I I I
882                    + 1 2 20
883                    - 1 2 20",
884            ))),
885            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
886                " I I I
887                    - 1 1 10
888                    + 1 1 40",
889            ))),
890            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
891        ])
892        .into_executor(schema.clone(), vec![0, 1]);
893
894        let sink_param = SinkParam {
895            sink_id: 0.into(),
896            sink_name: "test".into(),
897            properties,
898
899            columns: columns
900                .iter()
901                .filter(|col| !col.is_hidden)
902                .map(|col| col.column_desc.clone())
903                .collect(),
904            downstream_pk: vec![0],
905            sink_type: SinkType::Upsert,
906            format_desc: None,
907            db_name: "test".into(),
908            sink_from_name: "test".into(),
909        };
910
911        let info = ExecutorInfo::new(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
912
913        let sink = build_sink(sink_param.clone()).unwrap();
914
915        let sink_executor = SinkExecutor::new(
916            ActorContext::for_test(0),
917            info,
918            source,
919            SinkWriterParam::for_test(),
920            sink,
921            sink_param,
922            columns.clone(),
923            BoundedInMemLogStoreFactory::new(1),
924            1024,
925            vec![DataType::Int64, DataType::Int64, DataType::Int64],
926            None,
927        )
928        .await
929        .unwrap();
930
931        let mut executor = sink_executor.boxed().execute();
932
933        // Barrier message.
934        executor.next().await.unwrap().unwrap();
935
936        let chunk_msg = executor.next().await.unwrap().unwrap();
937        assert_eq!(
938            chunk_msg.into_chunk().unwrap().compact(),
939            StreamChunk::from_pretty(
940                " I I I
941                + 1 1 10",
942            )
943        );
944
945        // Barrier message.
946        executor.next().await.unwrap().unwrap();
947
948        let chunk_msg = executor.next().await.unwrap().unwrap();
949        assert_eq!(
950            chunk_msg.into_chunk().unwrap().compact(),
951            StreamChunk::from_pretty(
952                " I I I
953                U- 1 1 10
954                U+ 1 1 40",
955            )
956        );
957
958        // The last barrier message.
959        executor.next().await.unwrap().unwrap();
960    }
961
962    #[tokio::test]
963    async fn test_empty_barrier_sink() {
964        use risingwave_common::types::DataType;
965
966        use crate::executor::Barrier;
967
968        let properties = maplit::btreemap! {
969            "connector".into() => "blackhole".into(),
970            "type".into() => "append-only".into(),
971            "force_append_only".into() => "true".into()
972        };
973        let columns = vec![
974            ColumnCatalog {
975                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
976                is_hidden: false,
977            },
978            ColumnCatalog {
979                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
980                is_hidden: false,
981            },
982        ];
983        let schema: Schema = columns
984            .iter()
985            .map(|column| Field::from(column.column_desc.clone()))
986            .collect();
987        let pk_indices = vec![0];
988
989        let source = MockSource::with_messages(vec![
990            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
991            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
992            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
993        ])
994        .into_executor(schema.clone(), pk_indices.clone());
995
996        let sink_param = SinkParam {
997            sink_id: 0.into(),
998            sink_name: "test".into(),
999            properties,
1000
1001            columns: columns
1002                .iter()
1003                .filter(|col| !col.is_hidden)
1004                .map(|col| col.column_desc.clone())
1005                .collect(),
1006            downstream_pk: pk_indices.clone(),
1007            sink_type: SinkType::ForceAppendOnly,
1008            format_desc: None,
1009            db_name: "test".into(),
1010            sink_from_name: "test".into(),
1011        };
1012
1013        let info = ExecutorInfo::new(schema, pk_indices, "SinkExecutor".to_owned(), 0);
1014
1015        let sink = build_sink(sink_param.clone()).unwrap();
1016
1017        let sink_executor = SinkExecutor::new(
1018            ActorContext::for_test(0),
1019            info,
1020            source,
1021            SinkWriterParam::for_test(),
1022            sink,
1023            sink_param,
1024            columns,
1025            BoundedInMemLogStoreFactory::new(1),
1026            1024,
1027            vec![DataType::Int64, DataType::Int64],
1028            None,
1029        )
1030        .await
1031        .unwrap();
1032
1033        let mut executor = sink_executor.boxed().execute();
1034
1035        // Barrier message.
1036        assert_eq!(
1037            executor.next().await.unwrap().unwrap(),
1038            Message::Barrier(Barrier::new_test_barrier(test_epoch(1)))
1039        );
1040
1041        // Barrier message.
1042        assert_eq!(
1043            executor.next().await.unwrap().unwrap(),
1044            Message::Barrier(Barrier::new_test_barrier(test_epoch(2)))
1045        );
1046
1047        // The last barrier message.
1048        assert_eq!(
1049            executor.next().await.unwrap().unwrap(),
1050            Message::Barrier(Barrier::new_test_barrier(test_epoch(3)))
1051        );
1052    }
1053}