risingwave_connector/sink/
trivial.rs1use std::marker::PhantomData;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use phf::{Set, phf_set};
20use risingwave_common::session_config::sink_decouple::SinkDecouple;
21
22use crate::enforce_secret::EnforceSecret;
23use crate::sink::log_store::{LogStoreReadItem, TruncateOffset};
24use crate::sink::{LogSinker, Result, Sink, SinkError, SinkLogReader, SinkParam, SinkWriterParam};
25
26pub const BLACKHOLE_SINK: &str = "blackhole";
27pub const TABLE_SINK: &str = "table";
28
29pub trait TrivialSinkType: Send + 'static {
30 const TRACE_LOG: bool;
35 const SINK_NAME: &'static str;
36}
37
38#[derive(Debug)]
39pub struct BlackHole;
40
41impl TrivialSinkType for BlackHole {
42 const SINK_NAME: &'static str = BLACKHOLE_SINK;
43 const TRACE_LOG: bool = true;
44}
45
46pub type BlackHoleSink = TrivialSink<BlackHole>;
47
48#[derive(Debug)]
49pub struct Table;
50
51impl TrivialSinkType for Table {
52 const SINK_NAME: &'static str = TABLE_SINK;
53 const TRACE_LOG: bool = false;
54}
55
56pub type TableSink = TrivialSink<Table>;
57
58#[derive(Debug)]
59pub struct TrivialSink<T: TrivialSinkType> {
60 param: Arc<SinkParam>,
61 _marker: PhantomData<T>,
62}
63
64impl<T: TrivialSinkType> EnforceSecret for TrivialSink<T> {
65 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {};
66}
67
68impl<T: TrivialSinkType> TryFrom<SinkParam> for TrivialSink<T> {
69 type Error = SinkError;
70
71 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
72 Ok(Self {
73 param: Arc::new(param),
74 _marker: PhantomData,
75 })
76 }
77}
78
79impl<T: TrivialSinkType> Sink for TrivialSink<T> {
80 type LogSinker = Self;
81
82 const SINK_NAME: &'static str = T::SINK_NAME;
83
84 fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
87 match user_specified {
88 SinkDecouple::Enable => Ok(true),
89 SinkDecouple::Default | SinkDecouple::Disable => Ok(false),
90 }
91 }
92
93 fn support_schema_change() -> bool {
94 true
95 }
96
97 async fn new_log_sinker(&self, _writer_env: SinkWriterParam) -> Result<Self::LogSinker> {
98 Ok(Self {
99 param: self.param.clone(),
100 _marker: PhantomData,
101 })
102 }
103
104 async fn validate(&self) -> Result<()> {
105 Ok(())
106 }
107}
108
109#[async_trait]
110impl<T: TrivialSinkType> LogSinker for TrivialSink<T> {
111 async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
112 let schema = self.param.schema();
113
114 log_reader.start_from(None).await?;
115 loop {
116 let (epoch, item) = log_reader.next_item().await?;
117 match item {
118 LogStoreReadItem::StreamChunk { chunk_id, chunk } => {
119 if T::TRACE_LOG {
120 tracing::trace!(
121 target: "events::sink::message::chunk",
122 sink_id = %self.param.sink_id,
123 sink_name = self.param.sink_name,
124 cardinality = chunk.cardinality(),
125 capacity = chunk.capacity(),
126 "\n{}\n", chunk.to_pretty_with_schema(&schema),
127 );
128 }
129
130 log_reader.truncate(TruncateOffset::Chunk { epoch, chunk_id })?;
131 }
132 LogStoreReadItem::Barrier { .. } => {
133 if T::TRACE_LOG {
134 tracing::trace!(
135 target: "events::sink::message::barrier",
136 sink_id = %self.param.sink_id,
137 sink_name = self.param.sink_name,
138 epoch,
139 );
140 }
141
142 log_reader.truncate(TruncateOffset::Barrier { epoch })?;
143 }
144 }
145 }
146 }
147}