risingwave_common/row/
owned_row.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::mem;
16
17use risingwave_common_estimate_size::EstimateSize;
18
19use super::Row;
20use crate::types::{
21    DataType, Date, Datum, DatumRef, Decimal, Interval, ScalarImpl, Time, Timestamp, ToDatumRef,
22};
23use crate::util::iter_util::ZipEqDebug;
24use crate::util::value_encoding;
25use crate::util::value_encoding::deserialize_datum;
26
27/// An owned row type with a `Vec<Datum>`.
28#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
29pub struct OwnedRow(Box<[Datum]>);
30
31/// Do not implement `IndexMut` to make it immutable.
32impl std::ops::Index<usize> for OwnedRow {
33    type Output = Datum;
34
35    fn index(&self, index: usize) -> &Self::Output {
36        &self.0[index]
37    }
38}
39
40impl AsRef<OwnedRow> for OwnedRow {
41    fn as_ref(&self) -> &OwnedRow {
42        self
43    }
44}
45
46impl OwnedRow {
47    /// Returns an empty row.
48    ///
49    /// Note: use [`empty`](super::empty()) if possible.
50    pub fn empty() -> Self {
51        Self(Box::new([]))
52    }
53
54    pub fn new(values: Vec<Datum>) -> Self {
55        Self(values.into())
56    }
57
58    /// Retrieve the underlying [`Box<[Datum]>`].
59    pub fn into_inner(self) -> Box<[Datum]> {
60        self.0
61    }
62
63    pub fn as_inner(&self) -> &[Datum] {
64        &self.0
65    }
66
67    /// Parse an [`OwnedRow`] from a pretty string, only used in tests.
68    pub fn from_pretty_with_tys(tys: &[DataType], s: impl AsRef<str>) -> Self {
69        let datums: Vec<_> = tys
70            .iter()
71            .zip_eq_debug(s.as_ref().split_ascii_whitespace())
72            .map(|(ty, x)| {
73                let scalar: ScalarImpl = match ty {
74                    DataType::Int16 => x.parse::<i16>().unwrap().into(),
75                    DataType::Int32 => x.parse::<i32>().unwrap().into(),
76                    DataType::Int64 => x.parse::<i64>().unwrap().into(),
77                    DataType::Float32 => x.parse::<f32>().unwrap().into(),
78                    DataType::Float64 => x.parse::<f64>().unwrap().into(),
79                    DataType::Varchar => x.to_owned().into(),
80                    DataType::Boolean => x.parse::<bool>().unwrap().into(),
81                    DataType::Date => x.parse::<Date>().unwrap().into(),
82                    DataType::Time => x.parse::<Time>().unwrap().into(),
83                    DataType::Timestamp => x.parse::<Timestamp>().unwrap().into(),
84                    DataType::Interval => x.parse::<Interval>().unwrap().into(),
85                    DataType::Decimal => x.parse::<Decimal>().unwrap().into(),
86                    _ => todo!(),
87                };
88                Some(scalar)
89            })
90            .collect();
91        Self::new(datums)
92    }
93
94    pub fn last(&self) -> DatumRef<'_> {
95        self.0[self.len() - 1].to_datum_ref()
96    }
97}
98
99impl EstimateSize for OwnedRow {
100    fn estimated_heap_size(&self) -> usize {
101        let data_heap_size: usize = self.0.iter().map(|datum| datum.estimated_heap_size()).sum();
102        self.0.len() * mem::size_of::<Datum>() + data_heap_size
103    }
104}
105
106impl Row for OwnedRow {
107    #[inline]
108    fn datum_at(&self, index: usize) -> DatumRef<'_> {
109        self[index].to_datum_ref()
110    }
111
112    #[inline]
113    unsafe fn datum_at_unchecked(&self, index: usize) -> DatumRef<'_> {
114        unsafe { self.0.get_unchecked(index).to_datum_ref() }
115    }
116
117    #[inline]
118    fn len(&self) -> usize {
119        self.0.len()
120    }
121
122    #[inline]
123    fn iter(&self) -> impl ExactSizeIterator<Item = DatumRef<'_>> {
124        self.0.iter().map(ToDatumRef::to_datum_ref)
125    }
126
127    #[inline]
128    fn to_owned_row(&self) -> OwnedRow {
129        self.clone()
130    }
131
132    #[inline]
133    fn into_owned_row(self) -> OwnedRow {
134        self
135    }
136}
137
138impl IntoIterator for OwnedRow {
139    type IntoIter = std::vec::IntoIter<Datum>;
140    type Item = Datum;
141
142    fn into_iter(self) -> Self::IntoIter {
143        self.0.into_vec().into_iter()
144    }
145}
146
147impl FromIterator<Datum> for OwnedRow {
148    fn from_iter<T: IntoIterator<Item = Datum>>(iter: T) -> Self {
149        Self(iter.into_iter().collect())
150    }
151}
152
153/// Deserializer of the [`OwnedRow`].
154#[derive(Clone, Debug)]
155pub struct RowDeserializer<D: AsRef<[DataType]> = Vec<DataType>> {
156    data_types: D,
157}
158
159impl<D: AsRef<[DataType]>> RowDeserializer<D> {
160    /// Creates a new `RowDeserializer` with row schema.
161    pub fn new(data_types: D) -> Self {
162        RowDeserializer { data_types }
163    }
164
165    /// Deserialize the row from value encoding bytes.
166    pub fn deserialize(&self, mut data: impl bytes::Buf) -> value_encoding::Result<OwnedRow> {
167        let mut values = Vec::with_capacity(self.data_types().len());
168        for typ in self.data_types() {
169            values.push(deserialize_datum(&mut data, typ)?);
170        }
171        Ok(OwnedRow(values.into()))
172    }
173
174    pub fn data_types(&self) -> &[DataType] {
175        self.data_types.as_ref()
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use itertools::Itertools;
182
183    use super::*;
184    use crate::row::RowExt;
185    use crate::types::DataType as Ty;
186    use crate::util::hash_util::Crc32FastBuilder;
187
188    #[test]
189    fn row_value_encode_decode() {
190        let row = OwnedRow::new(vec![
191            Some(ScalarImpl::Utf8("string".into())),
192            Some(ScalarImpl::Bool(true)),
193            Some(ScalarImpl::Int16(1)),
194            Some(ScalarImpl::Int32(2)),
195            Some(ScalarImpl::Int64(3)),
196            Some(ScalarImpl::Float32(4.0.into())),
197            Some(ScalarImpl::Float64(5.0.into())),
198            Some(ScalarImpl::Decimal("-233.3".parse().unwrap())),
199            Some(ScalarImpl::Interval(Interval::from_month_day_usec(7, 8, 9))),
200        ]);
201        let value_indices = (0..9).collect_vec();
202        let bytes = (&row).project(&value_indices).value_serialize();
203        assert_eq!(bytes.len(), 10 + 1 + 2 + 4 + 8 + 4 + 8 + 16 + 16 + 9);
204        let de = RowDeserializer::new(vec![
205            Ty::Varchar,
206            Ty::Boolean,
207            Ty::Int16,
208            Ty::Int32,
209            Ty::Int64,
210            Ty::Float32,
211            Ty::Float64,
212            Ty::Decimal,
213            Ty::Interval,
214        ]);
215        let row1 = de.deserialize(bytes.as_ref()).unwrap();
216        assert_eq!(row, row1);
217    }
218
219    #[test]
220    fn test_hash_row() {
221        let hash_builder = Crc32FastBuilder;
222
223        let row1 = OwnedRow::new(vec![
224            Some(ScalarImpl::Utf8("string".into())),
225            Some(ScalarImpl::Bool(true)),
226            Some(ScalarImpl::Int16(1)),
227            Some(ScalarImpl::Int32(2)),
228            Some(ScalarImpl::Int64(3)),
229            Some(ScalarImpl::Float32(4.0.into())),
230            Some(ScalarImpl::Float64(5.0.into())),
231            Some(ScalarImpl::Decimal("-233.3".parse().unwrap())),
232            Some(ScalarImpl::Interval(Interval::from_month_day_usec(7, 8, 9))),
233        ]);
234        let row2 = OwnedRow::new(vec![
235            Some(ScalarImpl::Interval(Interval::from_month_day_usec(7, 8, 9))),
236            Some(ScalarImpl::Utf8("string".into())),
237            Some(ScalarImpl::Bool(true)),
238            Some(ScalarImpl::Int16(1)),
239            Some(ScalarImpl::Int32(2)),
240            Some(ScalarImpl::Int64(3)),
241            Some(ScalarImpl::Float32(4.0.into())),
242            Some(ScalarImpl::Float64(5.0.into())),
243            Some(ScalarImpl::Decimal("-233.3".parse().unwrap())),
244        ]);
245        assert_ne!(row1.hash(hash_builder), row2.hash(hash_builder));
246
247        let row_default = OwnedRow::default();
248        assert_eq!(row_default.hash(hash_builder).value(), 0);
249    }
250}