risingwave_common/row/
owned_row.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::mem;
16
17use risingwave_common_estimate_size::EstimateSize;
18
19use super::Row;
20use crate::types::{
21    DataType, Date, Datum, DatumRef, Decimal, Interval, ScalarImpl, Time, Timestamp, ToDatumRef,
22};
23use crate::util::iter_util::ZipEqDebug;
24use crate::util::value_encoding;
25use crate::util::value_encoding::deserialize_datum;
26
27/// An owned row type with a `Vec<Datum>`.
28#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
29pub struct OwnedRow(Box<[Datum]>);
30
31/// Do not implement `IndexMut` to make it immutable.
32impl std::ops::Index<usize> for OwnedRow {
33    type Output = Datum;
34
35    fn index(&self, index: usize) -> &Self::Output {
36        &self.0[index]
37    }
38}
39
40impl AsRef<OwnedRow> for OwnedRow {
41    fn as_ref(&self) -> &OwnedRow {
42        self
43    }
44}
45
46impl OwnedRow {
47    /// Returns an empty row.
48    ///
49    /// Note: use [`empty`](super::empty()) if possible.
50    pub fn empty() -> Self {
51        Self(Box::new([]))
52    }
53
54    pub fn new(values: Vec<Datum>) -> Self {
55        Self(values.into())
56    }
57
58    /// Retrieve the underlying [`Box<[Datum]>`].
59    pub fn into_inner(self) -> Box<[Datum]> {
60        self.0
61    }
62
63    pub fn as_inner(&self) -> &[Datum] {
64        &self.0
65    }
66
67    /// Parse an [`OwnedRow`] from a pretty string, only used in tests.
68    pub fn from_pretty_with_tys(tys: &[DataType], s: impl AsRef<str>) -> Self {
69        let datums: Vec<_> = tys
70            .iter()
71            .zip_eq_debug(s.as_ref().split_ascii_whitespace())
72            .map(|(ty, x)| {
73                let scalar: ScalarImpl = match ty {
74                    DataType::Int16 => x.parse::<i16>().unwrap().into(),
75                    DataType::Int32 => x.parse::<i32>().unwrap().into(),
76                    DataType::Int64 => x.parse::<i64>().unwrap().into(),
77                    DataType::Float32 => x.parse::<f32>().unwrap().into(),
78                    DataType::Float64 => x.parse::<f64>().unwrap().into(),
79                    DataType::Varchar => x.to_owned().into(),
80                    DataType::Boolean => x.parse::<bool>().unwrap().into(),
81                    DataType::Date => x.parse::<Date>().unwrap().into(),
82                    DataType::Time => x.parse::<Time>().unwrap().into(),
83                    DataType::Timestamp => x.parse::<Timestamp>().unwrap().into(),
84                    DataType::Interval => x.parse::<Interval>().unwrap().into(),
85                    DataType::Decimal => x.parse::<Decimal>().unwrap().into(),
86                    _ => todo!(),
87                };
88                Some(scalar)
89            })
90            .collect();
91        Self::new(datums)
92    }
93
94    pub fn last(&self) -> DatumRef<'_> {
95        self.0[self.len() - 1].to_datum_ref()
96    }
97}
98
99impl EstimateSize for OwnedRow {
100    fn estimated_heap_size(&self) -> usize {
101        let data_heap_size: usize = self.0.iter().map(|datum| datum.estimated_heap_size()).sum();
102        self.0.len() * mem::size_of::<Datum>() + data_heap_size
103    }
104}
105
106impl Row for OwnedRow {
107    #[inline]
108    fn datum_at(&self, index: usize) -> DatumRef<'_> {
109        self[index].to_datum_ref()
110    }
111
112    #[inline]
113    unsafe fn datum_at_unchecked(&self, index: usize) -> DatumRef<'_> {
114        unsafe { self.0.get_unchecked(index).to_datum_ref() }
115    }
116
117    #[inline]
118    fn len(&self) -> usize {
119        self.0.len()
120    }
121
122    #[inline]
123    fn iter(&self) -> impl ExactSizeIterator<Item = DatumRef<'_>> {
124        self.0.iter().map(ToDatumRef::to_datum_ref)
125    }
126
127    #[inline]
128    fn to_owned_row(&self) -> OwnedRow {
129        self.clone()
130    }
131
132    #[inline]
133    fn into_owned_row(self) -> OwnedRow {
134        self
135    }
136}
137
138impl IntoIterator for OwnedRow {
139    type IntoIter = std::vec::IntoIter<Datum>;
140    type Item = Datum;
141
142    fn into_iter(self) -> Self::IntoIter {
143        self.0.into_vec().into_iter()
144    }
145}
146
147impl FromIterator<Datum> for OwnedRow {
148    fn from_iter<T: IntoIterator<Item = Datum>>(iter: T) -> Self {
149        Self(iter.into_iter().collect())
150    }
151}
152
153/// Deserializer of the [`OwnedRow`].
154#[derive(Clone, Debug)]
155pub struct RowDeserializer<D: AsRef<[DataType]> = Vec<DataType>> {
156    data_types: D,
157}
158
159impl<D: AsRef<[DataType]>> RowDeserializer<D> {
160    /// Creates a new `RowDeserializer` with row schema.
161    pub fn new(data_types: D) -> Self {
162        RowDeserializer { data_types }
163    }
164
165    pub fn deserialize_to(
166        &self,
167        mut data: impl bytes::Buf,
168        values: &mut Vec<Datum>,
169    ) -> value_encoding::Result<()> {
170        for typ in self.data_types() {
171            values.push(deserialize_datum(&mut data, typ)?);
172        }
173        Ok(())
174    }
175
176    /// Deserialize the row from value encoding bytes.
177    pub fn deserialize(&self, data: impl bytes::Buf) -> value_encoding::Result<OwnedRow> {
178        let mut values = Vec::with_capacity(self.data_types().len());
179        self.deserialize_to(data, &mut values)?;
180        Ok(OwnedRow(values.into()))
181    }
182
183    pub fn data_types(&self) -> &[DataType] {
184        self.data_types.as_ref()
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use itertools::Itertools;
191
192    use super::*;
193    use crate::row::RowExt;
194    use crate::types::DataType as Ty;
195    use crate::util::hash_util::Crc32FastBuilder;
196
197    #[test]
198    fn row_value_encode_decode() {
199        let row = OwnedRow::new(vec![
200            Some(ScalarImpl::Utf8("string".into())),
201            Some(ScalarImpl::Bool(true)),
202            Some(ScalarImpl::Int16(1)),
203            Some(ScalarImpl::Int32(2)),
204            Some(ScalarImpl::Int64(3)),
205            Some(ScalarImpl::Float32(4.0.into())),
206            Some(ScalarImpl::Float64(5.0.into())),
207            Some(ScalarImpl::Decimal("-233.3".parse().unwrap())),
208            Some(ScalarImpl::Interval(Interval::from_month_day_usec(7, 8, 9))),
209        ]);
210        let value_indices = (0..9).collect_vec();
211        let bytes = (&row).project(&value_indices).value_serialize();
212        assert_eq!(bytes.len(), 10 + 1 + 2 + 4 + 8 + 4 + 8 + 16 + 16 + 9);
213        let de = RowDeserializer::new(vec![
214            Ty::Varchar,
215            Ty::Boolean,
216            Ty::Int16,
217            Ty::Int32,
218            Ty::Int64,
219            Ty::Float32,
220            Ty::Float64,
221            Ty::Decimal,
222            Ty::Interval,
223        ]);
224        let row1 = de.deserialize(bytes.as_ref()).unwrap();
225        assert_eq!(row, row1);
226    }
227
228    #[test]
229    fn test_hash_row() {
230        let hash_builder = Crc32FastBuilder;
231
232        let row1 = OwnedRow::new(vec![
233            Some(ScalarImpl::Utf8("string".into())),
234            Some(ScalarImpl::Bool(true)),
235            Some(ScalarImpl::Int16(1)),
236            Some(ScalarImpl::Int32(2)),
237            Some(ScalarImpl::Int64(3)),
238            Some(ScalarImpl::Float32(4.0.into())),
239            Some(ScalarImpl::Float64(5.0.into())),
240            Some(ScalarImpl::Decimal("-233.3".parse().unwrap())),
241            Some(ScalarImpl::Interval(Interval::from_month_day_usec(7, 8, 9))),
242        ]);
243        let row2 = OwnedRow::new(vec![
244            Some(ScalarImpl::Interval(Interval::from_month_day_usec(7, 8, 9))),
245            Some(ScalarImpl::Utf8("string".into())),
246            Some(ScalarImpl::Bool(true)),
247            Some(ScalarImpl::Int16(1)),
248            Some(ScalarImpl::Int32(2)),
249            Some(ScalarImpl::Int64(3)),
250            Some(ScalarImpl::Float32(4.0.into())),
251            Some(ScalarImpl::Float64(5.0.into())),
252            Some(ScalarImpl::Decimal("-233.3".parse().unwrap())),
253        ]);
254        assert_ne!(row1.hash(hash_builder), row2.hash(hash_builder));
255
256        let row_default = OwnedRow::default();
257        assert_eq!(row_default.hash(hash_builder).value(), 0);
258    }
259}