1use std::collections::HashMap;
16
17use regex::Regex;
18use risingwave_common::catalog::Schema;
19use risingwave_common::row::Row;
20use risingwave_common::types::{DataType, ScalarRefImpl, ToText};
21use thiserror_ext::AsReport;
22
23use super::{Result, RowEncoder};
24use crate::sink::SinkError;
25use crate::sink::encoder::SerTo;
26
27pub enum TemplateEncoder {
28 String(TemplateStringEncoder),
29 RedisGeoKey(TemplateRedisGeoKeyEncoder),
30 RedisGeoValue(TemplateRedisGeoValueEncoder),
31 RedisPubSubKey(TemplateRedisPubSubKeyEncoder),
32}
33impl TemplateEncoder {
34 pub fn new_string(schema: Schema, col_indices: Option<Vec<usize>>, template: String) -> Self {
35 TemplateEncoder::String(TemplateStringEncoder::new(schema, col_indices, template))
36 }
37
38 pub fn new_geo_value(
39 schema: Schema,
40 col_indices: Option<Vec<usize>>,
41 lat_name: &str,
42 lon_name: &str,
43 ) -> Result<Self> {
44 Ok(TemplateEncoder::RedisGeoValue(
45 TemplateRedisGeoValueEncoder::new(schema, col_indices, lat_name, lon_name)?,
46 ))
47 }
48
49 pub fn new_geo_key(
50 schema: Schema,
51 col_indices: Option<Vec<usize>>,
52 member_name: &str,
53 template: String,
54 ) -> Result<Self> {
55 Ok(TemplateEncoder::RedisGeoKey(
56 TemplateRedisGeoKeyEncoder::new(schema, col_indices, member_name, template)?,
57 ))
58 }
59
60 pub fn new_pubsub_key(
61 schema: Schema,
62 col_indices: Option<Vec<usize>>,
63 channel: Option<String>,
64 channel_column: Option<String>,
65 ) -> Result<Self> {
66 Ok(TemplateEncoder::RedisPubSubKey(
67 TemplateRedisPubSubKeyEncoder::new(schema, col_indices, channel, channel_column)?,
68 ))
69 }
70}
71impl RowEncoder for TemplateEncoder {
72 type Output = TemplateEncoderOutput;
73
74 fn schema(&self) -> &Schema {
75 match self {
76 TemplateEncoder::String(encoder) => &encoder.schema,
77 TemplateEncoder::RedisGeoValue(encoder) => &encoder.schema,
78 TemplateEncoder::RedisGeoKey(encoder) => &encoder.key_encoder.schema,
79 TemplateEncoder::RedisPubSubKey(encoder) => &encoder.schema,
80 }
81 }
82
83 fn col_indices(&self) -> Option<&[usize]> {
84 match self {
85 TemplateEncoder::String(encoder) => encoder.col_indices.as_deref(),
86 TemplateEncoder::RedisGeoValue(encoder) => encoder.col_indices.as_deref(),
87 TemplateEncoder::RedisGeoKey(encoder) => encoder.key_encoder.col_indices.as_deref(),
88 TemplateEncoder::RedisPubSubKey(encoder) => encoder.col_indices.as_deref(),
89 }
90 }
91
92 fn encode_cols(
93 &self,
94 row: impl Row,
95 col_indices: impl Iterator<Item = usize>,
96 ) -> Result<Self::Output> {
97 match self {
98 TemplateEncoder::String(encoder) => Ok(TemplateEncoderOutput::String(
99 encoder.encode_cols(row, col_indices)?,
100 )),
101 TemplateEncoder::RedisGeoValue(encoder) => encoder.encode_cols(row, col_indices),
102 TemplateEncoder::RedisGeoKey(encoder) => encoder.encode_cols(row, col_indices),
103 TemplateEncoder::RedisPubSubKey(encoder) => encoder.encode_cols(row, col_indices),
104 }
105 }
106}
107pub struct TemplateStringEncoder {
110 schema: Schema,
111 col_indices: Option<Vec<usize>>,
112 template: String,
113}
114
115impl TemplateStringEncoder {
117 pub fn new(schema: Schema, col_indices: Option<Vec<usize>>, template: String) -> Self {
118 Self {
119 schema,
120 col_indices,
121 template,
122 }
123 }
124
125 pub fn check_string_format(format: &str, map: &HashMap<String, DataType>) -> Result<()> {
126 let re = Regex::new(r"\{([^}]*)\}").unwrap();
130 if !re.is_match(format) {
131 return Err(SinkError::Redis(
132 "Can't find {} in key_format or value_format".to_owned(),
133 ));
134 }
135 for capture in re.captures_iter(format) {
136 if let Some(inner_content) = capture.get(1)
137 && !map.contains_key(inner_content.as_str())
138 {
139 return Err(SinkError::Redis(format!(
140 "Can't find field({:?}) in key_format or value_format",
141 inner_content.as_str()
142 )));
143 }
144 }
145 Ok(())
146 }
147
148 pub fn encode_cols(
149 &self,
150 row: impl Row,
151 col_indices: impl Iterator<Item = usize>,
152 ) -> Result<String> {
153 let mut s = self.template.clone();
154
155 for idx in col_indices {
156 let field = &self.schema[idx];
157 let name = &field.name;
158 let data = row.datum_at(idx);
159 s = s.replace(
161 &format!("{{{}}}", name),
162 &data.to_text_with_type(&field.data_type),
163 );
164 }
165 Ok(s)
166 }
167}
168
169pub struct TemplateRedisGeoValueEncoder {
170 schema: Schema,
171 col_indices: Option<Vec<usize>>,
172 lat_col: usize,
173 lon_col: usize,
174}
175
176impl TemplateRedisGeoValueEncoder {
177 pub fn new(
178 schema: Schema,
179 col_indices: Option<Vec<usize>>,
180 lat_name: &str,
181 lon_name: &str,
182 ) -> Result<Self> {
183 let lat_col = schema
184 .names_str()
185 .iter()
186 .position(|name| name == &lat_name)
187 .ok_or_else(|| {
188 SinkError::Redis(format!("Can't find lat column({}) in schema", lat_name))
189 })?;
190 let lon_col = schema
191 .names_str()
192 .iter()
193 .position(|name| name == &lon_name)
194 .ok_or_else(|| {
195 SinkError::Redis(format!("Can't find lon column({}) in schema", lon_name))
196 })?;
197 Ok(Self {
198 schema,
199 col_indices,
200 lat_col,
201 lon_col,
202 })
203 }
204
205 pub fn encode_cols(
206 &self,
207 row: impl Row,
208 _col_indices: impl Iterator<Item = usize>,
209 ) -> Result<TemplateEncoderOutput> {
210 let lat = into_string_from_scalar(
211 row.datum_at(self.lat_col)
212 .ok_or_else(|| SinkError::Redis("lat is null".to_owned()))?,
213 )?;
214 let lon = into_string_from_scalar(
215 row.datum_at(self.lon_col)
216 .ok_or_else(|| SinkError::Redis("lon is null".to_owned()))?,
217 )?;
218 Ok(TemplateEncoderOutput::RedisGeoValue((lat, lon)))
219 }
220}
221
222fn into_string_from_scalar(scalar: ScalarRefImpl<'_>) -> Result<String> {
223 match scalar {
224 ScalarRefImpl::Float32(ordered_float) => Ok(Into::<f32>::into(ordered_float).to_string()),
225 ScalarRefImpl::Float64(ordered_float) => Ok(Into::<f64>::into(ordered_float).to_string()),
226 ScalarRefImpl::Utf8(s) => Ok(s.to_owned()),
227 _ => Err(SinkError::Encode(
228 "Only f32 and f64 can convert to redis geo".to_owned(),
229 )),
230 }
231}
232
233pub struct TemplateRedisGeoKeyEncoder {
234 key_encoder: TemplateStringEncoder,
235 member_col: usize,
236}
237
238impl TemplateRedisGeoKeyEncoder {
239 pub fn new(
240 schema: Schema,
241 col_indices: Option<Vec<usize>>,
242 member_name: &str,
243 template: String,
244 ) -> Result<Self> {
245 let member_col = schema
246 .names_str()
247 .iter()
248 .position(|name| name == &member_name)
249 .ok_or_else(|| {
250 SinkError::Redis(format!(
251 "Can't find member column({}) in schema",
252 member_name
253 ))
254 })?;
255 let key_encoder = TemplateStringEncoder::new(schema, col_indices, template);
256 Ok(Self {
257 key_encoder,
258 member_col,
259 })
260 }
261
262 pub fn encode_cols(
263 &self,
264 row: impl Row,
265 col_indices: impl Iterator<Item = usize>,
266 ) -> Result<TemplateEncoderOutput> {
267 let member = row
268 .datum_at(self.member_col)
269 .ok_or_else(|| SinkError::Redis("member is null".to_owned()))?
270 .to_text()
271 .clone();
272 let key = self.key_encoder.encode_cols(row, col_indices)?;
273 Ok(TemplateEncoderOutput::RedisGeoKey((key, member)))
274 }
275}
276
277pub enum TemplateRedisPubSubKeyEncoderInner {
278 PubSubName(String),
279 PubSubColumnIndex(usize),
280}
281pub struct TemplateRedisPubSubKeyEncoder {
282 inner: TemplateRedisPubSubKeyEncoderInner,
283 schema: Schema,
284 col_indices: Option<Vec<usize>>,
285}
286
287impl TemplateRedisPubSubKeyEncoder {
288 pub fn new(
289 schema: Schema,
290 col_indices: Option<Vec<usize>>,
291 channel: Option<String>,
292 channel_column: Option<String>,
293 ) -> Result<Self> {
294 if let Some(channel) = channel {
295 return Ok(Self {
296 inner: TemplateRedisPubSubKeyEncoderInner::PubSubName(channel),
297 schema,
298 col_indices,
299 });
300 }
301 if let Some(channel_column) = channel_column {
302 let channel_column_index = schema
303 .names_str()
304 .iter()
305 .position(|name| name == &channel_column)
306 .ok_or_else(|| {
307 SinkError::Redis(format!(
308 "Can't find pubsub column({}) in schema",
309 channel_column
310 ))
311 })?;
312 return Ok(Self {
313 inner: TemplateRedisPubSubKeyEncoderInner::PubSubColumnIndex(channel_column_index),
314 schema,
315 col_indices,
316 });
317 }
318 Err(SinkError::Redis(
319 "`channel` or `channel_column` must be set".to_owned(),
320 ))
321 }
322
323 pub fn encode_cols(
324 &self,
325 row: impl Row,
326 _col_indices: impl Iterator<Item = usize>,
327 ) -> Result<TemplateEncoderOutput> {
328 match &self.inner {
329 TemplateRedisPubSubKeyEncoderInner::PubSubName(channel) => {
330 Ok(TemplateEncoderOutput::RedisPubSubKey(channel.clone()))
331 }
332 TemplateRedisPubSubKeyEncoderInner::PubSubColumnIndex(pubsub_col) => {
333 let pubsub_key = row
334 .datum_at(*pubsub_col)
335 .ok_or_else(|| SinkError::Redis("pubsub_key is null".to_owned()))?
336 .to_text()
337 .clone();
338 Ok(TemplateEncoderOutput::RedisPubSubKey(pubsub_key))
339 }
340 }
341 }
342}
343
344pub enum TemplateEncoderOutput {
345 String(String),
347 RedisGeoValue((String, String)),
349 RedisGeoKey((String, String)),
351
352 RedisPubSubKey(String),
353}
354
355impl TemplateEncoderOutput {
356 pub fn into_string(self) -> Result<String> {
357 match self {
358 TemplateEncoderOutput::String(s) => Ok(s),
359 TemplateEncoderOutput::RedisGeoKey(_) => Err(SinkError::Encode(
360 "RedisGeoKey can't convert to string".to_owned(),
361 )),
362 TemplateEncoderOutput::RedisGeoValue(_) => Err(SinkError::Encode(
363 "RedisGeoVelue can't convert to string".to_owned(),
364 )),
365 TemplateEncoderOutput::RedisPubSubKey(s) => Ok(s),
366 }
367 }
368}
369
370impl SerTo<String> for TemplateEncoderOutput {
371 fn ser_to(self) -> Result<String> {
372 match self {
373 TemplateEncoderOutput::String(s) => Ok(s),
374 TemplateEncoderOutput::RedisGeoKey(_) => Err(SinkError::Encode(
375 "RedisGeoKey can't convert to string".to_owned(),
376 )),
377 TemplateEncoderOutput::RedisGeoValue(_) => Err(SinkError::Encode(
378 "RedisGeoVelue can't convert to string".to_owned(),
379 )),
380 TemplateEncoderOutput::RedisPubSubKey(s) => Ok(s),
381 }
382 }
383}
384
385#[derive(Debug)]
387pub enum RedisSinkPayloadWriterInput {
388 String(String),
390 RedisGeoValue((String, String)),
392 RedisGeoKey((String, String)),
394 RedisPubSubKey(String),
395}
396
397impl SerTo<RedisSinkPayloadWriterInput> for TemplateEncoderOutput {
398 fn ser_to(self) -> Result<RedisSinkPayloadWriterInput> {
399 match self {
400 TemplateEncoderOutput::String(s) => Ok(RedisSinkPayloadWriterInput::String(s)),
401 TemplateEncoderOutput::RedisGeoKey((lat, lon)) => {
402 Ok(RedisSinkPayloadWriterInput::RedisGeoKey((lat, lon)))
403 }
404 TemplateEncoderOutput::RedisGeoValue((key, member)) => {
405 Ok(RedisSinkPayloadWriterInput::RedisGeoValue((key, member)))
406 }
407 TemplateEncoderOutput::RedisPubSubKey(s) => {
408 Ok(RedisSinkPayloadWriterInput::RedisPubSubKey(s))
409 }
410 }
411 }
412}
413
414impl<T: SerTo<Vec<u8>>> SerTo<RedisSinkPayloadWriterInput> for T {
415 default fn ser_to(self) -> Result<RedisSinkPayloadWriterInput> {
416 let bytes = self.ser_to()?;
417 Ok(RedisSinkPayloadWriterInput::String(
418 String::from_utf8(bytes).map_err(|e| SinkError::Redis(e.to_report_string()))?,
419 ))
420 }
421}