risingwave_connector/parser/unified/
kv_event.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::types::{DataType, DatumCow};
16use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
17
18use super::{Access, AccessResult};
19use crate::parser::unified::AccessError;
20use crate::source::SourceColumnDesc;
21
22pub struct KvEvent<K, V> {
23    key_accessor: Option<K>,
24    value_accessor: Option<V>,
25}
26
27impl<K, V> Default for KvEvent<K, V> {
28    fn default() -> Self {
29        Self {
30            key_accessor: None,
31            value_accessor: None,
32        }
33    }
34}
35
36impl<K, V> KvEvent<K, V> {
37    pub fn with_key(&mut self, key: K)
38    where
39        K: Access,
40    {
41        self.key_accessor = Some(key);
42    }
43
44    pub fn with_value(&mut self, value: V)
45    where
46        V: Access,
47    {
48        self.value_accessor = Some(value);
49    }
50}
51
52impl<K, V> KvEvent<K, V>
53where
54    K: Access,
55    V: Access,
56{
57    fn access_key(&self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'_>> {
58        if let Some(ka) = &self.key_accessor {
59            ka.access(path, type_expected)
60        } else {
61            Err(AccessError::Undefined {
62                name: "key".to_owned(),
63                path: String::new(),
64            })
65        }
66    }
67
68    fn access_value(&self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'_>> {
69        if let Some(va) = &self.value_accessor {
70            va.access(path, type_expected)
71        } else {
72            Err(AccessError::Undefined {
73                name: "value".to_owned(),
74                path: String::new(),
75            })
76        }
77    }
78
79    pub fn access_field<const KEY_ONLY: bool>(
80        &self,
81        desc: &SourceColumnDesc,
82    ) -> AccessResult<DatumCow<'_>> {
83        match (&desc.additional_column.column_type, KEY_ONLY) {
84            (Some(AdditionalColumnType::Key(_)), _) => {
85                self.access_key(&[&desc.name], &desc.data_type)
86            }
87            // hack here: Get the whole payload as a single column
88            // use a special mark empty slice as path to represent the whole payload
89            (Some(AdditionalColumnType::Payload(_)), _) => self.access_value(&[], &desc.data_type),
90            (None, false) => self.access_value(&[&desc.name], &desc.data_type),
91            (_, true) => Ok(DatumCow::Owned(None)),
92            _ => unreachable!(),
93        }
94    }
95}