risingwave_connector/sink/
trivial.rs1use std::marker::PhantomData;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use phf::{Set, phf_set};
20use risingwave_common::session_config::sink_decouple::SinkDecouple;
21use tracing::info;
22
23use crate::enforce_secret::EnforceSecret;
24use crate::sink::log_store::{LogStoreReadItem, TruncateOffset};
25use crate::sink::{LogSinker, Result, Sink, SinkError, SinkLogReader, SinkParam, SinkWriterParam};
26
27pub const BLACKHOLE_SINK: &str = "blackhole";
28pub const TABLE_SINK: &str = "table";
29
30pub trait TrivialSinkType: Send + 'static {
31    const TRACE_LOG: bool;
36    const SINK_NAME: &'static str;
37}
38
39#[derive(Debug)]
40pub struct BlackHole;
41
42impl TrivialSinkType for BlackHole {
43    const SINK_NAME: &'static str = BLACKHOLE_SINK;
44    const TRACE_LOG: bool = true;
45}
46
47pub type BlackHoleSink = TrivialSink<BlackHole>;
48
49#[derive(Debug)]
50pub struct Table;
51
52impl TrivialSinkType for Table {
53    const SINK_NAME: &'static str = TABLE_SINK;
54    const TRACE_LOG: bool = false;
55}
56
57pub type TableSink = TrivialSink<Table>;
58
59#[derive(Debug)]
60pub struct TrivialSink<T: TrivialSinkType> {
61    param: Arc<SinkParam>,
62    _marker: PhantomData<T>,
63}
64
65impl<T: TrivialSinkType> EnforceSecret for TrivialSink<T> {
66    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {};
67}
68
69impl<T: TrivialSinkType> TryFrom<SinkParam> for TrivialSink<T> {
70    type Error = SinkError;
71
72    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
73        Ok(Self {
74            param: Arc::new(param),
75            _marker: PhantomData,
76        })
77    }
78}
79
80impl<T: TrivialSinkType> Sink for TrivialSink<T> {
81    type LogSinker = Self;
82
83    const SINK_NAME: &'static str = T::SINK_NAME;
84
85    fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
88        match user_specified {
89            SinkDecouple::Enable => Ok(true),
90            SinkDecouple::Default | SinkDecouple::Disable => Ok(false),
91        }
92    }
93
94    fn support_schema_change() -> bool {
95        true
96    }
97
98    async fn new_log_sinker(&self, _writer_env: SinkWriterParam) -> Result<Self::LogSinker> {
99        Ok(Self {
100            param: self.param.clone(),
101            _marker: PhantomData,
102        })
103    }
104
105    async fn validate(&self) -> Result<()> {
106        Ok(())
107    }
108}
109
110#[async_trait]
111impl<T: TrivialSinkType> LogSinker for TrivialSink<T> {
112    async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
113        let schema = self.param.schema();
114
115        log_reader.start_from(None).await?;
116        loop {
117            let (epoch, item) = log_reader.next_item().await?;
118            match item {
119                LogStoreReadItem::StreamChunk { chunk_id, chunk } => {
120                    if T::TRACE_LOG {
121                        tracing::trace!(
122                            target: "events::sink::message::chunk",
123                            sink_id = %self.param.sink_id,
124                            sink_name = self.param.sink_name,
125                            cardinality = chunk.cardinality(),
126                            capacity = chunk.capacity(),
127                            "\n{}\n", chunk.to_pretty_with_schema(&schema),
128                        );
129                    }
130
131                    log_reader.truncate(TruncateOffset::Chunk { epoch, chunk_id })?;
132                }
133                LogStoreReadItem::Barrier { add_columns, .. } => {
134                    if T::TRACE_LOG {
135                        tracing::trace!(
136                            target: "events::sink::message::barrier",
137                            sink_id = %self.param.sink_id,
138                            sink_name = self.param.sink_name,
139                            epoch,
140                        );
141                    }
142
143                    if let Some(add_columns) = add_columns {
144                        info!(?add_columns, "trivial sink receive add columns");
145                    }
146
147                    log_reader.truncate(TruncateOffset::Barrier { epoch })?;
148                }
149            }
150        }
151    }
152}