risingwave_stream/executor/
sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::mem;
17
18use anyhow::anyhow;
19use futures::stream::select;
20use futures::{FutureExt, TryFutureExt, TryStreamExt};
21use itertools::Itertools;
22use risingwave_common::array::Op;
23use risingwave_common::array::stream_chunk::StreamChunkMut;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::catalog::{ColumnCatalog, Field};
26use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntGauge};
27use risingwave_common_estimate_size::EstimateSize;
28use risingwave_common_estimate_size::collections::EstimatedVec;
29use risingwave_common_rate_limit::RateLimit;
30use risingwave_connector::dispatch_sink;
31use risingwave_connector::sink::catalog::{SinkId, SinkType};
32use risingwave_connector::sink::log_store::{
33    FlushCurrentEpochOptions, LogReader, LogReaderExt, LogReaderMetrics, LogStoreFactory,
34    LogWriter, LogWriterExt, LogWriterMetrics,
35};
36use risingwave_connector::sink::{
37    GLOBAL_SINK_METRICS, LogSinker, Sink, SinkImpl, SinkParam, SinkWriterParam,
38};
39use thiserror_ext::AsReport;
40use tokio::select;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
42use tokio::sync::oneshot;
43
44use crate::common::compact_chunk::{StreamChunkCompactor, merge_chunk_row};
45use crate::executor::prelude::*;
46pub struct SinkExecutor<F: LogStoreFactory> {
47    actor_context: ActorContextRef,
48    info: ExecutorInfo,
49    input: Executor,
50    sink: SinkImpl,
51    input_columns: Vec<ColumnCatalog>,
52    sink_param: SinkParam,
53    log_store_factory: F,
54    sink_writer_param: SinkWriterParam,
55    chunk_size: usize,
56    input_data_types: Vec<DataType>,
57    need_advance_delete: bool,
58    re_construct_with_sink_pk: bool,
59    /// Upstream and Downstream PK matched.
60    pk_matched: bool,
61    compact_chunk: bool,
62    rate_limit: Option<u32>,
63}
64
65// Drop all the DELETE messages in this chunk and convert UPDATE INSERT into INSERT.
66fn force_append_only(c: StreamChunk) -> StreamChunk {
67    let mut c: StreamChunkMut = c.into();
68    for (_, mut r) in c.to_rows_mut() {
69        match r.op() {
70            Op::Insert => {}
71            Op::Delete | Op::UpdateDelete => r.set_vis(false),
72            Op::UpdateInsert => r.set_op(Op::Insert),
73        }
74    }
75    c.into()
76}
77
78// Drop all the INSERT messages in this chunk and convert UPDATE DELETE into DELETE.
79fn force_delete_only(c: StreamChunk) -> StreamChunk {
80    let mut c: StreamChunkMut = c.into();
81    for (_, mut r) in c.to_rows_mut() {
82        match r.op() {
83            Op::Delete => {}
84            Op::Insert | Op::UpdateInsert => r.set_vis(false),
85            Op::UpdateDelete => r.set_op(Op::Delete),
86        }
87    }
88    c.into()
89}
90
91impl<F: LogStoreFactory> SinkExecutor<F> {
92    #[allow(clippy::too_many_arguments)]
93    #[expect(clippy::unused_async)]
94    pub async fn new(
95        actor_context: ActorContextRef,
96        info: ExecutorInfo,
97        input: Executor,
98        sink_writer_param: SinkWriterParam,
99        sink: SinkImpl,
100        sink_param: SinkParam,
101        columns: Vec<ColumnCatalog>,
102        log_store_factory: F,
103        chunk_size: usize,
104        input_data_types: Vec<DataType>,
105        rate_limit: Option<u32>,
106    ) -> StreamExecutorResult<Self> {
107        let sink_input_schema: Schema = columns
108            .iter()
109            .map(|column| Field::from(&column.column_desc))
110            .collect();
111
112        if let Some(col_dix) = sink_writer_param.extra_partition_col_idx {
113            // Remove the partition column from the schema.
114            assert_eq!(sink_input_schema.data_types(), {
115                let mut data_type = info.schema.data_types();
116                data_type.remove(col_dix);
117                data_type
118            });
119        } else {
120            assert_eq!(sink_input_schema.data_types(), info.schema.data_types());
121        }
122
123        let stream_key = info.pk_indices.clone();
124        let stream_key_sink_pk_mismatch = {
125            stream_key
126                .iter()
127                .any(|i| !sink_param.downstream_pk.contains(i))
128        };
129        // When stream key is different from the user defined primary key columns for sinks. The operations could be out of order
130        // stream key: a,b
131        // sink pk: a
132
133        // original:
134        // (1,1) -> (1,2)
135        // (1,2) -> (1,3)
136
137        // mv fragment 1:
138        // delete (1,1)
139
140        // mv fragment 2:
141        // insert (1,2)
142        // delete (1,2)
143
144        // mv fragment 3:
145        // insert (1,3)
146
147        // merge to sink fragment:
148        // insert (1,3)
149        // insert (1,2)
150        // delete (1,2)
151        // delete (1,1)
152        // So we do additional compaction in the sink executor per barrier.
153
154        //     1. compact all the changes with the stream key.
155        //     2. sink all the delete events and then sink all insert events.
156
157        // after compacting with the stream key, the two event with the same user defined sink pk must have different stream key.
158        // So the delete event is not to delete the inserted record in our internal streaming SQL semantic.
159        let need_advance_delete =
160            stream_key_sink_pk_mismatch && sink_param.sink_type != SinkType::AppendOnly;
161        // NOTE(st1page): reconstruct with sink pk need extra cost to buffer a barrier's data, so currently we bind it with mismatch case.
162        let re_construct_with_sink_pk = need_advance_delete
163            && sink_param.sink_type == SinkType::Upsert
164            && !sink_param.downstream_pk.is_empty();
165        let pk_matched = !stream_key_sink_pk_mismatch;
166        // Don't compact chunk for blackhole sink for better benchmark performance.
167        let compact_chunk = !sink.is_blackhole();
168
169        tracing::info!(
170            sink_id = sink_param.sink_id.sink_id,
171            actor_id = actor_context.id,
172            need_advance_delete,
173            re_construct_with_sink_pk,
174            pk_matched,
175            compact_chunk,
176            "Sink executor info"
177        );
178
179        Ok(Self {
180            actor_context,
181            info,
182            input,
183            sink,
184            input_columns: columns,
185            sink_param,
186            log_store_factory,
187            sink_writer_param,
188            chunk_size,
189            input_data_types,
190            need_advance_delete,
191            re_construct_with_sink_pk,
192            pk_matched,
193            compact_chunk,
194            rate_limit,
195        })
196    }
197
198    fn execute_inner(self) -> BoxedMessageStream {
199        let sink_id = self.sink_param.sink_id;
200        let actor_id = self.actor_context.id;
201        let fragment_id = self.actor_context.fragment_id;
202
203        let stream_key = self.info.pk_indices.clone();
204        let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics(
205            sink_id,
206            actor_id,
207            fragment_id,
208        );
209
210        let input = self.input.execute();
211
212        let input = input.inspect_ok(move |msg| {
213            if let Message::Chunk(c) = msg {
214                metrics.sink_input_row_count.inc_by(c.capacity() as u64);
215                metrics.sink_input_bytes.inc_by(c.estimated_size() as u64);
216            }
217        });
218
219        let processed_input = Self::process_msg(
220            input,
221            self.sink_param.sink_type,
222            stream_key,
223            self.need_advance_delete,
224            self.re_construct_with_sink_pk,
225            self.chunk_size,
226            self.input_data_types,
227            self.sink_param.downstream_pk.clone(),
228            metrics.sink_chunk_buffer_size,
229            self.compact_chunk,
230        );
231
232        if self.sink.is_sink_into_table() {
233            // TODO(hzxa21): support rate limit?
234            processed_input.boxed()
235        } else {
236            let labels = [
237                &actor_id.to_string(),
238                &sink_id.to_string(),
239                self.sink_param.sink_name.as_str(),
240            ];
241            let log_store_first_write_epoch = GLOBAL_SINK_METRICS
242                .log_store_first_write_epoch
243                .with_guarded_label_values(&labels);
244            let log_store_latest_write_epoch = GLOBAL_SINK_METRICS
245                .log_store_latest_write_epoch
246                .with_guarded_label_values(&labels);
247            let log_store_write_rows = GLOBAL_SINK_METRICS
248                .log_store_write_rows
249                .with_guarded_label_values(&labels);
250            let log_writer_metrics = LogWriterMetrics {
251                log_store_first_write_epoch,
252                log_store_latest_write_epoch,
253                log_store_write_rows,
254            };
255
256            let (rate_limit_tx, rate_limit_rx) = unbounded_channel();
257            // Init the rate limit
258            rate_limit_tx.send(self.rate_limit.into()).unwrap();
259
260            let (rebuild_sink_tx, rebuild_sink_rx) = unbounded_channel();
261
262            self.log_store_factory
263                .build()
264                .map(move |(log_reader, log_writer)| {
265                    let write_log_stream = Self::execute_write_log(
266                        processed_input,
267                        log_writer.monitored(log_writer_metrics),
268                        actor_id,
269                        sink_id,
270                        rate_limit_tx,
271                        rebuild_sink_tx,
272                    );
273
274                    let consume_log_stream_future = dispatch_sink!(self.sink, sink, {
275                        let consume_log_stream = Self::execute_consume_log(
276                            *sink,
277                            log_reader,
278                            self.input_columns,
279                            self.sink_param,
280                            self.sink_writer_param,
281                            self.pk_matched,
282                            self.actor_context,
283                            rate_limit_rx,
284                            rebuild_sink_rx,
285                        )
286                        .instrument_await(
287                            await_tree::span!("consume_log (sink_id {sink_id})").long_running(),
288                        )
289                        .map_ok(|never| never); // unify return type to `Message`
290
291                        consume_log_stream.boxed()
292                    });
293                    select(consume_log_stream_future.into_stream(), write_log_stream)
294                })
295                .into_stream()
296                .flatten()
297                .boxed()
298        }
299    }
300
301    #[try_stream(ok = Message, error = StreamExecutorError)]
302    async fn execute_write_log<W: LogWriter>(
303        input: impl MessageStream,
304        mut log_writer: W,
305        actor_id: ActorId,
306        sink_id: SinkId,
307        rate_limit_tx: UnboundedSender<RateLimit>,
308        rebuild_sink_tx: UnboundedSender<RebuildSinkMessage>,
309    ) {
310        pin_mut!(input);
311        let barrier = expect_first_barrier(&mut input).await?;
312        let epoch_pair = barrier.epoch;
313        let is_pause_on_startup = barrier.is_pause_on_startup();
314        // Propagate the first barrier
315        yield Message::Barrier(barrier);
316
317        log_writer.init(epoch_pair, is_pause_on_startup).await?;
318
319        let mut is_paused = false;
320
321        #[for_await]
322        for msg in input {
323            match msg? {
324                Message::Watermark(w) => yield Message::Watermark(w),
325                Message::Chunk(chunk) => {
326                    assert!(
327                        !is_paused,
328                        "Actor {actor_id} should not receive any data after pause"
329                    );
330                    log_writer.write_chunk(chunk.clone()).await?;
331                    yield Message::Chunk(chunk);
332                }
333                Message::Barrier(barrier) => {
334                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
335                    let post_flush = log_writer
336                        .flush_current_epoch(
337                            barrier.epoch.curr,
338                            FlushCurrentEpochOptions {
339                                is_checkpoint: barrier.kind.is_checkpoint(),
340                                new_vnode_bitmap: update_vnode_bitmap.clone(),
341                                is_stop: barrier.is_stop(actor_id),
342                            },
343                        )
344                        .await?;
345
346                    let mutation = barrier.mutation.clone();
347                    yield Message::Barrier(barrier);
348                    if F::REBUILD_SINK_ON_UPDATE_VNODE_BITMAP
349                        && let Some(new_vnode_bitmap) = update_vnode_bitmap.clone()
350                    {
351                        let (tx, rx) = oneshot::channel();
352                        rebuild_sink_tx
353                            .send(RebuildSinkMessage::RebuildSink(new_vnode_bitmap, tx))
354                            .map_err(|_| anyhow!("fail to send rebuild sink to reader"))?;
355                        rx.await
356                            .map_err(|_| anyhow!("fail to wait rebuild sink finish"))?;
357                    }
358                    post_flush.post_yield_barrier().await?;
359
360                    if let Some(mutation) = mutation.as_deref() {
361                        match mutation {
362                            Mutation::Pause => {
363                                log_writer.pause()?;
364                                is_paused = true;
365                            }
366                            Mutation::Resume => {
367                                log_writer.resume()?;
368                                is_paused = false;
369                            }
370                            Mutation::Throttle(actor_to_apply) => {
371                                if let Some(new_rate_limit) = actor_to_apply.get(&actor_id) {
372                                    tracing::info!(
373                                        rate_limit = new_rate_limit,
374                                        "received sink rate limit on actor {actor_id}"
375                                    );
376                                    if let Err(e) = rate_limit_tx.send((*new_rate_limit).into()) {
377                                        error!(
378                                            error = %e.as_report(),
379                                            "fail to send sink ate limit update"
380                                        );
381                                        return Err(StreamExecutorError::from(
382                                            e.to_report_string(),
383                                        ));
384                                    }
385                                }
386                            }
387                            Mutation::ConnectorPropsChange(config) => {
388                                if let Some(map) = config.get(&sink_id.sink_id)
389                                    && let Err(e) = rebuild_sink_tx
390                                        .send(RebuildSinkMessage::UpdateConfig(map.clone()))
391                                {
392                                    error!(
393                                        error = %e.as_report(),
394                                        "fail to send sink alter props"
395                                    );
396                                    return Err(StreamExecutorError::from(e.to_report_string()));
397                                }
398                            }
399                            _ => (),
400                        }
401                    }
402                }
403            }
404        }
405    }
406
407    #[allow(clippy::too_many_arguments)]
408    #[try_stream(ok = Message, error = StreamExecutorError)]
409    async fn process_msg(
410        input: impl MessageStream,
411        sink_type: SinkType,
412        stream_key: PkIndices,
413        need_advance_delete: bool,
414        re_construct_with_sink_pk: bool,
415        chunk_size: usize,
416        input_data_types: Vec<DataType>,
417        down_stream_pk: Vec<usize>,
418        sink_chunk_buffer_size_metrics: LabelGuardedIntGauge,
419        compact_chunk: bool,
420    ) {
421        // need to buffer chunks during one barrier
422        if need_advance_delete || re_construct_with_sink_pk {
423            let mut chunk_buffer = EstimatedVec::new();
424            let mut watermark: Option<super::Watermark> = None;
425            #[for_await]
426            for msg in input {
427                match msg? {
428                    Message::Watermark(w) => watermark = Some(w),
429                    Message::Chunk(c) => {
430                        chunk_buffer.push(c);
431
432                        sink_chunk_buffer_size_metrics.set(chunk_buffer.estimated_size() as i64);
433                    }
434                    Message::Barrier(barrier) => {
435                        let chunks = mem::take(&mut chunk_buffer).into_inner();
436                        let chunks = if need_advance_delete {
437                            let mut delete_chunks = vec![];
438                            let mut insert_chunks = vec![];
439
440                            for c in StreamChunkCompactor::new(stream_key.clone(), chunks)
441                                .into_compacted_chunks()
442                            {
443                                if sink_type != SinkType::ForceAppendOnly {
444                                    // Force append-only by dropping UPDATE/DELETE messages. We do this when the
445                                    // user forces the sink to be append-only while it is actually not based on
446                                    // the frontend derivation result.
447                                    let chunk = force_delete_only(c.clone());
448                                    if chunk.cardinality() > 0 {
449                                        delete_chunks.push(chunk);
450                                    }
451                                }
452                                let chunk = force_append_only(c);
453                                if chunk.cardinality() > 0 {
454                                    insert_chunks.push(chunk);
455                                }
456                            }
457                            delete_chunks
458                                .into_iter()
459                                .chain(insert_chunks.into_iter())
460                                .collect()
461                        } else {
462                            chunks
463                        };
464                        if re_construct_with_sink_pk {
465                            let chunks = StreamChunkCompactor::new(down_stream_pk.clone(), chunks)
466                                .reconstructed_compacted_chunks(
467                                    chunk_size,
468                                    input_data_types.clone(),
469                                    sink_type != SinkType::ForceAppendOnly,
470                                );
471                            for c in chunks {
472                                yield Message::Chunk(c);
473                            }
474                        } else {
475                            let mut chunk_builder =
476                                StreamChunkBuilder::new(chunk_size, input_data_types.clone());
477                            for chunk in chunks {
478                                for (op, row) in chunk.rows() {
479                                    if let Some(c) = chunk_builder.append_row(op, row) {
480                                        yield Message::Chunk(c);
481                                    }
482                                }
483                            }
484
485                            if let Some(c) = chunk_builder.take() {
486                                yield Message::Chunk(c);
487                            }
488                        };
489
490                        if let Some(w) = mem::take(&mut watermark) {
491                            yield Message::Watermark(w)
492                        }
493                        yield Message::Barrier(barrier);
494                    }
495                }
496            }
497        } else {
498            #[for_await]
499            for msg in input {
500                match msg? {
501                    Message::Watermark(w) => yield Message::Watermark(w),
502                    Message::Chunk(mut chunk) => {
503                        // Compact the chunk to eliminate any useless intermediate result (e.g. UPDATE
504                        // V->V).
505                        if compact_chunk {
506                            chunk = merge_chunk_row(chunk, &stream_key);
507                        }
508                        match sink_type {
509                            SinkType::AppendOnly => yield Message::Chunk(chunk),
510                            SinkType::ForceAppendOnly => {
511                                // Force append-only by dropping UPDATE/DELETE messages. We do this when the
512                                // user forces the sink to be append-only while it is actually not based on
513                                // the frontend derivation result.
514                                yield Message::Chunk(force_append_only(chunk))
515                            }
516                            SinkType::Upsert => yield Message::Chunk(chunk),
517                        }
518                    }
519                    Message::Barrier(barrier) => {
520                        yield Message::Barrier(barrier);
521                    }
522                }
523            }
524        }
525    }
526
527    #[expect(clippy::too_many_arguments)]
528    async fn execute_consume_log<S: Sink, R: LogReader>(
529        mut sink: S,
530        log_reader: R,
531        columns: Vec<ColumnCatalog>,
532        mut sink_param: SinkParam,
533        mut sink_writer_param: SinkWriterParam,
534        pk_matched: bool,
535        actor_context: ActorContextRef,
536        rate_limit_rx: UnboundedReceiver<RateLimit>,
537        mut rebuild_sink_rx: UnboundedReceiver<RebuildSinkMessage>,
538    ) -> StreamExecutorResult<!> {
539        let visible_columns = columns
540            .iter()
541            .enumerate()
542            .filter_map(|(idx, column)| (!column.is_hidden).then_some(idx))
543            .collect_vec();
544
545        let labels = [
546            &actor_context.id.to_string(),
547            sink_writer_param.connector.as_str(),
548            &sink_writer_param.sink_id.to_string(),
549            sink_writer_param.sink_name.as_str(),
550        ];
551        let log_store_reader_wait_new_future_duration_ns = GLOBAL_SINK_METRICS
552            .log_store_reader_wait_new_future_duration_ns
553            .with_guarded_label_values(&labels);
554        let log_store_read_rows = GLOBAL_SINK_METRICS
555            .log_store_read_rows
556            .with_guarded_label_values(&labels);
557        let log_store_read_bytes = GLOBAL_SINK_METRICS
558            .log_store_read_bytes
559            .with_guarded_label_values(&labels);
560        let log_store_latest_read_epoch = GLOBAL_SINK_METRICS
561            .log_store_latest_read_epoch
562            .with_guarded_label_values(&labels);
563        let metrics = LogReaderMetrics {
564            log_store_latest_read_epoch,
565            log_store_read_rows,
566            log_store_read_bytes,
567            log_store_reader_wait_new_future_duration_ns,
568        };
569
570        let downstream_pk = sink_param.downstream_pk.clone();
571
572        let mut log_reader = log_reader
573            .transform_chunk(move |chunk| {
574                // NOTE(kwannoel):
575                // After the optimization in https://github.com/risingwavelabs/risingwave/pull/12250,
576                // `DELETE`s will be sequenced before `INSERT`s in JDBC sinks and PG rust sink.
577                // There's a risk that adjacent chunks with `DELETE`s on the same PK will get
578                // merged into a single chunk, since the logstore format doesn't preserve chunk
579                // boundaries. Then we will have double `DELETE`s followed by unspecified sequence
580                // of `INSERT`s, and lead to inconsistent data downstream.
581                //
582                // We only need to do the compaction for upsert sinks, when the upstream and
583                // downstream PKs are matched. When the upstream and downstream PKs are not matched,
584                // we will buffer the chunks between two barriers, so the compaction is not needed,
585                // since the barriers will preserve chunk boundaries.
586                //
587                // When the sink is not an upsert sink, it is either `force_append_only` or
588                // `append_only`, we should only append to downstream, so there should not be any
589                // overlapping keys.
590                let chunk = if pk_matched && matches!(sink_param.sink_type, SinkType::Upsert) {
591                    merge_chunk_row(chunk, &downstream_pk)
592                } else {
593                    chunk
594                };
595                if visible_columns.len() != columns.len() {
596                    // Do projection here because we may have columns that aren't visible to
597                    // the downstream.
598                    chunk.project(&visible_columns)
599                } else {
600                    chunk
601                }
602            })
603            .monitored(metrics)
604            .rate_limited(rate_limit_rx);
605
606        log_reader.init().await?;
607        loop {
608            let future = async {
609                loop {
610                    let Err(e) = sink
611                        .new_log_sinker(sink_writer_param.clone())
612                        .and_then(|log_sinker| log_sinker.consume_log_and_sink(&mut log_reader))
613                        .await;
614                    GLOBAL_ERROR_METRICS.user_sink_error.report([
615                        "sink_executor_error".to_owned(),
616                        sink_param.sink_id.to_string(),
617                        sink_param.sink_name.clone(),
618                        actor_context.fragment_id.to_string(),
619                    ]);
620
621                    if let Some(meta_client) = sink_writer_param.meta_client.as_ref() {
622                        meta_client
623                            .add_sink_fail_evet_log(
624                                sink_writer_param.sink_id.sink_id,
625                                sink_writer_param.sink_name.clone(),
626                                sink_writer_param.connector.clone(),
627                                e.to_report_string(),
628                            )
629                            .await;
630                    }
631
632                    if F::ALLOW_REWIND {
633                        match log_reader.rewind().await {
634                            Ok(()) => {
635                                warn!(
636                                    error = %e.as_report(),
637                                    executor_id = sink_writer_param.executor_id,
638                                    sink_id = sink_param.sink_id.sink_id,
639                                    "reset log reader stream successfully after sink error"
640                                );
641                                Ok(())
642                            }
643                            Err(rewind_err) => {
644                                error!(
645                                    error = %rewind_err.as_report(),
646                                    "fail to rewind log reader"
647                                );
648                                Err(e)
649                            }
650                        }
651                    } else {
652                        Err(e)
653                    }
654                    .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
655                }
656            };
657            select! {
658                result = future => {
659                    let Err(e): StreamExecutorResult<!> = result;
660                    return Err(e);
661                }
662                result = rebuild_sink_rx.recv() => {
663                    match result.ok_or_else(|| anyhow!("failed to receive rebuild sink notify"))? {
664                        RebuildSinkMessage::RebuildSink(new_vnode, notify) => {
665                            sink_writer_param.vnode_bitmap = Some((*new_vnode).clone());
666                            if notify.send(()).is_err() {
667                                warn!("failed to notify rebuild sink");
668                            }
669                            log_reader.init().await?;
670                        },
671                        RebuildSinkMessage::UpdateConfig(config) => {
672                            if F::ALLOW_REWIND {
673                                match log_reader.rewind().await {
674                                    Ok(()) => {
675                                        sink_param.properties = config.into_iter().collect();
676                                        sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
677                                        info!(
678                                            executor_id = sink_writer_param.executor_id,
679                                            sink_id = sink_param.sink_id.sink_id,
680                                            "alter sink config successfully with rewind"
681                                        );
682                                        Ok(())
683                                    }
684                                    Err(rewind_err) => {
685                                        error!(
686                                            error = %rewind_err.as_report(),
687                                            "fail to rewind log reader for alter sink config "
688                                        );
689                                        Err(anyhow!("fail to rewind log after alter table").into())
690                                    }
691                                }
692                            } else {
693                                sink_param.properties = config.into_iter().collect();
694                                sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
695                                Err(anyhow!("This is not an actual error condition. The system is intentionally triggering recovery procedures to ensure ALTER SINK CONFIG are fully applied.").into())
696                            }
697                            .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
698                        },
699                    }
700                }
701            }
702        }
703    }
704}
705
706enum RebuildSinkMessage {
707    RebuildSink(Arc<Bitmap>, oneshot::Sender<()>),
708    UpdateConfig(HashMap<String, String>),
709}
710
711impl<F: LogStoreFactory> Execute for SinkExecutor<F> {
712    fn execute(self: Box<Self>) -> BoxedMessageStream {
713        self.execute_inner()
714    }
715}
716
717#[cfg(test)]
718mod test {
719    use risingwave_common::catalog::{ColumnDesc, ColumnId};
720    use risingwave_common::util::epoch::test_epoch;
721    use risingwave_connector::sink::build_sink;
722
723    use super::*;
724    use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
725    use crate::executor::test_utils::*;
726
727    #[tokio::test]
728    async fn test_force_append_only_sink() {
729        use risingwave_common::array::StreamChunkTestExt;
730        use risingwave_common::array::stream_chunk::StreamChunk;
731        use risingwave_common::types::DataType;
732
733        use crate::executor::Barrier;
734
735        let properties = maplit::btreemap! {
736            "connector".into() => "blackhole".into(),
737            "type".into() => "append-only".into(),
738            "force_append_only".into() => "true".into()
739        };
740
741        // We have two visible columns and one hidden column. The hidden column will be pruned out
742        // within the sink executor.
743        let columns = vec![
744            ColumnCatalog {
745                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
746                is_hidden: false,
747            },
748            ColumnCatalog {
749                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
750                is_hidden: false,
751            },
752            ColumnCatalog {
753                column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
754                is_hidden: true,
755            },
756        ];
757        let schema: Schema = columns
758            .iter()
759            .map(|column| Field::from(column.column_desc.clone()))
760            .collect();
761        let pk_indices = vec![0];
762
763        let source = MockSource::with_messages(vec![
764            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
765            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
766                " I I I
767                    + 3 2 1",
768            ))),
769            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
770            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
771                "  I I I
772                    U- 3 2 1
773                    U+ 3 4 1
774                     + 5 6 7",
775            ))),
776            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
777                " I I I
778                    - 5 6 7",
779            ))),
780        ])
781        .into_executor(schema.clone(), pk_indices.clone());
782
783        let sink_param = SinkParam {
784            sink_id: 0.into(),
785            sink_name: "test".into(),
786            properties,
787
788            columns: columns
789                .iter()
790                .filter(|col| !col.is_hidden)
791                .map(|col| col.column_desc.clone())
792                .collect(),
793            downstream_pk: pk_indices.clone(),
794            sink_type: SinkType::ForceAppendOnly,
795            format_desc: None,
796            db_name: "test".into(),
797            sink_from_name: "test".into(),
798        };
799
800        let info = ExecutorInfo::new(schema, pk_indices, "SinkExecutor".to_owned(), 0);
801
802        let sink = build_sink(sink_param.clone()).unwrap();
803
804        let sink_executor = SinkExecutor::new(
805            ActorContext::for_test(0),
806            info,
807            source,
808            SinkWriterParam::for_test(),
809            sink,
810            sink_param,
811            columns.clone(),
812            BoundedInMemLogStoreFactory::new(1),
813            1024,
814            vec![DataType::Int32, DataType::Int32, DataType::Int32],
815            None,
816        )
817        .await
818        .unwrap();
819
820        let mut executor = sink_executor.boxed().execute();
821
822        // Barrier message.
823        executor.next().await.unwrap().unwrap();
824
825        let chunk_msg = executor.next().await.unwrap().unwrap();
826        assert_eq!(
827            chunk_msg.into_chunk().unwrap().compact(),
828            StreamChunk::from_pretty(
829                " I I I
830                + 3 2 1",
831            )
832        );
833
834        // Barrier message.
835        executor.next().await.unwrap().unwrap();
836
837        let chunk_msg = executor.next().await.unwrap().unwrap();
838        assert_eq!(
839            chunk_msg.into_chunk().unwrap().compact(),
840            StreamChunk::from_pretty(
841                " I I I
842                + 3 4 1
843                + 5 6 7",
844            )
845        );
846
847        // Should not receive the third stream chunk message because the force-append-only sink
848        // executor will drop all DELETE messages.
849
850        // The last barrier message.
851        executor.next().await.unwrap().unwrap();
852    }
853
854    #[tokio::test]
855    async fn stream_key_sink_pk_mismatch() {
856        use risingwave_common::array::StreamChunkTestExt;
857        use risingwave_common::array::stream_chunk::StreamChunk;
858        use risingwave_common::types::DataType;
859
860        use crate::executor::Barrier;
861
862        let properties = maplit::btreemap! {
863            "connector".into() => "blackhole".into(),
864        };
865
866        // We have two visible columns and one hidden column. The hidden column will be pruned out
867        // within the sink executor.
868        let columns = vec![
869            ColumnCatalog {
870                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
871                is_hidden: false,
872            },
873            ColumnCatalog {
874                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
875                is_hidden: false,
876            },
877            ColumnCatalog {
878                column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
879                is_hidden: true,
880            },
881        ];
882        let schema: Schema = columns
883            .iter()
884            .map(|column| Field::from(column.column_desc.clone()))
885            .collect();
886
887        let source = MockSource::with_messages(vec![
888            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
889            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
890                " I I I
891                    + 1 1 10",
892            ))),
893            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
894            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
895                " I I I
896                    + 1 3 30",
897            ))),
898            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
899                " I I I
900                    + 1 2 20
901                    - 1 2 20",
902            ))),
903            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
904                " I I I
905                    - 1 1 10
906                    + 1 1 40",
907            ))),
908            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
909        ])
910        .into_executor(schema.clone(), vec![0, 1]);
911
912        let sink_param = SinkParam {
913            sink_id: 0.into(),
914            sink_name: "test".into(),
915            properties,
916
917            columns: columns
918                .iter()
919                .filter(|col| !col.is_hidden)
920                .map(|col| col.column_desc.clone())
921                .collect(),
922            downstream_pk: vec![0],
923            sink_type: SinkType::Upsert,
924            format_desc: None,
925            db_name: "test".into(),
926            sink_from_name: "test".into(),
927        };
928
929        let info = ExecutorInfo::new(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
930
931        let sink = build_sink(sink_param.clone()).unwrap();
932
933        let sink_executor = SinkExecutor::new(
934            ActorContext::for_test(0),
935            info,
936            source,
937            SinkWriterParam::for_test(),
938            sink,
939            sink_param,
940            columns.clone(),
941            BoundedInMemLogStoreFactory::new(1),
942            1024,
943            vec![DataType::Int64, DataType::Int64, DataType::Int64],
944            None,
945        )
946        .await
947        .unwrap();
948
949        let mut executor = sink_executor.boxed().execute();
950
951        // Barrier message.
952        executor.next().await.unwrap().unwrap();
953
954        let chunk_msg = executor.next().await.unwrap().unwrap();
955        assert_eq!(
956            chunk_msg.into_chunk().unwrap().compact(),
957            StreamChunk::from_pretty(
958                " I I I
959                + 1 1 10",
960            )
961        );
962
963        // Barrier message.
964        executor.next().await.unwrap().unwrap();
965
966        let chunk_msg = executor.next().await.unwrap().unwrap();
967        assert_eq!(
968            chunk_msg.into_chunk().unwrap().compact(),
969            StreamChunk::from_pretty(
970                " I I I
971                U- 1 1 10
972                U+ 1 1 40",
973            )
974        );
975
976        // The last barrier message.
977        executor.next().await.unwrap().unwrap();
978    }
979
980    #[tokio::test]
981    async fn test_empty_barrier_sink() {
982        use risingwave_common::types::DataType;
983
984        use crate::executor::Barrier;
985
986        let properties = maplit::btreemap! {
987            "connector".into() => "blackhole".into(),
988            "type".into() => "append-only".into(),
989            "force_append_only".into() => "true".into()
990        };
991        let columns = vec![
992            ColumnCatalog {
993                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
994                is_hidden: false,
995            },
996            ColumnCatalog {
997                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
998                is_hidden: false,
999            },
1000        ];
1001        let schema: Schema = columns
1002            .iter()
1003            .map(|column| Field::from(column.column_desc.clone()))
1004            .collect();
1005        let pk_indices = vec![0];
1006
1007        let source = MockSource::with_messages(vec![
1008            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1009            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1010            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1011        ])
1012        .into_executor(schema.clone(), pk_indices.clone());
1013
1014        let sink_param = SinkParam {
1015            sink_id: 0.into(),
1016            sink_name: "test".into(),
1017            properties,
1018
1019            columns: columns
1020                .iter()
1021                .filter(|col| !col.is_hidden)
1022                .map(|col| col.column_desc.clone())
1023                .collect(),
1024            downstream_pk: pk_indices.clone(),
1025            sink_type: SinkType::ForceAppendOnly,
1026            format_desc: None,
1027            db_name: "test".into(),
1028            sink_from_name: "test".into(),
1029        };
1030
1031        let info = ExecutorInfo::new(schema, pk_indices, "SinkExecutor".to_owned(), 0);
1032
1033        let sink = build_sink(sink_param.clone()).unwrap();
1034
1035        let sink_executor = SinkExecutor::new(
1036            ActorContext::for_test(0),
1037            info,
1038            source,
1039            SinkWriterParam::for_test(),
1040            sink,
1041            sink_param,
1042            columns,
1043            BoundedInMemLogStoreFactory::new(1),
1044            1024,
1045            vec![DataType::Int64, DataType::Int64],
1046            None,
1047        )
1048        .await
1049        .unwrap();
1050
1051        let mut executor = sink_executor.boxed().execute();
1052
1053        // Barrier message.
1054        assert_eq!(
1055            executor.next().await.unwrap().unwrap(),
1056            Message::Barrier(Barrier::new_test_barrier(test_epoch(1)))
1057        );
1058
1059        // Barrier message.
1060        assert_eq!(
1061            executor.next().await.unwrap().unwrap(),
1062            Message::Barrier(Barrier::new_test_barrier(test_epoch(2)))
1063        );
1064
1065        // The last barrier message.
1066        assert_eq!(
1067            executor.next().await.unwrap().unwrap(),
1068            Message::Barrier(Barrier::new_test_barrier(test_epoch(3)))
1069        );
1070    }
1071}