risingwave_connector/sink/
trivial.rs1use std::marker::PhantomData;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use phf::{Set, phf_set};
20use risingwave_common::session_config::sink_decouple::SinkDecouple;
21use tracing::info;
22
23use crate::enforce_secret::EnforceSecret;
24use crate::sink::log_store::{LogStoreReadItem, TruncateOffset};
25use crate::sink::{LogSinker, Result, Sink, SinkError, SinkLogReader, SinkParam, SinkWriterParam};
26
27pub const BLACKHOLE_SINK: &str = "blackhole";
28pub const TABLE_SINK: &str = "table";
29
30pub trait TrivialSinkType: Send + 'static {
31 const TRACE_LOG: bool;
36 const SINK_NAME: &'static str;
37}
38
39#[derive(Debug)]
40pub struct BlackHole;
41
42impl TrivialSinkType for BlackHole {
43 const SINK_NAME: &'static str = BLACKHOLE_SINK;
44 const TRACE_LOG: bool = true;
45}
46
47pub type BlackHoleSink = TrivialSink<BlackHole>;
48
49#[derive(Debug)]
50pub struct Table;
51
52impl TrivialSinkType for Table {
53 const SINK_NAME: &'static str = TABLE_SINK;
54 const TRACE_LOG: bool = false;
55}
56
57pub type TableSink = TrivialSink<Table>;
58
59#[derive(Debug)]
60pub struct TrivialSink<T: TrivialSinkType> {
61 param: Arc<SinkParam>,
62 _marker: PhantomData<T>,
63}
64
65impl<T: TrivialSinkType> EnforceSecret for TrivialSink<T> {
66 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {};
67}
68
69impl<T: TrivialSinkType> TryFrom<SinkParam> for TrivialSink<T> {
70 type Error = SinkError;
71
72 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
73 Ok(Self {
74 param: Arc::new(param),
75 _marker: PhantomData,
76 })
77 }
78}
79
80impl<T: TrivialSinkType> Sink for TrivialSink<T> {
81 type LogSinker = Self;
82
83 const SINK_NAME: &'static str = T::SINK_NAME;
84
85 fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
88 match user_specified {
89 SinkDecouple::Enable => Ok(true),
90 SinkDecouple::Default | SinkDecouple::Disable => Ok(false),
91 }
92 }
93
94 fn support_schema_change() -> bool {
95 true
96 }
97
98 async fn new_log_sinker(&self, _writer_env: SinkWriterParam) -> Result<Self::LogSinker> {
99 Ok(Self {
100 param: self.param.clone(),
101 _marker: PhantomData,
102 })
103 }
104
105 async fn validate(&self) -> Result<()> {
106 Ok(())
107 }
108}
109
110#[async_trait]
111impl<T: TrivialSinkType> LogSinker for TrivialSink<T> {
112 async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
113 let schema = self.param.schema();
114
115 log_reader.start_from(None).await?;
116 loop {
117 let (epoch, item) = log_reader.next_item().await?;
118 match item {
119 LogStoreReadItem::StreamChunk { chunk_id, chunk } => {
120 if T::TRACE_LOG {
121 tracing::trace!(
122 target: "events::sink::message::chunk",
123 sink_id = %self.param.sink_id,
124 sink_name = self.param.sink_name,
125 cardinality = chunk.cardinality(),
126 capacity = chunk.capacity(),
127 "\n{}\n", chunk.to_pretty_with_schema(&schema),
128 );
129 }
130
131 log_reader.truncate(TruncateOffset::Chunk { epoch, chunk_id })?;
132 }
133 LogStoreReadItem::Barrier { add_columns, .. } => {
134 if T::TRACE_LOG {
135 tracing::trace!(
136 target: "events::sink::message::barrier",
137 sink_id = %self.param.sink_id,
138 sink_name = self.param.sink_name,
139 epoch,
140 );
141 }
142
143 if let Some(add_columns) = add_columns {
144 info!(?add_columns, "trivial sink receive add columns");
145 }
146
147 log_reader.truncate(TruncateOffset::Barrier { epoch })?;
148 }
149 }
150 }
151 }
152}