1use std::collections::{BTreeMap, HashMap};
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use phf::phf_set;
20use redis::aio::MultiplexedConnection;
21use redis::cluster::{ClusterClient, ClusterConnection, ClusterPipeline};
22use redis::{Client as RedisClient, Pipeline};
23use risingwave_common::array::StreamChunk;
24use risingwave_common::catalog::Schema;
25use risingwave_common::types::DataType;
26use serde::Deserialize;
27use serde_json::Value;
28use serde_with::serde_as;
29use with_options::WithOptions;
30
31use super::catalog::SinkFormatDesc;
32use super::encoder::template::{RedisSinkPayloadWriterInput, TemplateStringEncoder};
33use super::formatter::SinkFormatterImpl;
34use super::writer::FormattedSink;
35use super::{SinkError, SinkParam};
36use crate::dispatch_sink_formatter_str_key_impl;
37use crate::enforce_secret::EnforceSecret;
38use crate::error::ConnectorResult;
39use crate::sink::log_store::DeliveryFutureManagerAddFuture;
40use crate::sink::writer::{
41 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
42};
43use crate::sink::{Result, Sink, SinkWriterParam};
44
45pub const REDIS_SINK: &str = "redis";
46pub const KEY_FORMAT: &str = "key_format";
47pub const VALUE_FORMAT: &str = "value_format";
48pub const REDIS_VALUE_TYPE: &str = "redis_value_type";
49pub const REDIS_VALUE_TYPE_STRING: &str = "string";
50pub const REDIS_VALUE_TYPE_GEO: &str = "geospatial";
51pub const REDIS_VALUE_TYPE_PUBSUB: &str = "pubsub";
52pub const REDIS_VALUE_TYPE_STREAM: &str = "stream";
53pub const LON_NAME: &str = "longitude";
54pub const LAT_NAME: &str = "latitude";
55pub const MEMBER_NAME: &str = "member";
56pub const CHANNEL: &str = "channel";
57pub const CHANNEL_COLUMN: &str = "channel_column";
58pub const STREAM: &str = "stream";
59pub const STREAM_COLUMN: &str = "stream_column";
60
61#[derive(Deserialize, Debug, Clone, WithOptions)]
62pub struct RedisCommon {
63 #[serde(rename = "redis.url")]
64 pub url: String,
65}
66
67impl EnforceSecret for RedisCommon {
68 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf_set! {
69 "redis.url"
70 };
71}
72
73pub enum RedisPipe {
74 Cluster(ClusterPipeline),
75 Single(Pipeline),
76}
77impl RedisPipe {
78 pub async fn query<T: redis::FromRedisValue>(
79 &self,
80 conn: &mut RedisConn,
81 ) -> ConnectorResult<T> {
82 match (self, conn) {
83 (RedisPipe::Cluster(pipe), RedisConn::Cluster(conn)) => Ok(pipe.query(conn)?),
84 (RedisPipe::Single(pipe), RedisConn::Single(conn)) => {
85 Ok(pipe.query_async(conn).await?)
86 }
87 _ => Err(SinkError::Redis("RedisPipe and RedisConn not match".to_owned()).into()),
88 }
89 }
90
91 pub fn clear(&mut self) {
92 match self {
93 RedisPipe::Cluster(pipe) => pipe.clear(),
94 RedisPipe::Single(pipe) => pipe.clear(),
95 }
96 }
97
98 pub fn set(
99 &mut self,
100 k: RedisSinkPayloadWriterInput,
101 v: RedisSinkPayloadWriterInput,
102 ) -> Result<()> {
103 match self {
104 RedisPipe::Cluster(pipe) => match (k, v) {
105 (
106 RedisSinkPayloadWriterInput::String(k),
107 RedisSinkPayloadWriterInput::String(v),
108 ) => {
109 pipe.set(k, v);
110 }
111 (
112 RedisSinkPayloadWriterInput::RedisGeoKey((key, member)),
113 RedisSinkPayloadWriterInput::RedisGeoValue((lat, lon)),
114 ) => {
115 pipe.geo_add(key, (lon, lat, member));
116 }
117 (
118 RedisSinkPayloadWriterInput::RedisPubSubStreamKey(key),
119 RedisSinkPayloadWriterInput::String(v),
120 ) => {
121 pipe.publish(key, v);
122 }
123 (
124 RedisSinkPayloadWriterInput::RedisPubSubStreamKey(key),
125 RedisSinkPayloadWriterInput::RedisStreamValue((field, value)),
126 ) => {
127 pipe.xadd(key, "*", &[(&field, &value)]);
128 }
129 _ => return Err(SinkError::Redis("RedisPipe set not match".to_owned())),
130 },
131 RedisPipe::Single(pipe) => match (k, v) {
132 (
133 RedisSinkPayloadWriterInput::String(k),
134 RedisSinkPayloadWriterInput::String(v),
135 ) => {
136 pipe.set(k, v);
137 }
138 (
139 RedisSinkPayloadWriterInput::RedisGeoKey((key, member)),
140 RedisSinkPayloadWriterInput::RedisGeoValue((lat, lon)),
141 ) => {
142 pipe.geo_add(key, (lon, lat, member));
143 }
144 (
145 RedisSinkPayloadWriterInput::RedisPubSubStreamKey(key),
146 RedisSinkPayloadWriterInput::String(v),
147 ) => {
148 pipe.publish(key, v);
149 }
150 (
151 RedisSinkPayloadWriterInput::RedisPubSubStreamKey(key),
152 RedisSinkPayloadWriterInput::RedisStreamValue((field, value)),
153 ) => {
154 pipe.xadd(key, "*", &[(&field, &value)]);
155 }
156 _ => return Err(SinkError::Redis("RedisPipe set not match".to_owned())),
157 },
158 };
159 Ok(())
160 }
161
162 pub fn del(&mut self, k: RedisSinkPayloadWriterInput) -> Result<()> {
163 match self {
164 RedisPipe::Cluster(pipe) => match k {
165 RedisSinkPayloadWriterInput::String(k) => {
166 pipe.del(k);
167 }
168 RedisSinkPayloadWriterInput::RedisGeoKey((key, member)) => {
169 pipe.zrem(key, member);
170 }
171 _ => return Err(SinkError::Redis("RedisPipe del not match".to_owned())),
172 },
173 RedisPipe::Single(pipe) => match k {
174 RedisSinkPayloadWriterInput::String(k) => {
175 pipe.del(k);
176 }
177 RedisSinkPayloadWriterInput::RedisGeoKey((key, member)) => {
178 pipe.zrem(key, member);
179 }
180 _ => return Err(SinkError::Redis("RedisPipe del not match".to_owned())),
181 },
182 };
183 Ok(())
184 }
185}
186pub enum RedisConn {
187 Cluster(ClusterConnection),
189 Single(MultiplexedConnection),
191}
192
193impl RedisCommon {
194 pub async fn build_conn_and_pipe(&self) -> ConnectorResult<(RedisConn, RedisPipe)> {
195 match serde_json::from_str(&self.url).map_err(|e| SinkError::Config(anyhow!(e))) {
196 Ok(v) => {
197 if let Value::Array(list) = v {
198 let list = list
199 .into_iter()
200 .map(|s| {
201 if let Value::String(s) = s {
202 Ok(s)
203 } else {
204 Err(SinkError::Redis(
205 "redis.url must be array of string".to_owned(),
206 )
207 .into())
208 }
209 })
210 .collect::<ConnectorResult<Vec<String>>>()?;
211
212 let client = ClusterClient::new(list)?;
213 Ok((
214 RedisConn::Cluster(client.get_connection()?),
215 RedisPipe::Cluster(redis::cluster::cluster_pipe()),
216 ))
217 } else {
218 Err(SinkError::Redis("redis.url must be array or string".to_owned()).into())
219 }
220 }
221 Err(_) => {
222 let client = RedisClient::open(self.url.clone())?;
223 Ok((
224 RedisConn::Single(client.get_multiplexed_async_connection().await?),
225 RedisPipe::Single(redis::pipe()),
226 ))
227 }
228 }
229 }
230}
231
232#[serde_as]
233#[derive(Clone, Debug, Deserialize, WithOptions)]
234pub struct RedisConfig {
235 #[serde(flatten)]
236 pub common: RedisCommon,
237}
238
239impl EnforceSecret for RedisConfig {
240 fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
241 for prop in prop_iter {
242 RedisCommon::enforce_one(prop)?;
243 }
244 Ok(())
245 }
246}
247
248impl RedisConfig {
249 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
250 let config =
251 serde_json::from_value::<RedisConfig>(serde_json::to_value(properties).unwrap())
252 .map_err(|e| SinkError::Config(anyhow!(e)))?;
253 Ok(config)
254 }
255}
256
257#[derive(Debug)]
258pub struct RedisSink {
259 config: RedisConfig,
260 schema: Schema,
261 pk_indices: Vec<usize>,
262 format_desc: SinkFormatDesc,
263 db_name: String,
264 sink_from_name: String,
265}
266
267impl EnforceSecret for RedisSink {
268 fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
269 for prop in prop_iter {
270 RedisConfig::enforce_one(prop)?;
271 }
272 Ok(())
273 }
274}
275
276#[async_trait]
277impl TryFrom<SinkParam> for RedisSink {
278 type Error = SinkError;
279
280 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
281 let Some(pk_indices) = param.downstream_pk.clone() else {
282 return Err(SinkError::Config(anyhow!(
283 "Redis Sink Primary Key must be specified."
284 )));
285 };
286 let config = RedisConfig::from_btreemap(param.properties.clone())?;
287 Ok(Self {
288 config,
289 schema: param.schema(),
290 pk_indices,
291 format_desc: param
292 .format_desc
293 .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
294 db_name: param.db_name,
295 sink_from_name: param.sink_from_name,
296 })
297 }
298}
299
300impl Sink for RedisSink {
301 type LogSinker = AsyncTruncateLogSinkerOf<RedisSinkWriter>;
302
303 const SINK_NAME: &'static str = "redis";
304
305 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
306 Ok(RedisSinkWriter::new(
307 self.config.clone(),
308 self.schema.clone(),
309 self.pk_indices.clone(),
310 &self.format_desc,
311 self.db_name.clone(),
312 self.sink_from_name.clone(),
313 )
314 .await?
315 .into_log_sinker(usize::MAX))
316 }
317
318 async fn validate(&self) -> Result<()> {
319 let all_map: HashMap<String, DataType> = self
320 .schema
321 .fields()
322 .iter()
323 .map(|f| (f.name.clone(), f.data_type.clone()))
324 .collect();
325 let pk_map: HashMap<String, DataType> = self
326 .schema
327 .fields()
328 .iter()
329 .enumerate()
330 .filter(|(k, _)| self.pk_indices.contains(k))
331 .map(|(_, v)| (v.name.clone(), v.data_type.clone()))
332 .collect();
333 if matches!(
334 self.format_desc.encode,
335 super::catalog::SinkEncode::Template
336 ) {
337 match self
338 .format_desc
339 .options
340 .get(REDIS_VALUE_TYPE)
341 .map(|s| s.as_str())
342 {
343 Some(REDIS_VALUE_TYPE_STRING) | None => {
345 let key_format = self.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
346 SinkError::Config(anyhow!(
347 "Cannot find '{KEY_FORMAT}', please set it or use JSON"
348 ))
349 })?;
350 TemplateStringEncoder::check_string_format(key_format, &pk_map)?;
351 let value_format =
352 self.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
353 SinkError::Config(anyhow!(
354 "Cannot find `{VALUE_FORMAT}`, please set it or use JSON"
355 ))
356 })?;
357 TemplateStringEncoder::check_string_format(value_format, &all_map)?;
358 }
359 Some(REDIS_VALUE_TYPE_GEO) => {
360 let key_format = self.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
361 SinkError::Config(anyhow!(
362 "Cannot find '{KEY_FORMAT}', please set it or use JSON"
363 ))
364 })?;
365 TemplateStringEncoder::check_string_format(key_format, &pk_map)?;
366
367 let lon_name = self.format_desc.options.get(LON_NAME).ok_or_else(|| {
368 SinkError::Config(anyhow!(
369 "Cannot find `{LON_NAME}`, please set it or use JSON or set `{REDIS_VALUE_TYPE}` to `{REDIS_VALUE_TYPE_STRING}`"
370 ))
371 })?;
372 let lat_name = self.format_desc.options.get(LAT_NAME).ok_or_else(|| {
373 SinkError::Config(anyhow!(
374 "Cannot find `{LAT_NAME}`, please set it or use JSON or set `{REDIS_VALUE_TYPE}` to `{REDIS_VALUE_TYPE_STRING}`"
375 ))
376 })?;
377 let member_name = self.format_desc.options.get(MEMBER_NAME).ok_or_else(|| {
378 SinkError::Config(anyhow!(
379 "Cannot find `{MEMBER_NAME}`, please set it or use JSON or set `{REDIS_VALUE_TYPE}` to `{REDIS_VALUE_TYPE_STRING}`"
380 ))
381 })?;
382 if let Some(lon_type) = all_map.get(lon_name)
383 && (lon_type == &DataType::Float64
384 || lon_type == &DataType::Float32
385 || lon_type == &DataType::Varchar)
386 {
387 } else {
389 return Err(SinkError::Config(anyhow!(
390 "`{LON_NAME}` must be set to `float64` or `float32` or `varchar`"
391 )));
392 }
393 if let Some(lat_type) = all_map.get(lat_name)
394 && (lat_type == &DataType::Float64
395 || lat_type == &DataType::Float32
396 || lat_type == &DataType::Varchar)
397 {
398 } else {
400 return Err(SinkError::Config(anyhow!(
401 "`{LAT_NAME}` must be set to `float64` or `float32` or `varchar`"
402 )));
403 }
404 if let Some(member_type) = pk_map.get(member_name)
405 && member_type == &DataType::Varchar
406 {
407 } else {
409 return Err(SinkError::Config(anyhow!(
410 "`{MEMBER_NAME}` must be set to `varchar` and `primary_key`"
411 )));
412 }
413 }
414 Some(REDIS_VALUE_TYPE_PUBSUB) => {
415 let channel = self.format_desc.options.get(CHANNEL);
416 let channel_column = self.format_desc.options.get(CHANNEL_COLUMN);
417 if (channel.is_none() && channel_column.is_none())
418 || (channel.is_some() && channel_column.is_some())
419 {
420 return Err(SinkError::Config(anyhow!(
421 "`{CHANNEL}` and `{CHANNEL_COLUMN}` only one can be set"
422 )));
423 }
424
425 if let Some(channel_column) = channel_column
426 && let Some(channel_column_type) = all_map.get(channel_column)
427 && (channel_column_type != &DataType::Varchar)
428 {
429 return Err(SinkError::Config(anyhow!(
430 "`{CHANNEL_COLUMN}` must be set to `varchar`"
431 )));
432 }
433
434 let value_format =
435 self.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
436 SinkError::Config(anyhow!("Cannot find `{VALUE_FORMAT}`"))
437 })?;
438 TemplateStringEncoder::check_string_format(value_format, &all_map)?;
439 }
440 Some(REDIS_VALUE_TYPE_STREAM) => {
441 tracing::error!("test:for bug");
442 risingwave_common::license::Feature::RedisSinkStream
443 .check_available()
444 .map_err(|e| anyhow::anyhow!(e))?;
445 let stream = self.format_desc.options.get(STREAM);
446 let stream_column = self.format_desc.options.get(STREAM_COLUMN);
447 if (stream.is_none() && stream_column.is_none())
448 || (stream.is_some() && stream_column.is_some())
449 {
450 return Err(SinkError::Config(anyhow!(
451 "Please specific either `{STREAM}` or `{STREAM_COLUMN}`. They are mutually exclusive options."
452 )));
453 }
454
455 if let Some(stream_column) = stream_column
456 && let Some(stream_column_type) = all_map.get(stream_column)
457 && (stream_column_type != &DataType::Varchar)
458 {
459 return Err(SinkError::Config(anyhow!(
460 "`{STREAM_COLUMN}` must be set to `varchar`"
461 )));
462 }
463
464 let value_format =
465 self.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
466 SinkError::Config(anyhow!("Cannot find `{VALUE_FORMAT}`"))
467 })?;
468 let key_format = self.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
469 SinkError::Config(anyhow!(
470 "Cannot find '{KEY_FORMAT}', please set it or use JSON"
471 ))
472 })?;
473 TemplateStringEncoder::check_string_format(key_format, &pk_map)?;
474 TemplateStringEncoder::check_string_format(value_format, &all_map)?;
475 }
476 _ => {
477 return Err(SinkError::Config(anyhow!(
478 "`{REDIS_VALUE_TYPE}` must be set to `{REDIS_VALUE_TYPE_STRING}` or `{REDIS_VALUE_TYPE_GEO}` or `{REDIS_VALUE_TYPE_PUBSUB}` or `{REDIS_VALUE_TYPE_STREAM}`"
479 )));
480 }
481 }
482 }
483 self.config.common.build_conn_and_pipe().await?;
484 Ok(())
485 }
486}
487
488pub struct RedisSinkWriter {
489 #[expect(dead_code)]
490 epoch: u64,
491 #[expect(dead_code)]
492 schema: Schema,
493 #[expect(dead_code)]
494 pk_indices: Vec<usize>,
495 formatter: SinkFormatterImpl,
496 payload_writer: RedisSinkPayloadWriter,
497}
498
499struct RedisSinkPayloadWriter {
500 conn: Option<RedisConn>,
502 pipe: RedisPipe,
504}
505
506impl RedisSinkPayloadWriter {
507 pub async fn new(config: RedisConfig) -> Result<Self> {
508 let (conn, pipe) = config.common.build_conn_and_pipe().await?;
509 let conn = Some(conn);
510
511 Ok(Self { conn, pipe })
512 }
513
514 #[cfg(test)]
515 pub fn mock() -> Self {
516 let conn = None;
517 let pipe = RedisPipe::Single(redis::pipe());
518 Self { conn, pipe }
519 }
520
521 pub async fn commit(&mut self) -> Result<()> {
522 #[cfg(test)]
523 {
524 if self.conn.is_none() {
525 return Ok(());
526 }
527 }
528 self.pipe.query::<()>(self.conn.as_mut().unwrap()).await?;
529 self.pipe.clear();
530 Ok(())
531 }
532}
533
534impl FormattedSink for RedisSinkPayloadWriter {
535 type K = RedisSinkPayloadWriterInput;
536 type V = RedisSinkPayloadWriterInput;
537
538 async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
539 let k = k.ok_or_else(|| SinkError::Redis("The redis key cannot be null".to_owned()))?;
540 match v {
541 Some(v) => self.pipe.set(k, v)?,
542 None => self.pipe.del(k)?,
543 };
544 Ok(())
545 }
546}
547
548impl RedisSinkWriter {
549 pub async fn new(
550 config: RedisConfig,
551 schema: Schema,
552 pk_indices: Vec<usize>,
553 format_desc: &SinkFormatDesc,
554 db_name: String,
555 sink_from_name: String,
556 ) -> Result<Self> {
557 let payload_writer = RedisSinkPayloadWriter::new(config.clone()).await?;
558 let formatter = SinkFormatterImpl::new(
559 format_desc,
560 schema.clone(),
561 pk_indices.clone(),
562 db_name,
563 sink_from_name,
564 "NO_TOPIC",
565 )
566 .await?;
567
568 Ok(Self {
569 schema,
570 pk_indices,
571 epoch: 0,
572 formatter,
573 payload_writer,
574 })
575 }
576
577 #[cfg(test)]
578 pub async fn mock(
579 schema: Schema,
580 pk_indices: Vec<usize>,
581 format_desc: &SinkFormatDesc,
582 ) -> Result<Self> {
583 let formatter = SinkFormatterImpl::new(
584 format_desc,
585 schema.clone(),
586 pk_indices.clone(),
587 "d1".to_owned(),
588 "t1".to_owned(),
589 "NO_TOPIC",
590 )
591 .await?;
592 Ok(Self {
593 schema,
594 pk_indices,
595 epoch: 0,
596 formatter,
597 payload_writer: RedisSinkPayloadWriter::mock(),
598 })
599 }
600}
601
602impl AsyncTruncateSinkWriter for RedisSinkWriter {
603 async fn write_chunk<'a>(
604 &'a mut self,
605 chunk: StreamChunk,
606 _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
607 ) -> Result<()> {
608 dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, {
609 self.payload_writer.write_chunk(chunk, formatter).await?;
610 self.payload_writer.commit().await
611 })
612 }
613}
614
615#[cfg(test)]
616mod test {
617 use core::panic;
618
619 use rdkafka::message::FromBytes;
620 use risingwave_common::array::{Array, I32Array, Op, Utf8Array};
621 use risingwave_common::catalog::Field;
622 use risingwave_common::types::DataType;
623 use risingwave_common::util::iter_util::ZipEqDebug;
624
625 use super::*;
626 use crate::sink::catalog::{SinkEncode, SinkFormat};
627 use crate::sink::log_store::DeliveryFutureManager;
628
629 #[tokio::test]
630 async fn test_write() {
631 let schema = Schema::new(vec![
632 Field {
633 data_type: DataType::Int32,
634 name: "id".to_owned(),
635 },
636 Field {
637 data_type: DataType::Varchar,
638 name: "name".to_owned(),
639 },
640 ]);
641
642 let format_desc = SinkFormatDesc {
643 format: SinkFormat::AppendOnly,
644 encode: SinkEncode::Json,
645 options: BTreeMap::default(),
646 secret_refs: BTreeMap::default(),
647 key_encode: None,
648 connection_id: None,
649 };
650
651 let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc)
652 .await
653 .unwrap();
654
655 let chunk_a = StreamChunk::new(
656 vec![Op::Insert, Op::Insert, Op::Insert],
657 vec![
658 I32Array::from_iter(vec![1, 2, 3]).into_ref(),
659 Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
660 ],
661 );
662
663 let mut manager = DeliveryFutureManager::new(0);
664
665 redis_sink_writer
666 .write_chunk(chunk_a, manager.start_write_chunk(0, 0))
667 .await
668 .expect("failed to write batch");
669 let expected_a = vec![
670 (
671 0,
672 "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":1}\r\n$23\r\n{\"id\":1,\"name\":\"Alice\"}\r\n",
673 ),
674 (
675 1,
676 "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":2}\r\n$21\r\n{\"id\":2,\"name\":\"Bob\"}\r\n",
677 ),
678 (
679 2,
680 "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":3}\r\n$23\r\n{\"id\":3,\"name\":\"Clare\"}\r\n",
681 ),
682 ];
683
684 if let RedisPipe::Single(pipe) = &redis_sink_writer.payload_writer.pipe {
685 pipe.cmd_iter()
686 .enumerate()
687 .zip_eq_debug(expected_a.clone())
688 .for_each(|((i, cmd), (exp_i, exp_cmd))| {
689 if exp_i == i {
690 assert_eq!(exp_cmd, str::from_bytes(&cmd.get_packed_command()).unwrap())
691 }
692 });
693 } else {
694 panic!("pipe type not match")
695 }
696 }
697
698 #[tokio::test]
699 async fn test_format_write() {
700 let schema = Schema::new(vec![
701 Field {
702 data_type: DataType::Int32,
703 name: "id".to_owned(),
704 },
705 Field {
706 data_type: DataType::Varchar,
707 name: "name".to_owned(),
708 },
709 ]);
710
711 let mut btree_map = BTreeMap::default();
712 btree_map.insert(KEY_FORMAT.to_owned(), "key-{id}".to_owned());
713 btree_map.insert(
714 VALUE_FORMAT.to_owned(),
715 "values:\\{id:{id},name:{name}\\}".to_owned(),
716 );
717 let format_desc = SinkFormatDesc {
718 format: SinkFormat::AppendOnly,
719 encode: SinkEncode::Template,
720 options: btree_map,
721 secret_refs: Default::default(),
722 key_encode: None,
723 connection_id: None,
724 };
725
726 let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc)
727 .await
728 .unwrap();
729
730 let mut future_manager = DeliveryFutureManager::new(0);
731
732 let chunk_a = StreamChunk::new(
733 vec![Op::Insert, Op::Insert, Op::Insert],
734 vec![
735 I32Array::from_iter(vec![1, 2, 3]).into_ref(),
736 Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
737 ],
738 );
739
740 redis_sink_writer
741 .write_chunk(chunk_a, future_manager.start_write_chunk(0, 0))
742 .await
743 .expect("failed to write batch");
744 let expected_a = vec![
745 (
746 0,
747 "*3\r\n$3\r\nSET\r\n$5\r\nkey-1\r\n$24\r\nvalues:{id:1,name:Alice}\r\n",
748 ),
749 (
750 1,
751 "*3\r\n$3\r\nSET\r\n$5\r\nkey-2\r\n$22\r\nvalues:{id:2,name:Bob}\r\n",
752 ),
753 (
754 2,
755 "*3\r\n$3\r\nSET\r\n$5\r\nkey-3\r\n$24\r\nvalues:{id:3,name:Clare}\r\n",
756 ),
757 ];
758
759 if let RedisPipe::Single(pipe) = &redis_sink_writer.payload_writer.pipe {
760 pipe.cmd_iter()
761 .enumerate()
762 .zip_eq_debug(expected_a.clone())
763 .for_each(|((i, cmd), (exp_i, exp_cmd))| {
764 if exp_i == i {
765 assert_eq!(exp_cmd, str::from_bytes(&cmd.get_packed_command()).unwrap())
766 }
767 });
768 } else {
769 panic!("pipe type not match")
770 };
771 }
772}