1use std::borrow::Cow;
16use std::collections::HashMap;
17
18use regex::{Captures, Regex};
19use risingwave_common::catalog::{Field, Schema};
20use risingwave_common::row::Row;
21use risingwave_common::types::{DataType, ScalarRefImpl, ToText};
22use thiserror_ext::AsReport;
23
24use super::{Result, RowEncoder};
25use crate::sink::SinkError;
26use crate::sink::encoder::SerTo;
27
28pub enum TemplateEncoder {
29 String(TemplateStringEncoder),
30 RedisGeoKey(TemplateRedisGeoKeyEncoder),
31 RedisGeoValue(TemplateRedisGeoValueEncoder),
32 RedisPubSubKey(TemplateRedisPubSubStreamKeyEncoder),
33 RedisStreamValue(TemplateRedisStreamValueEncoder),
34}
35impl TemplateEncoder {
36 pub fn new_string(schema: Schema, col_indices: Option<Vec<usize>>, template: String) -> Self {
37 TemplateEncoder::String(TemplateStringEncoder::new(schema, col_indices, template))
38 }
39
40 pub fn new_geo_value(
41 schema: Schema,
42 col_indices: Option<Vec<usize>>,
43 lat_name: &str,
44 lon_name: &str,
45 ) -> Result<Self> {
46 Ok(TemplateEncoder::RedisGeoValue(
47 TemplateRedisGeoValueEncoder::new(schema, col_indices, lat_name, lon_name)?,
48 ))
49 }
50
51 pub fn new_geo_key(
52 schema: Schema,
53 col_indices: Option<Vec<usize>>,
54 member_name: &str,
55 template: String,
56 ) -> Result<Self> {
57 Ok(TemplateEncoder::RedisGeoKey(
58 TemplateRedisGeoKeyEncoder::new(schema, col_indices, member_name, template)?,
59 ))
60 }
61
62 pub fn new_pubsub_stream_key(
63 schema: Schema,
64 col_indices: Option<Vec<usize>>,
65 channel: Option<String>,
66 channel_column: Option<String>,
67 ) -> Result<Self> {
68 Ok(TemplateEncoder::RedisPubSubKey(
69 TemplateRedisPubSubStreamKeyEncoder::new(schema, col_indices, channel, channel_column)?,
70 ))
71 }
72
73 pub fn new_stream_value(
74 schema: Schema,
75 col_indices: Option<Vec<usize>>,
76 key_template: String,
77 value_template: String,
78 ) -> Self {
79 TemplateEncoder::RedisStreamValue(TemplateRedisStreamValueEncoder::new(
80 schema,
81 col_indices,
82 key_template,
83 value_template,
84 ))
85 }
86}
87impl RowEncoder for TemplateEncoder {
88 type Output = TemplateEncoderOutput;
89
90 fn schema(&self) -> &Schema {
91 match self {
92 TemplateEncoder::String(encoder) => &encoder.schema,
93 TemplateEncoder::RedisGeoValue(encoder) => &encoder.schema,
94 TemplateEncoder::RedisGeoKey(encoder) => &encoder.key_encoder.schema,
95 TemplateEncoder::RedisPubSubKey(encoder) => &encoder.schema,
96 TemplateEncoder::RedisStreamValue(encoder) => &encoder.value_encoder.schema,
97 }
98 }
99
100 fn col_indices(&self) -> Option<&[usize]> {
101 match self {
102 TemplateEncoder::String(encoder) => encoder.col_indices.as_deref(),
103 TemplateEncoder::RedisGeoValue(encoder) => encoder.col_indices.as_deref(),
104 TemplateEncoder::RedisGeoKey(encoder) => encoder.key_encoder.col_indices.as_deref(),
105 TemplateEncoder::RedisPubSubKey(encoder) => encoder.col_indices.as_deref(),
106 TemplateEncoder::RedisStreamValue(encoder) => {
107 encoder.value_encoder.col_indices.as_deref()
108 }
109 }
110 }
111
112 fn encode_cols(
113 &self,
114 row: impl Row,
115 col_indices: impl Iterator<Item = usize>,
116 ) -> Result<Self::Output> {
117 match self {
118 TemplateEncoder::String(encoder) => Ok(TemplateEncoderOutput::String(
119 encoder.encode_cols(row, col_indices)?,
120 )),
121 TemplateEncoder::RedisGeoValue(encoder) => encoder.encode_cols(row, col_indices),
122 TemplateEncoder::RedisGeoKey(encoder) => encoder.encode_cols(row, col_indices),
123 TemplateEncoder::RedisPubSubKey(encoder) => encoder.encode_cols(row, col_indices),
124 TemplateEncoder::RedisStreamValue(encoder) => encoder.encode_cols(row, col_indices),
125 }
126 }
127}
128pub struct TemplateStringEncoder {
131 field_name_to_index: HashMap<String, (usize, Field)>,
132 col_indices: Option<Vec<usize>>,
133 template: String,
134 schema: Schema,
135}
136
137impl TemplateStringEncoder {
139 pub fn new(schema: Schema, col_indices: Option<Vec<usize>>, template: String) -> Self {
140 let field_name_to_index = schema
141 .fields()
142 .iter()
143 .enumerate()
144 .map(|(index, field)| (field.name.clone(), (index, field.clone())))
145 .collect();
146 Self {
147 field_name_to_index,
148 col_indices,
149 template,
150 schema,
151 }
152 }
153
154 pub fn check_string_format(format: &str, map: &HashMap<String, DataType>) -> Result<()> {
155 let re = Regex::new(r"(\\\})|(\\\{)|\{([^}]*)\}").unwrap();
157 if !re.is_match(format) {
158 return Err(SinkError::Redis(
159 "Can't find {} in key_format or value_format".to_owned(),
160 ));
161 }
162 for capture in re.captures_iter(format) {
163 if let Some(inner_content) = capture.get(3)
164 && !map.contains_key(inner_content.as_str())
165 {
166 return Err(SinkError::Redis(format!(
167 "Can't find field({:?}) in key_format or value_format",
168 inner_content.as_str()
169 )));
170 }
171 }
172 Ok(())
173 }
174
175 pub fn encode_cols(
176 &self,
177 row: impl Row,
178 col_indices: impl Iterator<Item = usize>,
179 ) -> Result<String> {
180 let s = self.template.clone();
181 let re = Regex::new(r"(\\\})|(\\\{)|\{([^}]*)\}").unwrap();
182 let col_indices: Vec<_> = col_indices.collect();
183 let replaced = re.replace_all(s.as_ref(), |caps: &Captures<'_>| {
184 if caps.get(1).is_some() {
185 Cow::Borrowed("}")
186 } else if caps.get(2).is_some() {
187 Cow::Borrowed("{")
188 } else if let Some(content) = caps.get(3) {
189 let (idx, field) = self.field_name_to_index.get(content.as_str()).unwrap();
190 if col_indices.contains(idx) {
191 let data = row.datum_at(*idx).to_text_with_type(&field.data_type);
192 Cow::Owned(data)
193 } else {
194 Cow::Borrowed("")
195 }
196 } else {
197 Cow::Borrowed("")
198 }
199 });
200 Ok(replaced.to_string())
201 }
202}
203
204pub struct TemplateRedisGeoValueEncoder {
205 schema: Schema,
206 col_indices: Option<Vec<usize>>,
207 lat_col: usize,
208 lon_col: usize,
209}
210
211impl TemplateRedisGeoValueEncoder {
212 pub fn new(
213 schema: Schema,
214 col_indices: Option<Vec<usize>>,
215 lat_name: &str,
216 lon_name: &str,
217 ) -> Result<Self> {
218 let lat_col = schema
219 .names_str()
220 .iter()
221 .position(|name| name == &lat_name)
222 .ok_or_else(|| {
223 SinkError::Redis(format!("Can't find lat column({}) in schema", lat_name))
224 })?;
225 let lon_col = schema
226 .names_str()
227 .iter()
228 .position(|name| name == &lon_name)
229 .ok_or_else(|| {
230 SinkError::Redis(format!("Can't find lon column({}) in schema", lon_name))
231 })?;
232 Ok(Self {
233 schema,
234 col_indices,
235 lat_col,
236 lon_col,
237 })
238 }
239
240 pub fn encode_cols(
241 &self,
242 row: impl Row,
243 _col_indices: impl Iterator<Item = usize>,
244 ) -> Result<TemplateEncoderOutput> {
245 let lat = into_string_from_scalar(
246 row.datum_at(self.lat_col)
247 .ok_or_else(|| SinkError::Redis("lat is null".to_owned()))?,
248 )?;
249 let lon = into_string_from_scalar(
250 row.datum_at(self.lon_col)
251 .ok_or_else(|| SinkError::Redis("lon is null".to_owned()))?,
252 )?;
253 Ok(TemplateEncoderOutput::RedisGeoValue((lat, lon)))
254 }
255}
256
257fn into_string_from_scalar(scalar: ScalarRefImpl<'_>) -> Result<String> {
258 match scalar {
259 ScalarRefImpl::Float32(ordered_float) => Ok(Into::<f32>::into(ordered_float).to_string()),
260 ScalarRefImpl::Float64(ordered_float) => Ok(Into::<f64>::into(ordered_float).to_string()),
261 ScalarRefImpl::Utf8(s) => Ok(s.to_owned()),
262 _ => Err(SinkError::Encode(
263 "Only f32 and f64 can convert to redis geo".to_owned(),
264 )),
265 }
266}
267
268pub struct TemplateRedisGeoKeyEncoder {
269 key_encoder: TemplateStringEncoder,
270 member_col: usize,
271}
272
273impl TemplateRedisGeoKeyEncoder {
274 pub fn new(
275 schema: Schema,
276 col_indices: Option<Vec<usize>>,
277 member_name: &str,
278 template: String,
279 ) -> Result<Self> {
280 let member_col = schema
281 .names_str()
282 .iter()
283 .position(|name| name == &member_name)
284 .ok_or_else(|| {
285 SinkError::Redis(format!(
286 "Can't find member column({}) in schema",
287 member_name
288 ))
289 })?;
290 let key_encoder = TemplateStringEncoder::new(schema, col_indices, template);
291 Ok(Self {
292 key_encoder,
293 member_col,
294 })
295 }
296
297 pub fn encode_cols(
298 &self,
299 row: impl Row,
300 col_indices: impl Iterator<Item = usize>,
301 ) -> Result<TemplateEncoderOutput> {
302 let member = row
303 .datum_at(self.member_col)
304 .ok_or_else(|| SinkError::Redis("member is null".to_owned()))?
305 .to_text();
306 let key = self.key_encoder.encode_cols(row, col_indices)?;
307 Ok(TemplateEncoderOutput::RedisGeoKey((key, member)))
308 }
309}
310
311pub enum TemplateRedisPubSubStreamKeyEncoderInner {
312 PubSubStreamName(String),
313 PubSubStreamColumnIndex(usize),
314}
315pub struct TemplateRedisPubSubStreamKeyEncoder {
316 inner: TemplateRedisPubSubStreamKeyEncoderInner,
317 schema: Schema,
318 col_indices: Option<Vec<usize>>,
319}
320
321impl TemplateRedisPubSubStreamKeyEncoder {
322 pub fn new(
323 schema: Schema,
324 col_indices: Option<Vec<usize>>,
325 channel: Option<String>,
326 channel_column: Option<String>,
327 ) -> Result<Self> {
328 if let Some(channel) = channel {
329 return Ok(Self {
330 inner: TemplateRedisPubSubStreamKeyEncoderInner::PubSubStreamName(channel),
331 schema,
332 col_indices,
333 });
334 }
335 if let Some(channel_column) = channel_column {
336 let channel_column_index = schema
337 .names_str()
338 .iter()
339 .position(|name| name == &channel_column)
340 .ok_or_else(|| {
341 SinkError::Redis(format!(
342 "Can't find pubsub column({}) in schema",
343 channel_column
344 ))
345 })?;
346 return Ok(Self {
347 inner: TemplateRedisPubSubStreamKeyEncoderInner::PubSubStreamColumnIndex(
348 channel_column_index,
349 ),
350 schema,
351 col_indices,
352 });
353 }
354 Err(SinkError::Redis(
355 "`channel` or `channel_column` must be set".to_owned(),
356 ))
357 }
358
359 pub fn encode_cols(
360 &self,
361 row: impl Row,
362 _col_indices: impl Iterator<Item = usize>,
363 ) -> Result<TemplateEncoderOutput> {
364 match &self.inner {
365 TemplateRedisPubSubStreamKeyEncoderInner::PubSubStreamName(channel) => {
366 Ok(TemplateEncoderOutput::RedisPubSubStreamKey(channel.clone()))
367 }
368 TemplateRedisPubSubStreamKeyEncoderInner::PubSubStreamColumnIndex(pubsub_col) => {
369 let pubsub_key = row
370 .datum_at(*pubsub_col)
371 .ok_or_else(|| SinkError::Redis("pubsub_key is null".to_owned()))?
372 .to_text();
373 Ok(TemplateEncoderOutput::RedisPubSubStreamKey(pubsub_key))
374 }
375 }
376 }
377}
378
379pub struct TemplateRedisStreamValueEncoder {
380 key_encoder: TemplateStringEncoder,
381 value_encoder: TemplateStringEncoder,
382}
383
384impl TemplateRedisStreamValueEncoder {
385 pub fn new(
386 schema: Schema,
387 col_indices: Option<Vec<usize>>,
388 key_template: String,
389 value_template: String,
390 ) -> Self {
391 let key_encoder =
392 TemplateStringEncoder::new(schema.clone(), col_indices.clone(), key_template);
393 let value_encoder = TemplateStringEncoder::new(schema, col_indices, value_template);
394 Self {
395 key_encoder,
396 value_encoder,
397 }
398 }
399
400 pub fn encode_cols(
401 &self,
402 row: impl Row,
403 col_indices: impl Iterator<Item = usize>,
404 ) -> Result<TemplateEncoderOutput> {
405 let col_indices: Vec<_> = col_indices.collect();
406 let key = self
407 .key_encoder
408 .encode_cols(&row, col_indices.clone().into_iter())?;
409 let value = self
410 .value_encoder
411 .encode_cols(row, col_indices.into_iter())?;
412 Ok(TemplateEncoderOutput::RedisStreamValue((key, value)))
413 }
414}
415
416pub enum TemplateEncoderOutput {
417 String(String),
419 RedisGeoValue((String, String)),
421 RedisGeoKey((String, String)),
423
424 RedisPubSubStreamKey(String),
425 RedisStreamValue((String, String)),
426}
427
428impl TemplateEncoderOutput {
429 pub fn into_string(self) -> Result<String> {
430 match self {
431 TemplateEncoderOutput::String(s) => Ok(s),
432 TemplateEncoderOutput::RedisGeoKey(_) => Err(SinkError::Encode(
433 "RedisGeoKey can't convert to string".to_owned(),
434 )),
435 TemplateEncoderOutput::RedisGeoValue(_) => Err(SinkError::Encode(
436 "RedisGeoValue can't convert to string".to_owned(),
437 )),
438 TemplateEncoderOutput::RedisPubSubStreamKey(s) => Ok(s),
439 TemplateEncoderOutput::RedisStreamValue((_, _)) => Err(SinkError::Encode(
440 "RedisStreamValue can't convert to string".to_owned(),
441 )),
442 }
443 }
444}
445
446impl SerTo<String> for TemplateEncoderOutput {
447 fn ser_to(self) -> Result<String> {
448 match self {
449 TemplateEncoderOutput::String(s) => Ok(s),
450 TemplateEncoderOutput::RedisGeoKey(_) => Err(SinkError::Encode(
451 "RedisGeoKey can't convert to string".to_owned(),
452 )),
453 TemplateEncoderOutput::RedisGeoValue(_) => Err(SinkError::Encode(
454 "RedisGeoValue can't convert to string".to_owned(),
455 )),
456 TemplateEncoderOutput::RedisPubSubStreamKey(s) => Ok(s),
457 TemplateEncoderOutput::RedisStreamValue((_, _)) => Err(SinkError::Encode(
458 "RedisStreamValue can't convert to string".to_owned(),
459 )),
460 }
461 }
462}
463
464#[derive(Debug)]
466pub enum RedisSinkPayloadWriterInput {
467 String(String),
469 RedisGeoValue((String, String)),
471 RedisGeoKey((String, String)),
473 RedisPubSubStreamKey(String),
474 RedisStreamValue((String, String)),
475}
476
477impl SerTo<RedisSinkPayloadWriterInput> for TemplateEncoderOutput {
478 fn ser_to(self) -> Result<RedisSinkPayloadWriterInput> {
479 match self {
480 TemplateEncoderOutput::String(s) => Ok(RedisSinkPayloadWriterInput::String(s)),
481 TemplateEncoderOutput::RedisGeoKey((lat, lon)) => {
482 Ok(RedisSinkPayloadWriterInput::RedisGeoKey((lat, lon)))
483 }
484 TemplateEncoderOutput::RedisGeoValue((key, member)) => {
485 Ok(RedisSinkPayloadWriterInput::RedisGeoValue((key, member)))
486 }
487 TemplateEncoderOutput::RedisPubSubStreamKey(s) => {
488 Ok(RedisSinkPayloadWriterInput::RedisPubSubStreamKey(s))
489 }
490 TemplateEncoderOutput::RedisStreamValue((key, value)) => {
491 Ok(RedisSinkPayloadWriterInput::RedisStreamValue((key, value)))
492 }
493 }
494 }
495}
496
497impl<T: SerTo<Vec<u8>>> SerTo<RedisSinkPayloadWriterInput> for T {
498 default fn ser_to(self) -> Result<RedisSinkPayloadWriterInput> {
499 let bytes = self.ser_to()?;
500 Ok(RedisSinkPayloadWriterInput::String(
501 String::from_utf8(bytes).map_err(|e| SinkError::Redis(e.to_report_string()))?,
502 ))
503 }
504}
505
506#[cfg(test)]
507mod tests {
508 use risingwave_common::catalog::{Field, Schema};
509 use risingwave_common::row::OwnedRow;
510 use risingwave_common::types::{DataType, ScalarImpl};
511
512 use super::*;
513
514 #[test]
515 fn test_template_format_validation() {
516 let schema = Schema::new(vec![
518 Field {
519 data_type: DataType::Int32,
520 name: "id".to_owned(),
521 },
522 Field {
523 data_type: DataType::Varchar,
524 name: "name".to_owned(),
525 },
526 Field {
527 data_type: DataType::Varchar,
528 name: "email".to_owned(),
529 },
530 ]);
531
532 let mut map = HashMap::new();
534 for field in schema.fields() {
535 map.insert(field.name.clone(), field.data_type.clone());
536 }
537
538 let valid_templates = vec![
540 "user:{id}",
541 "user:\\{{id}",
542 "user:\\{{id}\\}",
543 "user:\\{{id},{name}\\}",
544 "user:\\{prefix{id},suffix{name}\\}",
545 "user:\\{prefix{id},suffix{name},email:{email}\\}",
546 "user:\\{nested\\{deeply{id}\\}\\}",
547 "user:\\{outer\\{inner{id}\\},another{name}\\}",
548 "user:\\{complex\\{structure\\{with{id}\\},and{name}\\},email:{email}\\}",
549 "user:{id}{name}",
550 "user:\\\\{id}",
551 "user:\\\\\\{id}",
552 "user:\\a{id}",
553 "user:\\b{name}",
554 "user:{id}{name}{email}",
555 ];
556
557 for template in valid_templates {
558 assert!(
560 TemplateStringEncoder::check_string_format(template, &map).is_ok(),
561 "Template '{}' should be valid",
562 template
563 );
564 }
565
566 let invalid_templates = vec![
568 "user:no_braces", "user:{invalid_column}", "user:{id", "user:id}", "sadsadsad{}qw4e2ewq21", "user:{}",
574 "user:{\\id}",
575 ];
576
577 for template in invalid_templates {
578 assert!(
580 TemplateStringEncoder::check_string_format(template, &map).is_err(),
581 "Template '{}' should be invalid",
582 template
583 );
584 }
585 }
586
587 #[test]
588 fn test_template_encoding() {
589 let schema = Schema::new(vec![
591 Field {
592 data_type: DataType::Int32,
593 name: "id".to_owned(),
594 },
595 Field {
596 data_type: DataType::Varchar,
597 name: "name".to_owned(),
598 },
599 Field {
600 data_type: DataType::Varchar,
601 name: "email".to_owned(),
602 },
603 ]);
604
605 let test_cases = vec![
607 ("user:{id}", "user:123", vec![0]),
608 ("user:\\{id\\}", "user:{id}", vec![0]),
609 ("user:\\{id,name\\}", "user:{id,name}", vec![0, 1]),
610 (
611 "user:\\{prefix{id},suffix{name}\\}",
612 "user:{prefix123,suffixJohn Doe}",
613 vec![0, 1],
614 ),
615 (
616 "user:\\{nested\\{deeply{id}\\}\\}",
617 "user:{nested{deeply123}}",
618 vec![0],
619 ),
620 (
621 "user:\\{outer\\{inner{id}\\},another{name}\\}",
622 "user:{outer{inner123},anotherJohn Doe}",
623 vec![0, 1],
624 ),
625 ("user:{id}{name}", "user:123John Doe", vec![0, 1]),
626 ("user:\\{id\\}{name}", "user:{id}John Doe", vec![0, 1]),
627 ("user:\\\\{id}", "user:\\{id}", vec![0]),
628 ("user:\\\\\\{id}", "user:\\\\{id}", vec![0]),
629 ("user:\\a{id}", "user:\\a123", vec![0]),
630 ("user:\\b{name}", "user:\\bJohn Doe", vec![1]),
631 (
632 "user:{id}{name}{email}",
633 "user:123John Doejohn@example.com",
634 vec![0, 1, 2],
635 ),
636 ];
637
638 for (template, expected, col_indices) in test_cases {
639 let encoder = TemplateStringEncoder::new(
641 schema.clone(),
642 Some(col_indices.clone()),
643 template.to_owned(),
644 );
645
646 let row = OwnedRow::new(vec![
648 Some(ScalarImpl::Int32(123)),
649 Some(ScalarImpl::Utf8("John Doe".into())),
650 Some(ScalarImpl::Utf8("john@example.com".into())),
651 ]);
652
653 let result = encoder.encode_cols(row, col_indices.into_iter()).unwrap();
655
656 assert_eq!(result, expected, "Template '{}' encoding failed", template);
658 }
659 }
660
661 #[test]
662 fn test_complex_nested_template() {
663 let schema = Schema::new(vec![
665 Field {
666 data_type: DataType::Int32,
667 name: "id".to_owned(),
668 },
669 Field {
670 data_type: DataType::Varchar,
671 name: "name".to_owned(),
672 },
673 Field {
674 data_type: DataType::Varchar,
675 name: "email".to_owned(),
676 },
677 ]);
678
679 let mut map = HashMap::new();
681 for field in schema.fields() {
682 map.insert(field.name.clone(), field.data_type.clone());
683 }
684
685 let complex_template = "user:\\{prefix{id},suffix{name},email:{email},nested\\{deeply{id}\\},outer\\{inner{name}\\}\\}";
687
688 assert!(TemplateStringEncoder::check_string_format(complex_template, &map).is_ok());
690
691 let encoder = TemplateStringEncoder::new(
693 schema,
694 Some(vec![0, 1, 2]), complex_template.to_owned(),
696 );
697
698 let row = OwnedRow::new(vec![
700 Some(ScalarImpl::Int32(123)),
701 Some(ScalarImpl::Utf8("John Doe".into())),
702 Some(ScalarImpl::Utf8("john@example.com".into())),
703 ]);
704
705 let result = encoder.encode_cols(row, vec![0, 1, 2].into_iter()).unwrap();
707
708 assert_eq!(
710 result,
711 "user:{prefix123,suffixJohn Doe,email:john@example.com,nested{deeply123},outer{innerJohn Doe}}"
712 );
713 }
714}