risingwave_connector/sink/formatter/
append_only.rs1use risingwave_common::array::Op;
16
17use super::{Result, SinkFormatter, StreamChunk};
18use crate::sink::encoder::RowEncoder;
19use crate::tri;
20
21pub struct AppendOnlyFormatter<KE, VE> {
22 key_encoder: Option<KE>,
23 val_encoder: VE,
24}
25
26impl<KE, VE> AppendOnlyFormatter<KE, VE> {
27 pub fn new(key_encoder: Option<KE>, val_encoder: VE) -> Self {
28 Self {
29 key_encoder,
30 val_encoder,
31 }
32 }
33}
34
35impl<KE: RowEncoder, VE: RowEncoder> SinkFormatter for AppendOnlyFormatter<KE, VE> {
36 type K = KE::Output;
37 type V = VE::Output;
38
39 fn format_chunk(
40 &self,
41 chunk: &StreamChunk,
42 ) -> impl Iterator<Item = Result<(Option<Self::K>, Option<Self::V>)>> {
43 std::iter::from_coroutine(
44 #[coroutine]
45 || {
46 for (op, row) in chunk.rows() {
47 if op != Op::Insert {
48 continue;
49 }
50 let event_key_object = match &self.key_encoder {
51 Some(key_encoder) => Some(tri!(key_encoder.encode(row))),
52 None => None,
53 };
54 let event_object = Some(tri!(self.val_encoder.encode(row)));
55
56 yield Ok((event_key_object, event_object))
57 }
58 },
59 )
60 }
61}