risingwave_connector/sink/formatter/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::{Context, anyhow};
16use risingwave_common::array::StreamChunk;
17use risingwave_common::catalog::Field;
18
19use crate::sink::redis::REDIS_VALUE_TYPE_STREAM;
20use crate::sink::{Result, SinkError};
21
22mod append_only;
23mod debezium_json;
24mod upsert;
25
26pub use append_only::AppendOnlyFormatter;
27pub use debezium_json::{DebeziumAdapterOpts, DebeziumJsonFormatter};
28use risingwave_common::catalog::Schema;
29use risingwave_common::types::DataType;
30pub use upsert::UpsertFormatter;
31
32use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc};
33use super::encoder::bytes::BytesEncoder;
34use super::encoder::template::TemplateEncoder;
35use super::encoder::text::TextEncoder;
36use super::encoder::{
37    DateHandlingMode, JsonbHandlingMode, KafkaConnectParams, TimeHandlingMode,
38    TimestamptzHandlingMode,
39};
40use super::redis::{
41    CHANNEL, CHANNEL_COLUMN, KEY_FORMAT, LAT_NAME, LON_NAME, MEMBER_NAME, REDIS_VALUE_TYPE,
42    REDIS_VALUE_TYPE_GEO, REDIS_VALUE_TYPE_PUBSUB, REDIS_VALUE_TYPE_STRING, STREAM, STREAM_COLUMN,
43    VALUE_FORMAT,
44};
45use crate::sink::encoder::{
46    AvroEncoder, AvroHeader, JsonEncoder, ProtoEncoder, ProtoHeader, TimestampHandlingMode,
47};
48
49/// Transforms a `StreamChunk` into a sequence of key-value pairs according a specific format,
50/// for example append-only, upsert or debezium.
51pub trait SinkFormatter {
52    type K;
53    type V;
54
55    /// * Key may be None so that messages are partitioned using round-robin.
56    ///   For example append-only without `primary_key` (aka `downstream_pk`) set.
57    /// * Value may be None so that messages with same key are removed during log compaction.
58    ///   For example debezium tombstone event.
59    #[expect(clippy::type_complexity)]
60    fn format_chunk(
61        &self,
62        chunk: &StreamChunk,
63    ) -> impl Iterator<Item = Result<(Option<Self::K>, Option<Self::V>)>>;
64}
65
66/// `tri!` in generators yield `Err` and return `()`
67/// `?` in generators return `Err`
68#[macro_export]
69macro_rules! tri {
70    ($expr:expr) => {
71        match $expr {
72            Ok(val) => val,
73            Err(err) => {
74                yield Err(err);
75                return;
76            }
77        }
78    };
79}
80
81pub enum SinkFormatterImpl {
82    // append-only
83    AppendOnlyJson(AppendOnlyFormatter<JsonEncoder, JsonEncoder>),
84    AppendOnlyTextJson(AppendOnlyFormatter<TextEncoder, JsonEncoder>),
85    AppendOnlyBytesJson(AppendOnlyFormatter<BytesEncoder, JsonEncoder>),
86    AppendOnlyAvro(AppendOnlyFormatter<AvroEncoder, AvroEncoder>),
87    AppendOnlyTextAvro(AppendOnlyFormatter<TextEncoder, AvroEncoder>),
88    AppendOnlyBytesAvro(AppendOnlyFormatter<BytesEncoder, AvroEncoder>),
89    AppendOnlyProto(AppendOnlyFormatter<JsonEncoder, ProtoEncoder>),
90    AppendOnlyTextProto(AppendOnlyFormatter<TextEncoder, ProtoEncoder>),
91    AppendOnlyBytesProto(AppendOnlyFormatter<BytesEncoder, ProtoEncoder>),
92    AppendOnlyTemplate(AppendOnlyFormatter<TemplateEncoder, TemplateEncoder>),
93    AppendOnlyTextTemplate(AppendOnlyFormatter<TextEncoder, TemplateEncoder>),
94    AppendOnlyBytesTemplate(AppendOnlyFormatter<BytesEncoder, TemplateEncoder>),
95    AppendOnlyBytes(AppendOnlyFormatter<BytesEncoder, BytesEncoder>),
96    // upsert
97    UpsertJson(UpsertFormatter<JsonEncoder, JsonEncoder>),
98    UpsertTextJson(UpsertFormatter<TextEncoder, JsonEncoder>),
99    UpsertBytesJson(UpsertFormatter<BytesEncoder, JsonEncoder>),
100    UpsertAvro(UpsertFormatter<AvroEncoder, AvroEncoder>),
101    UpsertTextAvro(UpsertFormatter<TextEncoder, AvroEncoder>),
102    UpsertBytesAvro(UpsertFormatter<BytesEncoder, AvroEncoder>),
103    // `UpsertFormatter<ProtoEncoder, ProtoEncoder>` is intentionally left out
104    // to avoid using `ProtoEncoder` as key:
105    // <https://docs.confluent.io/platform/7.7/control-center/topics/schema.html#c3-schemas-best-practices-key-value-pairs>
106    UpsertTextProto(UpsertFormatter<TextEncoder, ProtoEncoder>),
107    UpsertBytesProto(UpsertFormatter<BytesEncoder, ProtoEncoder>),
108    UpsertTemplate(UpsertFormatter<TemplateEncoder, TemplateEncoder>),
109    UpsertTextTemplate(UpsertFormatter<TextEncoder, TemplateEncoder>),
110    UpsertBytesTemplate(UpsertFormatter<BytesEncoder, TemplateEncoder>),
111    // debezium
112    DebeziumJson(DebeziumJsonFormatter),
113}
114
115#[derive(Debug, Clone)]
116pub struct EncoderParams<'a> {
117    format_desc: &'a SinkFormatDesc,
118    schema: Schema,
119    db_name: String,
120    sink_from_name: String,
121    topic: &'a str,
122}
123
124/// Each encoder shall be able to be built from parameters.
125///
126/// This is not part of `RowEncoder` trait, because that one is about how an encoder completes its
127/// own job as a self-contained unit, with a custom `new` asking for only necessary info; while this
128/// one is about how different encoders can be selected from a common SQL interface.
129pub trait EncoderBuild: Sized {
130    /// Pass `pk_indices: None` for value/payload and `Some` for key. Certain encoder builds
131    /// differently when used as key vs value.
132    async fn build(params: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self>;
133}
134
135impl EncoderBuild for JsonEncoder {
136    async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
137        let timestamptz_mode = TimestamptzHandlingMode::from_options(&b.format_desc.options)?;
138        let jsonb_handling_mode = JsonbHandlingMode::from_options(&b.format_desc.options)?;
139        let encoder = JsonEncoder::new(
140            b.schema,
141            pk_indices,
142            DateHandlingMode::FromCe,
143            TimestampHandlingMode::Milli,
144            timestamptz_mode,
145            TimeHandlingMode::Milli,
146            jsonb_handling_mode,
147        );
148        let encoder = if let Some(s) = b.format_desc.options.get("schemas.enable") {
149            match s.to_lowercase().parse::<bool>() {
150                Ok(true) => {
151                    let kafka_connect = KafkaConnectParams {
152                        schema_name: format!("{}.{}", b.db_name, b.sink_from_name),
153                    };
154                    encoder.with_kafka_connect(kafka_connect)
155                }
156                Ok(false) => encoder,
157                _ => {
158                    return Err(SinkError::Config(anyhow!(
159                        "schemas.enable is expected to be `true` or `false`, got {s}",
160                    )));
161                }
162            }
163        } else {
164            encoder
165        };
166        Ok(encoder)
167    }
168}
169
170impl EncoderBuild for ProtoEncoder {
171    async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
172        // TODO: better to be a compile-time assert
173        assert!(pk_indices.is_none());
174        // By passing `None` as `aws_auth_props`, reading from `s3://` not supported yet.
175        let (descriptor, sid) =
176            crate::schema::protobuf::fetch_descriptor(&b.format_desc.options, b.topic, None)
177                .await
178                .map_err(|e| SinkError::Config(anyhow!(e)))?;
179        let header = match sid {
180            None => ProtoHeader::None,
181            Some(sid) => ProtoHeader::ConfluentSchemaRegistry(sid),
182        };
183        ProtoEncoder::new(b.schema, None, descriptor, header)
184    }
185}
186
187fn ensure_only_one_pk<'a>(
188    data_type_name: &'a str,
189    params: &'a EncoderParams<'_>,
190    pk_indices: &'a Option<Vec<usize>>,
191) -> Result<(usize, &'a Field)> {
192    let Some(pk_indices) = pk_indices else {
193        return Err(SinkError::Config(anyhow!(
194            "{}Encoder requires primary key columns to be specified",
195            data_type_name
196        )));
197    };
198    if pk_indices.len() != 1 {
199        return Err(SinkError::Config(anyhow!(
200            "KEY ENCODE {} expects only one primary key, but got {}",
201            data_type_name,
202            pk_indices.len(),
203        )));
204    }
205
206    let schema_ref = params.schema.fields().get(pk_indices[0]).ok_or_else(|| {
207        SinkError::Config(anyhow!(
208            "The primary key column index {} is out of bounds in schema {:?}",
209            pk_indices[0],
210            params.schema
211        ))
212    })?;
213
214    Ok((pk_indices[0], schema_ref))
215}
216
217impl EncoderBuild for BytesEncoder {
218    async fn build(params: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
219        match pk_indices {
220            // This is being used as a key encoder
221            Some(_) => {
222                let (pk_index, schema_ref) = ensure_only_one_pk("BYTES", &params, &pk_indices)?;
223                if let DataType::Bytea = schema_ref.data_type() {
224                    Ok(BytesEncoder::new(params.schema, pk_index))
225                } else {
226                    Err(SinkError::Config(anyhow!(
227                        "The key encode is BYTES, but the primary key column {} has type {}",
228                        schema_ref.name,
229                        schema_ref.data_type
230                    )))
231                }
232            }
233            // This is being used as a value encoder
234            None => {
235                // Ensure the schema has exactly one column and it's of type BYTEA
236                if params.schema.len() != 1 {
237                    return Err(SinkError::Config(anyhow!(
238                        "ENCODE BYTES requires exactly one column, got {} columns",
239                        params.schema.len()
240                    )));
241                }
242
243                let field = &params.schema.fields[0];
244                if let DataType::Bytea = field.data_type {
245                    Ok(BytesEncoder::new(params.schema, 0))
246                } else {
247                    Err(SinkError::Config(anyhow!(
248                        "ENCODE BYTES requires the column to be of type BYTEA, but got type {}",
249                        field.data_type
250                    )))
251                }
252            }
253        }
254    }
255}
256
257impl EncoderBuild for TextEncoder {
258    async fn build(params: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
259        let (pk_index, schema_ref) = ensure_only_one_pk("TEXT", &params, &pk_indices)?;
260        match &schema_ref.data_type() {
261            DataType::Varchar
262            | DataType::Boolean
263            | DataType::Int16
264            | DataType::Int32
265            | DataType::Int64
266            | DataType::Int256
267            | DataType::Serial => {}
268            _ => {
269                // why we don't allow float as text for key encode: https://github.com/risingwavelabs/risingwave/pull/16377#discussion_r1591864960
270                return Err(SinkError::Config(anyhow!(
271                    "The key encode is TEXT, but the primary key column {} has type {}. The key encode TEXT requires the primary key column to be of type varchar, bool, small int, int, big int, serial or rw_int256.",
272                    schema_ref.name,
273                    schema_ref.data_type
274                )));
275            }
276        }
277
278        Ok(Self::new(params.schema, pk_index))
279    }
280}
281
282impl EncoderBuild for AvroEncoder {
283    async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
284        use crate::schema::{SchemaLoader, SchemaVersion};
285
286        let loader = SchemaLoader::from_format_options(b.topic, &b.format_desc.options)
287            .await
288            .map_err(|e| SinkError::Config(anyhow!(e)))?;
289
290        let (schema_version, avro) = match pk_indices {
291            Some(_) => loader
292                .load_key_schema()
293                .await
294                .map_err(|e| SinkError::Config(anyhow!(e)))?,
295            None => loader
296                .load_val_schema()
297                .await
298                .map_err(|e| SinkError::Config(anyhow!(e)))?,
299        };
300        AvroEncoder::new(
301            b.schema,
302            pk_indices,
303            std::sync::Arc::new(avro),
304            match schema_version {
305                SchemaVersion::Confluent(x) => AvroHeader::ConfluentSchemaRegistry(x),
306                SchemaVersion::Glue(x) => AvroHeader::GlueSchemaRegistry(x),
307            },
308        )
309    }
310}
311
312impl EncoderBuild for TemplateEncoder {
313    async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
314        let redis_value_type = b
315            .format_desc
316            .options
317            .get(REDIS_VALUE_TYPE)
318            .map_or(REDIS_VALUE_TYPE_STRING, |s| s.as_str());
319        match redis_value_type {
320            REDIS_VALUE_TYPE_STRING => {
321                let option_name = match pk_indices {
322                    Some(_) => KEY_FORMAT,
323                    None => VALUE_FORMAT,
324                };
325                let template = b.format_desc.options.get(option_name).ok_or_else(|| {
326                    SinkError::Config(anyhow!("Cannot find '{option_name}',please set it."))
327                })?;
328                Ok(TemplateEncoder::new_string(
329                    b.schema,
330                    pk_indices,
331                    template.clone(),
332                ))
333            }
334            REDIS_VALUE_TYPE_GEO => match pk_indices {
335                Some(_) => {
336                    let member_name = b.format_desc.options.get(MEMBER_NAME).ok_or_else(|| {
337                        SinkError::Config(anyhow!("Cannot find `{MEMBER_NAME}`,please set it."))
338                    })?;
339                    let template = b.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
340                        SinkError::Config(anyhow!("Cannot find `{KEY_FORMAT}`,please set it."))
341                    })?;
342                    TemplateEncoder::new_geo_key(
343                        b.schema,
344                        pk_indices,
345                        member_name,
346                        template.clone(),
347                    )
348                }
349                None => {
350                    let lat_name = b.format_desc.options.get(LAT_NAME).ok_or_else(|| {
351                        SinkError::Config(anyhow!("Cannot find `{LAT_NAME}`, please set it."))
352                    })?;
353                    let lon_name = b.format_desc.options.get(LON_NAME).ok_or_else(|| {
354                        SinkError::Config(anyhow!("Cannot find `{LON_NAME}`,please set it."))
355                    })?;
356                    TemplateEncoder::new_geo_value(b.schema, pk_indices, lat_name, lon_name)
357                }
358            },
359            REDIS_VALUE_TYPE_PUBSUB => match pk_indices {
360                Some(_) => {
361                    let channel = b.format_desc.options.get(CHANNEL).cloned();
362                    let channel_column = b.format_desc.options.get(CHANNEL_COLUMN).cloned();
363                    if (channel.is_none() && channel_column.is_none())
364                        || (channel.is_some() && channel_column.is_some())
365                    {
366                        return Err(SinkError::Config(anyhow!(
367                            "`{CHANNEL}` and `{CHANNEL_COLUMN}` only one can be set"
368                        )));
369                    }
370                    TemplateEncoder::new_pubsub_stream_key(
371                        b.schema,
372                        pk_indices,
373                        channel,
374                        channel_column,
375                    )
376                }
377                None => {
378                    let template = b.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
379                        SinkError::Config(anyhow!("Cannot find '{VALUE_FORMAT}',please set it."))
380                    })?;
381                    Ok(TemplateEncoder::new_string(
382                        b.schema,
383                        pk_indices,
384                        template.clone(),
385                    ))
386                }
387            },
388            REDIS_VALUE_TYPE_STREAM => match pk_indices {
389                Some(_) => {
390                    let stream = b.format_desc.options.get(STREAM).cloned();
391                    let stream_column = b.format_desc.options.get(STREAM_COLUMN).cloned();
392                    if (stream.is_none() && stream_column.is_none())
393                        || (stream.is_some() && stream_column.is_some())
394                    {
395                        return Err(SinkError::Config(anyhow!(
396                            "`{STREAM}` and `{STREAM_COLUMN}` only one can be set"
397                        )));
398                    }
399                    TemplateEncoder::new_pubsub_stream_key(
400                        b.schema,
401                        pk_indices,
402                        stream,
403                        stream_column,
404                    )
405                }
406                None => {
407                    let value_template =
408                        b.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
409                            SinkError::Config(anyhow!(
410                                "Cannot find '{VALUE_FORMAT}',please set it."
411                            ))
412                        })?;
413                    let key_template = b.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
414                        SinkError::Config(anyhow!("Cannot find '{KEY_FORMAT}',please set it."))
415                    })?;
416
417                    Ok(TemplateEncoder::new_stream_value(
418                        b.schema,
419                        pk_indices,
420                        key_template.clone(),
421                        value_template.clone(),
422                    ))
423                }
424            },
425            _ => Err(SinkError::Config(anyhow!(
426                "The value type {} is not supported",
427                redis_value_type
428            ))),
429        }
430    }
431}
432
433struct FormatterParams<'a> {
434    builder: EncoderParams<'a>,
435    pk_indices: Vec<usize>,
436}
437
438/// Each formatter shall be able to be built from parameters.
439///
440/// This is not part of `SinkFormatter` trait, because that is about how a formatter completes its
441/// own job as a self-contained unit, with a custom `new` asking for only necessary info; while this
442/// one is about how different formatters can be selected from a common SQL interface.
443trait FormatterBuild: Sized {
444    async fn build(b: FormatterParams<'_>) -> Result<Self>;
445}
446
447impl<KE: EncoderBuild, VE: EncoderBuild> FormatterBuild for AppendOnlyFormatter<KE, VE> {
448    async fn build(b: FormatterParams<'_>) -> Result<Self> {
449        let key_encoder = match b.pk_indices.is_empty() {
450            true => None,
451            false => Some(KE::build(b.builder.clone(), Some(b.pk_indices)).await?),
452        };
453        let val_encoder = VE::build(b.builder, None).await?;
454        Ok(AppendOnlyFormatter::new(key_encoder, val_encoder))
455    }
456}
457
458impl<KE: EncoderBuild, VE: EncoderBuild> FormatterBuild for UpsertFormatter<KE, VE> {
459    async fn build(b: FormatterParams<'_>) -> Result<Self> {
460        let key_encoder = KE::build(b.builder.clone(), Some(b.pk_indices))
461            .await
462            .with_context(|| "Failed to build key encoder")?;
463        let val_encoder = VE::build(b.builder, None)
464            .await
465            .with_context(|| "Failed to build value encoder")?;
466        Ok(UpsertFormatter::new(key_encoder, val_encoder))
467    }
468}
469
470impl FormatterBuild for DebeziumJsonFormatter {
471    async fn build(b: FormatterParams<'_>) -> Result<Self> {
472        assert_eq!(b.builder.format_desc.encode, SinkEncode::Json);
473
474        Ok(DebeziumJsonFormatter::new(
475            b.builder.schema,
476            b.pk_indices,
477            b.builder.db_name,
478            b.builder.sink_from_name,
479            DebeziumAdapterOpts::default(),
480        ))
481    }
482}
483
484impl SinkFormatterImpl {
485    pub async fn new(
486        format_desc: &SinkFormatDesc,
487        schema: Schema,
488        pk_indices: Vec<usize>,
489        db_name: String,
490        sink_from_name: String,
491        topic: &str,
492    ) -> Result<Self> {
493        use {SinkEncode as E, SinkFormat as F, SinkFormatterImpl as Impl};
494        let p = FormatterParams {
495            builder: EncoderParams {
496                format_desc,
497                schema,
498                db_name,
499                sink_from_name,
500                topic,
501            },
502            pk_indices,
503        };
504
505        // When defining `SinkFormatterImpl` we already linked each variant (eg `AppendOnlyJson`) to
506        // an instantiation (eg `AppendOnlyFormatter<JsonEncoder, JsonEncoder>`) that implements the
507        // trait `FormatterBuild`.
508        //
509        // Here we just need to match a `(format, encode)` to a variant, and rustc shall be able to
510        // find the corresponding instantiation.
511
512        // However,
513        //   `Impl::AppendOnlyJson(FormatterBuild::build(p).await?)`
514        // fails to be inferred without the following dummy wrapper.
515        async fn build<T: FormatterBuild>(p: FormatterParams<'_>) -> Result<T> {
516            T::build(p).await
517        }
518
519        Ok(
520            match (
521                &format_desc.format,
522                &format_desc.encode,
523                &format_desc.key_encode,
524            ) {
525                (F::AppendOnly, E::Json, Some(E::Text)) => {
526                    Impl::AppendOnlyTextJson(build(p).await?)
527                }
528                (F::AppendOnly, E::Json, Some(E::Bytes)) => {
529                    Impl::AppendOnlyBytesJson(build(p).await?)
530                }
531                (F::AppendOnly, E::Json, None) => Impl::AppendOnlyJson(build(p).await?),
532                (F::AppendOnly, E::Avro, Some(E::Text)) => {
533                    Impl::AppendOnlyTextAvro(build(p).await?)
534                }
535                (F::AppendOnly, E::Avro, Some(E::Bytes)) => {
536                    Impl::AppendOnlyBytesAvro(build(p).await?)
537                }
538                (F::AppendOnly, E::Avro, None) => Impl::AppendOnlyAvro(build(p).await?),
539                (F::AppendOnly, E::Protobuf, Some(E::Text)) => {
540                    Impl::AppendOnlyTextProto(build(p).await?)
541                }
542                (F::AppendOnly, E::Protobuf, Some(E::Bytes)) => {
543                    Impl::AppendOnlyBytesProto(build(p).await?)
544                }
545                (F::AppendOnly, E::Protobuf, None) => Impl::AppendOnlyProto(build(p).await?),
546                (F::AppendOnly, E::Template, Some(E::Text)) => {
547                    Impl::AppendOnlyTextTemplate(build(p).await?)
548                }
549                (F::AppendOnly, E::Template, Some(E::Bytes)) => {
550                    Impl::AppendOnlyBytesTemplate(build(p).await?)
551                }
552                (F::AppendOnly, E::Template, None) => Impl::AppendOnlyTemplate(build(p).await?),
553                (F::AppendOnly, E::Bytes, None) => Impl::AppendOnlyBytes(build(p).await?),
554                (F::Upsert, E::Json, Some(E::Text)) => Impl::UpsertTextJson(build(p).await?),
555                (F::Upsert, E::Json, Some(E::Bytes)) => {
556                    Impl::UpsertBytesJson(build(p).await?)
557                }
558                (F::Upsert, E::Json, None) => Impl::UpsertJson(build(p).await?),
559                (F::Upsert, E::Avro, Some(E::Text)) => Impl::UpsertTextAvro(build(p).await?),
560                (F::Upsert, E::Avro, Some(E::Bytes)) => {
561                    Impl::UpsertBytesAvro(build(p).await?)
562                }
563                (F::Upsert, E::Avro, None) => Impl::UpsertAvro(build(p).await?),
564                (F::Upsert, E::Protobuf, Some(E::Text)) => Impl::UpsertTextProto(build(p).await?),
565                (F::Upsert, E::Protobuf, Some(E::Bytes)) => {
566                    Impl::UpsertBytesProto(build(p).await?)
567                }
568                (F::Upsert, E::Template, Some(E::Text)) => {
569                    Impl::UpsertTextTemplate(build(p).await?)
570                }
571                (F::Upsert, E::Template, Some(E::Bytes)) => {
572                    Impl::UpsertBytesTemplate(build(p).await?)
573                }
574                (F::Upsert, E::Template, None) => Impl::UpsertTemplate(build(p).await?),
575                (F::Debezium, E::Json, None) => Impl::DebeziumJson(build(p).await?),
576                (F::AppendOnly | F::Upsert, E::Text, _) => {
577                    return Err(SinkError::Config(anyhow!(
578                        "ENCODE TEXT is only valid as key encode."
579                    )));
580                }
581                (F::AppendOnly, E::Avro, _)
582                | (F::Upsert, E::Protobuf, _)
583                | (F::Upsert, E::Bytes, _)
584                | (F::Debezium, E::Json, Some(_))
585                | (F::Debezium, E::Avro | E::Protobuf | E::Template | E::Text | E::Bytes, _)
586                | (_, E::Parquet, _)
587                | (_, _, Some(E::Parquet))
588                | (F::AppendOnly, E::Bytes, Some(_))
589                | (F::AppendOnly | F::Upsert, _, Some(E::Template) | Some(E::Json) | Some(E::Avro) | Some(E::Protobuf)) // reject other encode as key encode
590                => {
591                    return Err(SinkError::Config(anyhow!(
592                        "sink format/encode/key_encode unsupported: {:?} {:?} {:?}",
593                        format_desc.format,
594                        format_desc.encode,
595                        format_desc.key_encode
596                    )));
597                }
598            },
599        )
600    }
601}
602
603/// Macro to dispatch formatting implementation for all supported sink formatter types.
604/// Used when the message key can be either bytes or string.
605///
606/// Takes a formatter implementation ($impl), binds it to a name ($name),
607/// and executes the provided code block ($body) with that binding.
608#[macro_export]
609macro_rules! dispatch_sink_formatter_impl {
610    ($impl:expr, $name:ident, $body:expr) => {
611        match $impl {
612            SinkFormatterImpl::AppendOnlyJson($name) => $body,
613            SinkFormatterImpl::AppendOnlyBytesJson($name) => $body,
614            SinkFormatterImpl::AppendOnlyTextJson($name) => $body,
615            SinkFormatterImpl::AppendOnlyAvro($name) => $body,
616            SinkFormatterImpl::AppendOnlyTextAvro($name) => $body,
617            SinkFormatterImpl::AppendOnlyBytesAvro($name) => $body,
618            SinkFormatterImpl::AppendOnlyProto($name) => $body,
619            SinkFormatterImpl::AppendOnlyTextProto($name) => $body,
620            SinkFormatterImpl::AppendOnlyBytesProto($name) => $body,
621
622            SinkFormatterImpl::UpsertJson($name) => $body,
623            SinkFormatterImpl::UpsertBytesJson($name) => $body,
624            SinkFormatterImpl::UpsertTextJson($name) => $body,
625            SinkFormatterImpl::UpsertAvro($name) => $body,
626            SinkFormatterImpl::UpsertTextAvro($name) => $body,
627            SinkFormatterImpl::UpsertBytesAvro($name) => $body,
628            SinkFormatterImpl::UpsertTextProto($name) => $body,
629            SinkFormatterImpl::UpsertBytesProto($name) => $body,
630            SinkFormatterImpl::DebeziumJson($name) => $body,
631            SinkFormatterImpl::AppendOnlyTextTemplate($name) => $body,
632            SinkFormatterImpl::AppendOnlyTemplate($name) => $body,
633            SinkFormatterImpl::UpsertTextTemplate($name) => $body,
634            SinkFormatterImpl::UpsertTemplate($name) => $body,
635            SinkFormatterImpl::AppendOnlyBytesTemplate($name) => $body,
636            SinkFormatterImpl::UpsertBytesTemplate($name) => $body,
637            SinkFormatterImpl::AppendOnlyBytes($name) => $body,
638        }
639    };
640}
641
642/// Macro to dispatch formatting implementation for sink formatters that require string keys.
643/// Used when the message key must be a string (excludes some Avro and bytes implementations).
644///
645/// Similar to `dispatch_sink_formatter_impl`, but excludes certain formatter types
646/// that don't support string keys (e.g., `AppendOnlyAvro`, `UpsertAvro`).
647/// These cases are marked as unreachable!() since they should never occur
648/// in contexts requiring string keys.
649#[macro_export]
650macro_rules! dispatch_sink_formatter_str_key_impl {
651    ($impl:expr, $name:ident, $body:expr $(,$attr:meta)?) => {
652        $(#[$attr])?
653        match $impl {
654            SinkFormatterImpl::AppendOnlyJson($name) => $body,
655            SinkFormatterImpl::AppendOnlyBytesJson(_) => unreachable!(),
656            SinkFormatterImpl::AppendOnlyTextJson($name) => $body,
657            SinkFormatterImpl::AppendOnlyAvro(_) => unreachable!(),
658            SinkFormatterImpl::AppendOnlyTextAvro($name) => $body,
659            SinkFormatterImpl::AppendOnlyBytesAvro(_) => unreachable!(),
660            SinkFormatterImpl::AppendOnlyProto($name) => $body,
661            SinkFormatterImpl::AppendOnlyTextProto($name) => $body,
662            SinkFormatterImpl::AppendOnlyBytesProto(_) => unreachable!(),
663
664            SinkFormatterImpl::UpsertJson($name) => $body,
665            SinkFormatterImpl::UpsertTextJson($name) => $body,
666            SinkFormatterImpl::UpsertAvro(_) => unreachable!(),
667            SinkFormatterImpl::UpsertTextAvro($name) => $body,
668            SinkFormatterImpl::UpsertBytesAvro(_) => unreachable!(),
669            SinkFormatterImpl::UpsertTextProto($name) => $body,
670            SinkFormatterImpl::UpsertBytesProto(_) => unreachable!(),
671            SinkFormatterImpl::DebeziumJson($name) => $body,
672            SinkFormatterImpl::AppendOnlyTextTemplate($name) => $body,
673            SinkFormatterImpl::AppendOnlyTemplate($name) => $body,
674            SinkFormatterImpl::UpsertTextTemplate($name) => $body,
675            SinkFormatterImpl::UpsertBytesJson(_) => unreachable!(),
676            SinkFormatterImpl::UpsertTemplate($name) => $body,
677            SinkFormatterImpl::AppendOnlyBytesTemplate(_) => unreachable!(),
678            SinkFormatterImpl::UpsertBytesTemplate(_) => unreachable!(),
679            SinkFormatterImpl::AppendOnlyBytes(_) => unreachable!(),
680        }
681    };
682}