risingwave_connector/sink/
trivial.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::marker::PhantomData;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use phf::{Set, phf_set};
20use risingwave_common::session_config::sink_decouple::SinkDecouple;
21use tracing::info;
22
23use crate::enforce_secret::EnforceSecret;
24use crate::sink::log_store::{LogStoreReadItem, TruncateOffset};
25use crate::sink::{LogSinker, Result, Sink, SinkError, SinkLogReader, SinkParam, SinkWriterParam};
26
27pub const BLACKHOLE_SINK: &str = "blackhole";
28pub const TABLE_SINK: &str = "table";
29
30pub trait TrivialSinkType: Send + 'static {
31    /// Whether to enable `trace` log for every item to sink.
32    ///
33    /// Note that logs (tracing events) with `trace` level will be optimized out in release build
34    /// thus cannot be enabled at all. This is for debugging purpose only.
35    const TRACE_LOG: bool;
36    const SINK_NAME: &'static str;
37}
38
39#[derive(Debug)]
40pub struct BlackHole;
41
42impl TrivialSinkType for BlackHole {
43    const SINK_NAME: &'static str = BLACKHOLE_SINK;
44    const TRACE_LOG: bool = true;
45}
46
47pub type BlackHoleSink = TrivialSink<BlackHole>;
48
49#[derive(Debug)]
50pub struct Table;
51
52impl TrivialSinkType for Table {
53    const SINK_NAME: &'static str = TABLE_SINK;
54    const TRACE_LOG: bool = false;
55}
56
57pub type TableSink = TrivialSink<Table>;
58
59#[derive(Debug)]
60pub struct TrivialSink<T: TrivialSinkType> {
61    param: Arc<SinkParam>,
62    _marker: PhantomData<T>,
63}
64
65impl<T: TrivialSinkType> EnforceSecret for TrivialSink<T> {
66    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {};
67}
68
69impl<T: TrivialSinkType> TryFrom<SinkParam> for TrivialSink<T> {
70    type Error = SinkError;
71
72    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
73        Ok(Self {
74            param: Arc::new(param),
75            _marker: PhantomData,
76        })
77    }
78}
79
80impl<T: TrivialSinkType> Sink for TrivialSink<T> {
81    type LogSinker = Self;
82
83    const SINK_NAME: &'static str = T::SINK_NAME;
84
85    /// Enable sink decoupling for sink-into-table.
86    /// Disable sink decoupling for blackhole sink. It introduces overhead without any benefit
87    fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
88        match user_specified {
89            SinkDecouple::Enable => Ok(true),
90            SinkDecouple::Default | SinkDecouple::Disable => Ok(false),
91        }
92    }
93
94    fn support_schema_change() -> bool {
95        true
96    }
97
98    async fn new_log_sinker(&self, _writer_env: SinkWriterParam) -> Result<Self::LogSinker> {
99        Ok(Self {
100            param: self.param.clone(),
101            _marker: PhantomData,
102        })
103    }
104
105    async fn validate(&self) -> Result<()> {
106        Ok(())
107    }
108}
109
110#[async_trait]
111impl<T: TrivialSinkType> LogSinker for TrivialSink<T> {
112    async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
113        let schema = self.param.schema();
114
115        log_reader.start_from(None).await?;
116        loop {
117            let (epoch, item) = log_reader.next_item().await?;
118            match item {
119                LogStoreReadItem::StreamChunk { chunk_id, chunk } => {
120                    if T::TRACE_LOG {
121                        tracing::trace!(
122                            target: "events::sink::message::chunk",
123                            sink_id = %self.param.sink_id,
124                            sink_name = self.param.sink_name,
125                            cardinality = chunk.cardinality(),
126                            capacity = chunk.capacity(),
127                            "\n{}\n", chunk.to_pretty_with_schema(&schema),
128                        );
129                    }
130
131                    log_reader.truncate(TruncateOffset::Chunk { epoch, chunk_id })?;
132                }
133                LogStoreReadItem::Barrier { add_columns, .. } => {
134                    if T::TRACE_LOG {
135                        tracing::trace!(
136                            target: "events::sink::message::barrier",
137                            sink_id = %self.param.sink_id,
138                            sink_name = self.param.sink_name,
139                            epoch,
140                        );
141                    }
142
143                    if let Some(add_columns) = add_columns {
144                        info!(?add_columns, "trivial sink receive add columns");
145                    }
146
147                    log_reader.truncate(TruncateOffset::Barrier { epoch })?;
148                }
149            }
150        }
151    }
152}