risingwave_connector/parser/
bytes_parser.rs1use risingwave_common::try_match_expand;
16
17use super::unified::AccessImpl;
18use super::unified::bytes::BytesAccess;
19use super::{AccessBuilder, EncodingProperties};
20use crate::error::ConnectorResult;
21
22#[derive(Debug)]
23pub struct BytesAccessBuilder {
24 column_name: Option<String>,
25}
26
27impl AccessBuilder for BytesAccessBuilder {
28 #[allow(clippy::unused_async)]
29 async fn generate_accessor(
30 &mut self,
31 payload: Vec<u8>,
32 _: &crate::source::SourceMeta,
33 ) -> ConnectorResult<AccessImpl<'_>> {
34 Ok(AccessImpl::Bytes(BytesAccess::new(
35 &self.column_name,
36 payload,
37 )))
38 }
39}
40
41impl BytesAccessBuilder {
42 pub fn new(encoding_properties: EncodingProperties) -> ConnectorResult<Self> {
43 let config = try_match_expand!(encoding_properties, EncodingProperties::Bytes)?;
44 Ok(Self {
45 column_name: config.column_name,
46 })
47 }
48}
49
50#[cfg(test)]
51mod tests {
52 use risingwave_common::array::Op;
53 use risingwave_common::row::Row;
54 use risingwave_common::types::{DataType, ScalarImpl, ToOwnedDatum};
55
56 use crate::parser::plain_parser::PlainParser;
57 use crate::parser::{
58 BytesProperties, EncodingProperties, ProtocolProperties, SourceColumnDesc,
59 SourceStreamChunkBuilder, SpecificParserConfig,
60 };
61 use crate::source::{SourceContext, SourceCtrlOpts};
62
63 fn get_payload() -> Vec<Vec<u8>> {
64 vec![br#"t"#.to_vec(), br#"random"#.to_vec()]
65 }
66
67 async fn test_bytes_parser(get_payload: fn() -> Vec<Vec<u8>>) {
68 let descs = vec![SourceColumnDesc::simple("id", DataType::Bytea, 0.into())];
69 let props = SpecificParserConfig {
70 encoding_config: EncodingProperties::Bytes(BytesProperties { column_name: None }),
71 protocol_config: ProtocolProperties::Plain,
72 };
73 let mut parser = PlainParser::new(props, descs.clone(), SourceContext::dummy().into())
74 .await
75 .unwrap();
76
77 let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());
78
79 for payload in get_payload() {
80 let writer = builder.row_writer();
81 parser
82 .parse_inner(None, Some(payload), writer)
83 .await
84 .unwrap();
85 }
86
87 builder.finish_current_chunk();
88 let chunk = builder.consume_ready_chunks().next().unwrap();
89 let mut rows = chunk.rows();
90 {
91 let (op, row) = rows.next().unwrap();
92 assert_eq!(op, Op::Insert);
93 assert_eq!(
94 row.datum_at(0).to_owned_datum(),
95 Some(ScalarImpl::Bytea("t".as_bytes().into()))
96 );
97 }
98
99 {
100 let (op, row) = rows.next().unwrap();
101 assert_eq!(op, Op::Insert);
102 assert_eq!(
103 row.datum_at(0).to_owned_datum(),
104 Some(ScalarImpl::Bytea("random".as_bytes().into()))
105 );
106 }
107 }
108
109 #[tokio::test]
110 async fn test_bytes_parse_object_top_level() {
111 test_bytes_parser(get_payload).await;
112 }
113}