1use std::collections::{BTreeMap, HashMap};
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use phf::phf_set;
20use redis::aio::MultiplexedConnection;
21use redis::cluster::{ClusterClient, ClusterConnection, ClusterPipeline};
22use redis::{Client as RedisClient, Pipeline};
23use risingwave_common::array::StreamChunk;
24use risingwave_common::catalog::Schema;
25use risingwave_common::types::DataType;
26use serde_derive::Deserialize;
27use serde_json::Value;
28use serde_with::serde_as;
29use with_options::WithOptions;
30
31use super::catalog::SinkFormatDesc;
32use super::encoder::template::{RedisSinkPayloadWriterInput, TemplateStringEncoder};
33use super::formatter::SinkFormatterImpl;
34use super::writer::FormattedSink;
35use super::{SinkError, SinkParam};
36use crate::dispatch_sink_formatter_str_key_impl;
37use crate::enforce_secret::EnforceSecret;
38use crate::error::ConnectorResult;
39use crate::sink::log_store::DeliveryFutureManagerAddFuture;
40use crate::sink::writer::{
41 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
42};
43use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriterParam};
44
45pub const REDIS_SINK: &str = "redis";
46pub const KEY_FORMAT: &str = "key_format";
47pub const VALUE_FORMAT: &str = "value_format";
48pub const REDIS_VALUE_TYPE: &str = "redis_value_type";
49pub const REDIS_VALUE_TYPE_STRING: &str = "string";
50pub const REDIS_VALUE_TYPE_GEO: &str = "geospatial";
51pub const REDIS_VALUE_TYPE_PUBSUB: &str = "pubsub";
52pub const LON_NAME: &str = "longitude";
53pub const LAT_NAME: &str = "latitude";
54pub const MEMBER_NAME: &str = "member";
55pub const CHANNEL: &str = "channel";
56pub const CHANNEL_COLUMN: &str = "channel_column";
57
58#[derive(Deserialize, Debug, Clone, WithOptions)]
59pub struct RedisCommon {
60 #[serde(rename = "redis.url")]
61 pub url: String,
62}
63
64impl EnforceSecret for RedisCommon {
65 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf_set! {
66 "redis.url"
67 };
68}
69
70pub enum RedisPipe {
71 Cluster(ClusterPipeline),
72 Single(Pipeline),
73}
74impl RedisPipe {
75 pub async fn query<T: redis::FromRedisValue>(
76 &self,
77 conn: &mut RedisConn,
78 ) -> ConnectorResult<T> {
79 match (self, conn) {
80 (RedisPipe::Cluster(pipe), RedisConn::Cluster(conn)) => Ok(pipe.query(conn)?),
81 (RedisPipe::Single(pipe), RedisConn::Single(conn)) => {
82 Ok(pipe.query_async(conn).await?)
83 }
84 _ => Err(SinkError::Redis("RedisPipe and RedisConn not match".to_owned()).into()),
85 }
86 }
87
88 pub fn clear(&mut self) {
89 match self {
90 RedisPipe::Cluster(pipe) => pipe.clear(),
91 RedisPipe::Single(pipe) => pipe.clear(),
92 }
93 }
94
95 pub fn set(
96 &mut self,
97 k: RedisSinkPayloadWriterInput,
98 v: RedisSinkPayloadWriterInput,
99 ) -> Result<()> {
100 match self {
101 RedisPipe::Cluster(pipe) => match (k, v) {
102 (
103 RedisSinkPayloadWriterInput::String(k),
104 RedisSinkPayloadWriterInput::String(v),
105 ) => {
106 pipe.set(k, v);
107 }
108 (
109 RedisSinkPayloadWriterInput::RedisGeoKey((key, member)),
110 RedisSinkPayloadWriterInput::RedisGeoValue((lat, lon)),
111 ) => {
112 pipe.geo_add(key, (lon, lat, member));
113 }
114 (
115 RedisSinkPayloadWriterInput::RedisPubSubKey(key),
116 RedisSinkPayloadWriterInput::String(v),
117 ) => {
118 pipe.publish(key, v);
119 }
120 _ => return Err(SinkError::Redis("RedisPipe set not match".to_owned())),
121 },
122 RedisPipe::Single(pipe) => match (k, v) {
123 (
124 RedisSinkPayloadWriterInput::String(k),
125 RedisSinkPayloadWriterInput::String(v),
126 ) => {
127 pipe.set(k, v);
128 }
129 (
130 RedisSinkPayloadWriterInput::RedisGeoKey((key, member)),
131 RedisSinkPayloadWriterInput::RedisGeoValue((lat, lon)),
132 ) => {
133 pipe.geo_add(key, (lon, lat, member));
134 }
135 (
136 RedisSinkPayloadWriterInput::RedisPubSubKey(key),
137 RedisSinkPayloadWriterInput::String(v),
138 ) => {
139 pipe.publish(key, v);
140 }
141 _ => return Err(SinkError::Redis("RedisPipe set not match".to_owned())),
142 },
143 };
144 Ok(())
145 }
146
147 pub fn del(&mut self, k: RedisSinkPayloadWriterInput) -> Result<()> {
148 match self {
149 RedisPipe::Cluster(pipe) => match k {
150 RedisSinkPayloadWriterInput::String(k) => {
151 pipe.del(k);
152 }
153 RedisSinkPayloadWriterInput::RedisGeoKey((key, member)) => {
154 pipe.zrem(key, member);
155 }
156 _ => return Err(SinkError::Redis("RedisPipe del not match".to_owned())),
157 },
158 RedisPipe::Single(pipe) => match k {
159 RedisSinkPayloadWriterInput::String(k) => {
160 pipe.del(k);
161 }
162 RedisSinkPayloadWriterInput::RedisGeoKey((key, member)) => {
163 pipe.zrem(key, member);
164 }
165 _ => return Err(SinkError::Redis("RedisPipe del not match".to_owned())),
166 },
167 };
168 Ok(())
169 }
170}
171pub enum RedisConn {
172 Cluster(ClusterConnection),
174 Single(MultiplexedConnection),
176}
177
178impl RedisCommon {
179 pub async fn build_conn_and_pipe(&self) -> ConnectorResult<(RedisConn, RedisPipe)> {
180 match serde_json::from_str(&self.url).map_err(|e| SinkError::Config(anyhow!(e))) {
181 Ok(v) => {
182 if let Value::Array(list) = v {
183 let list = list
184 .into_iter()
185 .map(|s| {
186 if let Value::String(s) = s {
187 Ok(s)
188 } else {
189 Err(SinkError::Redis(
190 "redis.url must be array of string".to_owned(),
191 )
192 .into())
193 }
194 })
195 .collect::<ConnectorResult<Vec<String>>>()?;
196
197 let client = ClusterClient::new(list)?;
198 Ok((
199 RedisConn::Cluster(client.get_connection()?),
200 RedisPipe::Cluster(redis::cluster::cluster_pipe()),
201 ))
202 } else {
203 Err(SinkError::Redis("redis.url must be array or string".to_owned()).into())
204 }
205 }
206 Err(_) => {
207 let client = RedisClient::open(self.url.clone())?;
208 Ok((
209 RedisConn::Single(client.get_multiplexed_async_connection().await?),
210 RedisPipe::Single(redis::pipe()),
211 ))
212 }
213 }
214 }
215}
216
217#[serde_as]
218#[derive(Clone, Debug, Deserialize, WithOptions)]
219pub struct RedisConfig {
220 #[serde(flatten)]
221 pub common: RedisCommon,
222}
223
224impl EnforceSecret for RedisConfig {
225 fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
226 for prop in prop_iter {
227 RedisCommon::enforce_one(prop)?;
228 }
229 Ok(())
230 }
231}
232
233impl RedisConfig {
234 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
235 let config =
236 serde_json::from_value::<RedisConfig>(serde_json::to_value(properties).unwrap())
237 .map_err(|e| SinkError::Config(anyhow!(e)))?;
238 Ok(config)
239 }
240}
241
242#[derive(Debug)]
243pub struct RedisSink {
244 config: RedisConfig,
245 schema: Schema,
246 pk_indices: Vec<usize>,
247 format_desc: SinkFormatDesc,
248 db_name: String,
249 sink_from_name: String,
250}
251
252impl EnforceSecret for RedisSink {
253 fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
254 for prop in prop_iter {
255 RedisConfig::enforce_one(prop)?;
256 }
257 Ok(())
258 }
259}
260
261#[async_trait]
262impl TryFrom<SinkParam> for RedisSink {
263 type Error = SinkError;
264
265 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
266 if param.downstream_pk.is_empty() {
267 return Err(SinkError::Config(anyhow!(
268 "Redis Sink Primary Key must be specified."
269 )));
270 }
271 let config = RedisConfig::from_btreemap(param.properties.clone())?;
272 Ok(Self {
273 config,
274 schema: param.schema(),
275 pk_indices: param.downstream_pk,
276 format_desc: param
277 .format_desc
278 .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
279 db_name: param.db_name,
280 sink_from_name: param.sink_from_name,
281 })
282 }
283}
284
285impl Sink for RedisSink {
286 type Coordinator = DummySinkCommitCoordinator;
287 type LogSinker = AsyncTruncateLogSinkerOf<RedisSinkWriter>;
288
289 const SINK_NAME: &'static str = "redis";
290
291 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
292 Ok(RedisSinkWriter::new(
293 self.config.clone(),
294 self.schema.clone(),
295 self.pk_indices.clone(),
296 &self.format_desc,
297 self.db_name.clone(),
298 self.sink_from_name.clone(),
299 )
300 .await?
301 .into_log_sinker(usize::MAX))
302 }
303
304 async fn validate(&self) -> Result<()> {
305 self.config.common.build_conn_and_pipe().await?;
306 let all_map: HashMap<String, DataType> = self
307 .schema
308 .fields()
309 .iter()
310 .map(|f| (f.name.clone(), f.data_type.clone()))
311 .collect();
312 let pk_map: HashMap<String, DataType> = self
313 .schema
314 .fields()
315 .iter()
316 .enumerate()
317 .filter(|(k, _)| self.pk_indices.contains(k))
318 .map(|(_, v)| (v.name.clone(), v.data_type.clone()))
319 .collect();
320 if matches!(
321 self.format_desc.encode,
322 super::catalog::SinkEncode::Template
323 ) {
324 match self
325 .format_desc
326 .options
327 .get(REDIS_VALUE_TYPE)
328 .map(|s| s.as_str())
329 {
330 Some(REDIS_VALUE_TYPE_STRING) | None => {
332 let key_format = self.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
333 SinkError::Config(anyhow!(
334 "Cannot find '{KEY_FORMAT}', please set it or use JSON"
335 ))
336 })?;
337 TemplateStringEncoder::check_string_format(key_format, &pk_map)?;
338 let value_format =
339 self.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
340 SinkError::Config(anyhow!(
341 "Cannot find `{VALUE_FORMAT}`, please set it or use JSON"
342 ))
343 })?;
344 TemplateStringEncoder::check_string_format(value_format, &all_map)?;
345 }
346 Some(REDIS_VALUE_TYPE_GEO) => {
347 let key_format = self.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
348 SinkError::Config(anyhow!(
349 "Cannot find '{KEY_FORMAT}', please set it or use JSON"
350 ))
351 })?;
352 TemplateStringEncoder::check_string_format(key_format, &pk_map)?;
353
354 let lon_name = self.format_desc.options.get(LON_NAME).ok_or_else(|| {
355 SinkError::Config(anyhow!(
356 "Cannot find `{LON_NAME}`, please set it or use JSON or set `{REDIS_VALUE_TYPE}` to `{REDIS_VALUE_TYPE_STRING}`"
357 ))
358 })?;
359 let lat_name = self.format_desc.options.get(LAT_NAME).ok_or_else(|| {
360 SinkError::Config(anyhow!(
361 "Cannot find `{LAT_NAME}`, please set it or use JSON or set `{REDIS_VALUE_TYPE}` to `{REDIS_VALUE_TYPE_STRING}`"
362 ))
363 })?;
364 let member_name = self.format_desc.options.get(MEMBER_NAME).ok_or_else(|| {
365 SinkError::Config(anyhow!(
366 "Cannot find `{MEMBER_NAME}`, please set it or use JSON or set `{REDIS_VALUE_TYPE}` to `{REDIS_VALUE_TYPE_STRING}`"
367 ))
368 })?;
369 if let Some(lon_type) = all_map.get(lon_name)
370 && (lon_type == &DataType::Float64
371 || lon_type == &DataType::Float32
372 || lon_type == &DataType::Varchar)
373 {
374 } else {
376 return Err(SinkError::Config(anyhow!(
377 "`{LON_NAME}` must be set to `float64` or `float32` or `varchar`"
378 )));
379 }
380 if let Some(lat_type) = all_map.get(lat_name)
381 && (lat_type == &DataType::Float64
382 || lat_type == &DataType::Float32
383 || lat_type == &DataType::Varchar)
384 {
385 } else {
387 return Err(SinkError::Config(anyhow!(
388 "`{LAT_NAME}` must be set to `float64` or `float32` or `varchar`"
389 )));
390 }
391 if let Some(member_type) = pk_map.get(member_name)
392 && member_type == &DataType::Varchar
393 {
394 } else {
396 return Err(SinkError::Config(anyhow!(
397 "`{MEMBER_NAME}` must be set to `varchar` and `primary_key`"
398 )));
399 }
400 }
401 Some(REDIS_VALUE_TYPE_PUBSUB) => {
402 let channel = self.format_desc.options.get(CHANNEL);
403 let channel_column = self.format_desc.options.get(CHANNEL_COLUMN);
404 if (channel.is_none() && channel_column.is_none())
405 || (channel.is_some() && channel_column.is_some())
406 {
407 return Err(SinkError::Config(anyhow!(
408 "`{CHANNEL}` and `{CHANNEL_COLUMN}` only one can be set"
409 )));
410 }
411
412 if let Some(channel_column) = channel_column
413 && let Some(channel_column_type) = all_map.get(channel_column)
414 && (channel_column_type != &DataType::Varchar)
415 {
416 return Err(SinkError::Config(anyhow!(
417 "`{CHANNEL_COLUMN}` must be set to `varchar`"
418 )));
419 }
420
421 let value_format =
422 self.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
423 SinkError::Config(anyhow!("Cannot find `{VALUE_FORMAT}`"))
424 })?;
425 TemplateStringEncoder::check_string_format(value_format, &all_map)?;
426 }
427 _ => {
428 return Err(SinkError::Config(anyhow!(
429 "`{REDIS_VALUE_TYPE}` must be set to `{REDIS_VALUE_TYPE_STRING}` or `{REDIS_VALUE_TYPE_GEO}` or `{REDIS_VALUE_TYPE_PUBSUB}`"
430 )));
431 }
432 }
433 }
434 Ok(())
435 }
436}
437
438pub struct RedisSinkWriter {
439 #[expect(dead_code)]
440 epoch: u64,
441 #[expect(dead_code)]
442 schema: Schema,
443 #[expect(dead_code)]
444 pk_indices: Vec<usize>,
445 formatter: SinkFormatterImpl,
446 payload_writer: RedisSinkPayloadWriter,
447}
448
449struct RedisSinkPayloadWriter {
450 conn: Option<RedisConn>,
452 pipe: RedisPipe,
454}
455
456impl RedisSinkPayloadWriter {
457 pub async fn new(config: RedisConfig) -> Result<Self> {
458 let (conn, pipe) = config.common.build_conn_and_pipe().await?;
459 let conn = Some(conn);
460
461 Ok(Self { conn, pipe })
462 }
463
464 #[cfg(test)]
465 pub fn mock() -> Self {
466 let conn = None;
467 let pipe = RedisPipe::Single(redis::pipe());
468 Self { conn, pipe }
469 }
470
471 pub async fn commit(&mut self) -> Result<()> {
472 #[cfg(test)]
473 {
474 if self.conn.is_none() {
475 return Ok(());
476 }
477 }
478 self.pipe.query::<()>(self.conn.as_mut().unwrap()).await?;
479 self.pipe.clear();
480 Ok(())
481 }
482}
483
484impl FormattedSink for RedisSinkPayloadWriter {
485 type K = RedisSinkPayloadWriterInput;
486 type V = RedisSinkPayloadWriterInput;
487
488 async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
489 let k = k.ok_or_else(|| SinkError::Redis("The redis key cannot be null".to_owned()))?;
490 match v {
491 Some(v) => self.pipe.set(k, v)?,
492 None => self.pipe.del(k)?,
493 };
494 Ok(())
495 }
496}
497
498impl RedisSinkWriter {
499 pub async fn new(
500 config: RedisConfig,
501 schema: Schema,
502 pk_indices: Vec<usize>,
503 format_desc: &SinkFormatDesc,
504 db_name: String,
505 sink_from_name: String,
506 ) -> Result<Self> {
507 let payload_writer = RedisSinkPayloadWriter::new(config.clone()).await?;
508 let formatter = SinkFormatterImpl::new(
509 format_desc,
510 schema.clone(),
511 pk_indices.clone(),
512 db_name,
513 sink_from_name,
514 "NO_TOPIC",
515 )
516 .await?;
517
518 Ok(Self {
519 schema,
520 pk_indices,
521 epoch: 0,
522 formatter,
523 payload_writer,
524 })
525 }
526
527 #[cfg(test)]
528 pub async fn mock(
529 schema: Schema,
530 pk_indices: Vec<usize>,
531 format_desc: &SinkFormatDesc,
532 ) -> Result<Self> {
533 let formatter = SinkFormatterImpl::new(
534 format_desc,
535 schema.clone(),
536 pk_indices.clone(),
537 "d1".to_owned(),
538 "t1".to_owned(),
539 "NO_TOPIC",
540 )
541 .await?;
542 Ok(Self {
543 schema,
544 pk_indices,
545 epoch: 0,
546 formatter,
547 payload_writer: RedisSinkPayloadWriter::mock(),
548 })
549 }
550}
551
552impl AsyncTruncateSinkWriter for RedisSinkWriter {
553 async fn write_chunk<'a>(
554 &'a mut self,
555 chunk: StreamChunk,
556 _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
557 ) -> Result<()> {
558 dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, {
559 self.payload_writer.write_chunk(chunk, formatter).await?;
560 self.payload_writer.commit().await
561 })
562 }
563}
564
565#[cfg(test)]
566mod test {
567 use core::panic;
568
569 use rdkafka::message::FromBytes;
570 use risingwave_common::array::{Array, I32Array, Op, Utf8Array};
571 use risingwave_common::catalog::Field;
572 use risingwave_common::types::DataType;
573 use risingwave_common::util::iter_util::ZipEqDebug;
574
575 use super::*;
576 use crate::sink::catalog::{SinkEncode, SinkFormat};
577 use crate::sink::log_store::DeliveryFutureManager;
578
579 #[tokio::test]
580 async fn test_write() {
581 let schema = Schema::new(vec![
582 Field {
583 data_type: DataType::Int32,
584 name: "id".to_owned(),
585 },
586 Field {
587 data_type: DataType::Varchar,
588 name: "name".to_owned(),
589 },
590 ]);
591
592 let format_desc = SinkFormatDesc {
593 format: SinkFormat::AppendOnly,
594 encode: SinkEncode::Json,
595 options: BTreeMap::default(),
596 secret_refs: BTreeMap::default(),
597 key_encode: None,
598 connection_id: None,
599 };
600
601 let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc)
602 .await
603 .unwrap();
604
605 let chunk_a = StreamChunk::new(
606 vec![Op::Insert, Op::Insert, Op::Insert],
607 vec![
608 I32Array::from_iter(vec![1, 2, 3]).into_ref(),
609 Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
610 ],
611 );
612
613 let mut manager = DeliveryFutureManager::new(0);
614
615 redis_sink_writer
616 .write_chunk(chunk_a, manager.start_write_chunk(0, 0))
617 .await
618 .expect("failed to write batch");
619 let expected_a = vec![
620 (
621 0,
622 "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":1}\r\n$23\r\n{\"id\":1,\"name\":\"Alice\"}\r\n",
623 ),
624 (
625 1,
626 "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":2}\r\n$21\r\n{\"id\":2,\"name\":\"Bob\"}\r\n",
627 ),
628 (
629 2,
630 "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":3}\r\n$23\r\n{\"id\":3,\"name\":\"Clare\"}\r\n",
631 ),
632 ];
633
634 if let RedisPipe::Single(pipe) = &redis_sink_writer.payload_writer.pipe {
635 pipe.cmd_iter()
636 .enumerate()
637 .zip_eq_debug(expected_a.clone())
638 .for_each(|((i, cmd), (exp_i, exp_cmd))| {
639 if exp_i == i {
640 assert_eq!(exp_cmd, str::from_bytes(&cmd.get_packed_command()).unwrap())
641 }
642 });
643 } else {
644 panic!("pipe type not match")
645 }
646 }
647
648 #[tokio::test]
649 async fn test_format_write() {
650 let schema = Schema::new(vec![
651 Field {
652 data_type: DataType::Int32,
653 name: "id".to_owned(),
654 },
655 Field {
656 data_type: DataType::Varchar,
657 name: "name".to_owned(),
658 },
659 ]);
660
661 let mut btree_map = BTreeMap::default();
662 btree_map.insert(KEY_FORMAT.to_owned(), "key-{id}".to_owned());
663 btree_map.insert(
664 VALUE_FORMAT.to_owned(),
665 "values:\\{id:{id},name:{name}\\}".to_owned(),
666 );
667 let format_desc = SinkFormatDesc {
668 format: SinkFormat::AppendOnly,
669 encode: SinkEncode::Template,
670 options: btree_map,
671 secret_refs: Default::default(),
672 key_encode: None,
673 connection_id: None,
674 };
675
676 let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc)
677 .await
678 .unwrap();
679
680 let mut future_manager = DeliveryFutureManager::new(0);
681
682 let chunk_a = StreamChunk::new(
683 vec![Op::Insert, Op::Insert, Op::Insert],
684 vec![
685 I32Array::from_iter(vec![1, 2, 3]).into_ref(),
686 Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
687 ],
688 );
689
690 redis_sink_writer
691 .write_chunk(chunk_a, future_manager.start_write_chunk(0, 0))
692 .await
693 .expect("failed to write batch");
694 let expected_a = vec![
695 (
696 0,
697 "*3\r\n$3\r\nSET\r\n$5\r\nkey-1\r\n$24\r\nvalues:{id:1,name:Alice}\r\n",
698 ),
699 (
700 1,
701 "*3\r\n$3\r\nSET\r\n$5\r\nkey-2\r\n$22\r\nvalues:{id:2,name:Bob}\r\n",
702 ),
703 (
704 2,
705 "*3\r\n$3\r\nSET\r\n$5\r\nkey-3\r\n$24\r\nvalues:{id:3,name:Clare}\r\n",
706 ),
707 ];
708
709 if let RedisPipe::Single(pipe) = &redis_sink_writer.payload_writer.pipe {
710 pipe.cmd_iter()
711 .enumerate()
712 .zip_eq_debug(expected_a.clone())
713 .for_each(|((i, cmd), (exp_i, exp_cmd))| {
714 if exp_i == i {
715 assert_eq!(exp_cmd, str::from_bytes(&cmd.get_packed_command()).unwrap())
716 }
717 });
718 } else {
719 panic!("pipe type not match")
720 };
721 }
722}