risingwave_connector/parser/
bytes_parser.rs1use risingwave_common::try_match_expand;
16
17use super::unified::AccessImpl;
18use super::unified::bytes::BytesAccess;
19use super::{AccessBuilder, EncodingProperties};
20use crate::error::ConnectorResult;
21
22#[derive(Debug)]
23pub struct BytesAccessBuilder {
24 column_name: Option<String>,
25}
26
27impl AccessBuilder for BytesAccessBuilder {
28 async fn generate_accessor(
29 &mut self,
30 payload: Vec<u8>,
31 _: &crate::source::SourceMeta,
32 ) -> ConnectorResult<AccessImpl<'_>> {
33 Ok(AccessImpl::Bytes(BytesAccess::new(
34 &self.column_name,
35 payload,
36 )))
37 }
38}
39
40impl BytesAccessBuilder {
41 pub fn new(encoding_properties: EncodingProperties) -> ConnectorResult<Self> {
42 let config = try_match_expand!(encoding_properties, EncodingProperties::Bytes)?;
43 Ok(Self {
44 column_name: config.column_name,
45 })
46 }
47}
48
49#[cfg(test)]
50mod tests {
51 use risingwave_common::array::Op;
52 use risingwave_common::row::Row;
53 use risingwave_common::types::{DataType, ScalarImpl, ToOwnedDatum};
54
55 use crate::parser::plain_parser::PlainParser;
56 use crate::parser::{
57 BytesProperties, EncodingProperties, ProtocolProperties, SourceColumnDesc,
58 SourceStreamChunkBuilder, SpecificParserConfig,
59 };
60 use crate::source::{SourceContext, SourceCtrlOpts};
61
62 fn get_payload() -> Vec<Vec<u8>> {
63 vec![br#"t"#.to_vec(), br#"random"#.to_vec()]
64 }
65
66 async fn test_bytes_parser(get_payload: fn() -> Vec<Vec<u8>>) {
67 let descs = vec![SourceColumnDesc::simple("id", DataType::Bytea, 0.into())];
68 let props = SpecificParserConfig {
69 encoding_config: EncodingProperties::Bytes(BytesProperties { column_name: None }),
70 protocol_config: ProtocolProperties::Plain,
71 };
72 let mut parser = PlainParser::new(props, descs.clone(), SourceContext::dummy().into())
73 .await
74 .unwrap();
75
76 let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());
77
78 for payload in get_payload() {
79 let writer = builder.row_writer();
80 parser
81 .parse_inner(None, Some(payload), writer)
82 .await
83 .unwrap();
84 }
85
86 builder.finish_current_chunk();
87 let chunk = builder.consume_ready_chunks().next().unwrap();
88 let mut rows = chunk.rows();
89 {
90 let (op, row) = rows.next().unwrap();
91 assert_eq!(op, Op::Insert);
92 assert_eq!(
93 row.datum_at(0).to_owned_datum(),
94 Some(ScalarImpl::Bytea("t".as_bytes().into()))
95 );
96 }
97
98 {
99 let (op, row) = rows.next().unwrap();
100 assert_eq!(op, Op::Insert);
101 assert_eq!(
102 row.datum_at(0).to_owned_datum(),
103 Some(ScalarImpl::Bytea("random".as_bytes().into()))
104 );
105 }
106 }
107
108 #[tokio::test]
109 async fn test_bytes_parse_object_top_level() {
110 test_bytes_parser(get_payload).await;
111 }
112}