risingwave_connector/parser/unified/
bytes.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::types::{DataType, DatumCow, ScalarRefImpl};
16
17use super::{Access, AccessError, AccessResult};
18
19// where do we put data
20
21pub struct BytesAccess<'a> {
22    column_name: &'a Option<String>,
23    bytes: Vec<u8>,
24}
25
26impl<'a> BytesAccess<'a> {
27    pub fn new(column_name: &'a Option<String>, bytes: Vec<u8>) -> Self {
28        Self { column_name, bytes }
29    }
30}
31
32impl Access for BytesAccess<'_> {
33    /// path is empty currently, `type_expected` should be `Bytea`
34    fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
35        if let DataType::Bytea = type_expected {
36            if self.column_name.is_none()
37                || (path.len() == 1 && self.column_name.as_ref().unwrap() == path[0])
38            {
39                return Ok(DatumCow::Borrowed(Some(ScalarRefImpl::Bytea(
40                    self.bytes.as_slice(),
41                ))));
42            }
43            return Err(AccessError::Undefined {
44                name: path[0].to_owned(),
45                path: self.column_name.as_ref().unwrap().to_string(),
46            });
47        }
48        Err(AccessError::TypeError {
49            expected: "Bytea".to_owned(),
50            got: format!("{:?}", type_expected),
51            value: "".to_owned(),
52        })
53    }
54}