risingwave_connector/parser/
bytes_parser.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::try_match_expand;
16
17use super::unified::AccessImpl;
18use super::unified::bytes::BytesAccess;
19use super::{AccessBuilder, EncodingProperties};
20use crate::error::ConnectorResult;
21
22#[derive(Debug)]
23pub struct BytesAccessBuilder {
24    column_name: Option<String>,
25}
26
27impl AccessBuilder for BytesAccessBuilder {
28    async fn generate_accessor(
29        &mut self,
30        payload: Vec<u8>,
31        _: &crate::source::SourceMeta,
32    ) -> ConnectorResult<AccessImpl<'_>> {
33        Ok(AccessImpl::Bytes(BytesAccess::new(
34            &self.column_name,
35            payload,
36        )))
37    }
38}
39
40impl BytesAccessBuilder {
41    pub fn new(encoding_properties: EncodingProperties) -> ConnectorResult<Self> {
42        let config = try_match_expand!(encoding_properties, EncodingProperties::Bytes)?;
43        Ok(Self {
44            column_name: config.column_name,
45        })
46    }
47}
48
49#[cfg(test)]
50mod tests {
51    use risingwave_common::array::Op;
52    use risingwave_common::row::Row;
53    use risingwave_common::types::{DataType, ScalarImpl, ToOwnedDatum};
54
55    use crate::parser::plain_parser::PlainParser;
56    use crate::parser::{
57        BytesProperties, EncodingProperties, ProtocolProperties, SourceColumnDesc,
58        SourceStreamChunkBuilder, SpecificParserConfig,
59    };
60    use crate::source::{SourceContext, SourceCtrlOpts};
61
62    fn get_payload() -> Vec<Vec<u8>> {
63        vec![br#"t"#.to_vec(), br#"random"#.to_vec()]
64    }
65
66    async fn test_bytes_parser(get_payload: fn() -> Vec<Vec<u8>>) {
67        let descs = vec![SourceColumnDesc::simple("id", DataType::Bytea, 0.into())];
68        let props = SpecificParserConfig {
69            encoding_config: EncodingProperties::Bytes(BytesProperties { column_name: None }),
70            protocol_config: ProtocolProperties::Plain,
71        };
72        let mut parser = PlainParser::new(props, descs.clone(), SourceContext::dummy().into())
73            .await
74            .unwrap();
75
76        let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());
77
78        for payload in get_payload() {
79            let writer = builder.row_writer();
80            parser
81                .parse_inner(None, Some(payload), writer)
82                .await
83                .unwrap();
84        }
85
86        builder.finish_current_chunk();
87        let chunk = builder.consume_ready_chunks().next().unwrap();
88        let mut rows = chunk.rows();
89        {
90            let (op, row) = rows.next().unwrap();
91            assert_eq!(op, Op::Insert);
92            assert_eq!(
93                row.datum_at(0).to_owned_datum(),
94                Some(ScalarImpl::Bytea("t".as_bytes().into()))
95            );
96        }
97
98        {
99            let (op, row) = rows.next().unwrap();
100            assert_eq!(op, Op::Insert);
101            assert_eq!(
102                row.datum_at(0).to_owned_datum(),
103                Some(ScalarImpl::Bytea("random".as_bytes().into()))
104            );
105        }
106    }
107
108    #[tokio::test]
109    async fn test_bytes_parse_object_top_level() {
110        test_bytes_parser(get_payload).await;
111    }
112}