risingwave_connector/sink/
trivial.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::marker::PhantomData;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use phf::{Set, phf_set};
20use risingwave_common::session_config::sink_decouple::SinkDecouple;
21
22use crate::enforce_secret::EnforceSecret;
23use crate::sink::log_store::{LogStoreReadItem, TruncateOffset};
24use crate::sink::{LogSinker, Result, Sink, SinkError, SinkLogReader, SinkParam, SinkWriterParam};
25
26pub const BLACKHOLE_SINK: &str = "blackhole";
27pub const TABLE_SINK: &str = "table";
28
29pub trait TrivialSinkType: Send + 'static {
30    /// Whether to enable `trace` log for every item to sink.
31    ///
32    /// Note that logs (tracing events) with `trace` level will be optimized out in release build
33    /// thus cannot be enabled at all. This is for debugging purpose only.
34    const TRACE_LOG: bool;
35    const SINK_NAME: &'static str;
36}
37
38#[derive(Debug)]
39pub struct BlackHole;
40
41impl TrivialSinkType for BlackHole {
42    const SINK_NAME: &'static str = BLACKHOLE_SINK;
43    const TRACE_LOG: bool = true;
44}
45
46pub type BlackHoleSink = TrivialSink<BlackHole>;
47
48#[derive(Debug)]
49pub struct Table;
50
51impl TrivialSinkType for Table {
52    const SINK_NAME: &'static str = TABLE_SINK;
53    const TRACE_LOG: bool = false;
54}
55
56pub type TableSink = TrivialSink<Table>;
57
58#[derive(Debug)]
59pub struct TrivialSink<T: TrivialSinkType> {
60    param: Arc<SinkParam>,
61    _marker: PhantomData<T>,
62}
63
64impl<T: TrivialSinkType> EnforceSecret for TrivialSink<T> {
65    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {};
66}
67
68impl<T: TrivialSinkType> TryFrom<SinkParam> for TrivialSink<T> {
69    type Error = SinkError;
70
71    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
72        Ok(Self {
73            param: Arc::new(param),
74            _marker: PhantomData,
75        })
76    }
77}
78
79impl<T: TrivialSinkType> Sink for TrivialSink<T> {
80    type LogSinker = Self;
81
82    const SINK_NAME: &'static str = T::SINK_NAME;
83
84    /// Enable sink decoupling for sink-into-table.
85    /// Disable sink decoupling for blackhole sink. It introduces overhead without any benefit
86    fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
87        match user_specified {
88            SinkDecouple::Enable => Ok(true),
89            SinkDecouple::Default | SinkDecouple::Disable => Ok(false),
90        }
91    }
92
93    fn support_schema_change() -> bool {
94        true
95    }
96
97    async fn new_log_sinker(&self, _writer_env: SinkWriterParam) -> Result<Self::LogSinker> {
98        Ok(Self {
99            param: self.param.clone(),
100            _marker: PhantomData,
101        })
102    }
103
104    async fn validate(&self) -> Result<()> {
105        Ok(())
106    }
107}
108
109#[async_trait]
110impl<T: TrivialSinkType> LogSinker for TrivialSink<T> {
111    async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
112        let schema = self.param.schema();
113
114        log_reader.start_from(None).await?;
115        loop {
116            let (epoch, item) = log_reader.next_item().await?;
117            match item {
118                LogStoreReadItem::StreamChunk { chunk_id, chunk } => {
119                    if T::TRACE_LOG {
120                        tracing::trace!(
121                            target: "events::sink::message::chunk",
122                            sink_id = %self.param.sink_id,
123                            sink_name = self.param.sink_name,
124                            cardinality = chunk.cardinality(),
125                            capacity = chunk.capacity(),
126                            "\n{}\n", chunk.to_pretty_with_schema(&schema),
127                        );
128                    }
129
130                    log_reader.truncate(TruncateOffset::Chunk { epoch, chunk_id })?;
131                }
132                LogStoreReadItem::Barrier { .. } => {
133                    if T::TRACE_LOG {
134                        tracing::trace!(
135                            target: "events::sink::message::barrier",
136                            sink_id = %self.param.sink_id,
137                            sink_name = self.param.sink_name,
138                            epoch,
139                        );
140                    }
141
142                    log_reader.truncate(TruncateOffset::Barrier { epoch })?;
143                }
144            }
145        }
146    }
147}