1use std::collections::{BTreeMap, HashMap};
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use phf::phf_set;
20use redis::aio::MultiplexedConnection;
21use redis::cluster::{ClusterClient, ClusterConnection, ClusterPipeline};
22use redis::{Client as RedisClient, Pipeline};
23use risingwave_common::array::StreamChunk;
24use risingwave_common::catalog::Schema;
25use risingwave_common::types::DataType;
26use serde_derive::Deserialize;
27use serde_json::Value;
28use serde_with::serde_as;
29use with_options::WithOptions;
30
31use super::catalog::SinkFormatDesc;
32use super::encoder::template::{RedisSinkPayloadWriterInput, TemplateStringEncoder};
33use super::formatter::SinkFormatterImpl;
34use super::writer::FormattedSink;
35use super::{SinkError, SinkParam};
36use crate::dispatch_sink_formatter_str_key_impl;
37use crate::enforce_secret::EnforceSecret;
38use crate::error::ConnectorResult;
39use crate::sink::log_store::DeliveryFutureManagerAddFuture;
40use crate::sink::writer::{
41 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
42};
43use crate::sink::{Result, Sink, SinkWriterParam};
44
45pub const REDIS_SINK: &str = "redis";
46pub const KEY_FORMAT: &str = "key_format";
47pub const VALUE_FORMAT: &str = "value_format";
48pub const REDIS_VALUE_TYPE: &str = "redis_value_type";
49pub const REDIS_VALUE_TYPE_STRING: &str = "string";
50pub const REDIS_VALUE_TYPE_GEO: &str = "geospatial";
51pub const REDIS_VALUE_TYPE_PUBSUB: &str = "pubsub";
52pub const LON_NAME: &str = "longitude";
53pub const LAT_NAME: &str = "latitude";
54pub const MEMBER_NAME: &str = "member";
55pub const CHANNEL: &str = "channel";
56pub const CHANNEL_COLUMN: &str = "channel_column";
57
58#[derive(Deserialize, Debug, Clone, WithOptions)]
59pub struct RedisCommon {
60 #[serde(rename = "redis.url")]
61 pub url: String,
62}
63
64impl EnforceSecret for RedisCommon {
65 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf_set! {
66 "redis.url"
67 };
68}
69
70pub enum RedisPipe {
71 Cluster(ClusterPipeline),
72 Single(Pipeline),
73}
74impl RedisPipe {
75 pub async fn query<T: redis::FromRedisValue>(
76 &self,
77 conn: &mut RedisConn,
78 ) -> ConnectorResult<T> {
79 match (self, conn) {
80 (RedisPipe::Cluster(pipe), RedisConn::Cluster(conn)) => Ok(pipe.query(conn)?),
81 (RedisPipe::Single(pipe), RedisConn::Single(conn)) => {
82 Ok(pipe.query_async(conn).await?)
83 }
84 _ => Err(SinkError::Redis("RedisPipe and RedisConn not match".to_owned()).into()),
85 }
86 }
87
88 pub fn clear(&mut self) {
89 match self {
90 RedisPipe::Cluster(pipe) => pipe.clear(),
91 RedisPipe::Single(pipe) => pipe.clear(),
92 }
93 }
94
95 pub fn set(
96 &mut self,
97 k: RedisSinkPayloadWriterInput,
98 v: RedisSinkPayloadWriterInput,
99 ) -> Result<()> {
100 match self {
101 RedisPipe::Cluster(pipe) => match (k, v) {
102 (
103 RedisSinkPayloadWriterInput::String(k),
104 RedisSinkPayloadWriterInput::String(v),
105 ) => {
106 pipe.set(k, v);
107 }
108 (
109 RedisSinkPayloadWriterInput::RedisGeoKey((key, member)),
110 RedisSinkPayloadWriterInput::RedisGeoValue((lat, lon)),
111 ) => {
112 pipe.geo_add(key, (lon, lat, member));
113 }
114 (
115 RedisSinkPayloadWriterInput::RedisPubSubKey(key),
116 RedisSinkPayloadWriterInput::String(v),
117 ) => {
118 pipe.publish(key, v);
119 }
120 _ => return Err(SinkError::Redis("RedisPipe set not match".to_owned())),
121 },
122 RedisPipe::Single(pipe) => match (k, v) {
123 (
124 RedisSinkPayloadWriterInput::String(k),
125 RedisSinkPayloadWriterInput::String(v),
126 ) => {
127 pipe.set(k, v);
128 }
129 (
130 RedisSinkPayloadWriterInput::RedisGeoKey((key, member)),
131 RedisSinkPayloadWriterInput::RedisGeoValue((lat, lon)),
132 ) => {
133 pipe.geo_add(key, (lon, lat, member));
134 }
135 (
136 RedisSinkPayloadWriterInput::RedisPubSubKey(key),
137 RedisSinkPayloadWriterInput::String(v),
138 ) => {
139 pipe.publish(key, v);
140 }
141 _ => return Err(SinkError::Redis("RedisPipe set not match".to_owned())),
142 },
143 };
144 Ok(())
145 }
146
147 pub fn del(&mut self, k: RedisSinkPayloadWriterInput) -> Result<()> {
148 match self {
149 RedisPipe::Cluster(pipe) => match k {
150 RedisSinkPayloadWriterInput::String(k) => {
151 pipe.del(k);
152 }
153 RedisSinkPayloadWriterInput::RedisGeoKey((key, member)) => {
154 pipe.zrem(key, member);
155 }
156 _ => return Err(SinkError::Redis("RedisPipe del not match".to_owned())),
157 },
158 RedisPipe::Single(pipe) => match k {
159 RedisSinkPayloadWriterInput::String(k) => {
160 pipe.del(k);
161 }
162 RedisSinkPayloadWriterInput::RedisGeoKey((key, member)) => {
163 pipe.zrem(key, member);
164 }
165 _ => return Err(SinkError::Redis("RedisPipe del not match".to_owned())),
166 },
167 };
168 Ok(())
169 }
170}
171pub enum RedisConn {
172 Cluster(ClusterConnection),
174 Single(MultiplexedConnection),
176}
177
178impl RedisCommon {
179 pub async fn build_conn_and_pipe(&self) -> ConnectorResult<(RedisConn, RedisPipe)> {
180 match serde_json::from_str(&self.url).map_err(|e| SinkError::Config(anyhow!(e))) {
181 Ok(v) => {
182 if let Value::Array(list) = v {
183 let list = list
184 .into_iter()
185 .map(|s| {
186 if let Value::String(s) = s {
187 Ok(s)
188 } else {
189 Err(SinkError::Redis(
190 "redis.url must be array of string".to_owned(),
191 )
192 .into())
193 }
194 })
195 .collect::<ConnectorResult<Vec<String>>>()?;
196
197 let client = ClusterClient::new(list)?;
198 Ok((
199 RedisConn::Cluster(client.get_connection()?),
200 RedisPipe::Cluster(redis::cluster::cluster_pipe()),
201 ))
202 } else {
203 Err(SinkError::Redis("redis.url must be array or string".to_owned()).into())
204 }
205 }
206 Err(_) => {
207 let client = RedisClient::open(self.url.clone())?;
208 Ok((
209 RedisConn::Single(client.get_multiplexed_async_connection().await?),
210 RedisPipe::Single(redis::pipe()),
211 ))
212 }
213 }
214 }
215}
216
217#[serde_as]
218#[derive(Clone, Debug, Deserialize, WithOptions)]
219pub struct RedisConfig {
220 #[serde(flatten)]
221 pub common: RedisCommon,
222}
223
224impl EnforceSecret for RedisConfig {
225 fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
226 for prop in prop_iter {
227 RedisCommon::enforce_one(prop)?;
228 }
229 Ok(())
230 }
231}
232
233impl RedisConfig {
234 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
235 let config =
236 serde_json::from_value::<RedisConfig>(serde_json::to_value(properties).unwrap())
237 .map_err(|e| SinkError::Config(anyhow!(e)))?;
238 Ok(config)
239 }
240}
241
242#[derive(Debug)]
243pub struct RedisSink {
244 config: RedisConfig,
245 schema: Schema,
246 pk_indices: Vec<usize>,
247 format_desc: SinkFormatDesc,
248 db_name: String,
249 sink_from_name: String,
250}
251
252impl EnforceSecret for RedisSink {
253 fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
254 for prop in prop_iter {
255 RedisConfig::enforce_one(prop)?;
256 }
257 Ok(())
258 }
259}
260
261#[async_trait]
262impl TryFrom<SinkParam> for RedisSink {
263 type Error = SinkError;
264
265 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
266 if param.downstream_pk.is_empty() {
267 return Err(SinkError::Config(anyhow!(
268 "Redis Sink Primary Key must be specified."
269 )));
270 }
271 let config = RedisConfig::from_btreemap(param.properties.clone())?;
272 Ok(Self {
273 config,
274 schema: param.schema(),
275 pk_indices: param.downstream_pk,
276 format_desc: param
277 .format_desc
278 .ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
279 db_name: param.db_name,
280 sink_from_name: param.sink_from_name,
281 })
282 }
283}
284
285impl Sink for RedisSink {
286 type LogSinker = AsyncTruncateLogSinkerOf<RedisSinkWriter>;
287
288 const SINK_NAME: &'static str = "redis";
289
290 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
291 Ok(RedisSinkWriter::new(
292 self.config.clone(),
293 self.schema.clone(),
294 self.pk_indices.clone(),
295 &self.format_desc,
296 self.db_name.clone(),
297 self.sink_from_name.clone(),
298 )
299 .await?
300 .into_log_sinker(usize::MAX))
301 }
302
303 async fn validate(&self) -> Result<()> {
304 self.config.common.build_conn_and_pipe().await?;
305 let all_map: HashMap<String, DataType> = self
306 .schema
307 .fields()
308 .iter()
309 .map(|f| (f.name.clone(), f.data_type.clone()))
310 .collect();
311 let pk_map: HashMap<String, DataType> = self
312 .schema
313 .fields()
314 .iter()
315 .enumerate()
316 .filter(|(k, _)| self.pk_indices.contains(k))
317 .map(|(_, v)| (v.name.clone(), v.data_type.clone()))
318 .collect();
319 if matches!(
320 self.format_desc.encode,
321 super::catalog::SinkEncode::Template
322 ) {
323 match self
324 .format_desc
325 .options
326 .get(REDIS_VALUE_TYPE)
327 .map(|s| s.as_str())
328 {
329 Some(REDIS_VALUE_TYPE_STRING) | None => {
331 let key_format = self.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
332 SinkError::Config(anyhow!(
333 "Cannot find '{KEY_FORMAT}', please set it or use JSON"
334 ))
335 })?;
336 TemplateStringEncoder::check_string_format(key_format, &pk_map)?;
337 let value_format =
338 self.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
339 SinkError::Config(anyhow!(
340 "Cannot find `{VALUE_FORMAT}`, please set it or use JSON"
341 ))
342 })?;
343 TemplateStringEncoder::check_string_format(value_format, &all_map)?;
344 }
345 Some(REDIS_VALUE_TYPE_GEO) => {
346 let key_format = self.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
347 SinkError::Config(anyhow!(
348 "Cannot find '{KEY_FORMAT}', please set it or use JSON"
349 ))
350 })?;
351 TemplateStringEncoder::check_string_format(key_format, &pk_map)?;
352
353 let lon_name = self.format_desc.options.get(LON_NAME).ok_or_else(|| {
354 SinkError::Config(anyhow!(
355 "Cannot find `{LON_NAME}`, please set it or use JSON or set `{REDIS_VALUE_TYPE}` to `{REDIS_VALUE_TYPE_STRING}`"
356 ))
357 })?;
358 let lat_name = self.format_desc.options.get(LAT_NAME).ok_or_else(|| {
359 SinkError::Config(anyhow!(
360 "Cannot find `{LAT_NAME}`, please set it or use JSON or set `{REDIS_VALUE_TYPE}` to `{REDIS_VALUE_TYPE_STRING}`"
361 ))
362 })?;
363 let member_name = self.format_desc.options.get(MEMBER_NAME).ok_or_else(|| {
364 SinkError::Config(anyhow!(
365 "Cannot find `{MEMBER_NAME}`, please set it or use JSON or set `{REDIS_VALUE_TYPE}` to `{REDIS_VALUE_TYPE_STRING}`"
366 ))
367 })?;
368 if let Some(lon_type) = all_map.get(lon_name)
369 && (lon_type == &DataType::Float64
370 || lon_type == &DataType::Float32
371 || lon_type == &DataType::Varchar)
372 {
373 } else {
375 return Err(SinkError::Config(anyhow!(
376 "`{LON_NAME}` must be set to `float64` or `float32` or `varchar`"
377 )));
378 }
379 if let Some(lat_type) = all_map.get(lat_name)
380 && (lat_type == &DataType::Float64
381 || lat_type == &DataType::Float32
382 || lat_type == &DataType::Varchar)
383 {
384 } else {
386 return Err(SinkError::Config(anyhow!(
387 "`{LAT_NAME}` must be set to `float64` or `float32` or `varchar`"
388 )));
389 }
390 if let Some(member_type) = pk_map.get(member_name)
391 && member_type == &DataType::Varchar
392 {
393 } else {
395 return Err(SinkError::Config(anyhow!(
396 "`{MEMBER_NAME}` must be set to `varchar` and `primary_key`"
397 )));
398 }
399 }
400 Some(REDIS_VALUE_TYPE_PUBSUB) => {
401 let channel = self.format_desc.options.get(CHANNEL);
402 let channel_column = self.format_desc.options.get(CHANNEL_COLUMN);
403 if (channel.is_none() && channel_column.is_none())
404 || (channel.is_some() && channel_column.is_some())
405 {
406 return Err(SinkError::Config(anyhow!(
407 "`{CHANNEL}` and `{CHANNEL_COLUMN}` only one can be set"
408 )));
409 }
410
411 if let Some(channel_column) = channel_column
412 && let Some(channel_column_type) = all_map.get(channel_column)
413 && (channel_column_type != &DataType::Varchar)
414 {
415 return Err(SinkError::Config(anyhow!(
416 "`{CHANNEL_COLUMN}` must be set to `varchar`"
417 )));
418 }
419
420 let value_format =
421 self.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
422 SinkError::Config(anyhow!("Cannot find `{VALUE_FORMAT}`"))
423 })?;
424 TemplateStringEncoder::check_string_format(value_format, &all_map)?;
425 }
426 _ => {
427 return Err(SinkError::Config(anyhow!(
428 "`{REDIS_VALUE_TYPE}` must be set to `{REDIS_VALUE_TYPE_STRING}` or `{REDIS_VALUE_TYPE_GEO}` or `{REDIS_VALUE_TYPE_PUBSUB}`"
429 )));
430 }
431 }
432 }
433 Ok(())
434 }
435}
436
437pub struct RedisSinkWriter {
438 #[expect(dead_code)]
439 epoch: u64,
440 #[expect(dead_code)]
441 schema: Schema,
442 #[expect(dead_code)]
443 pk_indices: Vec<usize>,
444 formatter: SinkFormatterImpl,
445 payload_writer: RedisSinkPayloadWriter,
446}
447
448struct RedisSinkPayloadWriter {
449 conn: Option<RedisConn>,
451 pipe: RedisPipe,
453}
454
455impl RedisSinkPayloadWriter {
456 pub async fn new(config: RedisConfig) -> Result<Self> {
457 let (conn, pipe) = config.common.build_conn_and_pipe().await?;
458 let conn = Some(conn);
459
460 Ok(Self { conn, pipe })
461 }
462
463 #[cfg(test)]
464 pub fn mock() -> Self {
465 let conn = None;
466 let pipe = RedisPipe::Single(redis::pipe());
467 Self { conn, pipe }
468 }
469
470 pub async fn commit(&mut self) -> Result<()> {
471 #[cfg(test)]
472 {
473 if self.conn.is_none() {
474 return Ok(());
475 }
476 }
477 self.pipe.query::<()>(self.conn.as_mut().unwrap()).await?;
478 self.pipe.clear();
479 Ok(())
480 }
481}
482
483impl FormattedSink for RedisSinkPayloadWriter {
484 type K = RedisSinkPayloadWriterInput;
485 type V = RedisSinkPayloadWriterInput;
486
487 async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
488 let k = k.ok_or_else(|| SinkError::Redis("The redis key cannot be null".to_owned()))?;
489 match v {
490 Some(v) => self.pipe.set(k, v)?,
491 None => self.pipe.del(k)?,
492 };
493 Ok(())
494 }
495}
496
497impl RedisSinkWriter {
498 pub async fn new(
499 config: RedisConfig,
500 schema: Schema,
501 pk_indices: Vec<usize>,
502 format_desc: &SinkFormatDesc,
503 db_name: String,
504 sink_from_name: String,
505 ) -> Result<Self> {
506 let payload_writer = RedisSinkPayloadWriter::new(config.clone()).await?;
507 let formatter = SinkFormatterImpl::new(
508 format_desc,
509 schema.clone(),
510 pk_indices.clone(),
511 db_name,
512 sink_from_name,
513 "NO_TOPIC",
514 )
515 .await?;
516
517 Ok(Self {
518 schema,
519 pk_indices,
520 epoch: 0,
521 formatter,
522 payload_writer,
523 })
524 }
525
526 #[cfg(test)]
527 pub async fn mock(
528 schema: Schema,
529 pk_indices: Vec<usize>,
530 format_desc: &SinkFormatDesc,
531 ) -> Result<Self> {
532 let formatter = SinkFormatterImpl::new(
533 format_desc,
534 schema.clone(),
535 pk_indices.clone(),
536 "d1".to_owned(),
537 "t1".to_owned(),
538 "NO_TOPIC",
539 )
540 .await?;
541 Ok(Self {
542 schema,
543 pk_indices,
544 epoch: 0,
545 formatter,
546 payload_writer: RedisSinkPayloadWriter::mock(),
547 })
548 }
549}
550
551impl AsyncTruncateSinkWriter for RedisSinkWriter {
552 async fn write_chunk<'a>(
553 &'a mut self,
554 chunk: StreamChunk,
555 _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
556 ) -> Result<()> {
557 dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, {
558 self.payload_writer.write_chunk(chunk, formatter).await?;
559 self.payload_writer.commit().await
560 })
561 }
562}
563
564#[cfg(test)]
565mod test {
566 use core::panic;
567
568 use rdkafka::message::FromBytes;
569 use risingwave_common::array::{Array, I32Array, Op, Utf8Array};
570 use risingwave_common::catalog::Field;
571 use risingwave_common::types::DataType;
572 use risingwave_common::util::iter_util::ZipEqDebug;
573
574 use super::*;
575 use crate::sink::catalog::{SinkEncode, SinkFormat};
576 use crate::sink::log_store::DeliveryFutureManager;
577
578 #[tokio::test]
579 async fn test_write() {
580 let schema = Schema::new(vec![
581 Field {
582 data_type: DataType::Int32,
583 name: "id".to_owned(),
584 },
585 Field {
586 data_type: DataType::Varchar,
587 name: "name".to_owned(),
588 },
589 ]);
590
591 let format_desc = SinkFormatDesc {
592 format: SinkFormat::AppendOnly,
593 encode: SinkEncode::Json,
594 options: BTreeMap::default(),
595 secret_refs: BTreeMap::default(),
596 key_encode: None,
597 connection_id: None,
598 };
599
600 let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc)
601 .await
602 .unwrap();
603
604 let chunk_a = StreamChunk::new(
605 vec![Op::Insert, Op::Insert, Op::Insert],
606 vec![
607 I32Array::from_iter(vec![1, 2, 3]).into_ref(),
608 Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
609 ],
610 );
611
612 let mut manager = DeliveryFutureManager::new(0);
613
614 redis_sink_writer
615 .write_chunk(chunk_a, manager.start_write_chunk(0, 0))
616 .await
617 .expect("failed to write batch");
618 let expected_a = vec![
619 (
620 0,
621 "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":1}\r\n$23\r\n{\"id\":1,\"name\":\"Alice\"}\r\n",
622 ),
623 (
624 1,
625 "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":2}\r\n$21\r\n{\"id\":2,\"name\":\"Bob\"}\r\n",
626 ),
627 (
628 2,
629 "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":3}\r\n$23\r\n{\"id\":3,\"name\":\"Clare\"}\r\n",
630 ),
631 ];
632
633 if let RedisPipe::Single(pipe) = &redis_sink_writer.payload_writer.pipe {
634 pipe.cmd_iter()
635 .enumerate()
636 .zip_eq_debug(expected_a.clone())
637 .for_each(|((i, cmd), (exp_i, exp_cmd))| {
638 if exp_i == i {
639 assert_eq!(exp_cmd, str::from_bytes(&cmd.get_packed_command()).unwrap())
640 }
641 });
642 } else {
643 panic!("pipe type not match")
644 }
645 }
646
647 #[tokio::test]
648 async fn test_format_write() {
649 let schema = Schema::new(vec![
650 Field {
651 data_type: DataType::Int32,
652 name: "id".to_owned(),
653 },
654 Field {
655 data_type: DataType::Varchar,
656 name: "name".to_owned(),
657 },
658 ]);
659
660 let mut btree_map = BTreeMap::default();
661 btree_map.insert(KEY_FORMAT.to_owned(), "key-{id}".to_owned());
662 btree_map.insert(
663 VALUE_FORMAT.to_owned(),
664 "values:\\{id:{id},name:{name}\\}".to_owned(),
665 );
666 let format_desc = SinkFormatDesc {
667 format: SinkFormat::AppendOnly,
668 encode: SinkEncode::Template,
669 options: btree_map,
670 secret_refs: Default::default(),
671 key_encode: None,
672 connection_id: None,
673 };
674
675 let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc)
676 .await
677 .unwrap();
678
679 let mut future_manager = DeliveryFutureManager::new(0);
680
681 let chunk_a = StreamChunk::new(
682 vec![Op::Insert, Op::Insert, Op::Insert],
683 vec![
684 I32Array::from_iter(vec![1, 2, 3]).into_ref(),
685 Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
686 ],
687 );
688
689 redis_sink_writer
690 .write_chunk(chunk_a, future_manager.start_write_chunk(0, 0))
691 .await
692 .expect("failed to write batch");
693 let expected_a = vec![
694 (
695 0,
696 "*3\r\n$3\r\nSET\r\n$5\r\nkey-1\r\n$24\r\nvalues:{id:1,name:Alice}\r\n",
697 ),
698 (
699 1,
700 "*3\r\n$3\r\nSET\r\n$5\r\nkey-2\r\n$22\r\nvalues:{id:2,name:Bob}\r\n",
701 ),
702 (
703 2,
704 "*3\r\n$3\r\nSET\r\n$5\r\nkey-3\r\n$24\r\nvalues:{id:3,name:Clare}\r\n",
705 ),
706 ];
707
708 if let RedisPipe::Single(pipe) = &redis_sink_writer.payload_writer.pipe {
709 pipe.cmd_iter()
710 .enumerate()
711 .zip_eq_debug(expected_a.clone())
712 .for_each(|((i, cmd), (exp_i, exp_cmd))| {
713 if exp_i == i {
714 assert_eq!(exp_cmd, str::from_bytes(&cmd.get_packed_command()).unwrap())
715 }
716 });
717 } else {
718 panic!("pipe type not match")
719 };
720 }
721}