risingwave_common/row/
owned_row.rs1use std::mem;
16
17use risingwave_common_estimate_size::EstimateSize;
18
19use super::Row;
20use crate::types::{
21 DataType, Date, Datum, DatumRef, Decimal, Interval, ScalarImpl, Time, Timestamp, ToDatumRef,
22};
23use crate::util::iter_util::ZipEqDebug;
24use crate::util::value_encoding;
25use crate::util::value_encoding::deserialize_datum;
26
27#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
29pub struct OwnedRow(Box<[Datum]>);
30
31impl std::ops::Index<usize> for OwnedRow {
33 type Output = Datum;
34
35 fn index(&self, index: usize) -> &Self::Output {
36 &self.0[index]
37 }
38}
39
40impl AsRef<OwnedRow> for OwnedRow {
41 fn as_ref(&self) -> &OwnedRow {
42 self
43 }
44}
45
46impl OwnedRow {
47 pub fn empty() -> Self {
51 Self(Box::new([]))
52 }
53
54 pub fn new(values: Vec<Datum>) -> Self {
55 Self(values.into())
56 }
57
58 pub fn into_inner(self) -> Box<[Datum]> {
60 self.0
61 }
62
63 pub fn as_inner(&self) -> &[Datum] {
64 &self.0
65 }
66
67 pub fn from_pretty_with_tys(tys: &[DataType], s: impl AsRef<str>) -> Self {
69 let datums: Vec<_> = tys
70 .iter()
71 .zip_eq_debug(s.as_ref().split_ascii_whitespace())
72 .map(|(ty, x)| {
73 let scalar: ScalarImpl = match ty {
74 DataType::Int16 => x.parse::<i16>().unwrap().into(),
75 DataType::Int32 => x.parse::<i32>().unwrap().into(),
76 DataType::Int64 => x.parse::<i64>().unwrap().into(),
77 DataType::Float32 => x.parse::<f32>().unwrap().into(),
78 DataType::Float64 => x.parse::<f64>().unwrap().into(),
79 DataType::Varchar => x.to_owned().into(),
80 DataType::Boolean => x.parse::<bool>().unwrap().into(),
81 DataType::Date => x.parse::<Date>().unwrap().into(),
82 DataType::Time => x.parse::<Time>().unwrap().into(),
83 DataType::Timestamp => x.parse::<Timestamp>().unwrap().into(),
84 DataType::Interval => x.parse::<Interval>().unwrap().into(),
85 DataType::Decimal => x.parse::<Decimal>().unwrap().into(),
86 _ => todo!(),
87 };
88 Some(scalar)
89 })
90 .collect();
91 Self::new(datums)
92 }
93
94 pub fn last(&self) -> DatumRef<'_> {
95 self.0[self.len() - 1].to_datum_ref()
96 }
97}
98
99impl EstimateSize for OwnedRow {
100 fn estimated_heap_size(&self) -> usize {
101 let data_heap_size: usize = self.0.iter().map(|datum| datum.estimated_heap_size()).sum();
102 self.0.len() * mem::size_of::<Datum>() + data_heap_size
103 }
104}
105
106impl Row for OwnedRow {
107 #[inline]
108 fn datum_at(&self, index: usize) -> DatumRef<'_> {
109 self[index].to_datum_ref()
110 }
111
112 #[inline]
113 unsafe fn datum_at_unchecked(&self, index: usize) -> DatumRef<'_> {
114 unsafe { self.0.get_unchecked(index).to_datum_ref() }
115 }
116
117 #[inline]
118 fn len(&self) -> usize {
119 self.0.len()
120 }
121
122 #[inline]
123 fn iter(&self) -> impl ExactSizeIterator<Item = DatumRef<'_>> {
124 self.0.iter().map(ToDatumRef::to_datum_ref)
125 }
126
127 #[inline]
128 fn to_owned_row(&self) -> OwnedRow {
129 self.clone()
130 }
131
132 #[inline]
133 fn into_owned_row(self) -> OwnedRow {
134 self
135 }
136}
137
138impl IntoIterator for OwnedRow {
139 type IntoIter = std::vec::IntoIter<Datum>;
140 type Item = Datum;
141
142 fn into_iter(self) -> Self::IntoIter {
143 self.0.into_vec().into_iter()
144 }
145}
146
147impl FromIterator<Datum> for OwnedRow {
148 fn from_iter<T: IntoIterator<Item = Datum>>(iter: T) -> Self {
149 Self(iter.into_iter().collect())
150 }
151}
152
153#[derive(Clone, Debug)]
155pub struct RowDeserializer<D: AsRef<[DataType]> = Vec<DataType>> {
156 data_types: D,
157}
158
159impl<D: AsRef<[DataType]>> RowDeserializer<D> {
160 pub fn new(data_types: D) -> Self {
162 RowDeserializer { data_types }
163 }
164
165 pub fn deserialize(&self, mut data: impl bytes::Buf) -> value_encoding::Result<OwnedRow> {
167 let mut values = Vec::with_capacity(self.data_types().len());
168 for typ in self.data_types() {
169 values.push(deserialize_datum(&mut data, typ)?);
170 }
171 Ok(OwnedRow(values.into()))
172 }
173
174 pub fn data_types(&self) -> &[DataType] {
175 self.data_types.as_ref()
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use itertools::Itertools;
182
183 use super::*;
184 use crate::row::RowExt;
185 use crate::types::DataType as Ty;
186 use crate::util::hash_util::Crc32FastBuilder;
187
188 #[test]
189 fn row_value_encode_decode() {
190 let row = OwnedRow::new(vec![
191 Some(ScalarImpl::Utf8("string".into())),
192 Some(ScalarImpl::Bool(true)),
193 Some(ScalarImpl::Int16(1)),
194 Some(ScalarImpl::Int32(2)),
195 Some(ScalarImpl::Int64(3)),
196 Some(ScalarImpl::Float32(4.0.into())),
197 Some(ScalarImpl::Float64(5.0.into())),
198 Some(ScalarImpl::Decimal("-233.3".parse().unwrap())),
199 Some(ScalarImpl::Interval(Interval::from_month_day_usec(7, 8, 9))),
200 ]);
201 let value_indices = (0..9).collect_vec();
202 let bytes = (&row).project(&value_indices).value_serialize();
203 assert_eq!(bytes.len(), 10 + 1 + 2 + 4 + 8 + 4 + 8 + 16 + 16 + 9);
204 let de = RowDeserializer::new(vec![
205 Ty::Varchar,
206 Ty::Boolean,
207 Ty::Int16,
208 Ty::Int32,
209 Ty::Int64,
210 Ty::Float32,
211 Ty::Float64,
212 Ty::Decimal,
213 Ty::Interval,
214 ]);
215 let row1 = de.deserialize(bytes.as_ref()).unwrap();
216 assert_eq!(row, row1);
217 }
218
219 #[test]
220 fn test_hash_row() {
221 let hash_builder = Crc32FastBuilder;
222
223 let row1 = OwnedRow::new(vec![
224 Some(ScalarImpl::Utf8("string".into())),
225 Some(ScalarImpl::Bool(true)),
226 Some(ScalarImpl::Int16(1)),
227 Some(ScalarImpl::Int32(2)),
228 Some(ScalarImpl::Int64(3)),
229 Some(ScalarImpl::Float32(4.0.into())),
230 Some(ScalarImpl::Float64(5.0.into())),
231 Some(ScalarImpl::Decimal("-233.3".parse().unwrap())),
232 Some(ScalarImpl::Interval(Interval::from_month_day_usec(7, 8, 9))),
233 ]);
234 let row2 = OwnedRow::new(vec![
235 Some(ScalarImpl::Interval(Interval::from_month_day_usec(7, 8, 9))),
236 Some(ScalarImpl::Utf8("string".into())),
237 Some(ScalarImpl::Bool(true)),
238 Some(ScalarImpl::Int16(1)),
239 Some(ScalarImpl::Int32(2)),
240 Some(ScalarImpl::Int64(3)),
241 Some(ScalarImpl::Float32(4.0.into())),
242 Some(ScalarImpl::Float64(5.0.into())),
243 Some(ScalarImpl::Decimal("-233.3".parse().unwrap())),
244 ]);
245 assert_ne!(row1.hash(hash_builder), row2.hash(hash_builder));
246
247 let row_default = OwnedRow::default();
248 assert_eq!(row_default.hash(hash_builder).value(), 0);
249 }
250}