risingwave_connector/parser/unified/
kv_event.rs1use risingwave_common::types::{DataType, DatumCow};
16use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
17
18use super::{Access, AccessResult};
19use crate::parser::unified::AccessError;
20use crate::source::SourceColumnDesc;
21
22pub struct KvEvent<K, V> {
23 key_accessor: Option<K>,
24 value_accessor: Option<V>,
25}
26
27impl<K, V> Default for KvEvent<K, V> {
28 fn default() -> Self {
29 Self {
30 key_accessor: None,
31 value_accessor: None,
32 }
33 }
34}
35
36impl<K, V> KvEvent<K, V> {
37 pub fn with_key(&mut self, key: K)
38 where
39 K: Access,
40 {
41 self.key_accessor = Some(key);
42 }
43
44 pub fn with_value(&mut self, value: V)
45 where
46 V: Access,
47 {
48 self.value_accessor = Some(value);
49 }
50}
51
52impl<K, V> KvEvent<K, V>
53where
54 K: Access,
55 V: Access,
56{
57 fn access_key(&self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'_>> {
58 if let Some(ka) = &self.key_accessor {
59 ka.access(path, type_expected)
60 } else {
61 Err(AccessError::Undefined {
62 name: "key".to_owned(),
63 path: String::new(),
64 })
65 }
66 }
67
68 fn access_value(&self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'_>> {
69 if let Some(va) = &self.value_accessor {
70 va.access(path, type_expected)
71 } else {
72 Err(AccessError::Undefined {
73 name: "value".to_owned(),
74 path: String::new(),
75 })
76 }
77 }
78
79 pub fn access_field<const KEY_ONLY: bool>(
80 &self,
81 desc: &SourceColumnDesc,
82 ) -> AccessResult<DatumCow<'_>> {
83 match (&desc.additional_column.column_type, KEY_ONLY) {
84 (Some(AdditionalColumnType::Key(_)), _) => {
85 self.access_key(&[&desc.name], &desc.data_type)
86 }
87 (Some(AdditionalColumnType::Payload(_)), _) => self.access_value(&[], &desc.data_type),
90 (None, false) => self.access_value(&[&desc.name], &desc.data_type),
91 (_, true) => Ok(DatumCow::Owned(None)),
92 _ => unreachable!(),
93 }
94 }
95}