risingwave_connector/parser/
bytes_parser.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::try_match_expand;
16
17use super::unified::AccessImpl;
18use super::unified::bytes::BytesAccess;
19use super::{AccessBuilder, EncodingProperties};
20use crate::error::ConnectorResult;
21
22#[derive(Debug)]
23pub struct BytesAccessBuilder {
24    column_name: Option<String>,
25}
26
27impl AccessBuilder for BytesAccessBuilder {
28    #[allow(clippy::unused_async)]
29    async fn generate_accessor(
30        &mut self,
31        payload: Vec<u8>,
32        _: &crate::source::SourceMeta,
33    ) -> ConnectorResult<AccessImpl<'_>> {
34        Ok(AccessImpl::Bytes(BytesAccess::new(
35            &self.column_name,
36            payload,
37        )))
38    }
39}
40
41impl BytesAccessBuilder {
42    pub fn new(encoding_properties: EncodingProperties) -> ConnectorResult<Self> {
43        let config = try_match_expand!(encoding_properties, EncodingProperties::Bytes)?;
44        Ok(Self {
45            column_name: config.column_name,
46        })
47    }
48}
49
50#[cfg(test)]
51mod tests {
52    use risingwave_common::array::Op;
53    use risingwave_common::row::Row;
54    use risingwave_common::types::{DataType, ScalarImpl, ToOwnedDatum};
55
56    use crate::parser::plain_parser::PlainParser;
57    use crate::parser::{
58        BytesProperties, EncodingProperties, ProtocolProperties, SourceColumnDesc,
59        SourceStreamChunkBuilder, SpecificParserConfig,
60    };
61    use crate::source::{SourceContext, SourceCtrlOpts};
62
63    fn get_payload() -> Vec<Vec<u8>> {
64        vec![br#"t"#.to_vec(), br#"random"#.to_vec()]
65    }
66
67    async fn test_bytes_parser(get_payload: fn() -> Vec<Vec<u8>>) {
68        let descs = vec![SourceColumnDesc::simple("id", DataType::Bytea, 0.into())];
69        let props = SpecificParserConfig {
70            encoding_config: EncodingProperties::Bytes(BytesProperties { column_name: None }),
71            protocol_config: ProtocolProperties::Plain,
72        };
73        let mut parser = PlainParser::new(props, descs.clone(), SourceContext::dummy().into())
74            .await
75            .unwrap();
76
77        let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());
78
79        for payload in get_payload() {
80            let writer = builder.row_writer();
81            parser
82                .parse_inner(None, Some(payload), writer)
83                .await
84                .unwrap();
85        }
86
87        builder.finish_current_chunk();
88        let chunk = builder.consume_ready_chunks().next().unwrap();
89        let mut rows = chunk.rows();
90        {
91            let (op, row) = rows.next().unwrap();
92            assert_eq!(op, Op::Insert);
93            assert_eq!(
94                row.datum_at(0).to_owned_datum(),
95                Some(ScalarImpl::Bytea("t".as_bytes().into()))
96            );
97        }
98
99        {
100            let (op, row) = rows.next().unwrap();
101            assert_eq!(op, Op::Insert);
102            assert_eq!(
103                row.datum_at(0).to_owned_datum(),
104                Some(ScalarImpl::Bytea("random".as_bytes().into()))
105            );
106        }
107    }
108
109    #[tokio::test]
110    async fn test_bytes_parse_object_top_level() {
111        test_bytes_parser(get_payload).await;
112    }
113}