risingwave_connector/sink/
redis.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use phf::phf_set;
20use redis::aio::MultiplexedConnection;
21use redis::cluster::{ClusterClient, ClusterConnection, ClusterPipeline};
22use redis::{Client as RedisClient, Pipeline};
23use risingwave_common::array::StreamChunk;
24use risingwave_common::catalog::Schema;
25use risingwave_common::types::DataType;
26use serde_derive::Deserialize;
27use serde_json::Value;
28use serde_with::serde_as;
29use with_options::WithOptions;
30
31use super::catalog::SinkFormatDesc;
32use super::encoder::template::{RedisSinkPayloadWriterInput, TemplateStringEncoder};
33use super::formatter::SinkFormatterImpl;
34use super::writer::FormattedSink;
35use super::{SinkError, SinkParam};
36use crate::dispatch_sink_formatter_str_key_impl;
37use crate::enforce_secret::EnforceSecret;
38use crate::error::ConnectorResult;
39use crate::sink::log_store::DeliveryFutureManagerAddFuture;
40use crate::sink::writer::{
41    AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
42};
43use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriterParam};
44
45pub const REDIS_SINK: &str = "redis";
46pub const KEY_FORMAT: &str = "key_format";
47pub const VALUE_FORMAT: &str = "value_format";
48pub const REDIS_VALUE_TYPE: &str = "redis_value_type";
49pub const REDIS_VALUE_TYPE_STRING: &str = "string";
50pub const REDIS_VALUE_TYPE_GEO: &str = "geospatial";
51pub const REDIS_VALUE_TYPE_PUBSUB: &str = "pubsub";
52pub const LON_NAME: &str = "longitude";
53pub const LAT_NAME: &str = "latitude";
54pub const MEMBER_NAME: &str = "member";
55pub const CHANNEL: &str = "channel";
56pub const CHANNEL_COLUMN: &str = "channel_column";
57
58#[derive(Deserialize, Debug, Clone, WithOptions)]
59pub struct RedisCommon {
60    #[serde(rename = "redis.url")]
61    pub url: String,
62}
63
64impl EnforceSecret for RedisCommon {
65    const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf_set! {
66        "redis.url"
67    };
68}
69
70pub enum RedisPipe {
71    Cluster(ClusterPipeline),
72    Single(Pipeline),
73}
74impl RedisPipe {
75    pub async fn query<T: redis::FromRedisValue>(
76        &self,
77        conn: &mut RedisConn,
78    ) -> ConnectorResult<T> {
79        match (self, conn) {
80            (RedisPipe::Cluster(pipe), RedisConn::Cluster(conn)) => Ok(pipe.query(conn)?),
81            (RedisPipe::Single(pipe), RedisConn::Single(conn)) => {
82                Ok(pipe.query_async(conn).await?)
83            }
84            _ => Err(SinkError::Redis("RedisPipe and RedisConn not match".to_owned()).into()),
85        }
86    }
87
88    pub fn clear(&mut self) {
89        match self {
90            RedisPipe::Cluster(pipe) => pipe.clear(),
91            RedisPipe::Single(pipe) => pipe.clear(),
92        }
93    }
94
95    pub fn set(
96        &mut self,
97        k: RedisSinkPayloadWriterInput,
98        v: RedisSinkPayloadWriterInput,
99    ) -> Result<()> {
100        match self {
101            RedisPipe::Cluster(pipe) => match (k, v) {
102                (
103                    RedisSinkPayloadWriterInput::String(k),
104                    RedisSinkPayloadWriterInput::String(v),
105                ) => {
106                    pipe.set(k, v);
107                }
108                (
109                    RedisSinkPayloadWriterInput::RedisGeoKey((key, member)),
110                    RedisSinkPayloadWriterInput::RedisGeoValue((lat, lon)),
111                ) => {
112                    pipe.geo_add(key, (lon, lat, member));
113                }
114                (
115                    RedisSinkPayloadWriterInput::RedisPubSubKey(key),
116                    RedisSinkPayloadWriterInput::String(v),
117                ) => {
118                    pipe.publish(key, v);
119                }
120                _ => return Err(SinkError::Redis("RedisPipe set not match".to_owned())),
121            },
122            RedisPipe::Single(pipe) => match (k, v) {
123                (
124                    RedisSinkPayloadWriterInput::String(k),
125                    RedisSinkPayloadWriterInput::String(v),
126                ) => {
127                    pipe.set(k, v);
128                }
129                (
130                    RedisSinkPayloadWriterInput::RedisGeoKey((key, member)),
131                    RedisSinkPayloadWriterInput::RedisGeoValue((lat, lon)),
132                ) => {
133                    pipe.geo_add(key, (lon, lat, member));
134                }
135                (
136                    RedisSinkPayloadWriterInput::RedisPubSubKey(key),
137                    RedisSinkPayloadWriterInput::String(v),
138                ) => {
139                    pipe.publish(key, v);
140                }
141                _ => return Err(SinkError::Redis("RedisPipe set not match".to_owned())),
142            },
143        };
144        Ok(())
145    }
146
147    pub fn del(&mut self, k: RedisSinkPayloadWriterInput) -> Result<()> {
148        match self {
149            RedisPipe::Cluster(pipe) => match k {
150                RedisSinkPayloadWriterInput::String(k) => {
151                    pipe.del(k);
152                }
153                RedisSinkPayloadWriterInput::RedisGeoKey((key, member)) => {
154                    pipe.zrem(key, member);
155                }
156                _ => return Err(SinkError::Redis("RedisPipe del not match".to_owned())),
157            },
158            RedisPipe::Single(pipe) => match k {
159                RedisSinkPayloadWriterInput::String(k) => {
160                    pipe.del(k);
161                }
162                RedisSinkPayloadWriterInput::RedisGeoKey((key, member)) => {
163                    pipe.zrem(key, member);
164                }
165                _ => return Err(SinkError::Redis("RedisPipe del not match".to_owned())),
166            },
167        };
168        Ok(())
169    }
170}
171pub enum RedisConn {
172    // Redis deployed as a cluster, clusters with only one node should also use this conn
173    Cluster(ClusterConnection),
174    // Redis is not deployed as a cluster
175    Single(MultiplexedConnection),
176}
177
178impl RedisCommon {
179    pub async fn build_conn_and_pipe(&self) -> ConnectorResult<(RedisConn, RedisPipe)> {
180        match serde_json::from_str(&self.url).map_err(|e| SinkError::Config(anyhow!(e))) {
181            Ok(v) => {
182                if let Value::Array(list) = v {
183                    let list = list
184                        .into_iter()
185                        .map(|s| {
186                            if let Value::String(s) = s {
187                                Ok(s)
188                            } else {
189                                Err(SinkError::Redis(
190                                    "redis.url must be array of string".to_owned(),
191                                )
192                                .into())
193                            }
194                        })
195                        .collect::<ConnectorResult<Vec<String>>>()?;
196
197                    let client = ClusterClient::new(list)?;
198                    Ok((
199                        RedisConn::Cluster(client.get_connection()?),
200                        RedisPipe::Cluster(redis::cluster::cluster_pipe()),
201                    ))
202                } else {
203                    Err(SinkError::Redis("redis.url must be array or string".to_owned()).into())
204                }
205            }
206            Err(_) => {
207                let client = RedisClient::open(self.url.clone())?;
208                Ok((
209                    RedisConn::Single(client.get_multiplexed_async_connection().await?),
210                    RedisPipe::Single(redis::pipe()),
211                ))
212            }
213        }
214    }
215}
216
217#[serde_as]
218#[derive(Clone, Debug, Deserialize, WithOptions)]
219pub struct RedisConfig {
220    #[serde(flatten)]
221    pub common: RedisCommon,
222}
223
224impl EnforceSecret for RedisConfig {
225    fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
226        for prop in prop_iter {
227            RedisCommon::enforce_one(prop)?;
228        }
229        Ok(())
230    }
231}
232
233impl RedisConfig {
234    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
235        let config =
236            serde_json::from_value::<RedisConfig>(serde_json::to_value(properties).unwrap())
237                .map_err(|e| SinkError::Config(anyhow!(e)))?;
238        Ok(config)
239    }
240}
241
242#[derive(Debug)]
243pub struct RedisSink {
244    config: RedisConfig,
245    schema: Schema,
246    pk_indices: Vec<usize>,
247    format_desc: SinkFormatDesc,
248    db_name: String,
249    sink_from_name: String,
250}
251
252impl EnforceSecret for RedisSink {
253    fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
254        for prop in prop_iter {
255            RedisConfig::enforce_one(prop)?;
256        }
257        Ok(())
258    }
259}
260
261#[async_trait]
262impl TryFrom<SinkParam> for RedisSink {
263    type Error = SinkError;
264
265    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
266        if param.downstream_pk.is_empty() {
267            return Err(SinkError::Config(anyhow!(
268                "Redis Sink Primary Key must be specified."
269            )));
270        }
271        let config = RedisConfig::from_btreemap(param.properties.clone())?;
272        Ok(Self {
273            config,
274            schema: param.schema(),
275            pk_indices: param.downstream_pk,
276            format_desc: param
277                .format_desc
278                .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
279            db_name: param.db_name,
280            sink_from_name: param.sink_from_name,
281        })
282    }
283}
284
285impl Sink for RedisSink {
286    type Coordinator = DummySinkCommitCoordinator;
287    type LogSinker = AsyncTruncateLogSinkerOf<RedisSinkWriter>;
288
289    const SINK_NAME: &'static str = "redis";
290
291    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
292        Ok(RedisSinkWriter::new(
293            self.config.clone(),
294            self.schema.clone(),
295            self.pk_indices.clone(),
296            &self.format_desc,
297            self.db_name.clone(),
298            self.sink_from_name.clone(),
299        )
300        .await?
301        .into_log_sinker(usize::MAX))
302    }
303
304    async fn validate(&self) -> Result<()> {
305        self.config.common.build_conn_and_pipe().await?;
306        let all_map: HashMap<String, DataType> = self
307            .schema
308            .fields()
309            .iter()
310            .map(|f| (f.name.clone(), f.data_type.clone()))
311            .collect();
312        let pk_map: HashMap<String, DataType> = self
313            .schema
314            .fields()
315            .iter()
316            .enumerate()
317            .filter(|(k, _)| self.pk_indices.contains(k))
318            .map(|(_, v)| (v.name.clone(), v.data_type.clone()))
319            .collect();
320        if matches!(
321            self.format_desc.encode,
322            super::catalog::SinkEncode::Template
323        ) {
324            match self
325                .format_desc
326                .options
327                .get(REDIS_VALUE_TYPE)
328                .map(|s| s.as_str())
329            {
330                // if not set, default to string
331                Some(REDIS_VALUE_TYPE_STRING) | None => {
332                    let key_format = self.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
333                        SinkError::Config(anyhow!(
334                            "Cannot find '{KEY_FORMAT}', please set it or use JSON"
335                        ))
336                    })?;
337                    TemplateStringEncoder::check_string_format(key_format, &pk_map)?;
338                    let value_format =
339                        self.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
340                            SinkError::Config(anyhow!(
341                                "Cannot find `{VALUE_FORMAT}`, please set it or use JSON"
342                            ))
343                        })?;
344                    TemplateStringEncoder::check_string_format(value_format, &all_map)?;
345                }
346                Some(REDIS_VALUE_TYPE_GEO) => {
347                    let key_format = self.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
348                        SinkError::Config(anyhow!(
349                            "Cannot find '{KEY_FORMAT}', please set it or use JSON"
350                        ))
351                    })?;
352                    TemplateStringEncoder::check_string_format(key_format, &pk_map)?;
353
354                    let lon_name = self.format_desc.options.get(LON_NAME).ok_or_else(|| {
355                        SinkError::Config(anyhow!(
356                            "Cannot find `{LON_NAME}`, please set it or use JSON or set `{REDIS_VALUE_TYPE}` to `{REDIS_VALUE_TYPE_STRING}`"
357                        ))
358                    })?;
359                    let lat_name = self.format_desc.options.get(LAT_NAME).ok_or_else(|| {
360                        SinkError::Config(anyhow!(
361                            "Cannot find `{LAT_NAME}`, please set it or use JSON or set `{REDIS_VALUE_TYPE}` to `{REDIS_VALUE_TYPE_STRING}`"
362                        ))
363                    })?;
364                    let member_name = self.format_desc.options.get(MEMBER_NAME).ok_or_else(|| {
365                        SinkError::Config(anyhow!(
366                            "Cannot find `{MEMBER_NAME}`, please set it or use JSON or set `{REDIS_VALUE_TYPE}` to `{REDIS_VALUE_TYPE_STRING}`"
367                        ))
368                    })?;
369                    if let Some(lon_type) = all_map.get(lon_name)
370                        && (lon_type == &DataType::Float64
371                            || lon_type == &DataType::Float32
372                            || lon_type == &DataType::Varchar)
373                    {
374                        // do nothing
375                    } else {
376                        return Err(SinkError::Config(anyhow!(
377                            "`{LON_NAME}` must be set to `float64` or `float32` or `varchar`"
378                        )));
379                    }
380                    if let Some(lat_type) = all_map.get(lat_name)
381                        && (lat_type == &DataType::Float64
382                            || lat_type == &DataType::Float32
383                            || lat_type == &DataType::Varchar)
384                    {
385                        // do nothing
386                    } else {
387                        return Err(SinkError::Config(anyhow!(
388                            "`{LAT_NAME}` must be set to `float64` or `float32` or `varchar`"
389                        )));
390                    }
391                    if let Some(member_type) = pk_map.get(member_name)
392                        && member_type == &DataType::Varchar
393                    {
394                        // do nothing
395                    } else {
396                        return Err(SinkError::Config(anyhow!(
397                            "`{MEMBER_NAME}` must be set to `varchar` and `primary_key`"
398                        )));
399                    }
400                }
401                Some(REDIS_VALUE_TYPE_PUBSUB) => {
402                    let channel = self.format_desc.options.get(CHANNEL);
403                    let channel_column = self.format_desc.options.get(CHANNEL_COLUMN);
404                    if (channel.is_none() && channel_column.is_none())
405                        || (channel.is_some() && channel_column.is_some())
406                    {
407                        return Err(SinkError::Config(anyhow!(
408                            "`{CHANNEL}` and `{CHANNEL_COLUMN}` only one can be set"
409                        )));
410                    }
411
412                    if let Some(channel_column) = channel_column
413                        && let Some(channel_column_type) = all_map.get(channel_column)
414                        && (channel_column_type != &DataType::Varchar)
415                    {
416                        return Err(SinkError::Config(anyhow!(
417                            "`{CHANNEL_COLUMN}` must be set to `varchar`"
418                        )));
419                    }
420
421                    let value_format =
422                        self.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
423                            SinkError::Config(anyhow!("Cannot find `{VALUE_FORMAT}`"))
424                        })?;
425                    TemplateStringEncoder::check_string_format(value_format, &all_map)?;
426                }
427                _ => {
428                    return Err(SinkError::Config(anyhow!(
429                        "`{REDIS_VALUE_TYPE}` must be set to `{REDIS_VALUE_TYPE_STRING}` or `{REDIS_VALUE_TYPE_GEO}` or `{REDIS_VALUE_TYPE_PUBSUB}`"
430                    )));
431                }
432            }
433        }
434        Ok(())
435    }
436}
437
438pub struct RedisSinkWriter {
439    #[expect(dead_code)]
440    epoch: u64,
441    #[expect(dead_code)]
442    schema: Schema,
443    #[expect(dead_code)]
444    pk_indices: Vec<usize>,
445    formatter: SinkFormatterImpl,
446    payload_writer: RedisSinkPayloadWriter,
447}
448
449struct RedisSinkPayloadWriter {
450    // connection to redis, one per executor
451    conn: Option<RedisConn>,
452    // the command pipeline for write-commit
453    pipe: RedisPipe,
454}
455
456impl RedisSinkPayloadWriter {
457    pub async fn new(config: RedisConfig) -> Result<Self> {
458        let (conn, pipe) = config.common.build_conn_and_pipe().await?;
459        let conn = Some(conn);
460
461        Ok(Self { conn, pipe })
462    }
463
464    #[cfg(test)]
465    pub fn mock() -> Self {
466        let conn = None;
467        let pipe = RedisPipe::Single(redis::pipe());
468        Self { conn, pipe }
469    }
470
471    pub async fn commit(&mut self) -> Result<()> {
472        #[cfg(test)]
473        {
474            if self.conn.is_none() {
475                return Ok(());
476            }
477        }
478        self.pipe.query::<()>(self.conn.as_mut().unwrap()).await?;
479        self.pipe.clear();
480        Ok(())
481    }
482}
483
484impl FormattedSink for RedisSinkPayloadWriter {
485    type K = RedisSinkPayloadWriterInput;
486    type V = RedisSinkPayloadWriterInput;
487
488    async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
489        let k = k.ok_or_else(|| SinkError::Redis("The redis key cannot be null".to_owned()))?;
490        match v {
491            Some(v) => self.pipe.set(k, v)?,
492            None => self.pipe.del(k)?,
493        };
494        Ok(())
495    }
496}
497
498impl RedisSinkWriter {
499    pub async fn new(
500        config: RedisConfig,
501        schema: Schema,
502        pk_indices: Vec<usize>,
503        format_desc: &SinkFormatDesc,
504        db_name: String,
505        sink_from_name: String,
506    ) -> Result<Self> {
507        let payload_writer = RedisSinkPayloadWriter::new(config.clone()).await?;
508        let formatter = SinkFormatterImpl::new(
509            format_desc,
510            schema.clone(),
511            pk_indices.clone(),
512            db_name,
513            sink_from_name,
514            "NO_TOPIC",
515        )
516        .await?;
517
518        Ok(Self {
519            schema,
520            pk_indices,
521            epoch: 0,
522            formatter,
523            payload_writer,
524        })
525    }
526
527    #[cfg(test)]
528    pub async fn mock(
529        schema: Schema,
530        pk_indices: Vec<usize>,
531        format_desc: &SinkFormatDesc,
532    ) -> Result<Self> {
533        let formatter = SinkFormatterImpl::new(
534            format_desc,
535            schema.clone(),
536            pk_indices.clone(),
537            "d1".to_owned(),
538            "t1".to_owned(),
539            "NO_TOPIC",
540        )
541        .await?;
542        Ok(Self {
543            schema,
544            pk_indices,
545            epoch: 0,
546            formatter,
547            payload_writer: RedisSinkPayloadWriter::mock(),
548        })
549    }
550}
551
552impl AsyncTruncateSinkWriter for RedisSinkWriter {
553    async fn write_chunk<'a>(
554        &'a mut self,
555        chunk: StreamChunk,
556        _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
557    ) -> Result<()> {
558        dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, {
559            self.payload_writer.write_chunk(chunk, formatter).await?;
560            self.payload_writer.commit().await
561        })
562    }
563}
564
565#[cfg(test)]
566mod test {
567    use core::panic;
568
569    use rdkafka::message::FromBytes;
570    use risingwave_common::array::{Array, I32Array, Op, Utf8Array};
571    use risingwave_common::catalog::Field;
572    use risingwave_common::types::DataType;
573    use risingwave_common::util::iter_util::ZipEqDebug;
574
575    use super::*;
576    use crate::sink::catalog::{SinkEncode, SinkFormat};
577    use crate::sink::log_store::DeliveryFutureManager;
578
579    #[tokio::test]
580    async fn test_write() {
581        let schema = Schema::new(vec![
582            Field {
583                data_type: DataType::Int32,
584                name: "id".to_owned(),
585            },
586            Field {
587                data_type: DataType::Varchar,
588                name: "name".to_owned(),
589            },
590        ]);
591
592        let format_desc = SinkFormatDesc {
593            format: SinkFormat::AppendOnly,
594            encode: SinkEncode::Json,
595            options: BTreeMap::default(),
596            secret_refs: BTreeMap::default(),
597            key_encode: None,
598            connection_id: None,
599        };
600
601        let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc)
602            .await
603            .unwrap();
604
605        let chunk_a = StreamChunk::new(
606            vec![Op::Insert, Op::Insert, Op::Insert],
607            vec![
608                I32Array::from_iter(vec![1, 2, 3]).into_ref(),
609                Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
610            ],
611        );
612
613        let mut manager = DeliveryFutureManager::new(0);
614
615        redis_sink_writer
616            .write_chunk(chunk_a, manager.start_write_chunk(0, 0))
617            .await
618            .expect("failed to write batch");
619        let expected_a = vec![
620            (
621                0,
622                "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":1}\r\n$23\r\n{\"id\":1,\"name\":\"Alice\"}\r\n",
623            ),
624            (
625                1,
626                "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":2}\r\n$21\r\n{\"id\":2,\"name\":\"Bob\"}\r\n",
627            ),
628            (
629                2,
630                "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":3}\r\n$23\r\n{\"id\":3,\"name\":\"Clare\"}\r\n",
631            ),
632        ];
633
634        if let RedisPipe::Single(pipe) = &redis_sink_writer.payload_writer.pipe {
635            pipe.cmd_iter()
636                .enumerate()
637                .zip_eq_debug(expected_a.clone())
638                .for_each(|((i, cmd), (exp_i, exp_cmd))| {
639                    if exp_i == i {
640                        assert_eq!(exp_cmd, str::from_bytes(&cmd.get_packed_command()).unwrap())
641                    }
642                });
643        } else {
644            panic!("pipe type not match")
645        }
646    }
647
648    #[tokio::test]
649    async fn test_format_write() {
650        let schema = Schema::new(vec![
651            Field {
652                data_type: DataType::Int32,
653                name: "id".to_owned(),
654            },
655            Field {
656                data_type: DataType::Varchar,
657                name: "name".to_owned(),
658            },
659        ]);
660
661        let mut btree_map = BTreeMap::default();
662        btree_map.insert(KEY_FORMAT.to_owned(), "key-{id}".to_owned());
663        btree_map.insert(
664            VALUE_FORMAT.to_owned(),
665            "values:\\{id:{id},name:{name}\\}".to_owned(),
666        );
667        let format_desc = SinkFormatDesc {
668            format: SinkFormat::AppendOnly,
669            encode: SinkEncode::Template,
670            options: btree_map,
671            secret_refs: Default::default(),
672            key_encode: None,
673            connection_id: None,
674        };
675
676        let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc)
677            .await
678            .unwrap();
679
680        let mut future_manager = DeliveryFutureManager::new(0);
681
682        let chunk_a = StreamChunk::new(
683            vec![Op::Insert, Op::Insert, Op::Insert],
684            vec![
685                I32Array::from_iter(vec![1, 2, 3]).into_ref(),
686                Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
687            ],
688        );
689
690        redis_sink_writer
691            .write_chunk(chunk_a, future_manager.start_write_chunk(0, 0))
692            .await
693            .expect("failed to write batch");
694        let expected_a = vec![
695            (
696                0,
697                "*3\r\n$3\r\nSET\r\n$5\r\nkey-1\r\n$24\r\nvalues:{id:1,name:Alice}\r\n",
698            ),
699            (
700                1,
701                "*3\r\n$3\r\nSET\r\n$5\r\nkey-2\r\n$22\r\nvalues:{id:2,name:Bob}\r\n",
702            ),
703            (
704                2,
705                "*3\r\n$3\r\nSET\r\n$5\r\nkey-3\r\n$24\r\nvalues:{id:3,name:Clare}\r\n",
706            ),
707        ];
708
709        if let RedisPipe::Single(pipe) = &redis_sink_writer.payload_writer.pipe {
710            pipe.cmd_iter()
711                .enumerate()
712                .zip_eq_debug(expected_a.clone())
713                .for_each(|((i, cmd), (exp_i, exp_cmd))| {
714                    if exp_i == i {
715                        assert_eq!(exp_cmd, str::from_bytes(&cmd.get_packed_command()).unwrap())
716                    }
717                });
718        } else {
719            panic!("pipe type not match")
720        };
721    }
722}