risingwave_connector/sink/encoder/
template.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use regex::Regex;
18use risingwave_common::catalog::Schema;
19use risingwave_common::row::Row;
20use risingwave_common::types::{DataType, ScalarRefImpl, ToText};
21use thiserror_ext::AsReport;
22
23use super::{Result, RowEncoder};
24use crate::sink::SinkError;
25use crate::sink::encoder::SerTo;
26
27pub enum TemplateEncoder {
28    String(TemplateStringEncoder),
29    RedisGeoKey(TemplateRedisGeoKeyEncoder),
30    RedisGeoValue(TemplateRedisGeoValueEncoder),
31    RedisPubSubKey(TemplateRedisPubSubKeyEncoder),
32}
33impl TemplateEncoder {
34    pub fn new_string(schema: Schema, col_indices: Option<Vec<usize>>, template: String) -> Self {
35        TemplateEncoder::String(TemplateStringEncoder::new(schema, col_indices, template))
36    }
37
38    pub fn new_geo_value(
39        schema: Schema,
40        col_indices: Option<Vec<usize>>,
41        lat_name: &str,
42        lon_name: &str,
43    ) -> Result<Self> {
44        Ok(TemplateEncoder::RedisGeoValue(
45            TemplateRedisGeoValueEncoder::new(schema, col_indices, lat_name, lon_name)?,
46        ))
47    }
48
49    pub fn new_geo_key(
50        schema: Schema,
51        col_indices: Option<Vec<usize>>,
52        member_name: &str,
53        template: String,
54    ) -> Result<Self> {
55        Ok(TemplateEncoder::RedisGeoKey(
56            TemplateRedisGeoKeyEncoder::new(schema, col_indices, member_name, template)?,
57        ))
58    }
59
60    pub fn new_pubsub_key(
61        schema: Schema,
62        col_indices: Option<Vec<usize>>,
63        channel: Option<String>,
64        channel_column: Option<String>,
65    ) -> Result<Self> {
66        Ok(TemplateEncoder::RedisPubSubKey(
67            TemplateRedisPubSubKeyEncoder::new(schema, col_indices, channel, channel_column)?,
68        ))
69    }
70}
71impl RowEncoder for TemplateEncoder {
72    type Output = TemplateEncoderOutput;
73
74    fn schema(&self) -> &Schema {
75        match self {
76            TemplateEncoder::String(encoder) => &encoder.schema,
77            TemplateEncoder::RedisGeoValue(encoder) => &encoder.schema,
78            TemplateEncoder::RedisGeoKey(encoder) => &encoder.key_encoder.schema,
79            TemplateEncoder::RedisPubSubKey(encoder) => &encoder.schema,
80        }
81    }
82
83    fn col_indices(&self) -> Option<&[usize]> {
84        match self {
85            TemplateEncoder::String(encoder) => encoder.col_indices.as_deref(),
86            TemplateEncoder::RedisGeoValue(encoder) => encoder.col_indices.as_deref(),
87            TemplateEncoder::RedisGeoKey(encoder) => encoder.key_encoder.col_indices.as_deref(),
88            TemplateEncoder::RedisPubSubKey(encoder) => encoder.col_indices.as_deref(),
89        }
90    }
91
92    fn encode_cols(
93        &self,
94        row: impl Row,
95        col_indices: impl Iterator<Item = usize>,
96    ) -> Result<Self::Output> {
97        match self {
98            TemplateEncoder::String(encoder) => Ok(TemplateEncoderOutput::String(
99                encoder.encode_cols(row, col_indices)?,
100            )),
101            TemplateEncoder::RedisGeoValue(encoder) => encoder.encode_cols(row, col_indices),
102            TemplateEncoder::RedisGeoKey(encoder) => encoder.encode_cols(row, col_indices),
103            TemplateEncoder::RedisPubSubKey(encoder) => encoder.encode_cols(row, col_indices),
104        }
105    }
106}
107/// Encode a row according to a specified string template `user_id:{user_id}`.
108/// Data is encoded to string with [`ToText`].
109pub struct TemplateStringEncoder {
110    schema: Schema,
111    col_indices: Option<Vec<usize>>,
112    template: String,
113}
114
115/// todo! improve the performance.
116impl TemplateStringEncoder {
117    pub fn new(schema: Schema, col_indices: Option<Vec<usize>>, template: String) -> Self {
118        Self {
119            schema,
120            col_indices,
121            template,
122        }
123    }
124
125    pub fn check_string_format(format: &str, map: &HashMap<String, DataType>) -> Result<()> {
126        // We will check if the string inside {} corresponds to a column name in rw.
127        // In other words, the content within {} should exclusively consist of column names from rw,
128        // which means '{{column_name}}' or '{{column_name1},{column_name2}}' would be incorrect.
129        let re = Regex::new(r"\{([^}]*)\}").unwrap();
130        if !re.is_match(format) {
131            return Err(SinkError::Redis(
132                "Can't find {} in key_format or value_format".to_owned(),
133            ));
134        }
135        for capture in re.captures_iter(format) {
136            if let Some(inner_content) = capture.get(1)
137                && !map.contains_key(inner_content.as_str())
138            {
139                return Err(SinkError::Redis(format!(
140                    "Can't find field({:?}) in key_format or value_format",
141                    inner_content.as_str()
142                )));
143            }
144        }
145        Ok(())
146    }
147
148    pub fn encode_cols(
149        &self,
150        row: impl Row,
151        col_indices: impl Iterator<Item = usize>,
152    ) -> Result<String> {
153        let mut s = self.template.clone();
154
155        for idx in col_indices {
156            let field = &self.schema[idx];
157            let name = &field.name;
158            let data = row.datum_at(idx);
159            // TODO: timestamptz ToText also depends on TimeZone
160            s = s.replace(
161                &format!("{{{}}}", name),
162                &data.to_text_with_type(&field.data_type),
163            );
164        }
165        Ok(s)
166    }
167}
168
169pub struct TemplateRedisGeoValueEncoder {
170    schema: Schema,
171    col_indices: Option<Vec<usize>>,
172    lat_col: usize,
173    lon_col: usize,
174}
175
176impl TemplateRedisGeoValueEncoder {
177    pub fn new(
178        schema: Schema,
179        col_indices: Option<Vec<usize>>,
180        lat_name: &str,
181        lon_name: &str,
182    ) -> Result<Self> {
183        let lat_col = schema
184            .names_str()
185            .iter()
186            .position(|name| name == &lat_name)
187            .ok_or_else(|| {
188                SinkError::Redis(format!("Can't find lat column({}) in schema", lat_name))
189            })?;
190        let lon_col = schema
191            .names_str()
192            .iter()
193            .position(|name| name == &lon_name)
194            .ok_or_else(|| {
195                SinkError::Redis(format!("Can't find lon column({}) in schema", lon_name))
196            })?;
197        Ok(Self {
198            schema,
199            col_indices,
200            lat_col,
201            lon_col,
202        })
203    }
204
205    pub fn encode_cols(
206        &self,
207        row: impl Row,
208        _col_indices: impl Iterator<Item = usize>,
209    ) -> Result<TemplateEncoderOutput> {
210        let lat = into_string_from_scalar(
211            row.datum_at(self.lat_col)
212                .ok_or_else(|| SinkError::Redis("lat is null".to_owned()))?,
213        )?;
214        let lon = into_string_from_scalar(
215            row.datum_at(self.lon_col)
216                .ok_or_else(|| SinkError::Redis("lon is null".to_owned()))?,
217        )?;
218        Ok(TemplateEncoderOutput::RedisGeoValue((lat, lon)))
219    }
220}
221
222fn into_string_from_scalar(scalar: ScalarRefImpl<'_>) -> Result<String> {
223    match scalar {
224        ScalarRefImpl::Float32(ordered_float) => Ok(Into::<f32>::into(ordered_float).to_string()),
225        ScalarRefImpl::Float64(ordered_float) => Ok(Into::<f64>::into(ordered_float).to_string()),
226        ScalarRefImpl::Utf8(s) => Ok(s.to_owned()),
227        _ => Err(SinkError::Encode(
228            "Only f32 and f64 can convert to redis geo".to_owned(),
229        )),
230    }
231}
232
233pub struct TemplateRedisGeoKeyEncoder {
234    key_encoder: TemplateStringEncoder,
235    member_col: usize,
236}
237
238impl TemplateRedisGeoKeyEncoder {
239    pub fn new(
240        schema: Schema,
241        col_indices: Option<Vec<usize>>,
242        member_name: &str,
243        template: String,
244    ) -> Result<Self> {
245        let member_col = schema
246            .names_str()
247            .iter()
248            .position(|name| name == &member_name)
249            .ok_or_else(|| {
250                SinkError::Redis(format!(
251                    "Can't find member column({}) in schema",
252                    member_name
253                ))
254            })?;
255        let key_encoder = TemplateStringEncoder::new(schema, col_indices, template);
256        Ok(Self {
257            key_encoder,
258            member_col,
259        })
260    }
261
262    pub fn encode_cols(
263        &self,
264        row: impl Row,
265        col_indices: impl Iterator<Item = usize>,
266    ) -> Result<TemplateEncoderOutput> {
267        let member = row
268            .datum_at(self.member_col)
269            .ok_or_else(|| SinkError::Redis("member is null".to_owned()))?
270            .to_text()
271            .clone();
272        let key = self.key_encoder.encode_cols(row, col_indices)?;
273        Ok(TemplateEncoderOutput::RedisGeoKey((key, member)))
274    }
275}
276
277pub enum TemplateRedisPubSubKeyEncoderInner {
278    PubSubName(String),
279    PubSubColumnIndex(usize),
280}
281pub struct TemplateRedisPubSubKeyEncoder {
282    inner: TemplateRedisPubSubKeyEncoderInner,
283    schema: Schema,
284    col_indices: Option<Vec<usize>>,
285}
286
287impl TemplateRedisPubSubKeyEncoder {
288    pub fn new(
289        schema: Schema,
290        col_indices: Option<Vec<usize>>,
291        channel: Option<String>,
292        channel_column: Option<String>,
293    ) -> Result<Self> {
294        if let Some(channel) = channel {
295            return Ok(Self {
296                inner: TemplateRedisPubSubKeyEncoderInner::PubSubName(channel),
297                schema,
298                col_indices,
299            });
300        }
301        if let Some(channel_column) = channel_column {
302            let channel_column_index = schema
303                .names_str()
304                .iter()
305                .position(|name| name == &channel_column)
306                .ok_or_else(|| {
307                    SinkError::Redis(format!(
308                        "Can't find pubsub column({}) in schema",
309                        channel_column
310                    ))
311                })?;
312            return Ok(Self {
313                inner: TemplateRedisPubSubKeyEncoderInner::PubSubColumnIndex(channel_column_index),
314                schema,
315                col_indices,
316            });
317        }
318        Err(SinkError::Redis(
319            "`channel` or `channel_column` must be set".to_owned(),
320        ))
321    }
322
323    pub fn encode_cols(
324        &self,
325        row: impl Row,
326        _col_indices: impl Iterator<Item = usize>,
327    ) -> Result<TemplateEncoderOutput> {
328        match &self.inner {
329            TemplateRedisPubSubKeyEncoderInner::PubSubName(channel) => {
330                Ok(TemplateEncoderOutput::RedisPubSubKey(channel.clone()))
331            }
332            TemplateRedisPubSubKeyEncoderInner::PubSubColumnIndex(pubsub_col) => {
333                let pubsub_key = row
334                    .datum_at(*pubsub_col)
335                    .ok_or_else(|| SinkError::Redis("pubsub_key is null".to_owned()))?
336                    .to_text()
337                    .clone();
338                Ok(TemplateEncoderOutput::RedisPubSubKey(pubsub_key))
339            }
340        }
341    }
342}
343
344pub enum TemplateEncoderOutput {
345    // String formatted according to the template
346    String(String),
347    // The value of redis's geospatial, including longitude and latitude
348    RedisGeoValue((String, String)),
349    // The key of redis's geospatial, including redis's key and member
350    RedisGeoKey((String, String)),
351
352    RedisPubSubKey(String),
353}
354
355impl TemplateEncoderOutput {
356    pub fn into_string(self) -> Result<String> {
357        match self {
358            TemplateEncoderOutput::String(s) => Ok(s),
359            TemplateEncoderOutput::RedisGeoKey(_) => Err(SinkError::Encode(
360                "RedisGeoKey can't convert to string".to_owned(),
361            )),
362            TemplateEncoderOutput::RedisGeoValue(_) => Err(SinkError::Encode(
363                "RedisGeoVelue can't convert to string".to_owned(),
364            )),
365            TemplateEncoderOutput::RedisPubSubKey(s) => Ok(s),
366        }
367    }
368}
369
370impl SerTo<String> for TemplateEncoderOutput {
371    fn ser_to(self) -> Result<String> {
372        match self {
373            TemplateEncoderOutput::String(s) => Ok(s),
374            TemplateEncoderOutput::RedisGeoKey(_) => Err(SinkError::Encode(
375                "RedisGeoKey can't convert to string".to_owned(),
376            )),
377            TemplateEncoderOutput::RedisGeoValue(_) => Err(SinkError::Encode(
378                "RedisGeoVelue can't convert to string".to_owned(),
379            )),
380            TemplateEncoderOutput::RedisPubSubKey(s) => Ok(s),
381        }
382    }
383}
384
385/// The enum of inputs to `RedisSinkPayloadWriter`
386#[derive(Debug)]
387pub enum RedisSinkPayloadWriterInput {
388    // Json and String will be convert to string
389    String(String),
390    // The value of redis's geospatial, including longitude and latitude
391    RedisGeoValue((String, String)),
392    // The key of redis's geospatial, including redis's key and member
393    RedisGeoKey((String, String)),
394    RedisPubSubKey(String),
395}
396
397impl SerTo<RedisSinkPayloadWriterInput> for TemplateEncoderOutput {
398    fn ser_to(self) -> Result<RedisSinkPayloadWriterInput> {
399        match self {
400            TemplateEncoderOutput::String(s) => Ok(RedisSinkPayloadWriterInput::String(s)),
401            TemplateEncoderOutput::RedisGeoKey((lat, lon)) => {
402                Ok(RedisSinkPayloadWriterInput::RedisGeoKey((lat, lon)))
403            }
404            TemplateEncoderOutput::RedisGeoValue((key, member)) => {
405                Ok(RedisSinkPayloadWriterInput::RedisGeoValue((key, member)))
406            }
407            TemplateEncoderOutput::RedisPubSubKey(s) => {
408                Ok(RedisSinkPayloadWriterInput::RedisPubSubKey(s))
409            }
410        }
411    }
412}
413
414impl<T: SerTo<Vec<u8>>> SerTo<RedisSinkPayloadWriterInput> for T {
415    default fn ser_to(self) -> Result<RedisSinkPayloadWriterInput> {
416        let bytes = self.ser_to()?;
417        Ok(RedisSinkPayloadWriterInput::String(
418            String::from_utf8(bytes).map_err(|e| SinkError::Redis(e.to_report_string()))?,
419        ))
420    }
421}