risingwave_connector/sink/encoder/
template.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Cow;
16use std::collections::HashMap;
17
18use regex::{Captures, Regex};
19use risingwave_common::catalog::{Field, Schema};
20use risingwave_common::row::Row;
21use risingwave_common::types::{DataType, ScalarRefImpl, ToText};
22use thiserror_ext::AsReport;
23
24use super::{Result, RowEncoder};
25use crate::sink::SinkError;
26use crate::sink::encoder::SerTo;
27
28pub enum TemplateEncoder {
29    String(TemplateStringEncoder),
30    RedisGeoKey(TemplateRedisGeoKeyEncoder),
31    RedisGeoValue(TemplateRedisGeoValueEncoder),
32    RedisPubSubKey(TemplateRedisPubSubStreamKeyEncoder),
33    RedisStreamValue(TemplateRedisStreamValueEncoder),
34}
35impl TemplateEncoder {
36    pub fn new_string(schema: Schema, col_indices: Option<Vec<usize>>, template: String) -> Self {
37        TemplateEncoder::String(TemplateStringEncoder::new(schema, col_indices, template))
38    }
39
40    pub fn new_geo_value(
41        schema: Schema,
42        col_indices: Option<Vec<usize>>,
43        lat_name: &str,
44        lon_name: &str,
45    ) -> Result<Self> {
46        Ok(TemplateEncoder::RedisGeoValue(
47            TemplateRedisGeoValueEncoder::new(schema, col_indices, lat_name, lon_name)?,
48        ))
49    }
50
51    pub fn new_geo_key(
52        schema: Schema,
53        col_indices: Option<Vec<usize>>,
54        member_name: &str,
55        template: String,
56    ) -> Result<Self> {
57        Ok(TemplateEncoder::RedisGeoKey(
58            TemplateRedisGeoKeyEncoder::new(schema, col_indices, member_name, template)?,
59        ))
60    }
61
62    pub fn new_pubsub_stream_key(
63        schema: Schema,
64        col_indices: Option<Vec<usize>>,
65        channel: Option<String>,
66        channel_column: Option<String>,
67    ) -> Result<Self> {
68        Ok(TemplateEncoder::RedisPubSubKey(
69            TemplateRedisPubSubStreamKeyEncoder::new(schema, col_indices, channel, channel_column)?,
70        ))
71    }
72
73    pub fn new_stream_value(
74        schema: Schema,
75        col_indices: Option<Vec<usize>>,
76        key_template: String,
77        value_template: String,
78    ) -> Self {
79        TemplateEncoder::RedisStreamValue(TemplateRedisStreamValueEncoder::new(
80            schema,
81            col_indices,
82            key_template,
83            value_template,
84        ))
85    }
86}
87impl RowEncoder for TemplateEncoder {
88    type Output = TemplateEncoderOutput;
89
90    fn schema(&self) -> &Schema {
91        match self {
92            TemplateEncoder::String(encoder) => &encoder.schema,
93            TemplateEncoder::RedisGeoValue(encoder) => &encoder.schema,
94            TemplateEncoder::RedisGeoKey(encoder) => &encoder.key_encoder.schema,
95            TemplateEncoder::RedisPubSubKey(encoder) => &encoder.schema,
96            TemplateEncoder::RedisStreamValue(encoder) => &encoder.value_encoder.schema,
97        }
98    }
99
100    fn col_indices(&self) -> Option<&[usize]> {
101        match self {
102            TemplateEncoder::String(encoder) => encoder.col_indices.as_deref(),
103            TemplateEncoder::RedisGeoValue(encoder) => encoder.col_indices.as_deref(),
104            TemplateEncoder::RedisGeoKey(encoder) => encoder.key_encoder.col_indices.as_deref(),
105            TemplateEncoder::RedisPubSubKey(encoder) => encoder.col_indices.as_deref(),
106            TemplateEncoder::RedisStreamValue(encoder) => {
107                encoder.value_encoder.col_indices.as_deref()
108            }
109        }
110    }
111
112    fn encode_cols(
113        &self,
114        row: impl Row,
115        col_indices: impl Iterator<Item = usize>,
116    ) -> Result<Self::Output> {
117        match self {
118            TemplateEncoder::String(encoder) => Ok(TemplateEncoderOutput::String(
119                encoder.encode_cols(row, col_indices)?,
120            )),
121            TemplateEncoder::RedisGeoValue(encoder) => encoder.encode_cols(row, col_indices),
122            TemplateEncoder::RedisGeoKey(encoder) => encoder.encode_cols(row, col_indices),
123            TemplateEncoder::RedisPubSubKey(encoder) => encoder.encode_cols(row, col_indices),
124            TemplateEncoder::RedisStreamValue(encoder) => encoder.encode_cols(row, col_indices),
125        }
126    }
127}
128/// Encode a row according to a specified string template `user_id:{user_id}`.
129/// Data is encoded to string with [`ToText`].
130pub struct TemplateStringEncoder {
131    field_name_to_index: HashMap<String, (usize, Field)>,
132    col_indices: Option<Vec<usize>>,
133    template: String,
134    schema: Schema,
135}
136
137/// todo! improve the performance.
138impl TemplateStringEncoder {
139    pub fn new(schema: Schema, col_indices: Option<Vec<usize>>, template: String) -> Self {
140        let field_name_to_index = schema
141            .fields()
142            .iter()
143            .enumerate()
144            .map(|(index, field)| (field.name.clone(), (index, field.clone())))
145            .collect();
146        Self {
147            field_name_to_index,
148            col_indices,
149            template,
150            schema,
151        }
152    }
153
154    pub fn check_string_format(format: &str, map: &HashMap<String, DataType>) -> Result<()> {
155        // We will check if the string inside {} corresponds to a column name in rw.
156        let re = Regex::new(r"(\\\})|(\\\{)|\{([^}]*)\}").unwrap();
157        if !re.is_match(format) {
158            return Err(SinkError::Redis(
159                "Can't find {} in key_format or value_format".to_owned(),
160            ));
161        }
162        for capture in re.captures_iter(format) {
163            if let Some(inner_content) = capture.get(3)
164                && !map.contains_key(inner_content.as_str())
165            {
166                return Err(SinkError::Redis(format!(
167                    "Can't find field({:?}) in key_format or value_format",
168                    inner_content.as_str()
169                )));
170            }
171        }
172        Ok(())
173    }
174
175    pub fn encode_cols(
176        &self,
177        row: impl Row,
178        col_indices: impl Iterator<Item = usize>,
179    ) -> Result<String> {
180        let s = self.template.clone();
181        let re = Regex::new(r"(\\\})|(\\\{)|\{([^}]*)\}").unwrap();
182        let col_indices: Vec<_> = col_indices.collect();
183        let replaced = re.replace_all(s.as_ref(), |caps: &Captures<'_>| {
184            if caps.get(1).is_some() {
185                Cow::Borrowed("}")
186            } else if caps.get(2).is_some() {
187                Cow::Borrowed("{")
188            } else if let Some(content) = caps.get(3) {
189                let (idx, field) = self.field_name_to_index.get(content.as_str()).unwrap();
190                if col_indices.contains(idx) {
191                    let data = row.datum_at(*idx).to_text_with_type(&field.data_type);
192                    Cow::Owned(data)
193                } else {
194                    Cow::Borrowed("")
195                }
196            } else {
197                Cow::Borrowed("")
198            }
199        });
200        Ok(replaced.to_string())
201    }
202}
203
204pub struct TemplateRedisGeoValueEncoder {
205    schema: Schema,
206    col_indices: Option<Vec<usize>>,
207    lat_col: usize,
208    lon_col: usize,
209}
210
211impl TemplateRedisGeoValueEncoder {
212    pub fn new(
213        schema: Schema,
214        col_indices: Option<Vec<usize>>,
215        lat_name: &str,
216        lon_name: &str,
217    ) -> Result<Self> {
218        let lat_col = schema
219            .names_str()
220            .iter()
221            .position(|name| name == &lat_name)
222            .ok_or_else(|| {
223                SinkError::Redis(format!("Can't find lat column({}) in schema", lat_name))
224            })?;
225        let lon_col = schema
226            .names_str()
227            .iter()
228            .position(|name| name == &lon_name)
229            .ok_or_else(|| {
230                SinkError::Redis(format!("Can't find lon column({}) in schema", lon_name))
231            })?;
232        Ok(Self {
233            schema,
234            col_indices,
235            lat_col,
236            lon_col,
237        })
238    }
239
240    pub fn encode_cols(
241        &self,
242        row: impl Row,
243        _col_indices: impl Iterator<Item = usize>,
244    ) -> Result<TemplateEncoderOutput> {
245        let lat = into_string_from_scalar(
246            row.datum_at(self.lat_col)
247                .ok_or_else(|| SinkError::Redis("lat is null".to_owned()))?,
248        )?;
249        let lon = into_string_from_scalar(
250            row.datum_at(self.lon_col)
251                .ok_or_else(|| SinkError::Redis("lon is null".to_owned()))?,
252        )?;
253        Ok(TemplateEncoderOutput::RedisGeoValue((lat, lon)))
254    }
255}
256
257fn into_string_from_scalar(scalar: ScalarRefImpl<'_>) -> Result<String> {
258    match scalar {
259        ScalarRefImpl::Float32(ordered_float) => Ok(Into::<f32>::into(ordered_float).to_string()),
260        ScalarRefImpl::Float64(ordered_float) => Ok(Into::<f64>::into(ordered_float).to_string()),
261        ScalarRefImpl::Utf8(s) => Ok(s.to_owned()),
262        _ => Err(SinkError::Encode(
263            "Only f32 and f64 can convert to redis geo".to_owned(),
264        )),
265    }
266}
267
268pub struct TemplateRedisGeoKeyEncoder {
269    key_encoder: TemplateStringEncoder,
270    member_col: usize,
271}
272
273impl TemplateRedisGeoKeyEncoder {
274    pub fn new(
275        schema: Schema,
276        col_indices: Option<Vec<usize>>,
277        member_name: &str,
278        template: String,
279    ) -> Result<Self> {
280        let member_col = schema
281            .names_str()
282            .iter()
283            .position(|name| name == &member_name)
284            .ok_or_else(|| {
285                SinkError::Redis(format!(
286                    "Can't find member column({}) in schema",
287                    member_name
288                ))
289            })?;
290        let key_encoder = TemplateStringEncoder::new(schema, col_indices, template);
291        Ok(Self {
292            key_encoder,
293            member_col,
294        })
295    }
296
297    pub fn encode_cols(
298        &self,
299        row: impl Row,
300        col_indices: impl Iterator<Item = usize>,
301    ) -> Result<TemplateEncoderOutput> {
302        let member = row
303            .datum_at(self.member_col)
304            .ok_or_else(|| SinkError::Redis("member is null".to_owned()))?
305            .to_text();
306        let key = self.key_encoder.encode_cols(row, col_indices)?;
307        Ok(TemplateEncoderOutput::RedisGeoKey((key, member)))
308    }
309}
310
311pub enum TemplateRedisPubSubStreamKeyEncoderInner {
312    PubSubStreamName(String),
313    PubSubStreamColumnIndex(usize),
314}
315pub struct TemplateRedisPubSubStreamKeyEncoder {
316    inner: TemplateRedisPubSubStreamKeyEncoderInner,
317    schema: Schema,
318    col_indices: Option<Vec<usize>>,
319}
320
321impl TemplateRedisPubSubStreamKeyEncoder {
322    pub fn new(
323        schema: Schema,
324        col_indices: Option<Vec<usize>>,
325        channel: Option<String>,
326        channel_column: Option<String>,
327    ) -> Result<Self> {
328        if let Some(channel) = channel {
329            return Ok(Self {
330                inner: TemplateRedisPubSubStreamKeyEncoderInner::PubSubStreamName(channel),
331                schema,
332                col_indices,
333            });
334        }
335        if let Some(channel_column) = channel_column {
336            let channel_column_index = schema
337                .names_str()
338                .iter()
339                .position(|name| name == &channel_column)
340                .ok_or_else(|| {
341                    SinkError::Redis(format!(
342                        "Can't find pubsub column({}) in schema",
343                        channel_column
344                    ))
345                })?;
346            return Ok(Self {
347                inner: TemplateRedisPubSubStreamKeyEncoderInner::PubSubStreamColumnIndex(
348                    channel_column_index,
349                ),
350                schema,
351                col_indices,
352            });
353        }
354        Err(SinkError::Redis(
355            "`channel` or `channel_column` must be set".to_owned(),
356        ))
357    }
358
359    pub fn encode_cols(
360        &self,
361        row: impl Row,
362        _col_indices: impl Iterator<Item = usize>,
363    ) -> Result<TemplateEncoderOutput> {
364        match &self.inner {
365            TemplateRedisPubSubStreamKeyEncoderInner::PubSubStreamName(channel) => {
366                Ok(TemplateEncoderOutput::RedisPubSubStreamKey(channel.clone()))
367            }
368            TemplateRedisPubSubStreamKeyEncoderInner::PubSubStreamColumnIndex(pubsub_col) => {
369                let pubsub_key = row
370                    .datum_at(*pubsub_col)
371                    .ok_or_else(|| SinkError::Redis("pubsub_key is null".to_owned()))?
372                    .to_text();
373                Ok(TemplateEncoderOutput::RedisPubSubStreamKey(pubsub_key))
374            }
375        }
376    }
377}
378
379pub struct TemplateRedisStreamValueEncoder {
380    key_encoder: TemplateStringEncoder,
381    value_encoder: TemplateStringEncoder,
382}
383
384impl TemplateRedisStreamValueEncoder {
385    pub fn new(
386        schema: Schema,
387        col_indices: Option<Vec<usize>>,
388        key_template: String,
389        value_template: String,
390    ) -> Self {
391        let key_encoder =
392            TemplateStringEncoder::new(schema.clone(), col_indices.clone(), key_template);
393        let value_encoder = TemplateStringEncoder::new(schema, col_indices, value_template);
394        Self {
395            key_encoder,
396            value_encoder,
397        }
398    }
399
400    pub fn encode_cols(
401        &self,
402        row: impl Row,
403        col_indices: impl Iterator<Item = usize>,
404    ) -> Result<TemplateEncoderOutput> {
405        let col_indices: Vec<_> = col_indices.collect();
406        let key = self
407            .key_encoder
408            .encode_cols(&row, col_indices.clone().into_iter())?;
409        let value = self
410            .value_encoder
411            .encode_cols(row, col_indices.into_iter())?;
412        Ok(TemplateEncoderOutput::RedisStreamValue((key, value)))
413    }
414}
415
416pub enum TemplateEncoderOutput {
417    // String formatted according to the template
418    String(String),
419    // The value of redis's geospatial, including longitude and latitude
420    RedisGeoValue((String, String)),
421    // The key of redis's geospatial, including redis's key and member
422    RedisGeoKey((String, String)),
423
424    RedisPubSubStreamKey(String),
425    RedisStreamValue((String, String)),
426}
427
428impl TemplateEncoderOutput {
429    pub fn into_string(self) -> Result<String> {
430        match self {
431            TemplateEncoderOutput::String(s) => Ok(s),
432            TemplateEncoderOutput::RedisGeoKey(_) => Err(SinkError::Encode(
433                "RedisGeoKey can't convert to string".to_owned(),
434            )),
435            TemplateEncoderOutput::RedisGeoValue(_) => Err(SinkError::Encode(
436                "RedisGeoValue can't convert to string".to_owned(),
437            )),
438            TemplateEncoderOutput::RedisPubSubStreamKey(s) => Ok(s),
439            TemplateEncoderOutput::RedisStreamValue((_, _)) => Err(SinkError::Encode(
440                "RedisStreamValue can't convert to string".to_owned(),
441            )),
442        }
443    }
444}
445
446impl SerTo<String> for TemplateEncoderOutput {
447    fn ser_to(self) -> Result<String> {
448        match self {
449            TemplateEncoderOutput::String(s) => Ok(s),
450            TemplateEncoderOutput::RedisGeoKey(_) => Err(SinkError::Encode(
451                "RedisGeoKey can't convert to string".to_owned(),
452            )),
453            TemplateEncoderOutput::RedisGeoValue(_) => Err(SinkError::Encode(
454                "RedisGeoValue can't convert to string".to_owned(),
455            )),
456            TemplateEncoderOutput::RedisPubSubStreamKey(s) => Ok(s),
457            TemplateEncoderOutput::RedisStreamValue((_, _)) => Err(SinkError::Encode(
458                "RedisStreamValue can't convert to string".to_owned(),
459            )),
460        }
461    }
462}
463
464/// The enum of inputs to `RedisSinkPayloadWriter`
465#[derive(Debug)]
466pub enum RedisSinkPayloadWriterInput {
467    // Json and String will be convert to string
468    String(String),
469    // The value of redis's geospatial, including longitude and latitude
470    RedisGeoValue((String, String)),
471    // The key of redis's geospatial, including redis's key and member
472    RedisGeoKey((String, String)),
473    RedisPubSubStreamKey(String),
474    RedisStreamValue((String, String)),
475}
476
477impl SerTo<RedisSinkPayloadWriterInput> for TemplateEncoderOutput {
478    fn ser_to(self) -> Result<RedisSinkPayloadWriterInput> {
479        match self {
480            TemplateEncoderOutput::String(s) => Ok(RedisSinkPayloadWriterInput::String(s)),
481            TemplateEncoderOutput::RedisGeoKey((lat, lon)) => {
482                Ok(RedisSinkPayloadWriterInput::RedisGeoKey((lat, lon)))
483            }
484            TemplateEncoderOutput::RedisGeoValue((key, member)) => {
485                Ok(RedisSinkPayloadWriterInput::RedisGeoValue((key, member)))
486            }
487            TemplateEncoderOutput::RedisPubSubStreamKey(s) => {
488                Ok(RedisSinkPayloadWriterInput::RedisPubSubStreamKey(s))
489            }
490            TemplateEncoderOutput::RedisStreamValue((key, value)) => {
491                Ok(RedisSinkPayloadWriterInput::RedisStreamValue((key, value)))
492            }
493        }
494    }
495}
496
497impl<T: SerTo<Vec<u8>>> SerTo<RedisSinkPayloadWriterInput> for T {
498    default fn ser_to(self) -> Result<RedisSinkPayloadWriterInput> {
499        let bytes = self.ser_to()?;
500        Ok(RedisSinkPayloadWriterInput::String(
501            String::from_utf8(bytes).map_err(|e| SinkError::Redis(e.to_report_string()))?,
502        ))
503    }
504}
505
506#[cfg(test)]
507mod tests {
508    use risingwave_common::catalog::{Field, Schema};
509    use risingwave_common::row::OwnedRow;
510    use risingwave_common::types::{DataType, ScalarImpl};
511
512    use super::*;
513
514    #[test]
515    fn test_template_format_validation() {
516        // Create a schema with test columns
517        let schema = Schema::new(vec![
518            Field {
519                data_type: DataType::Int32,
520                name: "id".to_owned(),
521            },
522            Field {
523                data_type: DataType::Varchar,
524                name: "name".to_owned(),
525            },
526            Field {
527                data_type: DataType::Varchar,
528                name: "email".to_owned(),
529            },
530        ]);
531
532        // Create a map of column names to their data types
533        let mut map = HashMap::new();
534        for field in schema.fields() {
535            map.insert(field.name.clone(), field.data_type.clone());
536        }
537
538        // Test various template formats
539        let valid_templates = vec![
540            "user:{id}",
541            "user:\\{{id}",
542            "user:\\{{id}\\}",
543            "user:\\{{id},{name}\\}",
544            "user:\\{prefix{id},suffix{name}\\}",
545            "user:\\{prefix{id},suffix{name},email:{email}\\}",
546            "user:\\{nested\\{deeply{id}\\}\\}",
547            "user:\\{outer\\{inner{id}\\},another{name}\\}",
548            "user:\\{complex\\{structure\\{with{id}\\},and{name}\\},email:{email}\\}",
549            "user:{id}{name}",
550            "user:\\\\{id}",
551            "user:\\\\\\{id}",
552            "user:\\a{id}",
553            "user:\\b{name}",
554            "user:{id}{name}{email}",
555        ];
556
557        for template in valid_templates {
558            // Validate the template format
559            assert!(
560                TemplateStringEncoder::check_string_format(template, &map).is_ok(),
561                "Template '{}' should be valid",
562                template
563            );
564        }
565
566        // Test invalid templates
567        let invalid_templates = vec![
568            "user:no_braces",        // No braces
569            "user:{invalid_column}", // Non-existent column
570            "user:{id",              // Unclosed brace
571            "user:id}",              // Unopened brace
572            "sadsadsad{}qw4e2ewq21", // Empty braces
573            "user:{}",
574            "user:{\\id}",
575        ];
576
577        for template in invalid_templates {
578            // Validate the template format
579            assert!(
580                TemplateStringEncoder::check_string_format(template, &map).is_err(),
581                "Template '{}' should be invalid",
582                template
583            );
584        }
585    }
586
587    #[test]
588    fn test_template_encoding() {
589        // Create a schema with test columns
590        let schema = Schema::new(vec![
591            Field {
592                data_type: DataType::Int32,
593                name: "id".to_owned(),
594            },
595            Field {
596                data_type: DataType::Varchar,
597                name: "name".to_owned(),
598            },
599            Field {
600                data_type: DataType::Varchar,
601                name: "email".to_owned(),
602            },
603        ]);
604
605        // Test cases with different template formats
606        let test_cases = vec![
607            ("user:{id}", "user:123", vec![0]),
608            ("user:\\{id\\}", "user:{id}", vec![0]),
609            ("user:\\{id,name\\}", "user:{id,name}", vec![0, 1]),
610            (
611                "user:\\{prefix{id},suffix{name}\\}",
612                "user:{prefix123,suffixJohn Doe}",
613                vec![0, 1],
614            ),
615            (
616                "user:\\{nested\\{deeply{id}\\}\\}",
617                "user:{nested{deeply123}}",
618                vec![0],
619            ),
620            (
621                "user:\\{outer\\{inner{id}\\},another{name}\\}",
622                "user:{outer{inner123},anotherJohn Doe}",
623                vec![0, 1],
624            ),
625            ("user:{id}{name}", "user:123John Doe", vec![0, 1]),
626            ("user:\\{id\\}{name}", "user:{id}John Doe", vec![0, 1]),
627            ("user:\\\\{id}", "user:\\{id}", vec![0]),
628            ("user:\\\\\\{id}", "user:\\\\{id}", vec![0]),
629            ("user:\\a{id}", "user:\\a123", vec![0]),
630            ("user:\\b{name}", "user:\\bJohn Doe", vec![1]),
631            (
632                "user:{id}{name}{email}",
633                "user:123John Doejohn@example.com",
634                vec![0, 1, 2],
635            ),
636        ];
637
638        for (template, expected, col_indices) in test_cases {
639            // Create an encoder with the template
640            let encoder = TemplateStringEncoder::new(
641                schema.clone(),
642                Some(col_indices.clone()),
643                template.to_owned(),
644            );
645
646            // Create a test row
647            let row = OwnedRow::new(vec![
648                Some(ScalarImpl::Int32(123)),
649                Some(ScalarImpl::Utf8("John Doe".into())),
650                Some(ScalarImpl::Utf8("john@example.com".into())),
651            ]);
652
653            // Encode the row
654            let result = encoder.encode_cols(row, col_indices.into_iter()).unwrap();
655
656            // Check the result
657            assert_eq!(result, expected, "Template '{}' encoding failed", template);
658        }
659    }
660
661    #[test]
662    fn test_complex_nested_template() {
663        // Create a schema with test columns
664        let schema = Schema::new(vec![
665            Field {
666                data_type: DataType::Int32,
667                name: "id".to_owned(),
668            },
669            Field {
670                data_type: DataType::Varchar,
671                name: "name".to_owned(),
672            },
673            Field {
674                data_type: DataType::Varchar,
675                name: "email".to_owned(),
676            },
677        ]);
678
679        // Create a map of column names to their data types
680        let mut map = HashMap::new();
681        for field in schema.fields() {
682            map.insert(field.name.clone(), field.data_type.clone());
683        }
684
685        // Test a very complex nested template
686        let complex_template = "user:\\{prefix{id},suffix{name},email:{email},nested\\{deeply{id}\\},outer\\{inner{name}\\}\\}";
687
688        // Validate the template format
689        assert!(TemplateStringEncoder::check_string_format(complex_template, &map).is_ok());
690
691        // Create an encoder with the template
692        let encoder = TemplateStringEncoder::new(
693            schema,
694            Some(vec![0, 1, 2]), // Include all columns
695            complex_template.to_owned(),
696        );
697
698        // Create a test row
699        let row = OwnedRow::new(vec![
700            Some(ScalarImpl::Int32(123)),
701            Some(ScalarImpl::Utf8("John Doe".into())),
702            Some(ScalarImpl::Utf8("john@example.com".into())),
703        ]);
704
705        // Encode the row
706        let result = encoder.encode_cols(row, vec![0, 1, 2].into_iter()).unwrap();
707
708        // Check that all column values are in the result
709        assert_eq!(
710            result,
711            "user:{prefix123,suffixJohn Doe,email:john@example.com,nested{deeply123},outer{innerJohn Doe}}"
712        );
713    }
714}