1use anyhow::{Context, anyhow};
16use risingwave_common::array::StreamChunk;
17use risingwave_common::catalog::Field;
18
19use crate::sink::redis::REDIS_VALUE_TYPE_STREAM;
20use crate::sink::{Result, SinkError};
21
22mod append_only;
23mod debezium_json;
24mod upsert;
25
26pub use append_only::AppendOnlyFormatter;
27pub use debezium_json::{DebeziumAdapterOpts, DebeziumJsonFormatter};
28use risingwave_common::catalog::Schema;
29use risingwave_common::types::DataType;
30pub use upsert::UpsertFormatter;
31
32use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc};
33use super::encoder::bytes::BytesEncoder;
34use super::encoder::template::TemplateEncoder;
35use super::encoder::text::TextEncoder;
36use super::encoder::{
37 DateHandlingMode, JsonbHandlingMode, KafkaConnectParams, TimeHandlingMode,
38 TimestamptzHandlingMode,
39};
40use super::redis::{
41 CHANNEL, CHANNEL_COLUMN, KEY_FORMAT, LAT_NAME, LON_NAME, MEMBER_NAME, REDIS_VALUE_TYPE,
42 REDIS_VALUE_TYPE_GEO, REDIS_VALUE_TYPE_PUBSUB, REDIS_VALUE_TYPE_STRING, STREAM, STREAM_COLUMN,
43 VALUE_FORMAT,
44};
45use crate::sink::encoder::{
46 AvroEncoder, AvroHeader, JsonEncoder, ProtoEncoder, ProtoHeader, TimestampHandlingMode,
47};
48
49pub trait SinkFormatter {
52 type K;
53 type V;
54
55 #[expect(clippy::type_complexity)]
60 fn format_chunk(
61 &self,
62 chunk: &StreamChunk,
63 ) -> impl Iterator<Item = Result<(Option<Self::K>, Option<Self::V>)>>;
64}
65
66#[macro_export]
69macro_rules! tri {
70 ($expr:expr) => {
71 match $expr {
72 Ok(val) => val,
73 Err(err) => {
74 yield Err(err);
75 return;
76 }
77 }
78 };
79}
80
81pub enum SinkFormatterImpl {
82 AppendOnlyJson(AppendOnlyFormatter<JsonEncoder, JsonEncoder>),
84 AppendOnlyTextJson(AppendOnlyFormatter<TextEncoder, JsonEncoder>),
85 AppendOnlyBytesJson(AppendOnlyFormatter<BytesEncoder, JsonEncoder>),
86 AppendOnlyAvro(AppendOnlyFormatter<AvroEncoder, AvroEncoder>),
87 AppendOnlyTextAvro(AppendOnlyFormatter<TextEncoder, AvroEncoder>),
88 AppendOnlyBytesAvro(AppendOnlyFormatter<BytesEncoder, AvroEncoder>),
89 AppendOnlyProto(AppendOnlyFormatter<JsonEncoder, ProtoEncoder>),
90 AppendOnlyTextProto(AppendOnlyFormatter<TextEncoder, ProtoEncoder>),
91 AppendOnlyBytesProto(AppendOnlyFormatter<BytesEncoder, ProtoEncoder>),
92 AppendOnlyTemplate(AppendOnlyFormatter<TemplateEncoder, TemplateEncoder>),
93 AppendOnlyTextTemplate(AppendOnlyFormatter<TextEncoder, TemplateEncoder>),
94 AppendOnlyBytesTemplate(AppendOnlyFormatter<BytesEncoder, TemplateEncoder>),
95 AppendOnlyBytes(AppendOnlyFormatter<BytesEncoder, BytesEncoder>),
96 UpsertJson(UpsertFormatter<JsonEncoder, JsonEncoder>),
98 UpsertTextJson(UpsertFormatter<TextEncoder, JsonEncoder>),
99 UpsertBytesJson(UpsertFormatter<BytesEncoder, JsonEncoder>),
100 UpsertAvro(UpsertFormatter<AvroEncoder, AvroEncoder>),
101 UpsertTextAvro(UpsertFormatter<TextEncoder, AvroEncoder>),
102 UpsertBytesAvro(UpsertFormatter<BytesEncoder, AvroEncoder>),
103 UpsertTextProto(UpsertFormatter<TextEncoder, ProtoEncoder>),
107 UpsertBytesProto(UpsertFormatter<BytesEncoder, ProtoEncoder>),
108 UpsertTemplate(UpsertFormatter<TemplateEncoder, TemplateEncoder>),
109 UpsertTextTemplate(UpsertFormatter<TextEncoder, TemplateEncoder>),
110 UpsertBytesTemplate(UpsertFormatter<BytesEncoder, TemplateEncoder>),
111 DebeziumJson(DebeziumJsonFormatter),
113}
114
115#[derive(Debug, Clone)]
116pub struct EncoderParams<'a> {
117 format_desc: &'a SinkFormatDesc,
118 schema: Schema,
119 db_name: String,
120 sink_from_name: String,
121 topic: &'a str,
122}
123
124pub trait EncoderBuild: Sized {
130 async fn build(params: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self>;
133}
134
135impl EncoderBuild for JsonEncoder {
136 async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
137 let timestamptz_mode = TimestamptzHandlingMode::from_options(&b.format_desc.options)?;
138 let jsonb_handling_mode = JsonbHandlingMode::from_options(&b.format_desc.options)?;
139 let encoder = JsonEncoder::new(
140 b.schema,
141 pk_indices,
142 DateHandlingMode::FromCe,
143 TimestampHandlingMode::Milli,
144 timestamptz_mode,
145 TimeHandlingMode::Milli,
146 jsonb_handling_mode,
147 );
148 let encoder = if let Some(s) = b.format_desc.options.get("schemas.enable") {
149 match s.to_lowercase().parse::<bool>() {
150 Ok(true) => {
151 let kafka_connect = KafkaConnectParams {
152 schema_name: format!("{}.{}", b.db_name, b.sink_from_name),
153 };
154 encoder.with_kafka_connect(kafka_connect)
155 }
156 Ok(false) => encoder,
157 _ => {
158 return Err(SinkError::Config(anyhow!(
159 "schemas.enable is expected to be `true` or `false`, got {s}",
160 )));
161 }
162 }
163 } else {
164 encoder
165 };
166 Ok(encoder)
167 }
168}
169
170impl EncoderBuild for ProtoEncoder {
171 async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
172 assert!(pk_indices.is_none());
174 let (descriptor, sid) =
176 crate::schema::protobuf::fetch_descriptor(&b.format_desc.options, b.topic, None)
177 .await
178 .map_err(|e| SinkError::Config(anyhow!(e)))?;
179 let header = match sid {
180 None => ProtoHeader::None,
181 Some(sid) => ProtoHeader::ConfluentSchemaRegistry(sid),
182 };
183 ProtoEncoder::new(b.schema, None, descriptor, header)
184 }
185}
186
187fn ensure_only_one_pk<'a>(
188 data_type_name: &'a str,
189 params: &'a EncoderParams<'_>,
190 pk_indices: &'a Option<Vec<usize>>,
191) -> Result<(usize, &'a Field)> {
192 let Some(pk_indices) = pk_indices else {
193 return Err(SinkError::Config(anyhow!(
194 "{}Encoder requires primary key columns to be specified",
195 data_type_name
196 )));
197 };
198 if pk_indices.len() != 1 {
199 return Err(SinkError::Config(anyhow!(
200 "KEY ENCODE {} expects only one primary key, but got {}",
201 data_type_name,
202 pk_indices.len(),
203 )));
204 }
205
206 let schema_ref = params.schema.fields().get(pk_indices[0]).ok_or_else(|| {
207 SinkError::Config(anyhow!(
208 "The primary key column index {} is out of bounds in schema {:?}",
209 pk_indices[0],
210 params.schema
211 ))
212 })?;
213
214 Ok((pk_indices[0], schema_ref))
215}
216
217impl EncoderBuild for BytesEncoder {
218 async fn build(params: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
219 match pk_indices {
220 Some(_) => {
222 let (pk_index, schema_ref) = ensure_only_one_pk("BYTES", ¶ms, &pk_indices)?;
223 if let DataType::Bytea = schema_ref.data_type() {
224 Ok(BytesEncoder::new(params.schema, pk_index))
225 } else {
226 Err(SinkError::Config(anyhow!(
227 "The key encode is BYTES, but the primary key column {} has type {}",
228 schema_ref.name,
229 schema_ref.data_type
230 )))
231 }
232 }
233 None => {
235 if params.schema.len() != 1 {
237 return Err(SinkError::Config(anyhow!(
238 "ENCODE BYTES requires exactly one column, got {} columns",
239 params.schema.len()
240 )));
241 }
242
243 let field = ¶ms.schema.fields[0];
244 if let DataType::Bytea = field.data_type {
245 Ok(BytesEncoder::new(params.schema, 0))
246 } else {
247 Err(SinkError::Config(anyhow!(
248 "ENCODE BYTES requires the column to be of type BYTEA, but got type {}",
249 field.data_type
250 )))
251 }
252 }
253 }
254 }
255}
256
257impl EncoderBuild for TextEncoder {
258 async fn build(params: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
259 let (pk_index, schema_ref) = ensure_only_one_pk("TEXT", ¶ms, &pk_indices)?;
260 match &schema_ref.data_type() {
261 DataType::Varchar
262 | DataType::Boolean
263 | DataType::Int16
264 | DataType::Int32
265 | DataType::Int64
266 | DataType::Int256
267 | DataType::Serial => {}
268 _ => {
269 return Err(SinkError::Config(anyhow!(
271 "The key encode is TEXT, but the primary key column {} has type {}. The key encode TEXT requires the primary key column to be of type varchar, bool, small int, int, big int, serial or rw_int256.",
272 schema_ref.name,
273 schema_ref.data_type
274 )));
275 }
276 }
277
278 Ok(Self::new(params.schema, pk_index))
279 }
280}
281
282impl EncoderBuild for AvroEncoder {
283 async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
284 use crate::schema::{SchemaLoader, SchemaVersion};
285
286 let loader = SchemaLoader::from_format_options(b.topic, &b.format_desc.options)
287 .await
288 .map_err(|e| SinkError::Config(anyhow!(e)))?;
289
290 let (schema_version, avro) = match pk_indices {
291 Some(_) => loader
292 .load_key_schema()
293 .await
294 .map_err(|e| SinkError::Config(anyhow!(e)))?,
295 None => loader
296 .load_val_schema()
297 .await
298 .map_err(|e| SinkError::Config(anyhow!(e)))?,
299 };
300 AvroEncoder::new(
301 b.schema,
302 pk_indices,
303 std::sync::Arc::new(avro),
304 match schema_version {
305 SchemaVersion::Confluent(x) => AvroHeader::ConfluentSchemaRegistry(x),
306 SchemaVersion::Glue(x) => AvroHeader::GlueSchemaRegistry(x),
307 },
308 )
309 }
310}
311
312impl EncoderBuild for TemplateEncoder {
313 async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
314 let redis_value_type = b
315 .format_desc
316 .options
317 .get(REDIS_VALUE_TYPE)
318 .map_or(REDIS_VALUE_TYPE_STRING, |s| s.as_str());
319 match redis_value_type {
320 REDIS_VALUE_TYPE_STRING => {
321 let option_name = match pk_indices {
322 Some(_) => KEY_FORMAT,
323 None => VALUE_FORMAT,
324 };
325 let template = b.format_desc.options.get(option_name).ok_or_else(|| {
326 SinkError::Config(anyhow!("Cannot find '{option_name}',please set it."))
327 })?;
328 Ok(TemplateEncoder::new_string(
329 b.schema,
330 pk_indices,
331 template.clone(),
332 ))
333 }
334 REDIS_VALUE_TYPE_GEO => match pk_indices {
335 Some(_) => {
336 let member_name = b.format_desc.options.get(MEMBER_NAME).ok_or_else(|| {
337 SinkError::Config(anyhow!("Cannot find `{MEMBER_NAME}`,please set it."))
338 })?;
339 let template = b.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
340 SinkError::Config(anyhow!("Cannot find `{KEY_FORMAT}`,please set it."))
341 })?;
342 TemplateEncoder::new_geo_key(
343 b.schema,
344 pk_indices,
345 member_name,
346 template.clone(),
347 )
348 }
349 None => {
350 let lat_name = b.format_desc.options.get(LAT_NAME).ok_or_else(|| {
351 SinkError::Config(anyhow!("Cannot find `{LAT_NAME}`, please set it."))
352 })?;
353 let lon_name = b.format_desc.options.get(LON_NAME).ok_or_else(|| {
354 SinkError::Config(anyhow!("Cannot find `{LON_NAME}`,please set it."))
355 })?;
356 TemplateEncoder::new_geo_value(b.schema, pk_indices, lat_name, lon_name)
357 }
358 },
359 REDIS_VALUE_TYPE_PUBSUB => match pk_indices {
360 Some(_) => {
361 let channel = b.format_desc.options.get(CHANNEL).cloned();
362 let channel_column = b.format_desc.options.get(CHANNEL_COLUMN).cloned();
363 if (channel.is_none() && channel_column.is_none())
364 || (channel.is_some() && channel_column.is_some())
365 {
366 return Err(SinkError::Config(anyhow!(
367 "`{CHANNEL}` and `{CHANNEL_COLUMN}` only one can be set"
368 )));
369 }
370 TemplateEncoder::new_pubsub_stream_key(
371 b.schema,
372 pk_indices,
373 channel,
374 channel_column,
375 )
376 }
377 None => {
378 let template = b.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
379 SinkError::Config(anyhow!("Cannot find '{VALUE_FORMAT}',please set it."))
380 })?;
381 Ok(TemplateEncoder::new_string(
382 b.schema,
383 pk_indices,
384 template.clone(),
385 ))
386 }
387 },
388 REDIS_VALUE_TYPE_STREAM => match pk_indices {
389 Some(_) => {
390 let stream = b.format_desc.options.get(STREAM).cloned();
391 let stream_column = b.format_desc.options.get(STREAM_COLUMN).cloned();
392 if (stream.is_none() && stream_column.is_none())
393 || (stream.is_some() && stream_column.is_some())
394 {
395 return Err(SinkError::Config(anyhow!(
396 "`{STREAM}` and `{STREAM_COLUMN}` only one can be set"
397 )));
398 }
399 TemplateEncoder::new_pubsub_stream_key(
400 b.schema,
401 pk_indices,
402 stream,
403 stream_column,
404 )
405 }
406 None => {
407 let value_template =
408 b.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
409 SinkError::Config(anyhow!(
410 "Cannot find '{VALUE_FORMAT}',please set it."
411 ))
412 })?;
413 let key_template = b.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
414 SinkError::Config(anyhow!("Cannot find '{KEY_FORMAT}',please set it."))
415 })?;
416
417 Ok(TemplateEncoder::new_stream_value(
418 b.schema,
419 pk_indices,
420 key_template.clone(),
421 value_template.clone(),
422 ))
423 }
424 },
425 _ => Err(SinkError::Config(anyhow!(
426 "The value type {} is not supported",
427 redis_value_type
428 ))),
429 }
430 }
431}
432
433struct FormatterParams<'a> {
434 builder: EncoderParams<'a>,
435 pk_indices: Vec<usize>,
436}
437
438trait FormatterBuild: Sized {
444 async fn build(b: FormatterParams<'_>) -> Result<Self>;
445}
446
447impl<KE: EncoderBuild, VE: EncoderBuild> FormatterBuild for AppendOnlyFormatter<KE, VE> {
448 async fn build(b: FormatterParams<'_>) -> Result<Self> {
449 let key_encoder = match b.pk_indices.is_empty() {
450 true => None,
451 false => Some(KE::build(b.builder.clone(), Some(b.pk_indices)).await?),
452 };
453 let val_encoder = VE::build(b.builder, None).await?;
454 Ok(AppendOnlyFormatter::new(key_encoder, val_encoder))
455 }
456}
457
458impl<KE: EncoderBuild, VE: EncoderBuild> FormatterBuild for UpsertFormatter<KE, VE> {
459 async fn build(b: FormatterParams<'_>) -> Result<Self> {
460 let key_encoder = KE::build(b.builder.clone(), Some(b.pk_indices))
461 .await
462 .with_context(|| "Failed to build key encoder")?;
463 let val_encoder = VE::build(b.builder, None)
464 .await
465 .with_context(|| "Failed to build value encoder")?;
466 Ok(UpsertFormatter::new(key_encoder, val_encoder))
467 }
468}
469
470impl FormatterBuild for DebeziumJsonFormatter {
471 async fn build(b: FormatterParams<'_>) -> Result<Self> {
472 assert_eq!(b.builder.format_desc.encode, SinkEncode::Json);
473
474 Ok(DebeziumJsonFormatter::new(
475 b.builder.schema,
476 b.pk_indices,
477 b.builder.db_name,
478 b.builder.sink_from_name,
479 DebeziumAdapterOpts::default(),
480 ))
481 }
482}
483
484impl SinkFormatterImpl {
485 pub async fn new(
486 format_desc: &SinkFormatDesc,
487 schema: Schema,
488 pk_indices: Vec<usize>,
489 db_name: String,
490 sink_from_name: String,
491 topic: &str,
492 ) -> Result<Self> {
493 use {SinkEncode as E, SinkFormat as F, SinkFormatterImpl as Impl};
494 let p = FormatterParams {
495 builder: EncoderParams {
496 format_desc,
497 schema,
498 db_name,
499 sink_from_name,
500 topic,
501 },
502 pk_indices,
503 };
504
505 async fn build<T: FormatterBuild>(p: FormatterParams<'_>) -> Result<T> {
516 T::build(p).await
517 }
518
519 Ok(
520 match (
521 &format_desc.format,
522 &format_desc.encode,
523 &format_desc.key_encode,
524 ) {
525 (F::AppendOnly, E::Json, Some(E::Text)) => {
526 Impl::AppendOnlyTextJson(build(p).await?)
527 }
528 (F::AppendOnly, E::Json, Some(E::Bytes)) => {
529 Impl::AppendOnlyBytesJson(build(p).await?)
530 }
531 (F::AppendOnly, E::Json, None) => Impl::AppendOnlyJson(build(p).await?),
532 (F::AppendOnly, E::Avro, Some(E::Text)) => {
533 Impl::AppendOnlyTextAvro(build(p).await?)
534 }
535 (F::AppendOnly, E::Avro, Some(E::Bytes)) => {
536 Impl::AppendOnlyBytesAvro(build(p).await?)
537 }
538 (F::AppendOnly, E::Avro, None) => Impl::AppendOnlyAvro(build(p).await?),
539 (F::AppendOnly, E::Protobuf, Some(E::Text)) => {
540 Impl::AppendOnlyTextProto(build(p).await?)
541 }
542 (F::AppendOnly, E::Protobuf, Some(E::Bytes)) => {
543 Impl::AppendOnlyBytesProto(build(p).await?)
544 }
545 (F::AppendOnly, E::Protobuf, None) => Impl::AppendOnlyProto(build(p).await?),
546 (F::AppendOnly, E::Template, Some(E::Text)) => {
547 Impl::AppendOnlyTextTemplate(build(p).await?)
548 }
549 (F::AppendOnly, E::Template, Some(E::Bytes)) => {
550 Impl::AppendOnlyBytesTemplate(build(p).await?)
551 }
552 (F::AppendOnly, E::Template, None) => Impl::AppendOnlyTemplate(build(p).await?),
553 (F::AppendOnly, E::Bytes, None) => Impl::AppendOnlyBytes(build(p).await?),
554 (F::Upsert, E::Json, Some(E::Text)) => Impl::UpsertTextJson(build(p).await?),
555 (F::Upsert, E::Json, Some(E::Bytes)) => {
556 Impl::UpsertBytesJson(build(p).await?)
557 }
558 (F::Upsert, E::Json, None) => Impl::UpsertJson(build(p).await?),
559 (F::Upsert, E::Avro, Some(E::Text)) => Impl::UpsertTextAvro(build(p).await?),
560 (F::Upsert, E::Avro, Some(E::Bytes)) => {
561 Impl::UpsertBytesAvro(build(p).await?)
562 }
563 (F::Upsert, E::Avro, None) => Impl::UpsertAvro(build(p).await?),
564 (F::Upsert, E::Protobuf, Some(E::Text)) => Impl::UpsertTextProto(build(p).await?),
565 (F::Upsert, E::Protobuf, Some(E::Bytes)) => {
566 Impl::UpsertBytesProto(build(p).await?)
567 }
568 (F::Upsert, E::Template, Some(E::Text)) => {
569 Impl::UpsertTextTemplate(build(p).await?)
570 }
571 (F::Upsert, E::Template, Some(E::Bytes)) => {
572 Impl::UpsertBytesTemplate(build(p).await?)
573 }
574 (F::Upsert, E::Template, None) => Impl::UpsertTemplate(build(p).await?),
575 (F::Debezium, E::Json, None) => Impl::DebeziumJson(build(p).await?),
576 (F::AppendOnly | F::Upsert, E::Text, _) => {
577 return Err(SinkError::Config(anyhow!(
578 "ENCODE TEXT is only valid as key encode."
579 )));
580 }
581 (F::AppendOnly, E::Avro, _)
582 | (F::Upsert, E::Protobuf, _)
583 | (F::Upsert, E::Bytes, _)
584 | (F::Debezium, E::Json, Some(_))
585 | (F::Debezium, E::Avro | E::Protobuf | E::Template | E::Text | E::Bytes, _)
586 | (_, E::Parquet, _)
587 | (_, _, Some(E::Parquet))
588 | (F::AppendOnly, E::Bytes, Some(_))
589 | (F::AppendOnly | F::Upsert, _, Some(E::Template) | Some(E::Json) | Some(E::Avro) | Some(E::Protobuf)) => {
591 return Err(SinkError::Config(anyhow!(
592 "sink format/encode/key_encode unsupported: {:?} {:?} {:?}",
593 format_desc.format,
594 format_desc.encode,
595 format_desc.key_encode
596 )));
597 }
598 },
599 )
600 }
601}
602
603#[macro_export]
609macro_rules! dispatch_sink_formatter_impl {
610 ($impl:expr, $name:ident, $body:expr) => {
611 match $impl {
612 SinkFormatterImpl::AppendOnlyJson($name) => $body,
613 SinkFormatterImpl::AppendOnlyBytesJson($name) => $body,
614 SinkFormatterImpl::AppendOnlyTextJson($name) => $body,
615 SinkFormatterImpl::AppendOnlyAvro($name) => $body,
616 SinkFormatterImpl::AppendOnlyTextAvro($name) => $body,
617 SinkFormatterImpl::AppendOnlyBytesAvro($name) => $body,
618 SinkFormatterImpl::AppendOnlyProto($name) => $body,
619 SinkFormatterImpl::AppendOnlyTextProto($name) => $body,
620 SinkFormatterImpl::AppendOnlyBytesProto($name) => $body,
621
622 SinkFormatterImpl::UpsertJson($name) => $body,
623 SinkFormatterImpl::UpsertBytesJson($name) => $body,
624 SinkFormatterImpl::UpsertTextJson($name) => $body,
625 SinkFormatterImpl::UpsertAvro($name) => $body,
626 SinkFormatterImpl::UpsertTextAvro($name) => $body,
627 SinkFormatterImpl::UpsertBytesAvro($name) => $body,
628 SinkFormatterImpl::UpsertTextProto($name) => $body,
629 SinkFormatterImpl::UpsertBytesProto($name) => $body,
630 SinkFormatterImpl::DebeziumJson($name) => $body,
631 SinkFormatterImpl::AppendOnlyTextTemplate($name) => $body,
632 SinkFormatterImpl::AppendOnlyTemplate($name) => $body,
633 SinkFormatterImpl::UpsertTextTemplate($name) => $body,
634 SinkFormatterImpl::UpsertTemplate($name) => $body,
635 SinkFormatterImpl::AppendOnlyBytesTemplate($name) => $body,
636 SinkFormatterImpl::UpsertBytesTemplate($name) => $body,
637 SinkFormatterImpl::AppendOnlyBytes($name) => $body,
638 }
639 };
640}
641
642#[macro_export]
650macro_rules! dispatch_sink_formatter_str_key_impl {
651 ($impl:expr, $name:ident, $body:expr $(,$attr:meta)?) => {
652 $(#[$attr])?
653 match $impl {
654 SinkFormatterImpl::AppendOnlyJson($name) => $body,
655 SinkFormatterImpl::AppendOnlyBytesJson(_) => unreachable!(),
656 SinkFormatterImpl::AppendOnlyTextJson($name) => $body,
657 SinkFormatterImpl::AppendOnlyAvro(_) => unreachable!(),
658 SinkFormatterImpl::AppendOnlyTextAvro($name) => $body,
659 SinkFormatterImpl::AppendOnlyBytesAvro(_) => unreachable!(),
660 SinkFormatterImpl::AppendOnlyProto($name) => $body,
661 SinkFormatterImpl::AppendOnlyTextProto($name) => $body,
662 SinkFormatterImpl::AppendOnlyBytesProto(_) => unreachable!(),
663
664 SinkFormatterImpl::UpsertJson($name) => $body,
665 SinkFormatterImpl::UpsertTextJson($name) => $body,
666 SinkFormatterImpl::UpsertAvro(_) => unreachable!(),
667 SinkFormatterImpl::UpsertTextAvro($name) => $body,
668 SinkFormatterImpl::UpsertBytesAvro(_) => unreachable!(),
669 SinkFormatterImpl::UpsertTextProto($name) => $body,
670 SinkFormatterImpl::UpsertBytesProto(_) => unreachable!(),
671 SinkFormatterImpl::DebeziumJson($name) => $body,
672 SinkFormatterImpl::AppendOnlyTextTemplate($name) => $body,
673 SinkFormatterImpl::AppendOnlyTemplate($name) => $body,
674 SinkFormatterImpl::UpsertTextTemplate($name) => $body,
675 SinkFormatterImpl::UpsertBytesJson(_) => unreachable!(),
676 SinkFormatterImpl::UpsertTemplate($name) => $body,
677 SinkFormatterImpl::AppendOnlyBytesTemplate(_) => unreachable!(),
678 SinkFormatterImpl::UpsertBytesTemplate(_) => unreachable!(),
679 SinkFormatterImpl::AppendOnlyBytes(_) => unreachable!(),
680 }
681 };
682}