risingwave_connector/sink/
redis.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use phf::phf_set;
20use redis::aio::MultiplexedConnection;
21use redis::cluster::{ClusterClient, ClusterConnection, ClusterPipeline};
22use redis::{Client as RedisClient, Pipeline};
23use risingwave_common::array::StreamChunk;
24use risingwave_common::catalog::Schema;
25use risingwave_common::types::DataType;
26use serde::Deserialize;
27use serde_json::Value;
28use serde_with::serde_as;
29use with_options::WithOptions;
30
31use super::catalog::SinkFormatDesc;
32use super::encoder::template::{RedisSinkPayloadWriterInput, TemplateStringEncoder};
33use super::formatter::SinkFormatterImpl;
34use super::writer::FormattedSink;
35use super::{SinkError, SinkParam};
36use crate::dispatch_sink_formatter_str_key_impl;
37use crate::enforce_secret::EnforceSecret;
38use crate::error::ConnectorResult;
39use crate::sink::log_store::DeliveryFutureManagerAddFuture;
40use crate::sink::writer::{
41    AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
42};
43use crate::sink::{Result, Sink, SinkWriterParam};
44
45pub const REDIS_SINK: &str = "redis";
46pub const KEY_FORMAT: &str = "key_format";
47pub const VALUE_FORMAT: &str = "value_format";
48pub const REDIS_VALUE_TYPE: &str = "redis_value_type";
49pub const REDIS_VALUE_TYPE_STRING: &str = "string";
50pub const REDIS_VALUE_TYPE_GEO: &str = "geospatial";
51pub const REDIS_VALUE_TYPE_PUBSUB: &str = "pubsub";
52pub const REDIS_VALUE_TYPE_STREAM: &str = "stream";
53pub const LON_NAME: &str = "longitude";
54pub const LAT_NAME: &str = "latitude";
55pub const MEMBER_NAME: &str = "member";
56pub const CHANNEL: &str = "channel";
57pub const CHANNEL_COLUMN: &str = "channel_column";
58pub const STREAM: &str = "stream";
59pub const STREAM_COLUMN: &str = "stream_column";
60
61#[derive(Deserialize, Debug, Clone, WithOptions)]
62pub struct RedisCommon {
63    #[serde(rename = "redis.url")]
64    pub url: String,
65}
66
67impl EnforceSecret for RedisCommon {
68    const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf_set! {
69        "redis.url"
70    };
71}
72
73pub enum RedisPipe {
74    Cluster(ClusterPipeline),
75    Single(Pipeline),
76}
77impl RedisPipe {
78    pub async fn query<T: redis::FromRedisValue>(
79        &self,
80        conn: &mut RedisConn,
81    ) -> ConnectorResult<T> {
82        match (self, conn) {
83            (RedisPipe::Cluster(pipe), RedisConn::Cluster(conn)) => Ok(pipe.query(conn)?),
84            (RedisPipe::Single(pipe), RedisConn::Single(conn)) => {
85                Ok(pipe.query_async(conn).await?)
86            }
87            _ => Err(SinkError::Redis("RedisPipe and RedisConn not match".to_owned()).into()),
88        }
89    }
90
91    pub fn clear(&mut self) {
92        match self {
93            RedisPipe::Cluster(pipe) => pipe.clear(),
94            RedisPipe::Single(pipe) => pipe.clear(),
95        }
96    }
97
98    pub fn set(
99        &mut self,
100        k: RedisSinkPayloadWriterInput,
101        v: RedisSinkPayloadWriterInput,
102    ) -> Result<()> {
103        match self {
104            RedisPipe::Cluster(pipe) => match (k, v) {
105                (
106                    RedisSinkPayloadWriterInput::String(k),
107                    RedisSinkPayloadWriterInput::String(v),
108                ) => {
109                    pipe.set(k, v);
110                }
111                (
112                    RedisSinkPayloadWriterInput::RedisGeoKey((key, member)),
113                    RedisSinkPayloadWriterInput::RedisGeoValue((lat, lon)),
114                ) => {
115                    pipe.geo_add(key, (lon, lat, member));
116                }
117                (
118                    RedisSinkPayloadWriterInput::RedisPubSubStreamKey(key),
119                    RedisSinkPayloadWriterInput::String(v),
120                ) => {
121                    pipe.publish(key, v);
122                }
123                (
124                    RedisSinkPayloadWriterInput::RedisPubSubStreamKey(key),
125                    RedisSinkPayloadWriterInput::RedisStreamValue((field, value)),
126                ) => {
127                    pipe.xadd(key, "*", &[(&field, &value)]);
128                }
129                _ => return Err(SinkError::Redis("RedisPipe set not match".to_owned())),
130            },
131            RedisPipe::Single(pipe) => match (k, v) {
132                (
133                    RedisSinkPayloadWriterInput::String(k),
134                    RedisSinkPayloadWriterInput::String(v),
135                ) => {
136                    pipe.set(k, v);
137                }
138                (
139                    RedisSinkPayloadWriterInput::RedisGeoKey((key, member)),
140                    RedisSinkPayloadWriterInput::RedisGeoValue((lat, lon)),
141                ) => {
142                    pipe.geo_add(key, (lon, lat, member));
143                }
144                (
145                    RedisSinkPayloadWriterInput::RedisPubSubStreamKey(key),
146                    RedisSinkPayloadWriterInput::String(v),
147                ) => {
148                    pipe.publish(key, v);
149                }
150                (
151                    RedisSinkPayloadWriterInput::RedisPubSubStreamKey(key),
152                    RedisSinkPayloadWriterInput::RedisStreamValue((field, value)),
153                ) => {
154                    pipe.xadd(key, "*", &[(&field, &value)]);
155                }
156                _ => return Err(SinkError::Redis("RedisPipe set not match".to_owned())),
157            },
158        };
159        Ok(())
160    }
161
162    pub fn del(&mut self, k: RedisSinkPayloadWriterInput) -> Result<()> {
163        match self {
164            RedisPipe::Cluster(pipe) => match k {
165                RedisSinkPayloadWriterInput::String(k) => {
166                    pipe.del(k);
167                }
168                RedisSinkPayloadWriterInput::RedisGeoKey((key, member)) => {
169                    pipe.zrem(key, member);
170                }
171                _ => return Err(SinkError::Redis("RedisPipe del not match".to_owned())),
172            },
173            RedisPipe::Single(pipe) => match k {
174                RedisSinkPayloadWriterInput::String(k) => {
175                    pipe.del(k);
176                }
177                RedisSinkPayloadWriterInput::RedisGeoKey((key, member)) => {
178                    pipe.zrem(key, member);
179                }
180                _ => return Err(SinkError::Redis("RedisPipe del not match".to_owned())),
181            },
182        };
183        Ok(())
184    }
185}
186pub enum RedisConn {
187    // Redis deployed as a cluster, clusters with only one node should also use this conn
188    Cluster(ClusterConnection),
189    // Redis is not deployed as a cluster
190    Single(MultiplexedConnection),
191}
192
193impl RedisCommon {
194    pub async fn build_conn_and_pipe(&self) -> ConnectorResult<(RedisConn, RedisPipe)> {
195        match serde_json::from_str(&self.url).map_err(|e| SinkError::Config(anyhow!(e))) {
196            Ok(v) => {
197                if let Value::Array(list) = v {
198                    let list = list
199                        .into_iter()
200                        .map(|s| {
201                            if let Value::String(s) = s {
202                                Ok(s)
203                            } else {
204                                Err(SinkError::Redis(
205                                    "redis.url must be array of string".to_owned(),
206                                )
207                                .into())
208                            }
209                        })
210                        .collect::<ConnectorResult<Vec<String>>>()?;
211
212                    let client = ClusterClient::new(list)?;
213                    Ok((
214                        RedisConn::Cluster(client.get_connection()?),
215                        RedisPipe::Cluster(redis::cluster::cluster_pipe()),
216                    ))
217                } else {
218                    Err(SinkError::Redis("redis.url must be array or string".to_owned()).into())
219                }
220            }
221            Err(_) => {
222                let client = RedisClient::open(self.url.clone())?;
223                Ok((
224                    RedisConn::Single(client.get_multiplexed_async_connection().await?),
225                    RedisPipe::Single(redis::pipe()),
226                ))
227            }
228        }
229    }
230}
231
232#[serde_as]
233#[derive(Clone, Debug, Deserialize, WithOptions)]
234pub struct RedisConfig {
235    #[serde(flatten)]
236    pub common: RedisCommon,
237}
238
239impl EnforceSecret for RedisConfig {
240    fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
241        for prop in prop_iter {
242            RedisCommon::enforce_one(prop)?;
243        }
244        Ok(())
245    }
246}
247
248impl RedisConfig {
249    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
250        let config =
251            serde_json::from_value::<RedisConfig>(serde_json::to_value(properties).unwrap())
252                .map_err(|e| SinkError::Config(anyhow!(e)))?;
253        Ok(config)
254    }
255}
256
257#[derive(Debug)]
258pub struct RedisSink {
259    config: RedisConfig,
260    schema: Schema,
261    pk_indices: Vec<usize>,
262    format_desc: SinkFormatDesc,
263    db_name: String,
264    sink_from_name: String,
265}
266
267impl EnforceSecret for RedisSink {
268    fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
269        for prop in prop_iter {
270            RedisConfig::enforce_one(prop)?;
271        }
272        Ok(())
273    }
274}
275
276#[async_trait]
277impl TryFrom<SinkParam> for RedisSink {
278    type Error = SinkError;
279
280    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
281        let Some(pk_indices) = param.downstream_pk.clone() else {
282            return Err(SinkError::Config(anyhow!(
283                "Redis Sink Primary Key must be specified."
284            )));
285        };
286        let config = RedisConfig::from_btreemap(param.properties.clone())?;
287        Ok(Self {
288            config,
289            schema: param.schema(),
290            pk_indices,
291            format_desc: param
292                .format_desc
293                .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
294            db_name: param.db_name,
295            sink_from_name: param.sink_from_name,
296        })
297    }
298}
299
300impl Sink for RedisSink {
301    type LogSinker = AsyncTruncateLogSinkerOf<RedisSinkWriter>;
302
303    const SINK_NAME: &'static str = "redis";
304
305    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
306        Ok(RedisSinkWriter::new(
307            self.config.clone(),
308            self.schema.clone(),
309            self.pk_indices.clone(),
310            &self.format_desc,
311            self.db_name.clone(),
312            self.sink_from_name.clone(),
313        )
314        .await?
315        .into_log_sinker(usize::MAX))
316    }
317
318    async fn validate(&self) -> Result<()> {
319        let all_map: HashMap<String, DataType> = self
320            .schema
321            .fields()
322            .iter()
323            .map(|f| (f.name.clone(), f.data_type.clone()))
324            .collect();
325        let pk_map: HashMap<String, DataType> = self
326            .schema
327            .fields()
328            .iter()
329            .enumerate()
330            .filter(|(k, _)| self.pk_indices.contains(k))
331            .map(|(_, v)| (v.name.clone(), v.data_type.clone()))
332            .collect();
333        if matches!(
334            self.format_desc.encode,
335            super::catalog::SinkEncode::Template
336        ) {
337            match self
338                .format_desc
339                .options
340                .get(REDIS_VALUE_TYPE)
341                .map(|s| s.as_str())
342            {
343                // if not set, default to string
344                Some(REDIS_VALUE_TYPE_STRING) | None => {
345                    let key_format = self.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
346                        SinkError::Config(anyhow!(
347                            "Cannot find '{KEY_FORMAT}', please set it or use JSON"
348                        ))
349                    })?;
350                    TemplateStringEncoder::check_string_format(key_format, &pk_map)?;
351                    let value_format =
352                        self.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
353                            SinkError::Config(anyhow!(
354                                "Cannot find `{VALUE_FORMAT}`, please set it or use JSON"
355                            ))
356                        })?;
357                    TemplateStringEncoder::check_string_format(value_format, &all_map)?;
358                }
359                Some(REDIS_VALUE_TYPE_GEO) => {
360                    let key_format = self.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
361                        SinkError::Config(anyhow!(
362                            "Cannot find '{KEY_FORMAT}', please set it or use JSON"
363                        ))
364                    })?;
365                    TemplateStringEncoder::check_string_format(key_format, &pk_map)?;
366
367                    let lon_name = self.format_desc.options.get(LON_NAME).ok_or_else(|| {
368                        SinkError::Config(anyhow!(
369                            "Cannot find `{LON_NAME}`, please set it or use JSON or set `{REDIS_VALUE_TYPE}` to `{REDIS_VALUE_TYPE_STRING}`"
370                        ))
371                    })?;
372                    let lat_name = self.format_desc.options.get(LAT_NAME).ok_or_else(|| {
373                        SinkError::Config(anyhow!(
374                            "Cannot find `{LAT_NAME}`, please set it or use JSON or set `{REDIS_VALUE_TYPE}` to `{REDIS_VALUE_TYPE_STRING}`"
375                        ))
376                    })?;
377                    let member_name = self.format_desc.options.get(MEMBER_NAME).ok_or_else(|| {
378                        SinkError::Config(anyhow!(
379                            "Cannot find `{MEMBER_NAME}`, please set it or use JSON or set `{REDIS_VALUE_TYPE}` to `{REDIS_VALUE_TYPE_STRING}`"
380                        ))
381                    })?;
382                    if let Some(lon_type) = all_map.get(lon_name)
383                        && (lon_type == &DataType::Float64
384                            || lon_type == &DataType::Float32
385                            || lon_type == &DataType::Varchar)
386                    {
387                        // do nothing
388                    } else {
389                        return Err(SinkError::Config(anyhow!(
390                            "`{LON_NAME}` must be set to `float64` or `float32` or `varchar`"
391                        )));
392                    }
393                    if let Some(lat_type) = all_map.get(lat_name)
394                        && (lat_type == &DataType::Float64
395                            || lat_type == &DataType::Float32
396                            || lat_type == &DataType::Varchar)
397                    {
398                        // do nothing
399                    } else {
400                        return Err(SinkError::Config(anyhow!(
401                            "`{LAT_NAME}` must be set to `float64` or `float32` or `varchar`"
402                        )));
403                    }
404                    if let Some(member_type) = pk_map.get(member_name)
405                        && member_type == &DataType::Varchar
406                    {
407                        // do nothing
408                    } else {
409                        return Err(SinkError::Config(anyhow!(
410                            "`{MEMBER_NAME}` must be set to `varchar` and `primary_key`"
411                        )));
412                    }
413                }
414                Some(REDIS_VALUE_TYPE_PUBSUB) => {
415                    let channel = self.format_desc.options.get(CHANNEL);
416                    let channel_column = self.format_desc.options.get(CHANNEL_COLUMN);
417                    if (channel.is_none() && channel_column.is_none())
418                        || (channel.is_some() && channel_column.is_some())
419                    {
420                        return Err(SinkError::Config(anyhow!(
421                            "`{CHANNEL}` and `{CHANNEL_COLUMN}` only one can be set"
422                        )));
423                    }
424
425                    if let Some(channel_column) = channel_column
426                        && let Some(channel_column_type) = all_map.get(channel_column)
427                        && (channel_column_type != &DataType::Varchar)
428                    {
429                        return Err(SinkError::Config(anyhow!(
430                            "`{CHANNEL_COLUMN}` must be set to `varchar`"
431                        )));
432                    }
433
434                    let value_format =
435                        self.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
436                            SinkError::Config(anyhow!("Cannot find `{VALUE_FORMAT}`"))
437                        })?;
438                    TemplateStringEncoder::check_string_format(value_format, &all_map)?;
439                }
440                Some(REDIS_VALUE_TYPE_STREAM) => {
441                    tracing::error!("test:for bug");
442                    risingwave_common::license::Feature::RedisSinkStream
443                        .check_available()
444                        .map_err(|e| anyhow::anyhow!(e))?;
445                    let stream = self.format_desc.options.get(STREAM);
446                    let stream_column = self.format_desc.options.get(STREAM_COLUMN);
447                    if (stream.is_none() && stream_column.is_none())
448                        || (stream.is_some() && stream_column.is_some())
449                    {
450                        return Err(SinkError::Config(anyhow!(
451                            "Please specific either `{STREAM}` or `{STREAM_COLUMN}`. They are mutually exclusive options."
452                        )));
453                    }
454
455                    if let Some(stream_column) = stream_column
456                        && let Some(stream_column_type) = all_map.get(stream_column)
457                        && (stream_column_type != &DataType::Varchar)
458                    {
459                        return Err(SinkError::Config(anyhow!(
460                            "`{STREAM_COLUMN}` must be set to `varchar`"
461                        )));
462                    }
463
464                    let value_format =
465                        self.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
466                            SinkError::Config(anyhow!("Cannot find `{VALUE_FORMAT}`"))
467                        })?;
468                    let key_format = self.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
469                        SinkError::Config(anyhow!(
470                            "Cannot find '{KEY_FORMAT}', please set it or use JSON"
471                        ))
472                    })?;
473                    TemplateStringEncoder::check_string_format(key_format, &pk_map)?;
474                    TemplateStringEncoder::check_string_format(value_format, &all_map)?;
475                }
476                _ => {
477                    return Err(SinkError::Config(anyhow!(
478                        "`{REDIS_VALUE_TYPE}` must be set to `{REDIS_VALUE_TYPE_STRING}` or `{REDIS_VALUE_TYPE_GEO}` or `{REDIS_VALUE_TYPE_PUBSUB}` or `{REDIS_VALUE_TYPE_STREAM}`"
479                    )));
480                }
481            }
482        }
483        self.config.common.build_conn_and_pipe().await?;
484        Ok(())
485    }
486}
487
488pub struct RedisSinkWriter {
489    #[expect(dead_code)]
490    epoch: u64,
491    #[expect(dead_code)]
492    schema: Schema,
493    #[expect(dead_code)]
494    pk_indices: Vec<usize>,
495    formatter: SinkFormatterImpl,
496    payload_writer: RedisSinkPayloadWriter,
497}
498
499struct RedisSinkPayloadWriter {
500    // connection to redis, one per executor
501    conn: Option<RedisConn>,
502    // the command pipeline for write-commit
503    pipe: RedisPipe,
504}
505
506impl RedisSinkPayloadWriter {
507    pub async fn new(config: RedisConfig) -> Result<Self> {
508        let (conn, pipe) = config.common.build_conn_and_pipe().await?;
509        let conn = Some(conn);
510
511        Ok(Self { conn, pipe })
512    }
513
514    #[cfg(test)]
515    pub fn mock() -> Self {
516        let conn = None;
517        let pipe = RedisPipe::Single(redis::pipe());
518        Self { conn, pipe }
519    }
520
521    pub async fn commit(&mut self) -> Result<()> {
522        #[cfg(test)]
523        {
524            if self.conn.is_none() {
525                return Ok(());
526            }
527        }
528        self.pipe.query::<()>(self.conn.as_mut().unwrap()).await?;
529        self.pipe.clear();
530        Ok(())
531    }
532}
533
534impl FormattedSink for RedisSinkPayloadWriter {
535    type K = RedisSinkPayloadWriterInput;
536    type V = RedisSinkPayloadWriterInput;
537
538    async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
539        let k = k.ok_or_else(|| SinkError::Redis("The redis key cannot be null".to_owned()))?;
540        match v {
541            Some(v) => self.pipe.set(k, v)?,
542            None => self.pipe.del(k)?,
543        };
544        Ok(())
545    }
546}
547
548impl RedisSinkWriter {
549    pub async fn new(
550        config: RedisConfig,
551        schema: Schema,
552        pk_indices: Vec<usize>,
553        format_desc: &SinkFormatDesc,
554        db_name: String,
555        sink_from_name: String,
556    ) -> Result<Self> {
557        let payload_writer = RedisSinkPayloadWriter::new(config.clone()).await?;
558        let formatter = SinkFormatterImpl::new(
559            format_desc,
560            schema.clone(),
561            pk_indices.clone(),
562            db_name,
563            sink_from_name,
564            "NO_TOPIC",
565        )
566        .await?;
567
568        Ok(Self {
569            schema,
570            pk_indices,
571            epoch: 0,
572            formatter,
573            payload_writer,
574        })
575    }
576
577    #[cfg(test)]
578    pub async fn mock(
579        schema: Schema,
580        pk_indices: Vec<usize>,
581        format_desc: &SinkFormatDesc,
582    ) -> Result<Self> {
583        let formatter = SinkFormatterImpl::new(
584            format_desc,
585            schema.clone(),
586            pk_indices.clone(),
587            "d1".to_owned(),
588            "t1".to_owned(),
589            "NO_TOPIC",
590        )
591        .await?;
592        Ok(Self {
593            schema,
594            pk_indices,
595            epoch: 0,
596            formatter,
597            payload_writer: RedisSinkPayloadWriter::mock(),
598        })
599    }
600}
601
602impl AsyncTruncateSinkWriter for RedisSinkWriter {
603    async fn write_chunk<'a>(
604        &'a mut self,
605        chunk: StreamChunk,
606        _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
607    ) -> Result<()> {
608        dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, {
609            self.payload_writer.write_chunk(chunk, formatter).await?;
610            self.payload_writer.commit().await
611        })
612    }
613}
614
615#[cfg(test)]
616mod test {
617    use core::panic;
618
619    use rdkafka::message::FromBytes;
620    use risingwave_common::array::{Array, I32Array, Op, Utf8Array};
621    use risingwave_common::catalog::Field;
622    use risingwave_common::types::DataType;
623    use risingwave_common::util::iter_util::ZipEqDebug;
624
625    use super::*;
626    use crate::sink::catalog::{SinkEncode, SinkFormat};
627    use crate::sink::log_store::DeliveryFutureManager;
628
629    #[tokio::test]
630    async fn test_write() {
631        let schema = Schema::new(vec![
632            Field {
633                data_type: DataType::Int32,
634                name: "id".to_owned(),
635            },
636            Field {
637                data_type: DataType::Varchar,
638                name: "name".to_owned(),
639            },
640        ]);
641
642        let format_desc = SinkFormatDesc {
643            format: SinkFormat::AppendOnly,
644            encode: SinkEncode::Json,
645            options: BTreeMap::default(),
646            secret_refs: BTreeMap::default(),
647            key_encode: None,
648            connection_id: None,
649        };
650
651        let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc)
652            .await
653            .unwrap();
654
655        let chunk_a = StreamChunk::new(
656            vec![Op::Insert, Op::Insert, Op::Insert],
657            vec![
658                I32Array::from_iter(vec![1, 2, 3]).into_ref(),
659                Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
660            ],
661        );
662
663        let mut manager = DeliveryFutureManager::new(0);
664
665        redis_sink_writer
666            .write_chunk(chunk_a, manager.start_write_chunk(0, 0))
667            .await
668            .expect("failed to write batch");
669        let expected_a = vec![
670            (
671                0,
672                "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":1}\r\n$23\r\n{\"id\":1,\"name\":\"Alice\"}\r\n",
673            ),
674            (
675                1,
676                "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":2}\r\n$21\r\n{\"id\":2,\"name\":\"Bob\"}\r\n",
677            ),
678            (
679                2,
680                "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":3}\r\n$23\r\n{\"id\":3,\"name\":\"Clare\"}\r\n",
681            ),
682        ];
683
684        if let RedisPipe::Single(pipe) = &redis_sink_writer.payload_writer.pipe {
685            pipe.cmd_iter()
686                .enumerate()
687                .zip_eq_debug(expected_a.clone())
688                .for_each(|((i, cmd), (exp_i, exp_cmd))| {
689                    if exp_i == i {
690                        assert_eq!(exp_cmd, str::from_bytes(&cmd.get_packed_command()).unwrap())
691                    }
692                });
693        } else {
694            panic!("pipe type not match")
695        }
696    }
697
698    #[tokio::test]
699    async fn test_format_write() {
700        let schema = Schema::new(vec![
701            Field {
702                data_type: DataType::Int32,
703                name: "id".to_owned(),
704            },
705            Field {
706                data_type: DataType::Varchar,
707                name: "name".to_owned(),
708            },
709        ]);
710
711        let mut btree_map = BTreeMap::default();
712        btree_map.insert(KEY_FORMAT.to_owned(), "key-{id}".to_owned());
713        btree_map.insert(
714            VALUE_FORMAT.to_owned(),
715            "values:\\{id:{id},name:{name}\\}".to_owned(),
716        );
717        let format_desc = SinkFormatDesc {
718            format: SinkFormat::AppendOnly,
719            encode: SinkEncode::Template,
720            options: btree_map,
721            secret_refs: Default::default(),
722            key_encode: None,
723            connection_id: None,
724        };
725
726        let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc)
727            .await
728            .unwrap();
729
730        let mut future_manager = DeliveryFutureManager::new(0);
731
732        let chunk_a = StreamChunk::new(
733            vec![Op::Insert, Op::Insert, Op::Insert],
734            vec![
735                I32Array::from_iter(vec![1, 2, 3]).into_ref(),
736                Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
737            ],
738        );
739
740        redis_sink_writer
741            .write_chunk(chunk_a, future_manager.start_write_chunk(0, 0))
742            .await
743            .expect("failed to write batch");
744        let expected_a = vec![
745            (
746                0,
747                "*3\r\n$3\r\nSET\r\n$5\r\nkey-1\r\n$24\r\nvalues:{id:1,name:Alice}\r\n",
748            ),
749            (
750                1,
751                "*3\r\n$3\r\nSET\r\n$5\r\nkey-2\r\n$22\r\nvalues:{id:2,name:Bob}\r\n",
752            ),
753            (
754                2,
755                "*3\r\n$3\r\nSET\r\n$5\r\nkey-3\r\n$24\r\nvalues:{id:3,name:Clare}\r\n",
756            ),
757        ];
758
759        if let RedisPipe::Single(pipe) = &redis_sink_writer.payload_writer.pipe {
760            pipe.cmd_iter()
761                .enumerate()
762                .zip_eq_debug(expected_a.clone())
763                .for_each(|((i, cmd), (exp_i, exp_cmd))| {
764                    if exp_i == i {
765                        assert_eq!(exp_cmd, str::from_bytes(&cmd.get_packed_command()).unwrap())
766                    }
767                });
768        } else {
769            panic!("pipe type not match")
770        };
771    }
772}