risingwave_connector/sink/
redis.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use phf::phf_set;
20use redis::aio::MultiplexedConnection;
21use redis::cluster::{ClusterClient, ClusterConnection, ClusterPipeline};
22use redis::{Client as RedisClient, Pipeline};
23use risingwave_common::array::StreamChunk;
24use risingwave_common::catalog::Schema;
25use risingwave_common::types::DataType;
26use serde_derive::Deserialize;
27use serde_json::Value;
28use serde_with::serde_as;
29use with_options::WithOptions;
30
31use super::catalog::SinkFormatDesc;
32use super::encoder::template::{RedisSinkPayloadWriterInput, TemplateStringEncoder};
33use super::formatter::SinkFormatterImpl;
34use super::writer::FormattedSink;
35use super::{SinkError, SinkParam};
36use crate::dispatch_sink_formatter_str_key_impl;
37use crate::enforce_secret::EnforceSecret;
38use crate::error::ConnectorResult;
39use crate::sink::log_store::DeliveryFutureManagerAddFuture;
40use crate::sink::writer::{
41    AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
42};
43use crate::sink::{Result, Sink, SinkWriterParam};
44
45pub const REDIS_SINK: &str = "redis";
46pub const KEY_FORMAT: &str = "key_format";
47pub const VALUE_FORMAT: &str = "value_format";
48pub const REDIS_VALUE_TYPE: &str = "redis_value_type";
49pub const REDIS_VALUE_TYPE_STRING: &str = "string";
50pub const REDIS_VALUE_TYPE_GEO: &str = "geospatial";
51pub const REDIS_VALUE_TYPE_PUBSUB: &str = "pubsub";
52pub const LON_NAME: &str = "longitude";
53pub const LAT_NAME: &str = "latitude";
54pub const MEMBER_NAME: &str = "member";
55pub const CHANNEL: &str = "channel";
56pub const CHANNEL_COLUMN: &str = "channel_column";
57
58#[derive(Deserialize, Debug, Clone, WithOptions)]
59pub struct RedisCommon {
60    #[serde(rename = "redis.url")]
61    pub url: String,
62}
63
64impl EnforceSecret for RedisCommon {
65    const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf_set! {
66        "redis.url"
67    };
68}
69
70pub enum RedisPipe {
71    Cluster(ClusterPipeline),
72    Single(Pipeline),
73}
74impl RedisPipe {
75    pub async fn query<T: redis::FromRedisValue>(
76        &self,
77        conn: &mut RedisConn,
78    ) -> ConnectorResult<T> {
79        match (self, conn) {
80            (RedisPipe::Cluster(pipe), RedisConn::Cluster(conn)) => Ok(pipe.query(conn)?),
81            (RedisPipe::Single(pipe), RedisConn::Single(conn)) => {
82                Ok(pipe.query_async(conn).await?)
83            }
84            _ => Err(SinkError::Redis("RedisPipe and RedisConn not match".to_owned()).into()),
85        }
86    }
87
88    pub fn clear(&mut self) {
89        match self {
90            RedisPipe::Cluster(pipe) => pipe.clear(),
91            RedisPipe::Single(pipe) => pipe.clear(),
92        }
93    }
94
95    pub fn set(
96        &mut self,
97        k: RedisSinkPayloadWriterInput,
98        v: RedisSinkPayloadWriterInput,
99    ) -> Result<()> {
100        match self {
101            RedisPipe::Cluster(pipe) => match (k, v) {
102                (
103                    RedisSinkPayloadWriterInput::String(k),
104                    RedisSinkPayloadWriterInput::String(v),
105                ) => {
106                    pipe.set(k, v);
107                }
108                (
109                    RedisSinkPayloadWriterInput::RedisGeoKey((key, member)),
110                    RedisSinkPayloadWriterInput::RedisGeoValue((lat, lon)),
111                ) => {
112                    pipe.geo_add(key, (lon, lat, member));
113                }
114                (
115                    RedisSinkPayloadWriterInput::RedisPubSubKey(key),
116                    RedisSinkPayloadWriterInput::String(v),
117                ) => {
118                    pipe.publish(key, v);
119                }
120                _ => return Err(SinkError::Redis("RedisPipe set not match".to_owned())),
121            },
122            RedisPipe::Single(pipe) => match (k, v) {
123                (
124                    RedisSinkPayloadWriterInput::String(k),
125                    RedisSinkPayloadWriterInput::String(v),
126                ) => {
127                    pipe.set(k, v);
128                }
129                (
130                    RedisSinkPayloadWriterInput::RedisGeoKey((key, member)),
131                    RedisSinkPayloadWriterInput::RedisGeoValue((lat, lon)),
132                ) => {
133                    pipe.geo_add(key, (lon, lat, member));
134                }
135                (
136                    RedisSinkPayloadWriterInput::RedisPubSubKey(key),
137                    RedisSinkPayloadWriterInput::String(v),
138                ) => {
139                    pipe.publish(key, v);
140                }
141                _ => return Err(SinkError::Redis("RedisPipe set not match".to_owned())),
142            },
143        };
144        Ok(())
145    }
146
147    pub fn del(&mut self, k: RedisSinkPayloadWriterInput) -> Result<()> {
148        match self {
149            RedisPipe::Cluster(pipe) => match k {
150                RedisSinkPayloadWriterInput::String(k) => {
151                    pipe.del(k);
152                }
153                RedisSinkPayloadWriterInput::RedisGeoKey((key, member)) => {
154                    pipe.zrem(key, member);
155                }
156                _ => return Err(SinkError::Redis("RedisPipe del not match".to_owned())),
157            },
158            RedisPipe::Single(pipe) => match k {
159                RedisSinkPayloadWriterInput::String(k) => {
160                    pipe.del(k);
161                }
162                RedisSinkPayloadWriterInput::RedisGeoKey((key, member)) => {
163                    pipe.zrem(key, member);
164                }
165                _ => return Err(SinkError::Redis("RedisPipe del not match".to_owned())),
166            },
167        };
168        Ok(())
169    }
170}
171pub enum RedisConn {
172    // Redis deployed as a cluster, clusters with only one node should also use this conn
173    Cluster(ClusterConnection),
174    // Redis is not deployed as a cluster
175    Single(MultiplexedConnection),
176}
177
178impl RedisCommon {
179    pub async fn build_conn_and_pipe(&self) -> ConnectorResult<(RedisConn, RedisPipe)> {
180        match serde_json::from_str(&self.url).map_err(|e| SinkError::Config(anyhow!(e))) {
181            Ok(v) => {
182                if let Value::Array(list) = v {
183                    let list = list
184                        .into_iter()
185                        .map(|s| {
186                            if let Value::String(s) = s {
187                                Ok(s)
188                            } else {
189                                Err(SinkError::Redis(
190                                    "redis.url must be array of string".to_owned(),
191                                )
192                                .into())
193                            }
194                        })
195                        .collect::<ConnectorResult<Vec<String>>>()?;
196
197                    let client = ClusterClient::new(list)?;
198                    Ok((
199                        RedisConn::Cluster(client.get_connection()?),
200                        RedisPipe::Cluster(redis::cluster::cluster_pipe()),
201                    ))
202                } else {
203                    Err(SinkError::Redis("redis.url must be array or string".to_owned()).into())
204                }
205            }
206            Err(_) => {
207                let client = RedisClient::open(self.url.clone())?;
208                Ok((
209                    RedisConn::Single(client.get_multiplexed_async_connection().await?),
210                    RedisPipe::Single(redis::pipe()),
211                ))
212            }
213        }
214    }
215}
216
217#[serde_as]
218#[derive(Clone, Debug, Deserialize, WithOptions)]
219pub struct RedisConfig {
220    #[serde(flatten)]
221    pub common: RedisCommon,
222}
223
224impl EnforceSecret for RedisConfig {
225    fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
226        for prop in prop_iter {
227            RedisCommon::enforce_one(prop)?;
228        }
229        Ok(())
230    }
231}
232
233impl RedisConfig {
234    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
235        let config =
236            serde_json::from_value::<RedisConfig>(serde_json::to_value(properties).unwrap())
237                .map_err(|e| SinkError::Config(anyhow!(e)))?;
238        Ok(config)
239    }
240}
241
242#[derive(Debug)]
243pub struct RedisSink {
244    config: RedisConfig,
245    schema: Schema,
246    pk_indices: Vec<usize>,
247    format_desc: SinkFormatDesc,
248    db_name: String,
249    sink_from_name: String,
250}
251
252impl EnforceSecret for RedisSink {
253    fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
254        for prop in prop_iter {
255            RedisConfig::enforce_one(prop)?;
256        }
257        Ok(())
258    }
259}
260
261#[async_trait]
262impl TryFrom<SinkParam> for RedisSink {
263    type Error = SinkError;
264
265    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
266        if param.downstream_pk.is_empty() {
267            return Err(SinkError::Config(anyhow!(
268                "Redis Sink Primary Key must be specified."
269            )));
270        }
271        let config = RedisConfig::from_btreemap(param.properties.clone())?;
272        Ok(Self {
273            config,
274            schema: param.schema(),
275            pk_indices: param.downstream_pk,
276            format_desc: param
277                .format_desc
278                .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
279            db_name: param.db_name,
280            sink_from_name: param.sink_from_name,
281        })
282    }
283}
284
285impl Sink for RedisSink {
286    type LogSinker = AsyncTruncateLogSinkerOf<RedisSinkWriter>;
287
288    const SINK_NAME: &'static str = "redis";
289
290    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
291        Ok(RedisSinkWriter::new(
292            self.config.clone(),
293            self.schema.clone(),
294            self.pk_indices.clone(),
295            &self.format_desc,
296            self.db_name.clone(),
297            self.sink_from_name.clone(),
298        )
299        .await?
300        .into_log_sinker(usize::MAX))
301    }
302
303    async fn validate(&self) -> Result<()> {
304        self.config.common.build_conn_and_pipe().await?;
305        let all_map: HashMap<String, DataType> = self
306            .schema
307            .fields()
308            .iter()
309            .map(|f| (f.name.clone(), f.data_type.clone()))
310            .collect();
311        let pk_map: HashMap<String, DataType> = self
312            .schema
313            .fields()
314            .iter()
315            .enumerate()
316            .filter(|(k, _)| self.pk_indices.contains(k))
317            .map(|(_, v)| (v.name.clone(), v.data_type.clone()))
318            .collect();
319        if matches!(
320            self.format_desc.encode,
321            super::catalog::SinkEncode::Template
322        ) {
323            match self
324                .format_desc
325                .options
326                .get(REDIS_VALUE_TYPE)
327                .map(|s| s.as_str())
328            {
329                // if not set, default to string
330                Some(REDIS_VALUE_TYPE_STRING) | None => {
331                    let key_format = self.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
332                        SinkError::Config(anyhow!(
333                            "Cannot find '{KEY_FORMAT}', please set it or use JSON"
334                        ))
335                    })?;
336                    TemplateStringEncoder::check_string_format(key_format, &pk_map)?;
337                    let value_format =
338                        self.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
339                            SinkError::Config(anyhow!(
340                                "Cannot find `{VALUE_FORMAT}`, please set it or use JSON"
341                            ))
342                        })?;
343                    TemplateStringEncoder::check_string_format(value_format, &all_map)?;
344                }
345                Some(REDIS_VALUE_TYPE_GEO) => {
346                    let key_format = self.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
347                        SinkError::Config(anyhow!(
348                            "Cannot find '{KEY_FORMAT}', please set it or use JSON"
349                        ))
350                    })?;
351                    TemplateStringEncoder::check_string_format(key_format, &pk_map)?;
352
353                    let lon_name = self.format_desc.options.get(LON_NAME).ok_or_else(|| {
354                        SinkError::Config(anyhow!(
355                            "Cannot find `{LON_NAME}`, please set it or use JSON or set `{REDIS_VALUE_TYPE}` to `{REDIS_VALUE_TYPE_STRING}`"
356                        ))
357                    })?;
358                    let lat_name = self.format_desc.options.get(LAT_NAME).ok_or_else(|| {
359                        SinkError::Config(anyhow!(
360                            "Cannot find `{LAT_NAME}`, please set it or use JSON or set `{REDIS_VALUE_TYPE}` to `{REDIS_VALUE_TYPE_STRING}`"
361                        ))
362                    })?;
363                    let member_name = self.format_desc.options.get(MEMBER_NAME).ok_or_else(|| {
364                        SinkError::Config(anyhow!(
365                            "Cannot find `{MEMBER_NAME}`, please set it or use JSON or set `{REDIS_VALUE_TYPE}` to `{REDIS_VALUE_TYPE_STRING}`"
366                        ))
367                    })?;
368                    if let Some(lon_type) = all_map.get(lon_name)
369                        && (lon_type == &DataType::Float64
370                            || lon_type == &DataType::Float32
371                            || lon_type == &DataType::Varchar)
372                    {
373                        // do nothing
374                    } else {
375                        return Err(SinkError::Config(anyhow!(
376                            "`{LON_NAME}` must be set to `float64` or `float32` or `varchar`"
377                        )));
378                    }
379                    if let Some(lat_type) = all_map.get(lat_name)
380                        && (lat_type == &DataType::Float64
381                            || lat_type == &DataType::Float32
382                            || lat_type == &DataType::Varchar)
383                    {
384                        // do nothing
385                    } else {
386                        return Err(SinkError::Config(anyhow!(
387                            "`{LAT_NAME}` must be set to `float64` or `float32` or `varchar`"
388                        )));
389                    }
390                    if let Some(member_type) = pk_map.get(member_name)
391                        && member_type == &DataType::Varchar
392                    {
393                        // do nothing
394                    } else {
395                        return Err(SinkError::Config(anyhow!(
396                            "`{MEMBER_NAME}` must be set to `varchar` and `primary_key`"
397                        )));
398                    }
399                }
400                Some(REDIS_VALUE_TYPE_PUBSUB) => {
401                    let channel = self.format_desc.options.get(CHANNEL);
402                    let channel_column = self.format_desc.options.get(CHANNEL_COLUMN);
403                    if (channel.is_none() && channel_column.is_none())
404                        || (channel.is_some() && channel_column.is_some())
405                    {
406                        return Err(SinkError::Config(anyhow!(
407                            "`{CHANNEL}` and `{CHANNEL_COLUMN}` only one can be set"
408                        )));
409                    }
410
411                    if let Some(channel_column) = channel_column
412                        && let Some(channel_column_type) = all_map.get(channel_column)
413                        && (channel_column_type != &DataType::Varchar)
414                    {
415                        return Err(SinkError::Config(anyhow!(
416                            "`{CHANNEL_COLUMN}` must be set to `varchar`"
417                        )));
418                    }
419
420                    let value_format =
421                        self.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
422                            SinkError::Config(anyhow!("Cannot find `{VALUE_FORMAT}`"))
423                        })?;
424                    TemplateStringEncoder::check_string_format(value_format, &all_map)?;
425                }
426                _ => {
427                    return Err(SinkError::Config(anyhow!(
428                        "`{REDIS_VALUE_TYPE}` must be set to `{REDIS_VALUE_TYPE_STRING}` or `{REDIS_VALUE_TYPE_GEO}` or `{REDIS_VALUE_TYPE_PUBSUB}`"
429                    )));
430                }
431            }
432        }
433        Ok(())
434    }
435}
436
437pub struct RedisSinkWriter {
438    #[expect(dead_code)]
439    epoch: u64,
440    #[expect(dead_code)]
441    schema: Schema,
442    #[expect(dead_code)]
443    pk_indices: Vec<usize>,
444    formatter: SinkFormatterImpl,
445    payload_writer: RedisSinkPayloadWriter,
446}
447
448struct RedisSinkPayloadWriter {
449    // connection to redis, one per executor
450    conn: Option<RedisConn>,
451    // the command pipeline for write-commit
452    pipe: RedisPipe,
453}
454
455impl RedisSinkPayloadWriter {
456    pub async fn new(config: RedisConfig) -> Result<Self> {
457        let (conn, pipe) = config.common.build_conn_and_pipe().await?;
458        let conn = Some(conn);
459
460        Ok(Self { conn, pipe })
461    }
462
463    #[cfg(test)]
464    pub fn mock() -> Self {
465        let conn = None;
466        let pipe = RedisPipe::Single(redis::pipe());
467        Self { conn, pipe }
468    }
469
470    pub async fn commit(&mut self) -> Result<()> {
471        #[cfg(test)]
472        {
473            if self.conn.is_none() {
474                return Ok(());
475            }
476        }
477        self.pipe.query::<()>(self.conn.as_mut().unwrap()).await?;
478        self.pipe.clear();
479        Ok(())
480    }
481}
482
483impl FormattedSink for RedisSinkPayloadWriter {
484    type K = RedisSinkPayloadWriterInput;
485    type V = RedisSinkPayloadWriterInput;
486
487    async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
488        let k = k.ok_or_else(|| SinkError::Redis("The redis key cannot be null".to_owned()))?;
489        match v {
490            Some(v) => self.pipe.set(k, v)?,
491            None => self.pipe.del(k)?,
492        };
493        Ok(())
494    }
495}
496
497impl RedisSinkWriter {
498    pub async fn new(
499        config: RedisConfig,
500        schema: Schema,
501        pk_indices: Vec<usize>,
502        format_desc: &SinkFormatDesc,
503        db_name: String,
504        sink_from_name: String,
505    ) -> Result<Self> {
506        let payload_writer = RedisSinkPayloadWriter::new(config.clone()).await?;
507        let formatter = SinkFormatterImpl::new(
508            format_desc,
509            schema.clone(),
510            pk_indices.clone(),
511            db_name,
512            sink_from_name,
513            "NO_TOPIC",
514        )
515        .await?;
516
517        Ok(Self {
518            schema,
519            pk_indices,
520            epoch: 0,
521            formatter,
522            payload_writer,
523        })
524    }
525
526    #[cfg(test)]
527    pub async fn mock(
528        schema: Schema,
529        pk_indices: Vec<usize>,
530        format_desc: &SinkFormatDesc,
531    ) -> Result<Self> {
532        let formatter = SinkFormatterImpl::new(
533            format_desc,
534            schema.clone(),
535            pk_indices.clone(),
536            "d1".to_owned(),
537            "t1".to_owned(),
538            "NO_TOPIC",
539        )
540        .await?;
541        Ok(Self {
542            schema,
543            pk_indices,
544            epoch: 0,
545            formatter,
546            payload_writer: RedisSinkPayloadWriter::mock(),
547        })
548    }
549}
550
551impl AsyncTruncateSinkWriter for RedisSinkWriter {
552    async fn write_chunk<'a>(
553        &'a mut self,
554        chunk: StreamChunk,
555        _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
556    ) -> Result<()> {
557        dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, {
558            self.payload_writer.write_chunk(chunk, formatter).await?;
559            self.payload_writer.commit().await
560        })
561    }
562}
563
564#[cfg(test)]
565mod test {
566    use core::panic;
567
568    use rdkafka::message::FromBytes;
569    use risingwave_common::array::{Array, I32Array, Op, Utf8Array};
570    use risingwave_common::catalog::Field;
571    use risingwave_common::types::DataType;
572    use risingwave_common::util::iter_util::ZipEqDebug;
573
574    use super::*;
575    use crate::sink::catalog::{SinkEncode, SinkFormat};
576    use crate::sink::log_store::DeliveryFutureManager;
577
578    #[tokio::test]
579    async fn test_write() {
580        let schema = Schema::new(vec![
581            Field {
582                data_type: DataType::Int32,
583                name: "id".to_owned(),
584            },
585            Field {
586                data_type: DataType::Varchar,
587                name: "name".to_owned(),
588            },
589        ]);
590
591        let format_desc = SinkFormatDesc {
592            format: SinkFormat::AppendOnly,
593            encode: SinkEncode::Json,
594            options: BTreeMap::default(),
595            secret_refs: BTreeMap::default(),
596            key_encode: None,
597            connection_id: None,
598        };
599
600        let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc)
601            .await
602            .unwrap();
603
604        let chunk_a = StreamChunk::new(
605            vec![Op::Insert, Op::Insert, Op::Insert],
606            vec![
607                I32Array::from_iter(vec![1, 2, 3]).into_ref(),
608                Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
609            ],
610        );
611
612        let mut manager = DeliveryFutureManager::new(0);
613
614        redis_sink_writer
615            .write_chunk(chunk_a, manager.start_write_chunk(0, 0))
616            .await
617            .expect("failed to write batch");
618        let expected_a = vec![
619            (
620                0,
621                "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":1}\r\n$23\r\n{\"id\":1,\"name\":\"Alice\"}\r\n",
622            ),
623            (
624                1,
625                "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":2}\r\n$21\r\n{\"id\":2,\"name\":\"Bob\"}\r\n",
626            ),
627            (
628                2,
629                "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":3}\r\n$23\r\n{\"id\":3,\"name\":\"Clare\"}\r\n",
630            ),
631        ];
632
633        if let RedisPipe::Single(pipe) = &redis_sink_writer.payload_writer.pipe {
634            pipe.cmd_iter()
635                .enumerate()
636                .zip_eq_debug(expected_a.clone())
637                .for_each(|((i, cmd), (exp_i, exp_cmd))| {
638                    if exp_i == i {
639                        assert_eq!(exp_cmd, str::from_bytes(&cmd.get_packed_command()).unwrap())
640                    }
641                });
642        } else {
643            panic!("pipe type not match")
644        }
645    }
646
647    #[tokio::test]
648    async fn test_format_write() {
649        let schema = Schema::new(vec![
650            Field {
651                data_type: DataType::Int32,
652                name: "id".to_owned(),
653            },
654            Field {
655                data_type: DataType::Varchar,
656                name: "name".to_owned(),
657            },
658        ]);
659
660        let mut btree_map = BTreeMap::default();
661        btree_map.insert(KEY_FORMAT.to_owned(), "key-{id}".to_owned());
662        btree_map.insert(
663            VALUE_FORMAT.to_owned(),
664            "values:\\{id:{id},name:{name}\\}".to_owned(),
665        );
666        let format_desc = SinkFormatDesc {
667            format: SinkFormat::AppendOnly,
668            encode: SinkEncode::Template,
669            options: btree_map,
670            secret_refs: Default::default(),
671            key_encode: None,
672            connection_id: None,
673        };
674
675        let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc)
676            .await
677            .unwrap();
678
679        let mut future_manager = DeliveryFutureManager::new(0);
680
681        let chunk_a = StreamChunk::new(
682            vec![Op::Insert, Op::Insert, Op::Insert],
683            vec![
684                I32Array::from_iter(vec![1, 2, 3]).into_ref(),
685                Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
686            ],
687        );
688
689        redis_sink_writer
690            .write_chunk(chunk_a, future_manager.start_write_chunk(0, 0))
691            .await
692            .expect("failed to write batch");
693        let expected_a = vec![
694            (
695                0,
696                "*3\r\n$3\r\nSET\r\n$5\r\nkey-1\r\n$24\r\nvalues:{id:1,name:Alice}\r\n",
697            ),
698            (
699                1,
700                "*3\r\n$3\r\nSET\r\n$5\r\nkey-2\r\n$22\r\nvalues:{id:2,name:Bob}\r\n",
701            ),
702            (
703                2,
704                "*3\r\n$3\r\nSET\r\n$5\r\nkey-3\r\n$24\r\nvalues:{id:3,name:Clare}\r\n",
705            ),
706        ];
707
708        if let RedisPipe::Single(pipe) = &redis_sink_writer.payload_writer.pipe {
709            pipe.cmd_iter()
710                .enumerate()
711                .zip_eq_debug(expected_a.clone())
712                .for_each(|((i, cmd), (exp_i, exp_cmd))| {
713                    if exp_i == i {
714                        assert_eq!(exp_cmd, str::from_bytes(&cmd.get_packed_command()).unwrap())
715                    }
716                });
717        } else {
718            panic!("pipe type not match")
719        };
720    }
721}