risingwave_connector/sink/
redis.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use redis::aio::MultiplexedConnection;
20use redis::cluster::{ClusterClient, ClusterConnection, ClusterPipeline};
21use redis::{Client as RedisClient, Pipeline};
22use risingwave_common::array::StreamChunk;
23use risingwave_common::catalog::Schema;
24use risingwave_common::types::DataType;
25use serde_derive::Deserialize;
26use serde_json::Value;
27use serde_with::serde_as;
28use with_options::WithOptions;
29
30use super::catalog::SinkFormatDesc;
31use super::encoder::template::{RedisSinkPayloadWriterInput, TemplateStringEncoder};
32use super::formatter::SinkFormatterImpl;
33use super::writer::FormattedSink;
34use super::{SinkError, SinkParam};
35use crate::dispatch_sink_formatter_str_key_impl;
36use crate::error::ConnectorResult;
37use crate::sink::log_store::DeliveryFutureManagerAddFuture;
38use crate::sink::writer::{
39    AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
40};
41use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriterParam};
42
43pub const REDIS_SINK: &str = "redis";
44pub const KEY_FORMAT: &str = "key_format";
45pub const VALUE_FORMAT: &str = "value_format";
46pub const REDIS_VALUE_TYPE: &str = "redis_value_type";
47pub const REDIS_VALUE_TYPE_STRING: &str = "string";
48pub const REDIS_VALUE_TYPE_GEO: &str = "geospatial";
49pub const REDIS_VALUE_TYPE_PUBSUB: &str = "pubsub";
50pub const LON_NAME: &str = "longitude";
51pub const LAT_NAME: &str = "latitude";
52pub const MEMBER_NAME: &str = "member";
53pub const CHANNEL: &str = "channel";
54pub const CHANNEL_COLUMN: &str = "channel_column";
55
56#[derive(Deserialize, Debug, Clone, WithOptions)]
57pub struct RedisCommon {
58    #[serde(rename = "redis.url")]
59    pub url: String,
60}
61
62pub enum RedisPipe {
63    Cluster(ClusterPipeline),
64    Single(Pipeline),
65}
66impl RedisPipe {
67    pub async fn query<T: redis::FromRedisValue>(
68        &self,
69        conn: &mut RedisConn,
70    ) -> ConnectorResult<T> {
71        match (self, conn) {
72            (RedisPipe::Cluster(pipe), RedisConn::Cluster(conn)) => Ok(pipe.query(conn)?),
73            (RedisPipe::Single(pipe), RedisConn::Single(conn)) => {
74                Ok(pipe.query_async(conn).await?)
75            }
76            _ => Err(SinkError::Redis("RedisPipe and RedisConn not match".to_owned()).into()),
77        }
78    }
79
80    pub fn clear(&mut self) {
81        match self {
82            RedisPipe::Cluster(pipe) => pipe.clear(),
83            RedisPipe::Single(pipe) => pipe.clear(),
84        }
85    }
86
87    pub fn set(
88        &mut self,
89        k: RedisSinkPayloadWriterInput,
90        v: RedisSinkPayloadWriterInput,
91    ) -> Result<()> {
92        match self {
93            RedisPipe::Cluster(pipe) => match (k, v) {
94                (
95                    RedisSinkPayloadWriterInput::String(k),
96                    RedisSinkPayloadWriterInput::String(v),
97                ) => {
98                    pipe.set(k, v);
99                }
100                (
101                    RedisSinkPayloadWriterInput::RedisGeoKey((key, member)),
102                    RedisSinkPayloadWriterInput::RedisGeoValue((lat, lon)),
103                ) => {
104                    pipe.geo_add(key, (lon, lat, member));
105                }
106                (
107                    RedisSinkPayloadWriterInput::RedisPubSubKey(key),
108                    RedisSinkPayloadWriterInput::String(v),
109                ) => {
110                    pipe.publish(key, v);
111                }
112                _ => return Err(SinkError::Redis("RedisPipe set not match".to_owned())),
113            },
114            RedisPipe::Single(pipe) => match (k, v) {
115                (
116                    RedisSinkPayloadWriterInput::String(k),
117                    RedisSinkPayloadWriterInput::String(v),
118                ) => {
119                    pipe.set(k, v);
120                }
121                (
122                    RedisSinkPayloadWriterInput::RedisGeoKey((key, member)),
123                    RedisSinkPayloadWriterInput::RedisGeoValue((lat, lon)),
124                ) => {
125                    pipe.geo_add(key, (lon, lat, member));
126                }
127                (
128                    RedisSinkPayloadWriterInput::RedisPubSubKey(key),
129                    RedisSinkPayloadWriterInput::String(v),
130                ) => {
131                    pipe.publish(key, v);
132                }
133                _ => return Err(SinkError::Redis("RedisPipe set not match".to_owned())),
134            },
135        };
136        Ok(())
137    }
138
139    pub fn del(&mut self, k: RedisSinkPayloadWriterInput) -> Result<()> {
140        match self {
141            RedisPipe::Cluster(pipe) => match k {
142                RedisSinkPayloadWriterInput::String(k) => {
143                    pipe.del(k);
144                }
145                RedisSinkPayloadWriterInput::RedisGeoKey((key, member)) => {
146                    pipe.zrem(key, member);
147                }
148                _ => return Err(SinkError::Redis("RedisPipe del not match".to_owned())),
149            },
150            RedisPipe::Single(pipe) => match k {
151                RedisSinkPayloadWriterInput::String(k) => {
152                    pipe.del(k);
153                }
154                RedisSinkPayloadWriterInput::RedisGeoKey((key, member)) => {
155                    pipe.zrem(key, member);
156                }
157                _ => return Err(SinkError::Redis("RedisPipe del not match".to_owned())),
158            },
159        };
160        Ok(())
161    }
162}
163pub enum RedisConn {
164    // Redis deployed as a cluster, clusters with only one node should also use this conn
165    Cluster(ClusterConnection),
166    // Redis is not deployed as a cluster
167    Single(MultiplexedConnection),
168}
169
170impl RedisCommon {
171    pub async fn build_conn_and_pipe(&self) -> ConnectorResult<(RedisConn, RedisPipe)> {
172        match serde_json::from_str(&self.url).map_err(|e| SinkError::Config(anyhow!(e))) {
173            Ok(v) => {
174                if let Value::Array(list) = v {
175                    let list = list
176                        .into_iter()
177                        .map(|s| {
178                            if let Value::String(s) = s {
179                                Ok(s)
180                            } else {
181                                Err(SinkError::Redis(
182                                    "redis.url must be array of string".to_owned(),
183                                )
184                                .into())
185                            }
186                        })
187                        .collect::<ConnectorResult<Vec<String>>>()?;
188
189                    let client = ClusterClient::new(list)?;
190                    Ok((
191                        RedisConn::Cluster(client.get_connection()?),
192                        RedisPipe::Cluster(redis::cluster::cluster_pipe()),
193                    ))
194                } else {
195                    Err(SinkError::Redis("redis.url must be array or string".to_owned()).into())
196                }
197            }
198            Err(_) => {
199                let client = RedisClient::open(self.url.clone())?;
200                Ok((
201                    RedisConn::Single(client.get_multiplexed_async_connection().await?),
202                    RedisPipe::Single(redis::pipe()),
203                ))
204            }
205        }
206    }
207}
208
209#[serde_as]
210#[derive(Clone, Debug, Deserialize, WithOptions)]
211pub struct RedisConfig {
212    #[serde(flatten)]
213    pub common: RedisCommon,
214}
215
216impl RedisConfig {
217    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
218        let config =
219            serde_json::from_value::<RedisConfig>(serde_json::to_value(properties).unwrap())
220                .map_err(|e| SinkError::Config(anyhow!(e)))?;
221        Ok(config)
222    }
223}
224
225#[derive(Debug)]
226pub struct RedisSink {
227    config: RedisConfig,
228    schema: Schema,
229    pk_indices: Vec<usize>,
230    format_desc: SinkFormatDesc,
231    db_name: String,
232    sink_from_name: String,
233}
234
235#[async_trait]
236impl TryFrom<SinkParam> for RedisSink {
237    type Error = SinkError;
238
239    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
240        if param.downstream_pk.is_empty() {
241            return Err(SinkError::Config(anyhow!(
242                "Redis Sink Primary Key must be specified."
243            )));
244        }
245        let config = RedisConfig::from_btreemap(param.properties.clone())?;
246        Ok(Self {
247            config,
248            schema: param.schema(),
249            pk_indices: param.downstream_pk,
250            format_desc: param
251                .format_desc
252                .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
253            db_name: param.db_name,
254            sink_from_name: param.sink_from_name,
255        })
256    }
257}
258
259impl Sink for RedisSink {
260    type Coordinator = DummySinkCommitCoordinator;
261    type LogSinker = AsyncTruncateLogSinkerOf<RedisSinkWriter>;
262
263    const SINK_NAME: &'static str = "redis";
264
265    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
266        Ok(RedisSinkWriter::new(
267            self.config.clone(),
268            self.schema.clone(),
269            self.pk_indices.clone(),
270            &self.format_desc,
271            self.db_name.clone(),
272            self.sink_from_name.clone(),
273        )
274        .await?
275        .into_log_sinker(usize::MAX))
276    }
277
278    async fn validate(&self) -> Result<()> {
279        self.config.common.build_conn_and_pipe().await?;
280        let all_map: HashMap<String, DataType> = self
281            .schema
282            .fields()
283            .iter()
284            .map(|f| (f.name.clone(), f.data_type.clone()))
285            .collect();
286        let pk_map: HashMap<String, DataType> = self
287            .schema
288            .fields()
289            .iter()
290            .enumerate()
291            .filter(|(k, _)| self.pk_indices.contains(k))
292            .map(|(_, v)| (v.name.clone(), v.data_type.clone()))
293            .collect();
294        if matches!(
295            self.format_desc.encode,
296            super::catalog::SinkEncode::Template
297        ) {
298            match self
299                .format_desc
300                .options
301                .get(REDIS_VALUE_TYPE)
302                .map(|s| s.as_str())
303            {
304                // if not set, default to string
305                Some(REDIS_VALUE_TYPE_STRING) | None => {
306                    let key_format = self.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
307                        SinkError::Config(anyhow!(
308                            "Cannot find '{KEY_FORMAT}', please set it or use JSON"
309                        ))
310                    })?;
311                    TemplateStringEncoder::check_string_format(key_format, &pk_map)?;
312                    let value_format =
313                        self.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
314                            SinkError::Config(anyhow!(
315                                "Cannot find `{VALUE_FORMAT}`, please set it or use JSON"
316                            ))
317                        })?;
318                    TemplateStringEncoder::check_string_format(value_format, &all_map)?;
319                }
320                Some(REDIS_VALUE_TYPE_GEO) => {
321                    let key_format = self.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
322                        SinkError::Config(anyhow!(
323                            "Cannot find '{KEY_FORMAT}', please set it or use JSON"
324                        ))
325                    })?;
326                    TemplateStringEncoder::check_string_format(key_format, &pk_map)?;
327
328                    let lon_name = self.format_desc.options.get(LON_NAME).ok_or_else(|| {
329                        SinkError::Config(anyhow!(
330                            "Cannot find `{LON_NAME}`, please set it or use JSON or set `{REDIS_VALUE_TYPE}` to `{REDIS_VALUE_TYPE_STRING}`"
331                        ))
332                    })?;
333                    let lat_name = self.format_desc.options.get(LAT_NAME).ok_or_else(|| {
334                        SinkError::Config(anyhow!(
335                            "Cannot find `{LAT_NAME}`, please set it or use JSON or set `{REDIS_VALUE_TYPE}` to `{REDIS_VALUE_TYPE_STRING}`"
336                        ))
337                    })?;
338                    let member_name = self.format_desc.options.get(MEMBER_NAME).ok_or_else(|| {
339                        SinkError::Config(anyhow!(
340                            "Cannot find `{MEMBER_NAME}`, please set it or use JSON or set `{REDIS_VALUE_TYPE}` to `{REDIS_VALUE_TYPE_STRING}`"
341                        ))
342                    })?;
343                    if let Some(lon_type) = all_map.get(lon_name)
344                        && (lon_type == &DataType::Float64
345                            || lon_type == &DataType::Float32
346                            || lon_type == &DataType::Varchar)
347                    {
348                        // do nothing
349                    } else {
350                        return Err(SinkError::Config(anyhow!(
351                            "`{LON_NAME}` must be set to `float64` or `float32` or `varchar`"
352                        )));
353                    }
354                    if let Some(lat_type) = all_map.get(lat_name)
355                        && (lat_type == &DataType::Float64
356                            || lat_type == &DataType::Float32
357                            || lat_type == &DataType::Varchar)
358                    {
359                        // do nothing
360                    } else {
361                        return Err(SinkError::Config(anyhow!(
362                            "`{LAT_NAME}` must be set to `float64` or `float32` or `varchar`"
363                        )));
364                    }
365                    if let Some(member_type) = pk_map.get(member_name)
366                        && member_type == &DataType::Varchar
367                    {
368                        // do nothing
369                    } else {
370                        return Err(SinkError::Config(anyhow!(
371                            "`{MEMBER_NAME}` must be set to `varchar` and `primary_key`"
372                        )));
373                    }
374                }
375                Some(REDIS_VALUE_TYPE_PUBSUB) => {
376                    let channel = self.format_desc.options.get(CHANNEL);
377                    let channel_column = self.format_desc.options.get(CHANNEL_COLUMN);
378                    if (channel.is_none() && channel_column.is_none())
379                        || (channel.is_some() && channel_column.is_some())
380                    {
381                        return Err(SinkError::Config(anyhow!(
382                            "`{CHANNEL}` and `{CHANNEL_COLUMN}` only one can be set"
383                        )));
384                    }
385
386                    if let Some(channel_column) = channel_column
387                        && let Some(channel_column_type) = all_map.get(channel_column)
388                        && (channel_column_type != &DataType::Varchar)
389                    {
390                        return Err(SinkError::Config(anyhow!(
391                            "`{CHANNEL_COLUMN}` must be set to `varchar`"
392                        )));
393                    }
394
395                    let value_format =
396                        self.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
397                            SinkError::Config(anyhow!("Cannot find `{VALUE_FORMAT}`"))
398                        })?;
399                    TemplateStringEncoder::check_string_format(value_format, &all_map)?;
400                }
401                _ => {
402                    return Err(SinkError::Config(anyhow!(
403                        "`{REDIS_VALUE_TYPE}` must be set to `{REDIS_VALUE_TYPE_STRING}` or `{REDIS_VALUE_TYPE_GEO}` or `{REDIS_VALUE_TYPE_PUBSUB}`"
404                    )));
405                }
406            }
407        }
408        Ok(())
409    }
410}
411
412pub struct RedisSinkWriter {
413    #[expect(dead_code)]
414    epoch: u64,
415    #[expect(dead_code)]
416    schema: Schema,
417    #[expect(dead_code)]
418    pk_indices: Vec<usize>,
419    formatter: SinkFormatterImpl,
420    payload_writer: RedisSinkPayloadWriter,
421}
422
423struct RedisSinkPayloadWriter {
424    // connection to redis, one per executor
425    conn: Option<RedisConn>,
426    // the command pipeline for write-commit
427    pipe: RedisPipe,
428}
429
430impl RedisSinkPayloadWriter {
431    pub async fn new(config: RedisConfig) -> Result<Self> {
432        let (conn, pipe) = config.common.build_conn_and_pipe().await?;
433        let conn = Some(conn);
434
435        Ok(Self { conn, pipe })
436    }
437
438    #[cfg(test)]
439    pub fn mock() -> Self {
440        let conn = None;
441        let pipe = RedisPipe::Single(redis::pipe());
442        Self { conn, pipe }
443    }
444
445    pub async fn commit(&mut self) -> Result<()> {
446        #[cfg(test)]
447        {
448            if self.conn.is_none() {
449                return Ok(());
450            }
451        }
452        self.pipe.query::<()>(self.conn.as_mut().unwrap()).await?;
453        self.pipe.clear();
454        Ok(())
455    }
456}
457
458impl FormattedSink for RedisSinkPayloadWriter {
459    type K = RedisSinkPayloadWriterInput;
460    type V = RedisSinkPayloadWriterInput;
461
462    async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
463        let k = k.ok_or_else(|| SinkError::Redis("The redis key cannot be null".to_owned()))?;
464        match v {
465            Some(v) => self.pipe.set(k, v)?,
466            None => self.pipe.del(k)?,
467        };
468        Ok(())
469    }
470}
471
472impl RedisSinkWriter {
473    pub async fn new(
474        config: RedisConfig,
475        schema: Schema,
476        pk_indices: Vec<usize>,
477        format_desc: &SinkFormatDesc,
478        db_name: String,
479        sink_from_name: String,
480    ) -> Result<Self> {
481        let payload_writer = RedisSinkPayloadWriter::new(config.clone()).await?;
482        let formatter = SinkFormatterImpl::new(
483            format_desc,
484            schema.clone(),
485            pk_indices.clone(),
486            db_name,
487            sink_from_name,
488            "NO_TOPIC",
489        )
490        .await?;
491
492        Ok(Self {
493            schema,
494            pk_indices,
495            epoch: 0,
496            formatter,
497            payload_writer,
498        })
499    }
500
501    #[cfg(test)]
502    pub async fn mock(
503        schema: Schema,
504        pk_indices: Vec<usize>,
505        format_desc: &SinkFormatDesc,
506    ) -> Result<Self> {
507        let formatter = SinkFormatterImpl::new(
508            format_desc,
509            schema.clone(),
510            pk_indices.clone(),
511            "d1".to_owned(),
512            "t1".to_owned(),
513            "NO_TOPIC",
514        )
515        .await?;
516        Ok(Self {
517            schema,
518            pk_indices,
519            epoch: 0,
520            formatter,
521            payload_writer: RedisSinkPayloadWriter::mock(),
522        })
523    }
524}
525
526impl AsyncTruncateSinkWriter for RedisSinkWriter {
527    async fn write_chunk<'a>(
528        &'a mut self,
529        chunk: StreamChunk,
530        _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
531    ) -> Result<()> {
532        dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, {
533            self.payload_writer.write_chunk(chunk, formatter).await?;
534            self.payload_writer.commit().await
535        })
536    }
537}
538
539#[cfg(test)]
540mod test {
541    use core::panic;
542
543    use rdkafka::message::FromBytes;
544    use risingwave_common::array::{Array, I32Array, Op, Utf8Array};
545    use risingwave_common::catalog::Field;
546    use risingwave_common::types::DataType;
547    use risingwave_common::util::iter_util::ZipEqDebug;
548
549    use super::*;
550    use crate::sink::catalog::{SinkEncode, SinkFormat};
551    use crate::sink::log_store::DeliveryFutureManager;
552
553    #[tokio::test]
554    async fn test_write() {
555        let schema = Schema::new(vec![
556            Field {
557                data_type: DataType::Int32,
558                name: "id".to_owned(),
559            },
560            Field {
561                data_type: DataType::Varchar,
562                name: "name".to_owned(),
563            },
564        ]);
565
566        let format_desc = SinkFormatDesc {
567            format: SinkFormat::AppendOnly,
568            encode: SinkEncode::Json,
569            options: BTreeMap::default(),
570            secret_refs: BTreeMap::default(),
571            key_encode: None,
572            connection_id: None,
573        };
574
575        let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc)
576            .await
577            .unwrap();
578
579        let chunk_a = StreamChunk::new(
580            vec![Op::Insert, Op::Insert, Op::Insert],
581            vec![
582                I32Array::from_iter(vec![1, 2, 3]).into_ref(),
583                Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
584            ],
585        );
586
587        let mut manager = DeliveryFutureManager::new(0);
588
589        redis_sink_writer
590            .write_chunk(chunk_a, manager.start_write_chunk(0, 0))
591            .await
592            .expect("failed to write batch");
593        let expected_a = vec![
594            (
595                0,
596                "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":1}\r\n$23\r\n{\"id\":1,\"name\":\"Alice\"}\r\n",
597            ),
598            (
599                1,
600                "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":2}\r\n$21\r\n{\"id\":2,\"name\":\"Bob\"}\r\n",
601            ),
602            (
603                2,
604                "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":3}\r\n$23\r\n{\"id\":3,\"name\":\"Clare\"}\r\n",
605            ),
606        ];
607
608        if let RedisPipe::Single(pipe) = &redis_sink_writer.payload_writer.pipe {
609            pipe.cmd_iter()
610                .enumerate()
611                .zip_eq_debug(expected_a.clone())
612                .for_each(|((i, cmd), (exp_i, exp_cmd))| {
613                    if exp_i == i {
614                        assert_eq!(exp_cmd, str::from_bytes(&cmd.get_packed_command()).unwrap())
615                    }
616                });
617        } else {
618            panic!("pipe type not match")
619        }
620    }
621
622    #[tokio::test]
623    async fn test_format_write() {
624        let schema = Schema::new(vec![
625            Field {
626                data_type: DataType::Int32,
627                name: "id".to_owned(),
628            },
629            Field {
630                data_type: DataType::Varchar,
631                name: "name".to_owned(),
632            },
633        ]);
634
635        let mut btree_map = BTreeMap::default();
636        btree_map.insert(KEY_FORMAT.to_owned(), "key-{id}".to_owned());
637        btree_map.insert(
638            VALUE_FORMAT.to_owned(),
639            "values:{id:{id},name:{name}}".to_owned(),
640        );
641        let format_desc = SinkFormatDesc {
642            format: SinkFormat::AppendOnly,
643            encode: SinkEncode::Template,
644            options: btree_map,
645            secret_refs: Default::default(),
646            key_encode: None,
647            connection_id: None,
648        };
649
650        let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc)
651            .await
652            .unwrap();
653
654        let mut future_manager = DeliveryFutureManager::new(0);
655
656        let chunk_a = StreamChunk::new(
657            vec![Op::Insert, Op::Insert, Op::Insert],
658            vec![
659                I32Array::from_iter(vec![1, 2, 3]).into_ref(),
660                Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
661            ],
662        );
663
664        redis_sink_writer
665            .write_chunk(chunk_a, future_manager.start_write_chunk(0, 0))
666            .await
667            .expect("failed to write batch");
668        let expected_a = vec![
669            (
670                0,
671                "*3\r\n$3\r\nSET\r\n$5\r\nkey-1\r\n$24\r\nvalues:{id:1,name:Alice}\r\n",
672            ),
673            (
674                1,
675                "*3\r\n$3\r\nSET\r\n$5\r\nkey-2\r\n$22\r\nvalues:{id:2,name:Bob}\r\n",
676            ),
677            (
678                2,
679                "*3\r\n$3\r\nSET\r\n$5\r\nkey-3\r\n$24\r\nvalues:{id:3,name:Clare}\r\n",
680            ),
681        ];
682
683        if let RedisPipe::Single(pipe) = &redis_sink_writer.payload_writer.pipe {
684            pipe.cmd_iter()
685                .enumerate()
686                .zip_eq_debug(expected_a.clone())
687                .for_each(|((i, cmd), (exp_i, exp_cmd))| {
688                    if exp_i == i {
689                        assert_eq!(exp_cmd, str::from_bytes(&cmd.get_packed_command()).unwrap())
690                    }
691                });
692        } else {
693            panic!("pipe type not match")
694        };
695    }
696}