risingwave_connector/sink/formatter/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::{Context, anyhow};
16use risingwave_common::array::StreamChunk;
17use risingwave_common::catalog::Field;
18
19use crate::sink::{Result, SinkError};
20
21mod append_only;
22mod debezium_json;
23mod upsert;
24
25pub use append_only::AppendOnlyFormatter;
26pub use debezium_json::{DebeziumAdapterOpts, DebeziumJsonFormatter};
27use risingwave_common::catalog::Schema;
28use risingwave_common::types::DataType;
29pub use upsert::UpsertFormatter;
30
31use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc};
32use super::encoder::bytes::BytesEncoder;
33use super::encoder::template::TemplateEncoder;
34use super::encoder::text::TextEncoder;
35use super::encoder::{
36    DateHandlingMode, JsonbHandlingMode, KafkaConnectParams, TimeHandlingMode,
37    TimestamptzHandlingMode,
38};
39use super::redis::{
40    CHANNEL, CHANNEL_COLUMN, KEY_FORMAT, LAT_NAME, LON_NAME, MEMBER_NAME, REDIS_VALUE_TYPE,
41    REDIS_VALUE_TYPE_GEO, REDIS_VALUE_TYPE_PUBSUB, REDIS_VALUE_TYPE_STRING, VALUE_FORMAT,
42};
43use crate::sink::encoder::{
44    AvroEncoder, AvroHeader, JsonEncoder, ProtoEncoder, ProtoHeader, TimestampHandlingMode,
45};
46
47/// Transforms a `StreamChunk` into a sequence of key-value pairs according a specific format,
48/// for example append-only, upsert or debezium.
49pub trait SinkFormatter {
50    type K;
51    type V;
52
53    /// * Key may be None so that messages are partitioned using round-robin.
54    ///   For example append-only without `primary_key` (aka `downstream_pk`) set.
55    /// * Value may be None so that messages with same key are removed during log compaction.
56    ///   For example debezium tombstone event.
57    #[expect(clippy::type_complexity)]
58    fn format_chunk(
59        &self,
60        chunk: &StreamChunk,
61    ) -> impl Iterator<Item = Result<(Option<Self::K>, Option<Self::V>)>>;
62}
63
64/// `tri!` in generators yield `Err` and return `()`
65/// `?` in generators return `Err`
66#[macro_export]
67macro_rules! tri {
68    ($expr:expr) => {
69        match $expr {
70            Ok(val) => val,
71            Err(err) => {
72                yield Err(err);
73                return;
74            }
75        }
76    };
77}
78
79pub enum SinkFormatterImpl {
80    // append-only
81    AppendOnlyJson(AppendOnlyFormatter<JsonEncoder, JsonEncoder>),
82    AppendOnlyTextJson(AppendOnlyFormatter<TextEncoder, JsonEncoder>),
83    AppendOnlyBytesJson(AppendOnlyFormatter<BytesEncoder, JsonEncoder>),
84    AppendOnlyAvro(AppendOnlyFormatter<AvroEncoder, AvroEncoder>),
85    AppendOnlyTextAvro(AppendOnlyFormatter<TextEncoder, AvroEncoder>),
86    AppendOnlyBytesAvro(AppendOnlyFormatter<BytesEncoder, AvroEncoder>),
87    AppendOnlyProto(AppendOnlyFormatter<JsonEncoder, ProtoEncoder>),
88    AppendOnlyTextProto(AppendOnlyFormatter<TextEncoder, ProtoEncoder>),
89    AppendOnlyBytesProto(AppendOnlyFormatter<BytesEncoder, ProtoEncoder>),
90    AppendOnlyTemplate(AppendOnlyFormatter<TemplateEncoder, TemplateEncoder>),
91    AppendOnlyTextTemplate(AppendOnlyFormatter<TextEncoder, TemplateEncoder>),
92    AppendOnlyBytesTemplate(AppendOnlyFormatter<BytesEncoder, TemplateEncoder>),
93    // upsert
94    UpsertJson(UpsertFormatter<JsonEncoder, JsonEncoder>),
95    UpsertTextJson(UpsertFormatter<TextEncoder, JsonEncoder>),
96    UpsertBytesJson(UpsertFormatter<BytesEncoder, JsonEncoder>),
97    UpsertAvro(UpsertFormatter<AvroEncoder, AvroEncoder>),
98    UpsertTextAvro(UpsertFormatter<TextEncoder, AvroEncoder>),
99    UpsertBytesAvro(UpsertFormatter<BytesEncoder, AvroEncoder>),
100    // `UpsertFormatter<ProtoEncoder, ProtoEncoder>` is intentionally left out
101    // to avoid using `ProtoEncoder` as key:
102    // <https://docs.confluent.io/platform/7.7/control-center/topics/schema.html#c3-schemas-best-practices-key-value-pairs>
103    UpsertTextProto(UpsertFormatter<TextEncoder, ProtoEncoder>),
104    UpsertBytesProto(UpsertFormatter<BytesEncoder, ProtoEncoder>),
105    UpsertTemplate(UpsertFormatter<TemplateEncoder, TemplateEncoder>),
106    UpsertTextTemplate(UpsertFormatter<TextEncoder, TemplateEncoder>),
107    UpsertBytesTemplate(UpsertFormatter<BytesEncoder, TemplateEncoder>),
108    // debezium
109    DebeziumJson(DebeziumJsonFormatter),
110}
111
112#[derive(Debug, Clone)]
113pub struct EncoderParams<'a> {
114    format_desc: &'a SinkFormatDesc,
115    schema: Schema,
116    db_name: String,
117    sink_from_name: String,
118    topic: &'a str,
119}
120
121/// Each encoder shall be able to be built from parameters.
122///
123/// This is not part of `RowEncoder` trait, because that one is about how an encoder completes its
124/// own job as a self-contained unit, with a custom `new` asking for only necessary info; while this
125/// one is about how different encoders can be selected from a common SQL interface.
126pub trait EncoderBuild: Sized {
127    /// Pass `pk_indices: None` for value/payload and `Some` for key. Certain encoder builds
128    /// differently when used as key vs value.
129    async fn build(params: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self>;
130}
131
132impl EncoderBuild for JsonEncoder {
133    async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
134        let timestamptz_mode = TimestamptzHandlingMode::from_options(&b.format_desc.options)?;
135        let jsonb_handling_mode = JsonbHandlingMode::from_options(&b.format_desc.options)?;
136        let encoder = JsonEncoder::new(
137            b.schema,
138            pk_indices,
139            DateHandlingMode::FromCe,
140            TimestampHandlingMode::Milli,
141            timestamptz_mode,
142            TimeHandlingMode::Milli,
143            jsonb_handling_mode,
144        );
145        let encoder = if let Some(s) = b.format_desc.options.get("schemas.enable") {
146            match s.to_lowercase().parse::<bool>() {
147                Ok(true) => {
148                    let kafka_connect = KafkaConnectParams {
149                        schema_name: format!("{}.{}", b.db_name, b.sink_from_name),
150                    };
151                    encoder.with_kafka_connect(kafka_connect)
152                }
153                Ok(false) => encoder,
154                _ => {
155                    return Err(SinkError::Config(anyhow!(
156                        "schemas.enable is expected to be `true` or `false`, got {s}",
157                    )));
158                }
159            }
160        } else {
161            encoder
162        };
163        Ok(encoder)
164    }
165}
166
167impl EncoderBuild for ProtoEncoder {
168    async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
169        // TODO: better to be a compile-time assert
170        assert!(pk_indices.is_none());
171        // By passing `None` as `aws_auth_props`, reading from `s3://` not supported yet.
172        let (descriptor, sid) =
173            crate::schema::protobuf::fetch_descriptor(&b.format_desc.options, b.topic, None)
174                .await
175                .map_err(|e| SinkError::Config(anyhow!(e)))?;
176        let header = match sid {
177            None => ProtoHeader::None,
178            Some(sid) => ProtoHeader::ConfluentSchemaRegistry(sid),
179        };
180        ProtoEncoder::new(b.schema, None, descriptor, header)
181    }
182}
183
184fn ensure_only_one_pk<'a>(
185    data_type_name: &'a str,
186    params: &'a EncoderParams<'_>,
187    pk_indices: &'a Option<Vec<usize>>,
188) -> Result<(usize, &'a Field)> {
189    let Some(pk_indices) = pk_indices else {
190        return Err(SinkError::Config(anyhow!(
191            "{}Encoder requires primary key columns to be specified",
192            data_type_name
193        )));
194    };
195    if pk_indices.len() != 1 {
196        return Err(SinkError::Config(anyhow!(
197            "KEY ENCODE {} expects only one primary key, but got {}",
198            data_type_name,
199            pk_indices.len(),
200        )));
201    }
202
203    let schema_ref = params.schema.fields().get(pk_indices[0]).ok_or_else(|| {
204        SinkError::Config(anyhow!(
205            "The primary key column index {} is out of bounds in schema {:?}",
206            pk_indices[0],
207            params.schema
208        ))
209    })?;
210
211    Ok((pk_indices[0], schema_ref))
212}
213
214impl EncoderBuild for BytesEncoder {
215    async fn build(params: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
216        let (pk_index, schema_ref) = ensure_only_one_pk("BYTES", &params, &pk_indices)?;
217        if let DataType::Bytea = schema_ref.data_type() {
218            Ok(BytesEncoder::new(params.schema, pk_index))
219        } else {
220            Err(SinkError::Config(anyhow!(
221                "The key encode is BYTES, but the primary key column {} has type {}",
222                schema_ref.name,
223                schema_ref.data_type
224            )))
225        }
226    }
227}
228
229impl EncoderBuild for TextEncoder {
230    async fn build(params: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
231        let (pk_index, schema_ref) = ensure_only_one_pk("TEXT", &params, &pk_indices)?;
232        match &schema_ref.data_type() {
233            DataType::Varchar
234            | DataType::Boolean
235            | DataType::Int16
236            | DataType::Int32
237            | DataType::Int64
238            | DataType::Int256
239            | DataType::Serial => {}
240            _ => {
241                // why we don't allow float as text for key encode: https://github.com/risingwavelabs/risingwave/pull/16377#discussion_r1591864960
242                return Err(SinkError::Config(anyhow!(
243                    "The key encode is TEXT, but the primary key column {} has type {}. The key encode TEXT requires the primary key column to be of type varchar, bool, small int, int, big int, serial or rw_int256.",
244                    schema_ref.name,
245                    schema_ref.data_type
246                )));
247            }
248        }
249
250        Ok(Self::new(params.schema, pk_index))
251    }
252}
253
254impl EncoderBuild for AvroEncoder {
255    async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
256        use crate::schema::{SchemaLoader, SchemaVersion};
257
258        let loader = SchemaLoader::from_format_options(b.topic, &b.format_desc.options)
259            .await
260            .map_err(|e| SinkError::Config(anyhow!(e)))?;
261
262        let (schema_version, avro) = match pk_indices {
263            Some(_) => loader
264                .load_key_schema()
265                .await
266                .map_err(|e| SinkError::Config(anyhow!(e)))?,
267            None => loader
268                .load_val_schema()
269                .await
270                .map_err(|e| SinkError::Config(anyhow!(e)))?,
271        };
272        AvroEncoder::new(
273            b.schema,
274            pk_indices,
275            std::sync::Arc::new(avro),
276            match schema_version {
277                SchemaVersion::Confluent(x) => AvroHeader::ConfluentSchemaRegistry(x),
278                SchemaVersion::Glue(x) => AvroHeader::GlueSchemaRegistry(x),
279            },
280        )
281    }
282}
283
284impl EncoderBuild for TemplateEncoder {
285    async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
286        let redis_value_type = b
287            .format_desc
288            .options
289            .get(REDIS_VALUE_TYPE)
290            .map_or(REDIS_VALUE_TYPE_STRING, |s| s.as_str());
291        match redis_value_type {
292            REDIS_VALUE_TYPE_STRING => {
293                let option_name = match pk_indices {
294                    Some(_) => KEY_FORMAT,
295                    None => VALUE_FORMAT,
296                };
297                let template = b.format_desc.options.get(option_name).ok_or_else(|| {
298                    SinkError::Config(anyhow!("Cannot find '{option_name}',please set it."))
299                })?;
300                Ok(TemplateEncoder::new_string(
301                    b.schema,
302                    pk_indices,
303                    template.clone(),
304                ))
305            }
306            REDIS_VALUE_TYPE_GEO => match pk_indices {
307                Some(_) => {
308                    let member_name = b.format_desc.options.get(MEMBER_NAME).ok_or_else(|| {
309                        SinkError::Config(anyhow!("Cannot find `{MEMBER_NAME}`,please set it."))
310                    })?;
311                    let template = b.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
312                        SinkError::Config(anyhow!("Cannot find `{KEY_FORMAT}`,please set it."))
313                    })?;
314                    TemplateEncoder::new_geo_key(
315                        b.schema,
316                        pk_indices,
317                        member_name,
318                        template.clone(),
319                    )
320                }
321                None => {
322                    let lat_name = b.format_desc.options.get(LAT_NAME).ok_or_else(|| {
323                        SinkError::Config(anyhow!("Cannot find `{LAT_NAME}`, please set it."))
324                    })?;
325                    let lon_name = b.format_desc.options.get(LON_NAME).ok_or_else(|| {
326                        SinkError::Config(anyhow!("Cannot find `{LON_NAME}`,please set it."))
327                    })?;
328                    TemplateEncoder::new_geo_value(b.schema, pk_indices, lat_name, lon_name)
329                }
330            },
331            REDIS_VALUE_TYPE_PUBSUB => match pk_indices {
332                Some(_) => {
333                    let channel = b.format_desc.options.get(CHANNEL).cloned();
334                    let channel_column = b.format_desc.options.get(CHANNEL_COLUMN).cloned();
335                    if (channel.is_none() && channel_column.is_none())
336                        || (channel.is_some() && channel_column.is_some())
337                    {
338                        return Err(SinkError::Config(anyhow!(
339                            "`{CHANNEL}` and `{CHANNEL_COLUMN}` only one can be set"
340                        )));
341                    }
342                    TemplateEncoder::new_pubsub_key(b.schema, pk_indices, channel, channel_column)
343                }
344                None => {
345                    let template = b.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
346                        SinkError::Config(anyhow!("Cannot find '{VALUE_FORMAT}',please set it."))
347                    })?;
348                    Ok(TemplateEncoder::new_string(
349                        b.schema,
350                        pk_indices,
351                        template.clone(),
352                    ))
353                }
354            },
355            _ => Err(SinkError::Config(anyhow!(
356                "The value type {} is not supported",
357                redis_value_type
358            ))),
359        }
360    }
361}
362
363struct FormatterParams<'a> {
364    builder: EncoderParams<'a>,
365    pk_indices: Vec<usize>,
366}
367
368/// Each formatter shall be able to be built from parameters.
369///
370/// This is not part of `SinkFormatter` trait, because that is about how a formatter completes its
371/// own job as a self-contained unit, with a custom `new` asking for only necessary info; while this
372/// one is about how different formatters can be selected from a common SQL interface.
373trait FormatterBuild: Sized {
374    async fn build(b: FormatterParams<'_>) -> Result<Self>;
375}
376
377impl<KE: EncoderBuild, VE: EncoderBuild> FormatterBuild for AppendOnlyFormatter<KE, VE> {
378    async fn build(b: FormatterParams<'_>) -> Result<Self> {
379        let key_encoder = match b.pk_indices.is_empty() {
380            true => None,
381            false => Some(KE::build(b.builder.clone(), Some(b.pk_indices)).await?),
382        };
383        let val_encoder = VE::build(b.builder, None).await?;
384        Ok(AppendOnlyFormatter::new(key_encoder, val_encoder))
385    }
386}
387
388impl<KE: EncoderBuild, VE: EncoderBuild> FormatterBuild for UpsertFormatter<KE, VE> {
389    async fn build(b: FormatterParams<'_>) -> Result<Self> {
390        let key_encoder = KE::build(b.builder.clone(), Some(b.pk_indices))
391            .await
392            .with_context(|| "Failed to build key encoder")?;
393        let val_encoder = VE::build(b.builder, None)
394            .await
395            .with_context(|| "Failed to build value encoder")?;
396        Ok(UpsertFormatter::new(key_encoder, val_encoder))
397    }
398}
399
400impl FormatterBuild for DebeziumJsonFormatter {
401    async fn build(b: FormatterParams<'_>) -> Result<Self> {
402        assert_eq!(b.builder.format_desc.encode, SinkEncode::Json);
403
404        Ok(DebeziumJsonFormatter::new(
405            b.builder.schema,
406            b.pk_indices,
407            b.builder.db_name,
408            b.builder.sink_from_name,
409            DebeziumAdapterOpts::default(),
410        ))
411    }
412}
413
414impl SinkFormatterImpl {
415    pub async fn new(
416        format_desc: &SinkFormatDesc,
417        schema: Schema,
418        pk_indices: Vec<usize>,
419        db_name: String,
420        sink_from_name: String,
421        topic: &str,
422    ) -> Result<Self> {
423        use {SinkEncode as E, SinkFormat as F, SinkFormatterImpl as Impl};
424        let p = FormatterParams {
425            builder: EncoderParams {
426                format_desc,
427                schema,
428                db_name,
429                sink_from_name,
430                topic,
431            },
432            pk_indices,
433        };
434
435        // When defining `SinkFormatterImpl` we already linked each variant (eg `AppendOnlyJson`) to
436        // an instantiation (eg `AppendOnlyFormatter<JsonEncoder, JsonEncoder>`) that implements the
437        // trait `FormatterBuild`.
438        //
439        // Here we just need to match a `(format, encode)` to a variant, and rustc shall be able to
440        // find the corresponding instantiation.
441
442        // However,
443        //   `Impl::AppendOnlyJson(FormatterBuild::build(p).await?)`
444        // fails to be inferred without the following dummy wrapper.
445        async fn build<T: FormatterBuild>(p: FormatterParams<'_>) -> Result<T> {
446            T::build(p).await
447        }
448
449        Ok(
450            match (
451                &format_desc.format,
452                &format_desc.encode,
453                &format_desc.key_encode,
454            ) {
455                (F::AppendOnly, E::Json, Some(E::Text)) => {
456                    Impl::AppendOnlyTextJson(build(p).await?)
457                }
458                (F::AppendOnly, E::Json, Some(E::Bytes)) => {
459                    Impl::AppendOnlyBytesJson(build(p).await?)
460                }
461                (F::AppendOnly, E::Json, None) => Impl::AppendOnlyJson(build(p).await?),
462                (F::AppendOnly, E::Avro, Some(E::Text)) => {
463                    Impl::AppendOnlyTextAvro(build(p).await?)
464                }
465                (F::AppendOnly, E::Avro, Some(E::Bytes)) => {
466                    Impl::AppendOnlyBytesAvro(build(p).await?)
467                }
468                (F::AppendOnly, E::Avro, None) => Impl::AppendOnlyAvro(build(p).await?),
469                (F::AppendOnly, E::Protobuf, Some(E::Text)) => {
470                    Impl::AppendOnlyTextProto(build(p).await?)
471                }
472                (F::AppendOnly, E::Protobuf, Some(E::Bytes)) => {
473                    Impl::AppendOnlyBytesProto(build(p).await?)
474                }
475                (F::AppendOnly, E::Protobuf, None) => Impl::AppendOnlyProto(build(p).await?),
476                (F::AppendOnly, E::Template, Some(E::Text)) => {
477                    Impl::AppendOnlyTextTemplate(build(p).await?)
478                }
479                (F::AppendOnly, E::Template, Some(E::Bytes)) => {
480                    Impl::AppendOnlyBytesTemplate(build(p).await?)
481                }
482                (F::AppendOnly, E::Template, None) => Impl::AppendOnlyTemplate(build(p).await?),
483                (F::Upsert, E::Json, Some(E::Text)) => Impl::UpsertTextJson(build(p).await?),
484                (F::Upsert, E::Json, Some(E::Bytes)) => {
485                    Impl::UpsertBytesJson(build(p).await?)
486                }
487                (F::Upsert, E::Json, None) => Impl::UpsertJson(build(p).await?),
488                (F::Upsert, E::Avro, Some(E::Text)) => Impl::UpsertTextAvro(build(p).await?),
489                (F::Upsert, E::Avro, Some(E::Bytes)) => {
490                    Impl::UpsertBytesAvro(build(p).await?)
491                }
492                (F::Upsert, E::Avro, None) => Impl::UpsertAvro(build(p).await?),
493                (F::Upsert, E::Protobuf, Some(E::Text)) => Impl::UpsertTextProto(build(p).await?),
494                (F::Upsert, E::Protobuf, Some(E::Bytes)) => {
495                    Impl::UpsertBytesProto(build(p).await?)
496                }
497                (F::Upsert, E::Template, Some(E::Text)) => {
498                    Impl::UpsertTextTemplate(build(p).await?)
499                }
500                (F::Upsert, E::Template, Some(E::Bytes)) => {
501                    Impl::UpsertBytesTemplate(build(p).await?)
502                }
503                (F::Upsert, E::Template, None) => Impl::UpsertTemplate(build(p).await?),
504                (F::Debezium, E::Json, None) => Impl::DebeziumJson(build(p).await?),
505                (F::AppendOnly | F::Upsert, E::Text, _) => {
506                    return Err(SinkError::Config(anyhow!(
507                        "ENCODE TEXT is only valid as key encode."
508                    )));
509                }
510                (F::AppendOnly, E::Avro, _)
511                | (F::Upsert, E::Protobuf, _)
512                | (F::Debezium, E::Json, Some(_))
513                | (F::Debezium, E::Avro | E::Protobuf | E::Template | E::Text, _)
514                | (F::AppendOnly, E::Bytes, _)
515                | (F::Upsert, E::Bytes, _)
516                | (F::Debezium, E::Bytes, _)
517                | (_, E::Parquet, _)
518                | (_, _, Some(E::Parquet))
519                | (F::AppendOnly | F::Upsert, _, Some(E::Template) | Some(E::Json) | Some(E::Avro) | Some(E::Protobuf)) // reject other encode as key encode
520                => {
521                    return Err(SinkError::Config(anyhow!(
522                        "sink format/encode/key_encode unsupported: {:?} {:?} {:?}",
523                        format_desc.format,
524                        format_desc.encode,
525                        format_desc.key_encode
526                    )));
527                }
528            },
529        )
530    }
531}
532
533/// Macro to dispatch formatting implementation for all supported sink formatter types.
534/// Used when the message key can be either bytes or string.
535///
536/// Takes a formatter implementation ($impl), binds it to a name ($name),
537/// and executes the provided code block ($body) with that binding.
538#[macro_export]
539macro_rules! dispatch_sink_formatter_impl {
540    ($impl:expr, $name:ident, $body:expr) => {
541        match $impl {
542            SinkFormatterImpl::AppendOnlyJson($name) => $body,
543            SinkFormatterImpl::AppendOnlyBytesJson($name) => $body,
544            SinkFormatterImpl::AppendOnlyTextJson($name) => $body,
545            SinkFormatterImpl::AppendOnlyAvro($name) => $body,
546            SinkFormatterImpl::AppendOnlyTextAvro($name) => $body,
547            SinkFormatterImpl::AppendOnlyBytesAvro($name) => $body,
548            SinkFormatterImpl::AppendOnlyProto($name) => $body,
549            SinkFormatterImpl::AppendOnlyTextProto($name) => $body,
550            SinkFormatterImpl::AppendOnlyBytesProto($name) => $body,
551
552            SinkFormatterImpl::UpsertJson($name) => $body,
553            SinkFormatterImpl::UpsertBytesJson($name) => $body,
554            SinkFormatterImpl::UpsertTextJson($name) => $body,
555            SinkFormatterImpl::UpsertAvro($name) => $body,
556            SinkFormatterImpl::UpsertTextAvro($name) => $body,
557            SinkFormatterImpl::UpsertBytesAvro($name) => $body,
558            SinkFormatterImpl::UpsertTextProto($name) => $body,
559            SinkFormatterImpl::UpsertBytesProto($name) => $body,
560            SinkFormatterImpl::DebeziumJson($name) => $body,
561            SinkFormatterImpl::AppendOnlyTextTemplate($name) => $body,
562            SinkFormatterImpl::AppendOnlyTemplate($name) => $body,
563            SinkFormatterImpl::UpsertTextTemplate($name) => $body,
564            SinkFormatterImpl::UpsertTemplate($name) => $body,
565            SinkFormatterImpl::AppendOnlyBytesTemplate($name) => $body,
566            SinkFormatterImpl::UpsertBytesTemplate($name) => $body,
567        }
568    };
569}
570
571/// Macro to dispatch formatting implementation for sink formatters that require string keys.
572/// Used when the message key must be a string (excludes some Avro and bytes implementations).
573///
574/// Similar to `dispatch_sink_formatter_impl`, but excludes certain formatter types
575/// that don't support string keys (e.g., `AppendOnlyAvro`, `UpsertAvro`).
576/// These cases are marked as unreachable!() since they should never occur
577/// in contexts requiring string keys.
578#[macro_export]
579macro_rules! dispatch_sink_formatter_str_key_impl {
580    ($impl:expr, $name:ident, $body:expr) => {
581        match $impl {
582            SinkFormatterImpl::AppendOnlyJson($name) => $body,
583            SinkFormatterImpl::AppendOnlyBytesJson(_) => unreachable!(),
584            SinkFormatterImpl::AppendOnlyTextJson($name) => $body,
585            SinkFormatterImpl::AppendOnlyAvro(_) => unreachable!(),
586            SinkFormatterImpl::AppendOnlyTextAvro($name) => $body,
587            SinkFormatterImpl::AppendOnlyBytesAvro(_) => unreachable!(),
588            SinkFormatterImpl::AppendOnlyProto($name) => $body,
589            SinkFormatterImpl::AppendOnlyTextProto($name) => $body,
590            SinkFormatterImpl::AppendOnlyBytesProto(_) => unreachable!(),
591
592            SinkFormatterImpl::UpsertJson($name) => $body,
593            SinkFormatterImpl::UpsertTextJson($name) => $body,
594            SinkFormatterImpl::UpsertAvro(_) => unreachable!(),
595            SinkFormatterImpl::UpsertTextAvro($name) => $body,
596            SinkFormatterImpl::UpsertBytesAvro(_) => unreachable!(),
597            SinkFormatterImpl::UpsertTextProto($name) => $body,
598            SinkFormatterImpl::UpsertBytesProto(_) => unreachable!(),
599            SinkFormatterImpl::DebeziumJson($name) => $body,
600            SinkFormatterImpl::AppendOnlyTextTemplate($name) => $body,
601            SinkFormatterImpl::AppendOnlyTemplate($name) => $body,
602            SinkFormatterImpl::UpsertTextTemplate($name) => $body,
603            SinkFormatterImpl::UpsertBytesJson(_) => unreachable!(),
604            SinkFormatterImpl::UpsertTemplate($name) => $body,
605            SinkFormatterImpl::AppendOnlyBytesTemplate(_) => unreachable!(),
606            SinkFormatterImpl::UpsertBytesTemplate(_) => unreachable!(),
607        }
608    };
609}