1use std::collections::{BTreeMap, HashMap};
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use redis::aio::MultiplexedConnection;
20use redis::cluster::{ClusterClient, ClusterConnection, ClusterPipeline};
21use redis::{Client as RedisClient, Pipeline};
22use risingwave_common::array::StreamChunk;
23use risingwave_common::catalog::Schema;
24use risingwave_common::types::DataType;
25use serde_derive::Deserialize;
26use serde_json::Value;
27use serde_with::serde_as;
28use with_options::WithOptions;
29
30use super::catalog::SinkFormatDesc;
31use super::encoder::template::{RedisSinkPayloadWriterInput, TemplateStringEncoder};
32use super::formatter::SinkFormatterImpl;
33use super::writer::FormattedSink;
34use super::{SinkError, SinkParam};
35use crate::dispatch_sink_formatter_str_key_impl;
36use crate::error::ConnectorResult;
37use crate::sink::log_store::DeliveryFutureManagerAddFuture;
38use crate::sink::writer::{
39 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
40};
41use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriterParam};
42
43pub const REDIS_SINK: &str = "redis";
44pub const KEY_FORMAT: &str = "key_format";
45pub const VALUE_FORMAT: &str = "value_format";
46pub const REDIS_VALUE_TYPE: &str = "redis_value_type";
47pub const REDIS_VALUE_TYPE_STRING: &str = "string";
48pub const REDIS_VALUE_TYPE_GEO: &str = "geospatial";
49pub const REDIS_VALUE_TYPE_PUBSUB: &str = "pubsub";
50pub const LON_NAME: &str = "longitude";
51pub const LAT_NAME: &str = "latitude";
52pub const MEMBER_NAME: &str = "member";
53pub const CHANNEL: &str = "channel";
54pub const CHANNEL_COLUMN: &str = "channel_column";
55
56#[derive(Deserialize, Debug, Clone, WithOptions)]
57pub struct RedisCommon {
58 #[serde(rename = "redis.url")]
59 pub url: String,
60}
61
62pub enum RedisPipe {
63 Cluster(ClusterPipeline),
64 Single(Pipeline),
65}
66impl RedisPipe {
67 pub async fn query<T: redis::FromRedisValue>(
68 &self,
69 conn: &mut RedisConn,
70 ) -> ConnectorResult<T> {
71 match (self, conn) {
72 (RedisPipe::Cluster(pipe), RedisConn::Cluster(conn)) => Ok(pipe.query(conn)?),
73 (RedisPipe::Single(pipe), RedisConn::Single(conn)) => {
74 Ok(pipe.query_async(conn).await?)
75 }
76 _ => Err(SinkError::Redis("RedisPipe and RedisConn not match".to_owned()).into()),
77 }
78 }
79
80 pub fn clear(&mut self) {
81 match self {
82 RedisPipe::Cluster(pipe) => pipe.clear(),
83 RedisPipe::Single(pipe) => pipe.clear(),
84 }
85 }
86
87 pub fn set(
88 &mut self,
89 k: RedisSinkPayloadWriterInput,
90 v: RedisSinkPayloadWriterInput,
91 ) -> Result<()> {
92 match self {
93 RedisPipe::Cluster(pipe) => match (k, v) {
94 (
95 RedisSinkPayloadWriterInput::String(k),
96 RedisSinkPayloadWriterInput::String(v),
97 ) => {
98 pipe.set(k, v);
99 }
100 (
101 RedisSinkPayloadWriterInput::RedisGeoKey((key, member)),
102 RedisSinkPayloadWriterInput::RedisGeoValue((lat, lon)),
103 ) => {
104 pipe.geo_add(key, (lon, lat, member));
105 }
106 (
107 RedisSinkPayloadWriterInput::RedisPubSubKey(key),
108 RedisSinkPayloadWriterInput::String(v),
109 ) => {
110 pipe.publish(key, v);
111 }
112 _ => return Err(SinkError::Redis("RedisPipe set not match".to_owned())),
113 },
114 RedisPipe::Single(pipe) => match (k, v) {
115 (
116 RedisSinkPayloadWriterInput::String(k),
117 RedisSinkPayloadWriterInput::String(v),
118 ) => {
119 pipe.set(k, v);
120 }
121 (
122 RedisSinkPayloadWriterInput::RedisGeoKey((key, member)),
123 RedisSinkPayloadWriterInput::RedisGeoValue((lat, lon)),
124 ) => {
125 pipe.geo_add(key, (lon, lat, member));
126 }
127 (
128 RedisSinkPayloadWriterInput::RedisPubSubKey(key),
129 RedisSinkPayloadWriterInput::String(v),
130 ) => {
131 pipe.publish(key, v);
132 }
133 _ => return Err(SinkError::Redis("RedisPipe set not match".to_owned())),
134 },
135 };
136 Ok(())
137 }
138
139 pub fn del(&mut self, k: RedisSinkPayloadWriterInput) -> Result<()> {
140 match self {
141 RedisPipe::Cluster(pipe) => match k {
142 RedisSinkPayloadWriterInput::String(k) => {
143 pipe.del(k);
144 }
145 RedisSinkPayloadWriterInput::RedisGeoKey((key, member)) => {
146 pipe.zrem(key, member);
147 }
148 _ => return Err(SinkError::Redis("RedisPipe del not match".to_owned())),
149 },
150 RedisPipe::Single(pipe) => match k {
151 RedisSinkPayloadWriterInput::String(k) => {
152 pipe.del(k);
153 }
154 RedisSinkPayloadWriterInput::RedisGeoKey((key, member)) => {
155 pipe.zrem(key, member);
156 }
157 _ => return Err(SinkError::Redis("RedisPipe del not match".to_owned())),
158 },
159 };
160 Ok(())
161 }
162}
163pub enum RedisConn {
164 Cluster(ClusterConnection),
166 Single(MultiplexedConnection),
168}
169
170impl RedisCommon {
171 pub async fn build_conn_and_pipe(&self) -> ConnectorResult<(RedisConn, RedisPipe)> {
172 match serde_json::from_str(&self.url).map_err(|e| SinkError::Config(anyhow!(e))) {
173 Ok(v) => {
174 if let Value::Array(list) = v {
175 let list = list
176 .into_iter()
177 .map(|s| {
178 if let Value::String(s) = s {
179 Ok(s)
180 } else {
181 Err(SinkError::Redis(
182 "redis.url must be array of string".to_owned(),
183 )
184 .into())
185 }
186 })
187 .collect::<ConnectorResult<Vec<String>>>()?;
188
189 let client = ClusterClient::new(list)?;
190 Ok((
191 RedisConn::Cluster(client.get_connection()?),
192 RedisPipe::Cluster(redis::cluster::cluster_pipe()),
193 ))
194 } else {
195 Err(SinkError::Redis("redis.url must be array or string".to_owned()).into())
196 }
197 }
198 Err(_) => {
199 let client = RedisClient::open(self.url.clone())?;
200 Ok((
201 RedisConn::Single(client.get_multiplexed_async_connection().await?),
202 RedisPipe::Single(redis::pipe()),
203 ))
204 }
205 }
206 }
207}
208
209#[serde_as]
210#[derive(Clone, Debug, Deserialize, WithOptions)]
211pub struct RedisConfig {
212 #[serde(flatten)]
213 pub common: RedisCommon,
214}
215
216impl RedisConfig {
217 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
218 let config =
219 serde_json::from_value::<RedisConfig>(serde_json::to_value(properties).unwrap())
220 .map_err(|e| SinkError::Config(anyhow!(e)))?;
221 Ok(config)
222 }
223}
224
225#[derive(Debug)]
226pub struct RedisSink {
227 config: RedisConfig,
228 schema: Schema,
229 pk_indices: Vec<usize>,
230 format_desc: SinkFormatDesc,
231 db_name: String,
232 sink_from_name: String,
233}
234
235#[async_trait]
236impl TryFrom<SinkParam> for RedisSink {
237 type Error = SinkError;
238
239 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
240 if param.downstream_pk.is_empty() {
241 return Err(SinkError::Config(anyhow!(
242 "Redis Sink Primary Key must be specified."
243 )));
244 }
245 let config = RedisConfig::from_btreemap(param.properties.clone())?;
246 Ok(Self {
247 config,
248 schema: param.schema(),
249 pk_indices: param.downstream_pk,
250 format_desc: param
251 .format_desc
252 .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
253 db_name: param.db_name,
254 sink_from_name: param.sink_from_name,
255 })
256 }
257}
258
259impl Sink for RedisSink {
260 type Coordinator = DummySinkCommitCoordinator;
261 type LogSinker = AsyncTruncateLogSinkerOf<RedisSinkWriter>;
262
263 const SINK_NAME: &'static str = "redis";
264
265 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
266 Ok(RedisSinkWriter::new(
267 self.config.clone(),
268 self.schema.clone(),
269 self.pk_indices.clone(),
270 &self.format_desc,
271 self.db_name.clone(),
272 self.sink_from_name.clone(),
273 )
274 .await?
275 .into_log_sinker(usize::MAX))
276 }
277
278 async fn validate(&self) -> Result<()> {
279 self.config.common.build_conn_and_pipe().await?;
280 let all_map: HashMap<String, DataType> = self
281 .schema
282 .fields()
283 .iter()
284 .map(|f| (f.name.clone(), f.data_type.clone()))
285 .collect();
286 let pk_map: HashMap<String, DataType> = self
287 .schema
288 .fields()
289 .iter()
290 .enumerate()
291 .filter(|(k, _)| self.pk_indices.contains(k))
292 .map(|(_, v)| (v.name.clone(), v.data_type.clone()))
293 .collect();
294 if matches!(
295 self.format_desc.encode,
296 super::catalog::SinkEncode::Template
297 ) {
298 match self
299 .format_desc
300 .options
301 .get(REDIS_VALUE_TYPE)
302 .map(|s| s.as_str())
303 {
304 Some(REDIS_VALUE_TYPE_STRING) | None => {
306 let key_format = self.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
307 SinkError::Config(anyhow!(
308 "Cannot find '{KEY_FORMAT}', please set it or use JSON"
309 ))
310 })?;
311 TemplateStringEncoder::check_string_format(key_format, &pk_map)?;
312 let value_format =
313 self.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
314 SinkError::Config(anyhow!(
315 "Cannot find `{VALUE_FORMAT}`, please set it or use JSON"
316 ))
317 })?;
318 TemplateStringEncoder::check_string_format(value_format, &all_map)?;
319 }
320 Some(REDIS_VALUE_TYPE_GEO) => {
321 let key_format = self.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
322 SinkError::Config(anyhow!(
323 "Cannot find '{KEY_FORMAT}', please set it or use JSON"
324 ))
325 })?;
326 TemplateStringEncoder::check_string_format(key_format, &pk_map)?;
327
328 let lon_name = self.format_desc.options.get(LON_NAME).ok_or_else(|| {
329 SinkError::Config(anyhow!(
330 "Cannot find `{LON_NAME}`, please set it or use JSON or set `{REDIS_VALUE_TYPE}` to `{REDIS_VALUE_TYPE_STRING}`"
331 ))
332 })?;
333 let lat_name = self.format_desc.options.get(LAT_NAME).ok_or_else(|| {
334 SinkError::Config(anyhow!(
335 "Cannot find `{LAT_NAME}`, please set it or use JSON or set `{REDIS_VALUE_TYPE}` to `{REDIS_VALUE_TYPE_STRING}`"
336 ))
337 })?;
338 let member_name = self.format_desc.options.get(MEMBER_NAME).ok_or_else(|| {
339 SinkError::Config(anyhow!(
340 "Cannot find `{MEMBER_NAME}`, please set it or use JSON or set `{REDIS_VALUE_TYPE}` to `{REDIS_VALUE_TYPE_STRING}`"
341 ))
342 })?;
343 if let Some(lon_type) = all_map.get(lon_name)
344 && (lon_type == &DataType::Float64
345 || lon_type == &DataType::Float32
346 || lon_type == &DataType::Varchar)
347 {
348 } else {
350 return Err(SinkError::Config(anyhow!(
351 "`{LON_NAME}` must be set to `float64` or `float32` or `varchar`"
352 )));
353 }
354 if let Some(lat_type) = all_map.get(lat_name)
355 && (lat_type == &DataType::Float64
356 || lat_type == &DataType::Float32
357 || lat_type == &DataType::Varchar)
358 {
359 } else {
361 return Err(SinkError::Config(anyhow!(
362 "`{LAT_NAME}` must be set to `float64` or `float32` or `varchar`"
363 )));
364 }
365 if let Some(member_type) = pk_map.get(member_name)
366 && member_type == &DataType::Varchar
367 {
368 } else {
370 return Err(SinkError::Config(anyhow!(
371 "`{MEMBER_NAME}` must be set to `varchar` and `primary_key`"
372 )));
373 }
374 }
375 Some(REDIS_VALUE_TYPE_PUBSUB) => {
376 let channel = self.format_desc.options.get(CHANNEL);
377 let channel_column = self.format_desc.options.get(CHANNEL_COLUMN);
378 if (channel.is_none() && channel_column.is_none())
379 || (channel.is_some() && channel_column.is_some())
380 {
381 return Err(SinkError::Config(anyhow!(
382 "`{CHANNEL}` and `{CHANNEL_COLUMN}` only one can be set"
383 )));
384 }
385
386 if let Some(channel_column) = channel_column
387 && let Some(channel_column_type) = all_map.get(channel_column)
388 && (channel_column_type != &DataType::Varchar)
389 {
390 return Err(SinkError::Config(anyhow!(
391 "`{CHANNEL_COLUMN}` must be set to `varchar`"
392 )));
393 }
394
395 let value_format =
396 self.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
397 SinkError::Config(anyhow!("Cannot find `{VALUE_FORMAT}`"))
398 })?;
399 TemplateStringEncoder::check_string_format(value_format, &all_map)?;
400 }
401 _ => {
402 return Err(SinkError::Config(anyhow!(
403 "`{REDIS_VALUE_TYPE}` must be set to `{REDIS_VALUE_TYPE_STRING}` or `{REDIS_VALUE_TYPE_GEO}` or `{REDIS_VALUE_TYPE_PUBSUB}`"
404 )));
405 }
406 }
407 }
408 Ok(())
409 }
410}
411
412pub struct RedisSinkWriter {
413 #[expect(dead_code)]
414 epoch: u64,
415 #[expect(dead_code)]
416 schema: Schema,
417 #[expect(dead_code)]
418 pk_indices: Vec<usize>,
419 formatter: SinkFormatterImpl,
420 payload_writer: RedisSinkPayloadWriter,
421}
422
423struct RedisSinkPayloadWriter {
424 conn: Option<RedisConn>,
426 pipe: RedisPipe,
428}
429
430impl RedisSinkPayloadWriter {
431 pub async fn new(config: RedisConfig) -> Result<Self> {
432 let (conn, pipe) = config.common.build_conn_and_pipe().await?;
433 let conn = Some(conn);
434
435 Ok(Self { conn, pipe })
436 }
437
438 #[cfg(test)]
439 pub fn mock() -> Self {
440 let conn = None;
441 let pipe = RedisPipe::Single(redis::pipe());
442 Self { conn, pipe }
443 }
444
445 pub async fn commit(&mut self) -> Result<()> {
446 #[cfg(test)]
447 {
448 if self.conn.is_none() {
449 return Ok(());
450 }
451 }
452 self.pipe.query::<()>(self.conn.as_mut().unwrap()).await?;
453 self.pipe.clear();
454 Ok(())
455 }
456}
457
458impl FormattedSink for RedisSinkPayloadWriter {
459 type K = RedisSinkPayloadWriterInput;
460 type V = RedisSinkPayloadWriterInput;
461
462 async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
463 let k = k.ok_or_else(|| SinkError::Redis("The redis key cannot be null".to_owned()))?;
464 match v {
465 Some(v) => self.pipe.set(k, v)?,
466 None => self.pipe.del(k)?,
467 };
468 Ok(())
469 }
470}
471
472impl RedisSinkWriter {
473 pub async fn new(
474 config: RedisConfig,
475 schema: Schema,
476 pk_indices: Vec<usize>,
477 format_desc: &SinkFormatDesc,
478 db_name: String,
479 sink_from_name: String,
480 ) -> Result<Self> {
481 let payload_writer = RedisSinkPayloadWriter::new(config.clone()).await?;
482 let formatter = SinkFormatterImpl::new(
483 format_desc,
484 schema.clone(),
485 pk_indices.clone(),
486 db_name,
487 sink_from_name,
488 "NO_TOPIC",
489 )
490 .await?;
491
492 Ok(Self {
493 schema,
494 pk_indices,
495 epoch: 0,
496 formatter,
497 payload_writer,
498 })
499 }
500
501 #[cfg(test)]
502 pub async fn mock(
503 schema: Schema,
504 pk_indices: Vec<usize>,
505 format_desc: &SinkFormatDesc,
506 ) -> Result<Self> {
507 let formatter = SinkFormatterImpl::new(
508 format_desc,
509 schema.clone(),
510 pk_indices.clone(),
511 "d1".to_owned(),
512 "t1".to_owned(),
513 "NO_TOPIC",
514 )
515 .await?;
516 Ok(Self {
517 schema,
518 pk_indices,
519 epoch: 0,
520 formatter,
521 payload_writer: RedisSinkPayloadWriter::mock(),
522 })
523 }
524}
525
526impl AsyncTruncateSinkWriter for RedisSinkWriter {
527 async fn write_chunk<'a>(
528 &'a mut self,
529 chunk: StreamChunk,
530 _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
531 ) -> Result<()> {
532 dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, {
533 self.payload_writer.write_chunk(chunk, formatter).await?;
534 self.payload_writer.commit().await
535 })
536 }
537}
538
539#[cfg(test)]
540mod test {
541 use core::panic;
542
543 use rdkafka::message::FromBytes;
544 use risingwave_common::array::{Array, I32Array, Op, Utf8Array};
545 use risingwave_common::catalog::Field;
546 use risingwave_common::types::DataType;
547 use risingwave_common::util::iter_util::ZipEqDebug;
548
549 use super::*;
550 use crate::sink::catalog::{SinkEncode, SinkFormat};
551 use crate::sink::log_store::DeliveryFutureManager;
552
553 #[tokio::test]
554 async fn test_write() {
555 let schema = Schema::new(vec![
556 Field {
557 data_type: DataType::Int32,
558 name: "id".to_owned(),
559 },
560 Field {
561 data_type: DataType::Varchar,
562 name: "name".to_owned(),
563 },
564 ]);
565
566 let format_desc = SinkFormatDesc {
567 format: SinkFormat::AppendOnly,
568 encode: SinkEncode::Json,
569 options: BTreeMap::default(),
570 secret_refs: BTreeMap::default(),
571 key_encode: None,
572 connection_id: None,
573 };
574
575 let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc)
576 .await
577 .unwrap();
578
579 let chunk_a = StreamChunk::new(
580 vec![Op::Insert, Op::Insert, Op::Insert],
581 vec![
582 I32Array::from_iter(vec![1, 2, 3]).into_ref(),
583 Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
584 ],
585 );
586
587 let mut manager = DeliveryFutureManager::new(0);
588
589 redis_sink_writer
590 .write_chunk(chunk_a, manager.start_write_chunk(0, 0))
591 .await
592 .expect("failed to write batch");
593 let expected_a = vec![
594 (
595 0,
596 "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":1}\r\n$23\r\n{\"id\":1,\"name\":\"Alice\"}\r\n",
597 ),
598 (
599 1,
600 "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":2}\r\n$21\r\n{\"id\":2,\"name\":\"Bob\"}\r\n",
601 ),
602 (
603 2,
604 "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":3}\r\n$23\r\n{\"id\":3,\"name\":\"Clare\"}\r\n",
605 ),
606 ];
607
608 if let RedisPipe::Single(pipe) = &redis_sink_writer.payload_writer.pipe {
609 pipe.cmd_iter()
610 .enumerate()
611 .zip_eq_debug(expected_a.clone())
612 .for_each(|((i, cmd), (exp_i, exp_cmd))| {
613 if exp_i == i {
614 assert_eq!(exp_cmd, str::from_bytes(&cmd.get_packed_command()).unwrap())
615 }
616 });
617 } else {
618 panic!("pipe type not match")
619 }
620 }
621
622 #[tokio::test]
623 async fn test_format_write() {
624 let schema = Schema::new(vec![
625 Field {
626 data_type: DataType::Int32,
627 name: "id".to_owned(),
628 },
629 Field {
630 data_type: DataType::Varchar,
631 name: "name".to_owned(),
632 },
633 ]);
634
635 let mut btree_map = BTreeMap::default();
636 btree_map.insert(KEY_FORMAT.to_owned(), "key-{id}".to_owned());
637 btree_map.insert(
638 VALUE_FORMAT.to_owned(),
639 "values:{id:{id},name:{name}}".to_owned(),
640 );
641 let format_desc = SinkFormatDesc {
642 format: SinkFormat::AppendOnly,
643 encode: SinkEncode::Template,
644 options: btree_map,
645 secret_refs: Default::default(),
646 key_encode: None,
647 connection_id: None,
648 };
649
650 let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc)
651 .await
652 .unwrap();
653
654 let mut future_manager = DeliveryFutureManager::new(0);
655
656 let chunk_a = StreamChunk::new(
657 vec![Op::Insert, Op::Insert, Op::Insert],
658 vec![
659 I32Array::from_iter(vec![1, 2, 3]).into_ref(),
660 Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
661 ],
662 );
663
664 redis_sink_writer
665 .write_chunk(chunk_a, future_manager.start_write_chunk(0, 0))
666 .await
667 .expect("failed to write batch");
668 let expected_a = vec![
669 (
670 0,
671 "*3\r\n$3\r\nSET\r\n$5\r\nkey-1\r\n$24\r\nvalues:{id:1,name:Alice}\r\n",
672 ),
673 (
674 1,
675 "*3\r\n$3\r\nSET\r\n$5\r\nkey-2\r\n$22\r\nvalues:{id:2,name:Bob}\r\n",
676 ),
677 (
678 2,
679 "*3\r\n$3\r\nSET\r\n$5\r\nkey-3\r\n$24\r\nvalues:{id:3,name:Clare}\r\n",
680 ),
681 ];
682
683 if let RedisPipe::Single(pipe) = &redis_sink_writer.payload_writer.pipe {
684 pipe.cmd_iter()
685 .enumerate()
686 .zip_eq_debug(expected_a.clone())
687 .for_each(|((i, cmd), (exp_i, exp_cmd))| {
688 if exp_i == i {
689 assert_eq!(exp_cmd, str::from_bytes(&cmd.get_packed_command()).unwrap())
690 }
691 });
692 } else {
693 panic!("pipe type not match")
694 };
695 }
696}