risingwave_connector/sink/formatter/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::{Context, anyhow};
16use risingwave_common::array::StreamChunk;
17use risingwave_common::catalog::Field;
18
19use crate::sink::redis::REDIS_VALUE_TYPE_STREAM;
20use crate::sink::{Result, SinkError};
21
22mod append_only;
23mod debezium_json;
24mod upsert;
25
26pub use append_only::AppendOnlyFormatter;
27pub use debezium_json::{DebeziumAdapterOpts, DebeziumJsonFormatter};
28use risingwave_common::catalog::Schema;
29use risingwave_common::types::DataType;
30pub use upsert::UpsertFormatter;
31
32use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc};
33use super::encoder::bytes::BytesEncoder;
34use super::encoder::template::TemplateEncoder;
35use super::encoder::text::TextEncoder;
36use super::encoder::{
37    DateHandlingMode, JsonbHandlingMode, KafkaConnectParams, TimeHandlingMode,
38    TimestamptzHandlingMode,
39};
40use super::redis::{
41    CHANNEL, CHANNEL_COLUMN, KEY_FORMAT, LAT_NAME, LON_NAME, MEMBER_NAME, REDIS_VALUE_TYPE,
42    REDIS_VALUE_TYPE_GEO, REDIS_VALUE_TYPE_PUBSUB, REDIS_VALUE_TYPE_STRING, STREAM, STREAM_COLUMN,
43    VALUE_FORMAT,
44};
45use crate::sink::encoder::{
46    AvroEncoder, AvroHeader, JsonEncoder, ProtoEncoder, ProtoHeader, TimestampHandlingMode,
47};
48
49/// Transforms a `StreamChunk` into a sequence of key-value pairs according a specific format,
50/// for example append-only, upsert or debezium.
51pub trait SinkFormatter {
52    type K;
53    type V;
54
55    /// * Key may be None so that messages are partitioned using round-robin.
56    ///   For example append-only without `primary_key` (aka `downstream_pk`) set.
57    /// * Value may be None so that messages with same key are removed during log compaction.
58    ///   For example debezium tombstone event.
59    #[expect(clippy::type_complexity)]
60    fn format_chunk(
61        &self,
62        chunk: &StreamChunk,
63    ) -> impl Iterator<Item = Result<(Option<Self::K>, Option<Self::V>)>>;
64}
65
66/// `tri!` in generators yield `Err` and return `()`
67/// `?` in generators return `Err`
68#[macro_export]
69macro_rules! tri {
70    ($expr:expr) => {
71        match $expr {
72            Ok(val) => val,
73            Err(err) => {
74                yield Err(err);
75                return;
76            }
77        }
78    };
79}
80
81pub enum SinkFormatterImpl {
82    // append-only
83    AppendOnlyJson(AppendOnlyFormatter<JsonEncoder, JsonEncoder>),
84    AppendOnlyTextJson(AppendOnlyFormatter<TextEncoder, JsonEncoder>),
85    AppendOnlyBytesJson(AppendOnlyFormatter<BytesEncoder, JsonEncoder>),
86    AppendOnlyAvro(AppendOnlyFormatter<AvroEncoder, AvroEncoder>),
87    AppendOnlyTextAvro(AppendOnlyFormatter<TextEncoder, AvroEncoder>),
88    AppendOnlyBytesAvro(AppendOnlyFormatter<BytesEncoder, AvroEncoder>),
89    AppendOnlyProto(AppendOnlyFormatter<JsonEncoder, ProtoEncoder>),
90    AppendOnlyTextProto(AppendOnlyFormatter<TextEncoder, ProtoEncoder>),
91    AppendOnlyBytesProto(AppendOnlyFormatter<BytesEncoder, ProtoEncoder>),
92    AppendOnlyTemplate(AppendOnlyFormatter<TemplateEncoder, TemplateEncoder>),
93    AppendOnlyTextTemplate(AppendOnlyFormatter<TextEncoder, TemplateEncoder>),
94    AppendOnlyBytesTemplate(AppendOnlyFormatter<BytesEncoder, TemplateEncoder>),
95    // upsert
96    UpsertJson(UpsertFormatter<JsonEncoder, JsonEncoder>),
97    UpsertTextJson(UpsertFormatter<TextEncoder, JsonEncoder>),
98    UpsertBytesJson(UpsertFormatter<BytesEncoder, JsonEncoder>),
99    UpsertAvro(UpsertFormatter<AvroEncoder, AvroEncoder>),
100    UpsertTextAvro(UpsertFormatter<TextEncoder, AvroEncoder>),
101    UpsertBytesAvro(UpsertFormatter<BytesEncoder, AvroEncoder>),
102    // `UpsertFormatter<ProtoEncoder, ProtoEncoder>` is intentionally left out
103    // to avoid using `ProtoEncoder` as key:
104    // <https://docs.confluent.io/platform/7.7/control-center/topics/schema.html#c3-schemas-best-practices-key-value-pairs>
105    UpsertTextProto(UpsertFormatter<TextEncoder, ProtoEncoder>),
106    UpsertBytesProto(UpsertFormatter<BytesEncoder, ProtoEncoder>),
107    UpsertTemplate(UpsertFormatter<TemplateEncoder, TemplateEncoder>),
108    UpsertTextTemplate(UpsertFormatter<TextEncoder, TemplateEncoder>),
109    UpsertBytesTemplate(UpsertFormatter<BytesEncoder, TemplateEncoder>),
110    // debezium
111    DebeziumJson(DebeziumJsonFormatter),
112}
113
114#[derive(Debug, Clone)]
115pub struct EncoderParams<'a> {
116    format_desc: &'a SinkFormatDesc,
117    schema: Schema,
118    db_name: String,
119    sink_from_name: String,
120    topic: &'a str,
121}
122
123/// Each encoder shall be able to be built from parameters.
124///
125/// This is not part of `RowEncoder` trait, because that one is about how an encoder completes its
126/// own job as a self-contained unit, with a custom `new` asking for only necessary info; while this
127/// one is about how different encoders can be selected from a common SQL interface.
128pub trait EncoderBuild: Sized {
129    /// Pass `pk_indices: None` for value/payload and `Some` for key. Certain encoder builds
130    /// differently when used as key vs value.
131    async fn build(params: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self>;
132}
133
134impl EncoderBuild for JsonEncoder {
135    async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
136        let timestamptz_mode = TimestamptzHandlingMode::from_options(&b.format_desc.options)?;
137        let jsonb_handling_mode = JsonbHandlingMode::from_options(&b.format_desc.options)?;
138        let encoder = JsonEncoder::new(
139            b.schema,
140            pk_indices,
141            DateHandlingMode::FromCe,
142            TimestampHandlingMode::Milli,
143            timestamptz_mode,
144            TimeHandlingMode::Milli,
145            jsonb_handling_mode,
146        );
147        let encoder = if let Some(s) = b.format_desc.options.get("schemas.enable") {
148            match s.to_lowercase().parse::<bool>() {
149                Ok(true) => {
150                    let kafka_connect = KafkaConnectParams {
151                        schema_name: format!("{}.{}", b.db_name, b.sink_from_name),
152                    };
153                    encoder.with_kafka_connect(kafka_connect)
154                }
155                Ok(false) => encoder,
156                _ => {
157                    return Err(SinkError::Config(anyhow!(
158                        "schemas.enable is expected to be `true` or `false`, got {s}",
159                    )));
160                }
161            }
162        } else {
163            encoder
164        };
165        Ok(encoder)
166    }
167}
168
169impl EncoderBuild for ProtoEncoder {
170    async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
171        // TODO: better to be a compile-time assert
172        assert!(pk_indices.is_none());
173        // By passing `None` as `aws_auth_props`, reading from `s3://` not supported yet.
174        let (descriptor, sid) =
175            crate::schema::protobuf::fetch_descriptor(&b.format_desc.options, b.topic, None)
176                .await
177                .map_err(|e| SinkError::Config(anyhow!(e)))?;
178        let header = match sid {
179            None => ProtoHeader::None,
180            Some(sid) => ProtoHeader::ConfluentSchemaRegistry(sid),
181        };
182        ProtoEncoder::new(b.schema, None, descriptor, header)
183    }
184}
185
186fn ensure_only_one_pk<'a>(
187    data_type_name: &'a str,
188    params: &'a EncoderParams<'_>,
189    pk_indices: &'a Option<Vec<usize>>,
190) -> Result<(usize, &'a Field)> {
191    let Some(pk_indices) = pk_indices else {
192        return Err(SinkError::Config(anyhow!(
193            "{}Encoder requires primary key columns to be specified",
194            data_type_name
195        )));
196    };
197    if pk_indices.len() != 1 {
198        return Err(SinkError::Config(anyhow!(
199            "KEY ENCODE {} expects only one primary key, but got {}",
200            data_type_name,
201            pk_indices.len(),
202        )));
203    }
204
205    let schema_ref = params.schema.fields().get(pk_indices[0]).ok_or_else(|| {
206        SinkError::Config(anyhow!(
207            "The primary key column index {} is out of bounds in schema {:?}",
208            pk_indices[0],
209            params.schema
210        ))
211    })?;
212
213    Ok((pk_indices[0], schema_ref))
214}
215
216impl EncoderBuild for BytesEncoder {
217    async fn build(params: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
218        let (pk_index, schema_ref) = ensure_only_one_pk("BYTES", &params, &pk_indices)?;
219        if let DataType::Bytea = schema_ref.data_type() {
220            Ok(BytesEncoder::new(params.schema, pk_index))
221        } else {
222            Err(SinkError::Config(anyhow!(
223                "The key encode is BYTES, but the primary key column {} has type {}",
224                schema_ref.name,
225                schema_ref.data_type
226            )))
227        }
228    }
229}
230
231impl EncoderBuild for TextEncoder {
232    async fn build(params: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
233        let (pk_index, schema_ref) = ensure_only_one_pk("TEXT", &params, &pk_indices)?;
234        match &schema_ref.data_type() {
235            DataType::Varchar
236            | DataType::Boolean
237            | DataType::Int16
238            | DataType::Int32
239            | DataType::Int64
240            | DataType::Int256
241            | DataType::Serial => {}
242            _ => {
243                // why we don't allow float as text for key encode: https://github.com/risingwavelabs/risingwave/pull/16377#discussion_r1591864960
244                return Err(SinkError::Config(anyhow!(
245                    "The key encode is TEXT, but the primary key column {} has type {}. The key encode TEXT requires the primary key column to be of type varchar, bool, small int, int, big int, serial or rw_int256.",
246                    schema_ref.name,
247                    schema_ref.data_type
248                )));
249            }
250        }
251
252        Ok(Self::new(params.schema, pk_index))
253    }
254}
255
256impl EncoderBuild for AvroEncoder {
257    async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
258        use crate::schema::{SchemaLoader, SchemaVersion};
259
260        let loader = SchemaLoader::from_format_options(b.topic, &b.format_desc.options)
261            .await
262            .map_err(|e| SinkError::Config(anyhow!(e)))?;
263
264        let (schema_version, avro) = match pk_indices {
265            Some(_) => loader
266                .load_key_schema()
267                .await
268                .map_err(|e| SinkError::Config(anyhow!(e)))?,
269            None => loader
270                .load_val_schema()
271                .await
272                .map_err(|e| SinkError::Config(anyhow!(e)))?,
273        };
274        AvroEncoder::new(
275            b.schema,
276            pk_indices,
277            std::sync::Arc::new(avro),
278            match schema_version {
279                SchemaVersion::Confluent(x) => AvroHeader::ConfluentSchemaRegistry(x),
280                SchemaVersion::Glue(x) => AvroHeader::GlueSchemaRegistry(x),
281            },
282        )
283    }
284}
285
286impl EncoderBuild for TemplateEncoder {
287    async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
288        let redis_value_type = b
289            .format_desc
290            .options
291            .get(REDIS_VALUE_TYPE)
292            .map_or(REDIS_VALUE_TYPE_STRING, |s| s.as_str());
293        match redis_value_type {
294            REDIS_VALUE_TYPE_STRING => {
295                let option_name = match pk_indices {
296                    Some(_) => KEY_FORMAT,
297                    None => VALUE_FORMAT,
298                };
299                let template = b.format_desc.options.get(option_name).ok_or_else(|| {
300                    SinkError::Config(anyhow!("Cannot find '{option_name}',please set it."))
301                })?;
302                Ok(TemplateEncoder::new_string(
303                    b.schema,
304                    pk_indices,
305                    template.clone(),
306                ))
307            }
308            REDIS_VALUE_TYPE_GEO => match pk_indices {
309                Some(_) => {
310                    let member_name = b.format_desc.options.get(MEMBER_NAME).ok_or_else(|| {
311                        SinkError::Config(anyhow!("Cannot find `{MEMBER_NAME}`,please set it."))
312                    })?;
313                    let template = b.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
314                        SinkError::Config(anyhow!("Cannot find `{KEY_FORMAT}`,please set it."))
315                    })?;
316                    TemplateEncoder::new_geo_key(
317                        b.schema,
318                        pk_indices,
319                        member_name,
320                        template.clone(),
321                    )
322                }
323                None => {
324                    let lat_name = b.format_desc.options.get(LAT_NAME).ok_or_else(|| {
325                        SinkError::Config(anyhow!("Cannot find `{LAT_NAME}`, please set it."))
326                    })?;
327                    let lon_name = b.format_desc.options.get(LON_NAME).ok_or_else(|| {
328                        SinkError::Config(anyhow!("Cannot find `{LON_NAME}`,please set it."))
329                    })?;
330                    TemplateEncoder::new_geo_value(b.schema, pk_indices, lat_name, lon_name)
331                }
332            },
333            REDIS_VALUE_TYPE_PUBSUB => match pk_indices {
334                Some(_) => {
335                    let channel = b.format_desc.options.get(CHANNEL).cloned();
336                    let channel_column = b.format_desc.options.get(CHANNEL_COLUMN).cloned();
337                    if (channel.is_none() && channel_column.is_none())
338                        || (channel.is_some() && channel_column.is_some())
339                    {
340                        return Err(SinkError::Config(anyhow!(
341                            "`{CHANNEL}` and `{CHANNEL_COLUMN}` only one can be set"
342                        )));
343                    }
344                    TemplateEncoder::new_pubsub_stream_key(
345                        b.schema,
346                        pk_indices,
347                        channel,
348                        channel_column,
349                    )
350                }
351                None => {
352                    let template = b.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
353                        SinkError::Config(anyhow!("Cannot find '{VALUE_FORMAT}',please set it."))
354                    })?;
355                    Ok(TemplateEncoder::new_string(
356                        b.schema,
357                        pk_indices,
358                        template.clone(),
359                    ))
360                }
361            },
362            REDIS_VALUE_TYPE_STREAM => match pk_indices {
363                Some(_) => {
364                    let stream = b.format_desc.options.get(STREAM).cloned();
365                    let stream_column = b.format_desc.options.get(STREAM_COLUMN).cloned();
366                    if (stream.is_none() && stream_column.is_none())
367                        || (stream.is_some() && stream_column.is_some())
368                    {
369                        return Err(SinkError::Config(anyhow!(
370                            "`{STREAM}` and `{STREAM_COLUMN}` only one can be set"
371                        )));
372                    }
373                    TemplateEncoder::new_pubsub_stream_key(
374                        b.schema,
375                        pk_indices,
376                        stream,
377                        stream_column,
378                    )
379                }
380                None => {
381                    let value_template =
382                        b.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
383                            SinkError::Config(anyhow!(
384                                "Cannot find '{VALUE_FORMAT}',please set it."
385                            ))
386                        })?;
387                    let key_template = b.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
388                        SinkError::Config(anyhow!("Cannot find '{KEY_FORMAT}',please set it."))
389                    })?;
390
391                    Ok(TemplateEncoder::new_stream_value(
392                        b.schema,
393                        pk_indices,
394                        key_template.clone(),
395                        value_template.clone(),
396                    ))
397                }
398            },
399            _ => Err(SinkError::Config(anyhow!(
400                "The value type {} is not supported",
401                redis_value_type
402            ))),
403        }
404    }
405}
406
407struct FormatterParams<'a> {
408    builder: EncoderParams<'a>,
409    pk_indices: Vec<usize>,
410}
411
412/// Each formatter shall be able to be built from parameters.
413///
414/// This is not part of `SinkFormatter` trait, because that is about how a formatter completes its
415/// own job as a self-contained unit, with a custom `new` asking for only necessary info; while this
416/// one is about how different formatters can be selected from a common SQL interface.
417trait FormatterBuild: Sized {
418    async fn build(b: FormatterParams<'_>) -> Result<Self>;
419}
420
421impl<KE: EncoderBuild, VE: EncoderBuild> FormatterBuild for AppendOnlyFormatter<KE, VE> {
422    async fn build(b: FormatterParams<'_>) -> Result<Self> {
423        let key_encoder = match b.pk_indices.is_empty() {
424            true => None,
425            false => Some(KE::build(b.builder.clone(), Some(b.pk_indices)).await?),
426        };
427        let val_encoder = VE::build(b.builder, None).await?;
428        Ok(AppendOnlyFormatter::new(key_encoder, val_encoder))
429    }
430}
431
432impl<KE: EncoderBuild, VE: EncoderBuild> FormatterBuild for UpsertFormatter<KE, VE> {
433    async fn build(b: FormatterParams<'_>) -> Result<Self> {
434        let key_encoder = KE::build(b.builder.clone(), Some(b.pk_indices))
435            .await
436            .with_context(|| "Failed to build key encoder")?;
437        let val_encoder = VE::build(b.builder, None)
438            .await
439            .with_context(|| "Failed to build value encoder")?;
440        Ok(UpsertFormatter::new(key_encoder, val_encoder))
441    }
442}
443
444impl FormatterBuild for DebeziumJsonFormatter {
445    async fn build(b: FormatterParams<'_>) -> Result<Self> {
446        assert_eq!(b.builder.format_desc.encode, SinkEncode::Json);
447
448        Ok(DebeziumJsonFormatter::new(
449            b.builder.schema,
450            b.pk_indices,
451            b.builder.db_name,
452            b.builder.sink_from_name,
453            DebeziumAdapterOpts::default(),
454        ))
455    }
456}
457
458impl SinkFormatterImpl {
459    pub async fn new(
460        format_desc: &SinkFormatDesc,
461        schema: Schema,
462        pk_indices: Vec<usize>,
463        db_name: String,
464        sink_from_name: String,
465        topic: &str,
466    ) -> Result<Self> {
467        use {SinkEncode as E, SinkFormat as F, SinkFormatterImpl as Impl};
468        let p = FormatterParams {
469            builder: EncoderParams {
470                format_desc,
471                schema,
472                db_name,
473                sink_from_name,
474                topic,
475            },
476            pk_indices,
477        };
478
479        // When defining `SinkFormatterImpl` we already linked each variant (eg `AppendOnlyJson`) to
480        // an instantiation (eg `AppendOnlyFormatter<JsonEncoder, JsonEncoder>`) that implements the
481        // trait `FormatterBuild`.
482        //
483        // Here we just need to match a `(format, encode)` to a variant, and rustc shall be able to
484        // find the corresponding instantiation.
485
486        // However,
487        //   `Impl::AppendOnlyJson(FormatterBuild::build(p).await?)`
488        // fails to be inferred without the following dummy wrapper.
489        async fn build<T: FormatterBuild>(p: FormatterParams<'_>) -> Result<T> {
490            T::build(p).await
491        }
492
493        Ok(
494            match (
495                &format_desc.format,
496                &format_desc.encode,
497                &format_desc.key_encode,
498            ) {
499                (F::AppendOnly, E::Json, Some(E::Text)) => {
500                    Impl::AppendOnlyTextJson(build(p).await?)
501                }
502                (F::AppendOnly, E::Json, Some(E::Bytes)) => {
503                    Impl::AppendOnlyBytesJson(build(p).await?)
504                }
505                (F::AppendOnly, E::Json, None) => Impl::AppendOnlyJson(build(p).await?),
506                (F::AppendOnly, E::Avro, Some(E::Text)) => {
507                    Impl::AppendOnlyTextAvro(build(p).await?)
508                }
509                (F::AppendOnly, E::Avro, Some(E::Bytes)) => {
510                    Impl::AppendOnlyBytesAvro(build(p).await?)
511                }
512                (F::AppendOnly, E::Avro, None) => Impl::AppendOnlyAvro(build(p).await?),
513                (F::AppendOnly, E::Protobuf, Some(E::Text)) => {
514                    Impl::AppendOnlyTextProto(build(p).await?)
515                }
516                (F::AppendOnly, E::Protobuf, Some(E::Bytes)) => {
517                    Impl::AppendOnlyBytesProto(build(p).await?)
518                }
519                (F::AppendOnly, E::Protobuf, None) => Impl::AppendOnlyProto(build(p).await?),
520                (F::AppendOnly, E::Template, Some(E::Text)) => {
521                    Impl::AppendOnlyTextTemplate(build(p).await?)
522                }
523                (F::AppendOnly, E::Template, Some(E::Bytes)) => {
524                    Impl::AppendOnlyBytesTemplate(build(p).await?)
525                }
526                (F::AppendOnly, E::Template, None) => Impl::AppendOnlyTemplate(build(p).await?),
527                (F::Upsert, E::Json, Some(E::Text)) => Impl::UpsertTextJson(build(p).await?),
528                (F::Upsert, E::Json, Some(E::Bytes)) => {
529                    Impl::UpsertBytesJson(build(p).await?)
530                }
531                (F::Upsert, E::Json, None) => Impl::UpsertJson(build(p).await?),
532                (F::Upsert, E::Avro, Some(E::Text)) => Impl::UpsertTextAvro(build(p).await?),
533                (F::Upsert, E::Avro, Some(E::Bytes)) => {
534                    Impl::UpsertBytesAvro(build(p).await?)
535                }
536                (F::Upsert, E::Avro, None) => Impl::UpsertAvro(build(p).await?),
537                (F::Upsert, E::Protobuf, Some(E::Text)) => Impl::UpsertTextProto(build(p).await?),
538                (F::Upsert, E::Protobuf, Some(E::Bytes)) => {
539                    Impl::UpsertBytesProto(build(p).await?)
540                }
541                (F::Upsert, E::Template, Some(E::Text)) => {
542                    Impl::UpsertTextTemplate(build(p).await?)
543                }
544                (F::Upsert, E::Template, Some(E::Bytes)) => {
545                    Impl::UpsertBytesTemplate(build(p).await?)
546                }
547                (F::Upsert, E::Template, None) => Impl::UpsertTemplate(build(p).await?),
548                (F::Debezium, E::Json, None) => Impl::DebeziumJson(build(p).await?),
549                (F::AppendOnly | F::Upsert, E::Text, _) => {
550                    return Err(SinkError::Config(anyhow!(
551                        "ENCODE TEXT is only valid as key encode."
552                    )));
553                }
554                (F::AppendOnly, E::Avro, _)
555                | (F::Upsert, E::Protobuf, _)
556                | (F::Debezium, E::Json, Some(_))
557                | (F::Debezium, E::Avro | E::Protobuf | E::Template | E::Text, _)
558                | (F::AppendOnly, E::Bytes, _)
559                | (F::Upsert, E::Bytes, _)
560                | (F::Debezium, E::Bytes, _)
561                | (_, E::Parquet, _)
562                | (_, _, Some(E::Parquet))
563                | (F::AppendOnly | F::Upsert, _, Some(E::Template) | Some(E::Json) | Some(E::Avro) | Some(E::Protobuf)) // reject other encode as key encode
564                => {
565                    return Err(SinkError::Config(anyhow!(
566                        "sink format/encode/key_encode unsupported: {:?} {:?} {:?}",
567                        format_desc.format,
568                        format_desc.encode,
569                        format_desc.key_encode
570                    )));
571                }
572            },
573        )
574    }
575}
576
577/// Macro to dispatch formatting implementation for all supported sink formatter types.
578/// Used when the message key can be either bytes or string.
579///
580/// Takes a formatter implementation ($impl), binds it to a name ($name),
581/// and executes the provided code block ($body) with that binding.
582#[macro_export]
583macro_rules! dispatch_sink_formatter_impl {
584    ($impl:expr, $name:ident, $body:expr) => {
585        match $impl {
586            SinkFormatterImpl::AppendOnlyJson($name) => $body,
587            SinkFormatterImpl::AppendOnlyBytesJson($name) => $body,
588            SinkFormatterImpl::AppendOnlyTextJson($name) => $body,
589            SinkFormatterImpl::AppendOnlyAvro($name) => $body,
590            SinkFormatterImpl::AppendOnlyTextAvro($name) => $body,
591            SinkFormatterImpl::AppendOnlyBytesAvro($name) => $body,
592            SinkFormatterImpl::AppendOnlyProto($name) => $body,
593            SinkFormatterImpl::AppendOnlyTextProto($name) => $body,
594            SinkFormatterImpl::AppendOnlyBytesProto($name) => $body,
595
596            SinkFormatterImpl::UpsertJson($name) => $body,
597            SinkFormatterImpl::UpsertBytesJson($name) => $body,
598            SinkFormatterImpl::UpsertTextJson($name) => $body,
599            SinkFormatterImpl::UpsertAvro($name) => $body,
600            SinkFormatterImpl::UpsertTextAvro($name) => $body,
601            SinkFormatterImpl::UpsertBytesAvro($name) => $body,
602            SinkFormatterImpl::UpsertTextProto($name) => $body,
603            SinkFormatterImpl::UpsertBytesProto($name) => $body,
604            SinkFormatterImpl::DebeziumJson($name) => $body,
605            SinkFormatterImpl::AppendOnlyTextTemplate($name) => $body,
606            SinkFormatterImpl::AppendOnlyTemplate($name) => $body,
607            SinkFormatterImpl::UpsertTextTemplate($name) => $body,
608            SinkFormatterImpl::UpsertTemplate($name) => $body,
609            SinkFormatterImpl::AppendOnlyBytesTemplate($name) => $body,
610            SinkFormatterImpl::UpsertBytesTemplate($name) => $body,
611        }
612    };
613}
614
615/// Macro to dispatch formatting implementation for sink formatters that require string keys.
616/// Used when the message key must be a string (excludes some Avro and bytes implementations).
617///
618/// Similar to `dispatch_sink_formatter_impl`, but excludes certain formatter types
619/// that don't support string keys (e.g., `AppendOnlyAvro`, `UpsertAvro`).
620/// These cases are marked as unreachable!() since they should never occur
621/// in contexts requiring string keys.
622#[macro_export]
623macro_rules! dispatch_sink_formatter_str_key_impl {
624    ($impl:expr, $name:ident, $body:expr $(,$attr:meta)?) => {
625        $(#[$attr])?
626        match $impl {
627            SinkFormatterImpl::AppendOnlyJson($name) => $body,
628            SinkFormatterImpl::AppendOnlyBytesJson(_) => unreachable!(),
629            SinkFormatterImpl::AppendOnlyTextJson($name) => $body,
630            SinkFormatterImpl::AppendOnlyAvro(_) => unreachable!(),
631            SinkFormatterImpl::AppendOnlyTextAvro($name) => $body,
632            SinkFormatterImpl::AppendOnlyBytesAvro(_) => unreachable!(),
633            SinkFormatterImpl::AppendOnlyProto($name) => $body,
634            SinkFormatterImpl::AppendOnlyTextProto($name) => $body,
635            SinkFormatterImpl::AppendOnlyBytesProto(_) => unreachable!(),
636
637            SinkFormatterImpl::UpsertJson($name) => $body,
638            SinkFormatterImpl::UpsertTextJson($name) => $body,
639            SinkFormatterImpl::UpsertAvro(_) => unreachable!(),
640            SinkFormatterImpl::UpsertTextAvro($name) => $body,
641            SinkFormatterImpl::UpsertBytesAvro(_) => unreachable!(),
642            SinkFormatterImpl::UpsertTextProto($name) => $body,
643            SinkFormatterImpl::UpsertBytesProto(_) => unreachable!(),
644            SinkFormatterImpl::DebeziumJson($name) => $body,
645            SinkFormatterImpl::AppendOnlyTextTemplate($name) => $body,
646            SinkFormatterImpl::AppendOnlyTemplate($name) => $body,
647            SinkFormatterImpl::UpsertTextTemplate($name) => $body,
648            SinkFormatterImpl::UpsertBytesJson(_) => unreachable!(),
649            SinkFormatterImpl::UpsertTemplate($name) => $body,
650            SinkFormatterImpl::AppendOnlyBytesTemplate(_) => unreachable!(),
651            SinkFormatterImpl::UpsertBytesTemplate(_) => unreachable!(),
652        }
653    };
654}