1use anyhow::{Context, anyhow};
16use risingwave_common::array::StreamChunk;
17use risingwave_common::catalog::Field;
18
19use crate::sink::redis::REDIS_VALUE_TYPE_STREAM;
20use crate::sink::{Result, SinkError};
21
22mod append_only;
23mod debezium_json;
24mod upsert;
25
26pub use append_only::AppendOnlyFormatter;
27pub use debezium_json::{DebeziumAdapterOpts, DebeziumJsonFormatter};
28use risingwave_common::catalog::Schema;
29use risingwave_common::types::DataType;
30pub use upsert::UpsertFormatter;
31
32use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc};
33use super::encoder::bytes::BytesEncoder;
34use super::encoder::template::TemplateEncoder;
35use super::encoder::text::TextEncoder;
36use super::encoder::{
37 DateHandlingMode, JsonbHandlingMode, KafkaConnectParams, TimeHandlingMode,
38 TimestamptzHandlingMode,
39};
40use super::redis::{
41 CHANNEL, CHANNEL_COLUMN, KEY_FORMAT, LAT_NAME, LON_NAME, MEMBER_NAME, REDIS_VALUE_TYPE,
42 REDIS_VALUE_TYPE_GEO, REDIS_VALUE_TYPE_PUBSUB, REDIS_VALUE_TYPE_STRING, STREAM, STREAM_COLUMN,
43 VALUE_FORMAT,
44};
45use crate::sink::encoder::{
46 AvroEncoder, AvroHeader, JsonEncoder, ProtoEncoder, ProtoHeader, TimestampHandlingMode,
47};
48
49pub trait SinkFormatter {
52 type K;
53 type V;
54
55 #[expect(clippy::type_complexity)]
60 fn format_chunk(
61 &self,
62 chunk: &StreamChunk,
63 ) -> impl Iterator<Item = Result<(Option<Self::K>, Option<Self::V>)>>;
64}
65
66#[macro_export]
69macro_rules! tri {
70 ($expr:expr) => {
71 match $expr {
72 Ok(val) => val,
73 Err(err) => {
74 yield Err(err);
75 return;
76 }
77 }
78 };
79}
80
81pub enum SinkFormatterImpl {
82 AppendOnlyJson(AppendOnlyFormatter<JsonEncoder, JsonEncoder>),
84 AppendOnlyTextJson(AppendOnlyFormatter<TextEncoder, JsonEncoder>),
85 AppendOnlyBytesJson(AppendOnlyFormatter<BytesEncoder, JsonEncoder>),
86 AppendOnlyAvro(AppendOnlyFormatter<AvroEncoder, AvroEncoder>),
87 AppendOnlyTextAvro(AppendOnlyFormatter<TextEncoder, AvroEncoder>),
88 AppendOnlyBytesAvro(AppendOnlyFormatter<BytesEncoder, AvroEncoder>),
89 AppendOnlyProto(AppendOnlyFormatter<JsonEncoder, ProtoEncoder>),
90 AppendOnlyTextProto(AppendOnlyFormatter<TextEncoder, ProtoEncoder>),
91 AppendOnlyBytesProto(AppendOnlyFormatter<BytesEncoder, ProtoEncoder>),
92 AppendOnlyTemplate(AppendOnlyFormatter<TemplateEncoder, TemplateEncoder>),
93 AppendOnlyTextTemplate(AppendOnlyFormatter<TextEncoder, TemplateEncoder>),
94 AppendOnlyBytesTemplate(AppendOnlyFormatter<BytesEncoder, TemplateEncoder>),
95 UpsertJson(UpsertFormatter<JsonEncoder, JsonEncoder>),
97 UpsertTextJson(UpsertFormatter<TextEncoder, JsonEncoder>),
98 UpsertBytesJson(UpsertFormatter<BytesEncoder, JsonEncoder>),
99 UpsertAvro(UpsertFormatter<AvroEncoder, AvroEncoder>),
100 UpsertTextAvro(UpsertFormatter<TextEncoder, AvroEncoder>),
101 UpsertBytesAvro(UpsertFormatter<BytesEncoder, AvroEncoder>),
102 UpsertTextProto(UpsertFormatter<TextEncoder, ProtoEncoder>),
106 UpsertBytesProto(UpsertFormatter<BytesEncoder, ProtoEncoder>),
107 UpsertTemplate(UpsertFormatter<TemplateEncoder, TemplateEncoder>),
108 UpsertTextTemplate(UpsertFormatter<TextEncoder, TemplateEncoder>),
109 UpsertBytesTemplate(UpsertFormatter<BytesEncoder, TemplateEncoder>),
110 DebeziumJson(DebeziumJsonFormatter),
112}
113
114#[derive(Debug, Clone)]
115pub struct EncoderParams<'a> {
116 format_desc: &'a SinkFormatDesc,
117 schema: Schema,
118 db_name: String,
119 sink_from_name: String,
120 topic: &'a str,
121}
122
123pub trait EncoderBuild: Sized {
129 async fn build(params: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self>;
132}
133
134impl EncoderBuild for JsonEncoder {
135 async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
136 let timestamptz_mode = TimestamptzHandlingMode::from_options(&b.format_desc.options)?;
137 let jsonb_handling_mode = JsonbHandlingMode::from_options(&b.format_desc.options)?;
138 let encoder = JsonEncoder::new(
139 b.schema,
140 pk_indices,
141 DateHandlingMode::FromCe,
142 TimestampHandlingMode::Milli,
143 timestamptz_mode,
144 TimeHandlingMode::Milli,
145 jsonb_handling_mode,
146 );
147 let encoder = if let Some(s) = b.format_desc.options.get("schemas.enable") {
148 match s.to_lowercase().parse::<bool>() {
149 Ok(true) => {
150 let kafka_connect = KafkaConnectParams {
151 schema_name: format!("{}.{}", b.db_name, b.sink_from_name),
152 };
153 encoder.with_kafka_connect(kafka_connect)
154 }
155 Ok(false) => encoder,
156 _ => {
157 return Err(SinkError::Config(anyhow!(
158 "schemas.enable is expected to be `true` or `false`, got {s}",
159 )));
160 }
161 }
162 } else {
163 encoder
164 };
165 Ok(encoder)
166 }
167}
168
169impl EncoderBuild for ProtoEncoder {
170 async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
171 assert!(pk_indices.is_none());
173 let (descriptor, sid) =
175 crate::schema::protobuf::fetch_descriptor(&b.format_desc.options, b.topic, None)
176 .await
177 .map_err(|e| SinkError::Config(anyhow!(e)))?;
178 let header = match sid {
179 None => ProtoHeader::None,
180 Some(sid) => ProtoHeader::ConfluentSchemaRegistry(sid),
181 };
182 ProtoEncoder::new(b.schema, None, descriptor, header)
183 }
184}
185
186fn ensure_only_one_pk<'a>(
187 data_type_name: &'a str,
188 params: &'a EncoderParams<'_>,
189 pk_indices: &'a Option<Vec<usize>>,
190) -> Result<(usize, &'a Field)> {
191 let Some(pk_indices) = pk_indices else {
192 return Err(SinkError::Config(anyhow!(
193 "{}Encoder requires primary key columns to be specified",
194 data_type_name
195 )));
196 };
197 if pk_indices.len() != 1 {
198 return Err(SinkError::Config(anyhow!(
199 "KEY ENCODE {} expects only one primary key, but got {}",
200 data_type_name,
201 pk_indices.len(),
202 )));
203 }
204
205 let schema_ref = params.schema.fields().get(pk_indices[0]).ok_or_else(|| {
206 SinkError::Config(anyhow!(
207 "The primary key column index {} is out of bounds in schema {:?}",
208 pk_indices[0],
209 params.schema
210 ))
211 })?;
212
213 Ok((pk_indices[0], schema_ref))
214}
215
216impl EncoderBuild for BytesEncoder {
217 async fn build(params: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
218 let (pk_index, schema_ref) = ensure_only_one_pk("BYTES", ¶ms, &pk_indices)?;
219 if let DataType::Bytea = schema_ref.data_type() {
220 Ok(BytesEncoder::new(params.schema, pk_index))
221 } else {
222 Err(SinkError::Config(anyhow!(
223 "The key encode is BYTES, but the primary key column {} has type {}",
224 schema_ref.name,
225 schema_ref.data_type
226 )))
227 }
228 }
229}
230
231impl EncoderBuild for TextEncoder {
232 async fn build(params: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
233 let (pk_index, schema_ref) = ensure_only_one_pk("TEXT", ¶ms, &pk_indices)?;
234 match &schema_ref.data_type() {
235 DataType::Varchar
236 | DataType::Boolean
237 | DataType::Int16
238 | DataType::Int32
239 | DataType::Int64
240 | DataType::Int256
241 | DataType::Serial => {}
242 _ => {
243 return Err(SinkError::Config(anyhow!(
245 "The key encode is TEXT, but the primary key column {} has type {}. The key encode TEXT requires the primary key column to be of type varchar, bool, small int, int, big int, serial or rw_int256.",
246 schema_ref.name,
247 schema_ref.data_type
248 )));
249 }
250 }
251
252 Ok(Self::new(params.schema, pk_index))
253 }
254}
255
256impl EncoderBuild for AvroEncoder {
257 async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
258 use crate::schema::{SchemaLoader, SchemaVersion};
259
260 let loader = SchemaLoader::from_format_options(b.topic, &b.format_desc.options)
261 .await
262 .map_err(|e| SinkError::Config(anyhow!(e)))?;
263
264 let (schema_version, avro) = match pk_indices {
265 Some(_) => loader
266 .load_key_schema()
267 .await
268 .map_err(|e| SinkError::Config(anyhow!(e)))?,
269 None => loader
270 .load_val_schema()
271 .await
272 .map_err(|e| SinkError::Config(anyhow!(e)))?,
273 };
274 AvroEncoder::new(
275 b.schema,
276 pk_indices,
277 std::sync::Arc::new(avro),
278 match schema_version {
279 SchemaVersion::Confluent(x) => AvroHeader::ConfluentSchemaRegistry(x),
280 SchemaVersion::Glue(x) => AvroHeader::GlueSchemaRegistry(x),
281 },
282 )
283 }
284}
285
286impl EncoderBuild for TemplateEncoder {
287 async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
288 let redis_value_type = b
289 .format_desc
290 .options
291 .get(REDIS_VALUE_TYPE)
292 .map_or(REDIS_VALUE_TYPE_STRING, |s| s.as_str());
293 match redis_value_type {
294 REDIS_VALUE_TYPE_STRING => {
295 let option_name = match pk_indices {
296 Some(_) => KEY_FORMAT,
297 None => VALUE_FORMAT,
298 };
299 let template = b.format_desc.options.get(option_name).ok_or_else(|| {
300 SinkError::Config(anyhow!("Cannot find '{option_name}',please set it."))
301 })?;
302 Ok(TemplateEncoder::new_string(
303 b.schema,
304 pk_indices,
305 template.clone(),
306 ))
307 }
308 REDIS_VALUE_TYPE_GEO => match pk_indices {
309 Some(_) => {
310 let member_name = b.format_desc.options.get(MEMBER_NAME).ok_or_else(|| {
311 SinkError::Config(anyhow!("Cannot find `{MEMBER_NAME}`,please set it."))
312 })?;
313 let template = b.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
314 SinkError::Config(anyhow!("Cannot find `{KEY_FORMAT}`,please set it."))
315 })?;
316 TemplateEncoder::new_geo_key(
317 b.schema,
318 pk_indices,
319 member_name,
320 template.clone(),
321 )
322 }
323 None => {
324 let lat_name = b.format_desc.options.get(LAT_NAME).ok_or_else(|| {
325 SinkError::Config(anyhow!("Cannot find `{LAT_NAME}`, please set it."))
326 })?;
327 let lon_name = b.format_desc.options.get(LON_NAME).ok_or_else(|| {
328 SinkError::Config(anyhow!("Cannot find `{LON_NAME}`,please set it."))
329 })?;
330 TemplateEncoder::new_geo_value(b.schema, pk_indices, lat_name, lon_name)
331 }
332 },
333 REDIS_VALUE_TYPE_PUBSUB => match pk_indices {
334 Some(_) => {
335 let channel = b.format_desc.options.get(CHANNEL).cloned();
336 let channel_column = b.format_desc.options.get(CHANNEL_COLUMN).cloned();
337 if (channel.is_none() && channel_column.is_none())
338 || (channel.is_some() && channel_column.is_some())
339 {
340 return Err(SinkError::Config(anyhow!(
341 "`{CHANNEL}` and `{CHANNEL_COLUMN}` only one can be set"
342 )));
343 }
344 TemplateEncoder::new_pubsub_stream_key(
345 b.schema,
346 pk_indices,
347 channel,
348 channel_column,
349 )
350 }
351 None => {
352 let template = b.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
353 SinkError::Config(anyhow!("Cannot find '{VALUE_FORMAT}',please set it."))
354 })?;
355 Ok(TemplateEncoder::new_string(
356 b.schema,
357 pk_indices,
358 template.clone(),
359 ))
360 }
361 },
362 REDIS_VALUE_TYPE_STREAM => match pk_indices {
363 Some(_) => {
364 let stream = b.format_desc.options.get(STREAM).cloned();
365 let stream_column = b.format_desc.options.get(STREAM_COLUMN).cloned();
366 if (stream.is_none() && stream_column.is_none())
367 || (stream.is_some() && stream_column.is_some())
368 {
369 return Err(SinkError::Config(anyhow!(
370 "`{STREAM}` and `{STREAM_COLUMN}` only one can be set"
371 )));
372 }
373 TemplateEncoder::new_pubsub_stream_key(
374 b.schema,
375 pk_indices,
376 stream,
377 stream_column,
378 )
379 }
380 None => {
381 let value_template =
382 b.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
383 SinkError::Config(anyhow!(
384 "Cannot find '{VALUE_FORMAT}',please set it."
385 ))
386 })?;
387 let key_template = b.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
388 SinkError::Config(anyhow!("Cannot find '{KEY_FORMAT}',please set it."))
389 })?;
390
391 Ok(TemplateEncoder::new_stream_value(
392 b.schema,
393 pk_indices,
394 key_template.clone(),
395 value_template.clone(),
396 ))
397 }
398 },
399 _ => Err(SinkError::Config(anyhow!(
400 "The value type {} is not supported",
401 redis_value_type
402 ))),
403 }
404 }
405}
406
407struct FormatterParams<'a> {
408 builder: EncoderParams<'a>,
409 pk_indices: Vec<usize>,
410}
411
412trait FormatterBuild: Sized {
418 async fn build(b: FormatterParams<'_>) -> Result<Self>;
419}
420
421impl<KE: EncoderBuild, VE: EncoderBuild> FormatterBuild for AppendOnlyFormatter<KE, VE> {
422 async fn build(b: FormatterParams<'_>) -> Result<Self> {
423 let key_encoder = match b.pk_indices.is_empty() {
424 true => None,
425 false => Some(KE::build(b.builder.clone(), Some(b.pk_indices)).await?),
426 };
427 let val_encoder = VE::build(b.builder, None).await?;
428 Ok(AppendOnlyFormatter::new(key_encoder, val_encoder))
429 }
430}
431
432impl<KE: EncoderBuild, VE: EncoderBuild> FormatterBuild for UpsertFormatter<KE, VE> {
433 async fn build(b: FormatterParams<'_>) -> Result<Self> {
434 let key_encoder = KE::build(b.builder.clone(), Some(b.pk_indices))
435 .await
436 .with_context(|| "Failed to build key encoder")?;
437 let val_encoder = VE::build(b.builder, None)
438 .await
439 .with_context(|| "Failed to build value encoder")?;
440 Ok(UpsertFormatter::new(key_encoder, val_encoder))
441 }
442}
443
444impl FormatterBuild for DebeziumJsonFormatter {
445 async fn build(b: FormatterParams<'_>) -> Result<Self> {
446 assert_eq!(b.builder.format_desc.encode, SinkEncode::Json);
447
448 Ok(DebeziumJsonFormatter::new(
449 b.builder.schema,
450 b.pk_indices,
451 b.builder.db_name,
452 b.builder.sink_from_name,
453 DebeziumAdapterOpts::default(),
454 ))
455 }
456}
457
458impl SinkFormatterImpl {
459 pub async fn new(
460 format_desc: &SinkFormatDesc,
461 schema: Schema,
462 pk_indices: Vec<usize>,
463 db_name: String,
464 sink_from_name: String,
465 topic: &str,
466 ) -> Result<Self> {
467 use {SinkEncode as E, SinkFormat as F, SinkFormatterImpl as Impl};
468 let p = FormatterParams {
469 builder: EncoderParams {
470 format_desc,
471 schema,
472 db_name,
473 sink_from_name,
474 topic,
475 },
476 pk_indices,
477 };
478
479 async fn build<T: FormatterBuild>(p: FormatterParams<'_>) -> Result<T> {
490 T::build(p).await
491 }
492
493 Ok(
494 match (
495 &format_desc.format,
496 &format_desc.encode,
497 &format_desc.key_encode,
498 ) {
499 (F::AppendOnly, E::Json, Some(E::Text)) => {
500 Impl::AppendOnlyTextJson(build(p).await?)
501 }
502 (F::AppendOnly, E::Json, Some(E::Bytes)) => {
503 Impl::AppendOnlyBytesJson(build(p).await?)
504 }
505 (F::AppendOnly, E::Json, None) => Impl::AppendOnlyJson(build(p).await?),
506 (F::AppendOnly, E::Avro, Some(E::Text)) => {
507 Impl::AppendOnlyTextAvro(build(p).await?)
508 }
509 (F::AppendOnly, E::Avro, Some(E::Bytes)) => {
510 Impl::AppendOnlyBytesAvro(build(p).await?)
511 }
512 (F::AppendOnly, E::Avro, None) => Impl::AppendOnlyAvro(build(p).await?),
513 (F::AppendOnly, E::Protobuf, Some(E::Text)) => {
514 Impl::AppendOnlyTextProto(build(p).await?)
515 }
516 (F::AppendOnly, E::Protobuf, Some(E::Bytes)) => {
517 Impl::AppendOnlyBytesProto(build(p).await?)
518 }
519 (F::AppendOnly, E::Protobuf, None) => Impl::AppendOnlyProto(build(p).await?),
520 (F::AppendOnly, E::Template, Some(E::Text)) => {
521 Impl::AppendOnlyTextTemplate(build(p).await?)
522 }
523 (F::AppendOnly, E::Template, Some(E::Bytes)) => {
524 Impl::AppendOnlyBytesTemplate(build(p).await?)
525 }
526 (F::AppendOnly, E::Template, None) => Impl::AppendOnlyTemplate(build(p).await?),
527 (F::Upsert, E::Json, Some(E::Text)) => Impl::UpsertTextJson(build(p).await?),
528 (F::Upsert, E::Json, Some(E::Bytes)) => {
529 Impl::UpsertBytesJson(build(p).await?)
530 }
531 (F::Upsert, E::Json, None) => Impl::UpsertJson(build(p).await?),
532 (F::Upsert, E::Avro, Some(E::Text)) => Impl::UpsertTextAvro(build(p).await?),
533 (F::Upsert, E::Avro, Some(E::Bytes)) => {
534 Impl::UpsertBytesAvro(build(p).await?)
535 }
536 (F::Upsert, E::Avro, None) => Impl::UpsertAvro(build(p).await?),
537 (F::Upsert, E::Protobuf, Some(E::Text)) => Impl::UpsertTextProto(build(p).await?),
538 (F::Upsert, E::Protobuf, Some(E::Bytes)) => {
539 Impl::UpsertBytesProto(build(p).await?)
540 }
541 (F::Upsert, E::Template, Some(E::Text)) => {
542 Impl::UpsertTextTemplate(build(p).await?)
543 }
544 (F::Upsert, E::Template, Some(E::Bytes)) => {
545 Impl::UpsertBytesTemplate(build(p).await?)
546 }
547 (F::Upsert, E::Template, None) => Impl::UpsertTemplate(build(p).await?),
548 (F::Debezium, E::Json, None) => Impl::DebeziumJson(build(p).await?),
549 (F::AppendOnly | F::Upsert, E::Text, _) => {
550 return Err(SinkError::Config(anyhow!(
551 "ENCODE TEXT is only valid as key encode."
552 )));
553 }
554 (F::AppendOnly, E::Avro, _)
555 | (F::Upsert, E::Protobuf, _)
556 | (F::Debezium, E::Json, Some(_))
557 | (F::Debezium, E::Avro | E::Protobuf | E::Template | E::Text, _)
558 | (F::AppendOnly, E::Bytes, _)
559 | (F::Upsert, E::Bytes, _)
560 | (F::Debezium, E::Bytes, _)
561 | (_, E::Parquet, _)
562 | (_, _, Some(E::Parquet))
563 | (F::AppendOnly | F::Upsert, _, Some(E::Template) | Some(E::Json) | Some(E::Avro) | Some(E::Protobuf)) => {
565 return Err(SinkError::Config(anyhow!(
566 "sink format/encode/key_encode unsupported: {:?} {:?} {:?}",
567 format_desc.format,
568 format_desc.encode,
569 format_desc.key_encode
570 )));
571 }
572 },
573 )
574 }
575}
576
577#[macro_export]
583macro_rules! dispatch_sink_formatter_impl {
584 ($impl:expr, $name:ident, $body:expr) => {
585 match $impl {
586 SinkFormatterImpl::AppendOnlyJson($name) => $body,
587 SinkFormatterImpl::AppendOnlyBytesJson($name) => $body,
588 SinkFormatterImpl::AppendOnlyTextJson($name) => $body,
589 SinkFormatterImpl::AppendOnlyAvro($name) => $body,
590 SinkFormatterImpl::AppendOnlyTextAvro($name) => $body,
591 SinkFormatterImpl::AppendOnlyBytesAvro($name) => $body,
592 SinkFormatterImpl::AppendOnlyProto($name) => $body,
593 SinkFormatterImpl::AppendOnlyTextProto($name) => $body,
594 SinkFormatterImpl::AppendOnlyBytesProto($name) => $body,
595
596 SinkFormatterImpl::UpsertJson($name) => $body,
597 SinkFormatterImpl::UpsertBytesJson($name) => $body,
598 SinkFormatterImpl::UpsertTextJson($name) => $body,
599 SinkFormatterImpl::UpsertAvro($name) => $body,
600 SinkFormatterImpl::UpsertTextAvro($name) => $body,
601 SinkFormatterImpl::UpsertBytesAvro($name) => $body,
602 SinkFormatterImpl::UpsertTextProto($name) => $body,
603 SinkFormatterImpl::UpsertBytesProto($name) => $body,
604 SinkFormatterImpl::DebeziumJson($name) => $body,
605 SinkFormatterImpl::AppendOnlyTextTemplate($name) => $body,
606 SinkFormatterImpl::AppendOnlyTemplate($name) => $body,
607 SinkFormatterImpl::UpsertTextTemplate($name) => $body,
608 SinkFormatterImpl::UpsertTemplate($name) => $body,
609 SinkFormatterImpl::AppendOnlyBytesTemplate($name) => $body,
610 SinkFormatterImpl::UpsertBytesTemplate($name) => $body,
611 }
612 };
613}
614
615#[macro_export]
623macro_rules! dispatch_sink_formatter_str_key_impl {
624 ($impl:expr, $name:ident, $body:expr $(,$attr:meta)?) => {
625 $(#[$attr])?
626 match $impl {
627 SinkFormatterImpl::AppendOnlyJson($name) => $body,
628 SinkFormatterImpl::AppendOnlyBytesJson(_) => unreachable!(),
629 SinkFormatterImpl::AppendOnlyTextJson($name) => $body,
630 SinkFormatterImpl::AppendOnlyAvro(_) => unreachable!(),
631 SinkFormatterImpl::AppendOnlyTextAvro($name) => $body,
632 SinkFormatterImpl::AppendOnlyBytesAvro(_) => unreachable!(),
633 SinkFormatterImpl::AppendOnlyProto($name) => $body,
634 SinkFormatterImpl::AppendOnlyTextProto($name) => $body,
635 SinkFormatterImpl::AppendOnlyBytesProto(_) => unreachable!(),
636
637 SinkFormatterImpl::UpsertJson($name) => $body,
638 SinkFormatterImpl::UpsertTextJson($name) => $body,
639 SinkFormatterImpl::UpsertAvro(_) => unreachable!(),
640 SinkFormatterImpl::UpsertTextAvro($name) => $body,
641 SinkFormatterImpl::UpsertBytesAvro(_) => unreachable!(),
642 SinkFormatterImpl::UpsertTextProto($name) => $body,
643 SinkFormatterImpl::UpsertBytesProto(_) => unreachable!(),
644 SinkFormatterImpl::DebeziumJson($name) => $body,
645 SinkFormatterImpl::AppendOnlyTextTemplate($name) => $body,
646 SinkFormatterImpl::AppendOnlyTemplate($name) => $body,
647 SinkFormatterImpl::UpsertTextTemplate($name) => $body,
648 SinkFormatterImpl::UpsertBytesJson(_) => unreachable!(),
649 SinkFormatterImpl::UpsertTemplate($name) => $body,
650 SinkFormatterImpl::AppendOnlyBytesTemplate(_) => unreachable!(),
651 SinkFormatterImpl::UpsertBytesTemplate(_) => unreachable!(),
652 }
653 };
654}