1use anyhow::{Context, anyhow};
16use risingwave_common::array::StreamChunk;
17use risingwave_common::catalog::Field;
18
19use crate::sink::{Result, SinkError};
20
21mod append_only;
22mod debezium_json;
23mod upsert;
24
25pub use append_only::AppendOnlyFormatter;
26pub use debezium_json::{DebeziumAdapterOpts, DebeziumJsonFormatter};
27use risingwave_common::catalog::Schema;
28use risingwave_common::types::DataType;
29pub use upsert::UpsertFormatter;
30
31use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc};
32use super::encoder::bytes::BytesEncoder;
33use super::encoder::template::TemplateEncoder;
34use super::encoder::text::TextEncoder;
35use super::encoder::{
36 DateHandlingMode, JsonbHandlingMode, KafkaConnectParams, TimeHandlingMode,
37 TimestamptzHandlingMode,
38};
39use super::redis::{
40 CHANNEL, CHANNEL_COLUMN, KEY_FORMAT, LAT_NAME, LON_NAME, MEMBER_NAME, REDIS_VALUE_TYPE,
41 REDIS_VALUE_TYPE_GEO, REDIS_VALUE_TYPE_PUBSUB, REDIS_VALUE_TYPE_STRING, VALUE_FORMAT,
42};
43use crate::sink::encoder::{
44 AvroEncoder, AvroHeader, JsonEncoder, ProtoEncoder, ProtoHeader, TimestampHandlingMode,
45};
46
47pub trait SinkFormatter {
50 type K;
51 type V;
52
53 #[expect(clippy::type_complexity)]
58 fn format_chunk(
59 &self,
60 chunk: &StreamChunk,
61 ) -> impl Iterator<Item = Result<(Option<Self::K>, Option<Self::V>)>>;
62}
63
64#[macro_export]
67macro_rules! tri {
68 ($expr:expr) => {
69 match $expr {
70 Ok(val) => val,
71 Err(err) => {
72 yield Err(err);
73 return;
74 }
75 }
76 };
77}
78
79pub enum SinkFormatterImpl {
80 AppendOnlyJson(AppendOnlyFormatter<JsonEncoder, JsonEncoder>),
82 AppendOnlyTextJson(AppendOnlyFormatter<TextEncoder, JsonEncoder>),
83 AppendOnlyBytesJson(AppendOnlyFormatter<BytesEncoder, JsonEncoder>),
84 AppendOnlyAvro(AppendOnlyFormatter<AvroEncoder, AvroEncoder>),
85 AppendOnlyTextAvro(AppendOnlyFormatter<TextEncoder, AvroEncoder>),
86 AppendOnlyBytesAvro(AppendOnlyFormatter<BytesEncoder, AvroEncoder>),
87 AppendOnlyProto(AppendOnlyFormatter<JsonEncoder, ProtoEncoder>),
88 AppendOnlyTextProto(AppendOnlyFormatter<TextEncoder, ProtoEncoder>),
89 AppendOnlyBytesProto(AppendOnlyFormatter<BytesEncoder, ProtoEncoder>),
90 AppendOnlyTemplate(AppendOnlyFormatter<TemplateEncoder, TemplateEncoder>),
91 AppendOnlyTextTemplate(AppendOnlyFormatter<TextEncoder, TemplateEncoder>),
92 AppendOnlyBytesTemplate(AppendOnlyFormatter<BytesEncoder, TemplateEncoder>),
93 UpsertJson(UpsertFormatter<JsonEncoder, JsonEncoder>),
95 UpsertTextJson(UpsertFormatter<TextEncoder, JsonEncoder>),
96 UpsertBytesJson(UpsertFormatter<BytesEncoder, JsonEncoder>),
97 UpsertAvro(UpsertFormatter<AvroEncoder, AvroEncoder>),
98 UpsertTextAvro(UpsertFormatter<TextEncoder, AvroEncoder>),
99 UpsertBytesAvro(UpsertFormatter<BytesEncoder, AvroEncoder>),
100 UpsertTextProto(UpsertFormatter<TextEncoder, ProtoEncoder>),
104 UpsertBytesProto(UpsertFormatter<BytesEncoder, ProtoEncoder>),
105 UpsertTemplate(UpsertFormatter<TemplateEncoder, TemplateEncoder>),
106 UpsertTextTemplate(UpsertFormatter<TextEncoder, TemplateEncoder>),
107 UpsertBytesTemplate(UpsertFormatter<BytesEncoder, TemplateEncoder>),
108 DebeziumJson(DebeziumJsonFormatter),
110}
111
112#[derive(Debug, Clone)]
113pub struct EncoderParams<'a> {
114 format_desc: &'a SinkFormatDesc,
115 schema: Schema,
116 db_name: String,
117 sink_from_name: String,
118 topic: &'a str,
119}
120
121pub trait EncoderBuild: Sized {
127 async fn build(params: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self>;
130}
131
132impl EncoderBuild for JsonEncoder {
133 async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
134 let timestamptz_mode = TimestamptzHandlingMode::from_options(&b.format_desc.options)?;
135 let jsonb_handling_mode = JsonbHandlingMode::from_options(&b.format_desc.options)?;
136 let encoder = JsonEncoder::new(
137 b.schema,
138 pk_indices,
139 DateHandlingMode::FromCe,
140 TimestampHandlingMode::Milli,
141 timestamptz_mode,
142 TimeHandlingMode::Milli,
143 jsonb_handling_mode,
144 );
145 let encoder = if let Some(s) = b.format_desc.options.get("schemas.enable") {
146 match s.to_lowercase().parse::<bool>() {
147 Ok(true) => {
148 let kafka_connect = KafkaConnectParams {
149 schema_name: format!("{}.{}", b.db_name, b.sink_from_name),
150 };
151 encoder.with_kafka_connect(kafka_connect)
152 }
153 Ok(false) => encoder,
154 _ => {
155 return Err(SinkError::Config(anyhow!(
156 "schemas.enable is expected to be `true` or `false`, got {s}",
157 )));
158 }
159 }
160 } else {
161 encoder
162 };
163 Ok(encoder)
164 }
165}
166
167impl EncoderBuild for ProtoEncoder {
168 async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
169 assert!(pk_indices.is_none());
171 let (descriptor, sid) =
173 crate::schema::protobuf::fetch_descriptor(&b.format_desc.options, b.topic, None)
174 .await
175 .map_err(|e| SinkError::Config(anyhow!(e)))?;
176 let header = match sid {
177 None => ProtoHeader::None,
178 Some(sid) => ProtoHeader::ConfluentSchemaRegistry(sid),
179 };
180 ProtoEncoder::new(b.schema, None, descriptor, header)
181 }
182}
183
184fn ensure_only_one_pk<'a>(
185 data_type_name: &'a str,
186 params: &'a EncoderParams<'_>,
187 pk_indices: &'a Option<Vec<usize>>,
188) -> Result<(usize, &'a Field)> {
189 let Some(pk_indices) = pk_indices else {
190 return Err(SinkError::Config(anyhow!(
191 "{}Encoder requires primary key columns to be specified",
192 data_type_name
193 )));
194 };
195 if pk_indices.len() != 1 {
196 return Err(SinkError::Config(anyhow!(
197 "KEY ENCODE {} expects only one primary key, but got {}",
198 data_type_name,
199 pk_indices.len(),
200 )));
201 }
202
203 let schema_ref = params.schema.fields().get(pk_indices[0]).ok_or_else(|| {
204 SinkError::Config(anyhow!(
205 "The primary key column index {} is out of bounds in schema {:?}",
206 pk_indices[0],
207 params.schema
208 ))
209 })?;
210
211 Ok((pk_indices[0], schema_ref))
212}
213
214impl EncoderBuild for BytesEncoder {
215 async fn build(params: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
216 let (pk_index, schema_ref) = ensure_only_one_pk("BYTES", ¶ms, &pk_indices)?;
217 if let DataType::Bytea = schema_ref.data_type() {
218 Ok(BytesEncoder::new(params.schema, pk_index))
219 } else {
220 Err(SinkError::Config(anyhow!(
221 "The key encode is BYTES, but the primary key column {} has type {}",
222 schema_ref.name,
223 schema_ref.data_type
224 )))
225 }
226 }
227}
228
229impl EncoderBuild for TextEncoder {
230 async fn build(params: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
231 let (pk_index, schema_ref) = ensure_only_one_pk("TEXT", ¶ms, &pk_indices)?;
232 match &schema_ref.data_type() {
233 DataType::Varchar
234 | DataType::Boolean
235 | DataType::Int16
236 | DataType::Int32
237 | DataType::Int64
238 | DataType::Int256
239 | DataType::Serial => {}
240 _ => {
241 return Err(SinkError::Config(anyhow!(
243 "The key encode is TEXT, but the primary key column {} has type {}. The key encode TEXT requires the primary key column to be of type varchar, bool, small int, int, big int, serial or rw_int256.",
244 schema_ref.name,
245 schema_ref.data_type
246 )));
247 }
248 }
249
250 Ok(Self::new(params.schema, pk_index))
251 }
252}
253
254impl EncoderBuild for AvroEncoder {
255 async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
256 use crate::schema::{SchemaLoader, SchemaVersion};
257
258 let loader = SchemaLoader::from_format_options(b.topic, &b.format_desc.options)
259 .await
260 .map_err(|e| SinkError::Config(anyhow!(e)))?;
261
262 let (schema_version, avro) = match pk_indices {
263 Some(_) => loader
264 .load_key_schema()
265 .await
266 .map_err(|e| SinkError::Config(anyhow!(e)))?,
267 None => loader
268 .load_val_schema()
269 .await
270 .map_err(|e| SinkError::Config(anyhow!(e)))?,
271 };
272 AvroEncoder::new(
273 b.schema,
274 pk_indices,
275 std::sync::Arc::new(avro),
276 match schema_version {
277 SchemaVersion::Confluent(x) => AvroHeader::ConfluentSchemaRegistry(x),
278 SchemaVersion::Glue(x) => AvroHeader::GlueSchemaRegistry(x),
279 },
280 )
281 }
282}
283
284impl EncoderBuild for TemplateEncoder {
285 async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
286 let redis_value_type = b
287 .format_desc
288 .options
289 .get(REDIS_VALUE_TYPE)
290 .map_or(REDIS_VALUE_TYPE_STRING, |s| s.as_str());
291 match redis_value_type {
292 REDIS_VALUE_TYPE_STRING => {
293 let option_name = match pk_indices {
294 Some(_) => KEY_FORMAT,
295 None => VALUE_FORMAT,
296 };
297 let template = b.format_desc.options.get(option_name).ok_or_else(|| {
298 SinkError::Config(anyhow!("Cannot find '{option_name}',please set it."))
299 })?;
300 Ok(TemplateEncoder::new_string(
301 b.schema,
302 pk_indices,
303 template.clone(),
304 ))
305 }
306 REDIS_VALUE_TYPE_GEO => match pk_indices {
307 Some(_) => {
308 let member_name = b.format_desc.options.get(MEMBER_NAME).ok_or_else(|| {
309 SinkError::Config(anyhow!("Cannot find `{MEMBER_NAME}`,please set it."))
310 })?;
311 let template = b.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
312 SinkError::Config(anyhow!("Cannot find `{KEY_FORMAT}`,please set it."))
313 })?;
314 TemplateEncoder::new_geo_key(
315 b.schema,
316 pk_indices,
317 member_name,
318 template.clone(),
319 )
320 }
321 None => {
322 let lat_name = b.format_desc.options.get(LAT_NAME).ok_or_else(|| {
323 SinkError::Config(anyhow!("Cannot find `{LAT_NAME}`, please set it."))
324 })?;
325 let lon_name = b.format_desc.options.get(LON_NAME).ok_or_else(|| {
326 SinkError::Config(anyhow!("Cannot find `{LON_NAME}`,please set it."))
327 })?;
328 TemplateEncoder::new_geo_value(b.schema, pk_indices, lat_name, lon_name)
329 }
330 },
331 REDIS_VALUE_TYPE_PUBSUB => match pk_indices {
332 Some(_) => {
333 let channel = b.format_desc.options.get(CHANNEL).cloned();
334 let channel_column = b.format_desc.options.get(CHANNEL_COLUMN).cloned();
335 if (channel.is_none() && channel_column.is_none())
336 || (channel.is_some() && channel_column.is_some())
337 {
338 return Err(SinkError::Config(anyhow!(
339 "`{CHANNEL}` and `{CHANNEL_COLUMN}` only one can be set"
340 )));
341 }
342 TemplateEncoder::new_pubsub_key(b.schema, pk_indices, channel, channel_column)
343 }
344 None => {
345 let template = b.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
346 SinkError::Config(anyhow!("Cannot find '{VALUE_FORMAT}',please set it."))
347 })?;
348 Ok(TemplateEncoder::new_string(
349 b.schema,
350 pk_indices,
351 template.clone(),
352 ))
353 }
354 },
355 _ => Err(SinkError::Config(anyhow!(
356 "The value type {} is not supported",
357 redis_value_type
358 ))),
359 }
360 }
361}
362
363struct FormatterParams<'a> {
364 builder: EncoderParams<'a>,
365 pk_indices: Vec<usize>,
366}
367
368trait FormatterBuild: Sized {
374 async fn build(b: FormatterParams<'_>) -> Result<Self>;
375}
376
377impl<KE: EncoderBuild, VE: EncoderBuild> FormatterBuild for AppendOnlyFormatter<KE, VE> {
378 async fn build(b: FormatterParams<'_>) -> Result<Self> {
379 let key_encoder = match b.pk_indices.is_empty() {
380 true => None,
381 false => Some(KE::build(b.builder.clone(), Some(b.pk_indices)).await?),
382 };
383 let val_encoder = VE::build(b.builder, None).await?;
384 Ok(AppendOnlyFormatter::new(key_encoder, val_encoder))
385 }
386}
387
388impl<KE: EncoderBuild, VE: EncoderBuild> FormatterBuild for UpsertFormatter<KE, VE> {
389 async fn build(b: FormatterParams<'_>) -> Result<Self> {
390 let key_encoder = KE::build(b.builder.clone(), Some(b.pk_indices))
391 .await
392 .with_context(|| "Failed to build key encoder")?;
393 let val_encoder = VE::build(b.builder, None)
394 .await
395 .with_context(|| "Failed to build value encoder")?;
396 Ok(UpsertFormatter::new(key_encoder, val_encoder))
397 }
398}
399
400impl FormatterBuild for DebeziumJsonFormatter {
401 async fn build(b: FormatterParams<'_>) -> Result<Self> {
402 assert_eq!(b.builder.format_desc.encode, SinkEncode::Json);
403
404 Ok(DebeziumJsonFormatter::new(
405 b.builder.schema,
406 b.pk_indices,
407 b.builder.db_name,
408 b.builder.sink_from_name,
409 DebeziumAdapterOpts::default(),
410 ))
411 }
412}
413
414impl SinkFormatterImpl {
415 pub async fn new(
416 format_desc: &SinkFormatDesc,
417 schema: Schema,
418 pk_indices: Vec<usize>,
419 db_name: String,
420 sink_from_name: String,
421 topic: &str,
422 ) -> Result<Self> {
423 use {SinkEncode as E, SinkFormat as F, SinkFormatterImpl as Impl};
424 let p = FormatterParams {
425 builder: EncoderParams {
426 format_desc,
427 schema,
428 db_name,
429 sink_from_name,
430 topic,
431 },
432 pk_indices,
433 };
434
435 async fn build<T: FormatterBuild>(p: FormatterParams<'_>) -> Result<T> {
446 T::build(p).await
447 }
448
449 Ok(
450 match (
451 &format_desc.format,
452 &format_desc.encode,
453 &format_desc.key_encode,
454 ) {
455 (F::AppendOnly, E::Json, Some(E::Text)) => {
456 Impl::AppendOnlyTextJson(build(p).await?)
457 }
458 (F::AppendOnly, E::Json, Some(E::Bytes)) => {
459 Impl::AppendOnlyBytesJson(build(p).await?)
460 }
461 (F::AppendOnly, E::Json, None) => Impl::AppendOnlyJson(build(p).await?),
462 (F::AppendOnly, E::Avro, Some(E::Text)) => {
463 Impl::AppendOnlyTextAvro(build(p).await?)
464 }
465 (F::AppendOnly, E::Avro, Some(E::Bytes)) => {
466 Impl::AppendOnlyBytesAvro(build(p).await?)
467 }
468 (F::AppendOnly, E::Avro, None) => Impl::AppendOnlyAvro(build(p).await?),
469 (F::AppendOnly, E::Protobuf, Some(E::Text)) => {
470 Impl::AppendOnlyTextProto(build(p).await?)
471 }
472 (F::AppendOnly, E::Protobuf, Some(E::Bytes)) => {
473 Impl::AppendOnlyBytesProto(build(p).await?)
474 }
475 (F::AppendOnly, E::Protobuf, None) => Impl::AppendOnlyProto(build(p).await?),
476 (F::AppendOnly, E::Template, Some(E::Text)) => {
477 Impl::AppendOnlyTextTemplate(build(p).await?)
478 }
479 (F::AppendOnly, E::Template, Some(E::Bytes)) => {
480 Impl::AppendOnlyBytesTemplate(build(p).await?)
481 }
482 (F::AppendOnly, E::Template, None) => Impl::AppendOnlyTemplate(build(p).await?),
483 (F::Upsert, E::Json, Some(E::Text)) => Impl::UpsertTextJson(build(p).await?),
484 (F::Upsert, E::Json, Some(E::Bytes)) => {
485 Impl::UpsertBytesJson(build(p).await?)
486 }
487 (F::Upsert, E::Json, None) => Impl::UpsertJson(build(p).await?),
488 (F::Upsert, E::Avro, Some(E::Text)) => Impl::UpsertTextAvro(build(p).await?),
489 (F::Upsert, E::Avro, Some(E::Bytes)) => {
490 Impl::UpsertBytesAvro(build(p).await?)
491 }
492 (F::Upsert, E::Avro, None) => Impl::UpsertAvro(build(p).await?),
493 (F::Upsert, E::Protobuf, Some(E::Text)) => Impl::UpsertTextProto(build(p).await?),
494 (F::Upsert, E::Protobuf, Some(E::Bytes)) => {
495 Impl::UpsertBytesProto(build(p).await?)
496 }
497 (F::Upsert, E::Template, Some(E::Text)) => {
498 Impl::UpsertTextTemplate(build(p).await?)
499 }
500 (F::Upsert, E::Template, Some(E::Bytes)) => {
501 Impl::UpsertBytesTemplate(build(p).await?)
502 }
503 (F::Upsert, E::Template, None) => Impl::UpsertTemplate(build(p).await?),
504 (F::Debezium, E::Json, None) => Impl::DebeziumJson(build(p).await?),
505 (F::AppendOnly | F::Upsert, E::Text, _) => {
506 return Err(SinkError::Config(anyhow!(
507 "ENCODE TEXT is only valid as key encode."
508 )));
509 }
510 (F::AppendOnly, E::Avro, _)
511 | (F::Upsert, E::Protobuf, _)
512 | (F::Debezium, E::Json, Some(_))
513 | (F::Debezium, E::Avro | E::Protobuf | E::Template | E::Text, _)
514 | (F::AppendOnly, E::Bytes, _)
515 | (F::Upsert, E::Bytes, _)
516 | (F::Debezium, E::Bytes, _)
517 | (_, E::Parquet, _)
518 | (_, _, Some(E::Parquet))
519 | (F::AppendOnly | F::Upsert, _, Some(E::Template) | Some(E::Json) | Some(E::Avro) | Some(E::Protobuf)) => {
521 return Err(SinkError::Config(anyhow!(
522 "sink format/encode/key_encode unsupported: {:?} {:?} {:?}",
523 format_desc.format,
524 format_desc.encode,
525 format_desc.key_encode
526 )));
527 }
528 },
529 )
530 }
531}
532
533#[macro_export]
539macro_rules! dispatch_sink_formatter_impl {
540 ($impl:expr, $name:ident, $body:expr) => {
541 match $impl {
542 SinkFormatterImpl::AppendOnlyJson($name) => $body,
543 SinkFormatterImpl::AppendOnlyBytesJson($name) => $body,
544 SinkFormatterImpl::AppendOnlyTextJson($name) => $body,
545 SinkFormatterImpl::AppendOnlyAvro($name) => $body,
546 SinkFormatterImpl::AppendOnlyTextAvro($name) => $body,
547 SinkFormatterImpl::AppendOnlyBytesAvro($name) => $body,
548 SinkFormatterImpl::AppendOnlyProto($name) => $body,
549 SinkFormatterImpl::AppendOnlyTextProto($name) => $body,
550 SinkFormatterImpl::AppendOnlyBytesProto($name) => $body,
551
552 SinkFormatterImpl::UpsertJson($name) => $body,
553 SinkFormatterImpl::UpsertBytesJson($name) => $body,
554 SinkFormatterImpl::UpsertTextJson($name) => $body,
555 SinkFormatterImpl::UpsertAvro($name) => $body,
556 SinkFormatterImpl::UpsertTextAvro($name) => $body,
557 SinkFormatterImpl::UpsertBytesAvro($name) => $body,
558 SinkFormatterImpl::UpsertTextProto($name) => $body,
559 SinkFormatterImpl::UpsertBytesProto($name) => $body,
560 SinkFormatterImpl::DebeziumJson($name) => $body,
561 SinkFormatterImpl::AppendOnlyTextTemplate($name) => $body,
562 SinkFormatterImpl::AppendOnlyTemplate($name) => $body,
563 SinkFormatterImpl::UpsertTextTemplate($name) => $body,
564 SinkFormatterImpl::UpsertTemplate($name) => $body,
565 SinkFormatterImpl::AppendOnlyBytesTemplate($name) => $body,
566 SinkFormatterImpl::UpsertBytesTemplate($name) => $body,
567 }
568 };
569}
570
571#[macro_export]
579macro_rules! dispatch_sink_formatter_str_key_impl {
580 ($impl:expr, $name:ident, $body:expr) => {
581 match $impl {
582 SinkFormatterImpl::AppendOnlyJson($name) => $body,
583 SinkFormatterImpl::AppendOnlyBytesJson(_) => unreachable!(),
584 SinkFormatterImpl::AppendOnlyTextJson($name) => $body,
585 SinkFormatterImpl::AppendOnlyAvro(_) => unreachable!(),
586 SinkFormatterImpl::AppendOnlyTextAvro($name) => $body,
587 SinkFormatterImpl::AppendOnlyBytesAvro(_) => unreachable!(),
588 SinkFormatterImpl::AppendOnlyProto($name) => $body,
589 SinkFormatterImpl::AppendOnlyTextProto($name) => $body,
590 SinkFormatterImpl::AppendOnlyBytesProto(_) => unreachable!(),
591
592 SinkFormatterImpl::UpsertJson($name) => $body,
593 SinkFormatterImpl::UpsertTextJson($name) => $body,
594 SinkFormatterImpl::UpsertAvro(_) => unreachable!(),
595 SinkFormatterImpl::UpsertTextAvro($name) => $body,
596 SinkFormatterImpl::UpsertBytesAvro(_) => unreachable!(),
597 SinkFormatterImpl::UpsertTextProto($name) => $body,
598 SinkFormatterImpl::UpsertBytesProto(_) => unreachable!(),
599 SinkFormatterImpl::DebeziumJson($name) => $body,
600 SinkFormatterImpl::AppendOnlyTextTemplate($name) => $body,
601 SinkFormatterImpl::AppendOnlyTemplate($name) => $body,
602 SinkFormatterImpl::UpsertTextTemplate($name) => $body,
603 SinkFormatterImpl::UpsertBytesJson(_) => unreachable!(),
604 SinkFormatterImpl::UpsertTemplate($name) => $body,
605 SinkFormatterImpl::AppendOnlyBytesTemplate(_) => unreachable!(),
606 SinkFormatterImpl::UpsertBytesTemplate(_) => unreachable!(),
607 }
608 };
609}