1use std::mem;
16
17use risingwave_common_estimate_size::EstimateSize;
18
19use super::Row;
20use crate::types::{
21 DataType, Date, Datum, DatumRef, Decimal, Interval, ScalarImpl, Time, Timestamp, ToDatumRef,
22};
23use crate::util::iter_util::ZipEqDebug;
24use crate::util::value_encoding;
25use crate::util::value_encoding::deserialize_datum;
26
27#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
29pub struct OwnedRow(Box<[Datum]>);
30
31impl std::ops::Index<usize> for OwnedRow {
33 type Output = Datum;
34
35 fn index(&self, index: usize) -> &Self::Output {
36 &self.0[index]
37 }
38}
39
40impl AsRef<OwnedRow> for OwnedRow {
41 fn as_ref(&self) -> &OwnedRow {
42 self
43 }
44}
45
46impl OwnedRow {
47 pub fn empty() -> Self {
51 Self(Box::new([]))
52 }
53
54 pub fn new(values: Vec<Datum>) -> Self {
55 Self(values.into())
56 }
57
58 pub fn into_inner(self) -> Box<[Datum]> {
60 self.0
61 }
62
63 pub fn as_inner(&self) -> &[Datum] {
64 &self.0
65 }
66
67 pub fn from_pretty_with_tys(tys: &[DataType], s: impl AsRef<str>) -> Self {
69 let datums: Vec<_> = tys
70 .iter()
71 .zip_eq_debug(s.as_ref().split_ascii_whitespace())
72 .map(|(ty, x)| {
73 let scalar: ScalarImpl = match ty {
74 DataType::Int16 => x.parse::<i16>().unwrap().into(),
75 DataType::Int32 => x.parse::<i32>().unwrap().into(),
76 DataType::Int64 => x.parse::<i64>().unwrap().into(),
77 DataType::Float32 => x.parse::<f32>().unwrap().into(),
78 DataType::Float64 => x.parse::<f64>().unwrap().into(),
79 DataType::Varchar => x.to_owned().into(),
80 DataType::Boolean => x.parse::<bool>().unwrap().into(),
81 DataType::Date => x.parse::<Date>().unwrap().into(),
82 DataType::Time => x.parse::<Time>().unwrap().into(),
83 DataType::Timestamp => x.parse::<Timestamp>().unwrap().into(),
84 DataType::Interval => x.parse::<Interval>().unwrap().into(),
85 DataType::Decimal => x.parse::<Decimal>().unwrap().into(),
86 _ => todo!(),
87 };
88 Some(scalar)
89 })
90 .collect();
91 Self::new(datums)
92 }
93
94 pub fn last(&self) -> DatumRef<'_> {
95 self.0[self.len() - 1].to_datum_ref()
96 }
97}
98
99impl EstimateSize for OwnedRow {
100 fn estimated_heap_size(&self) -> usize {
101 let data_heap_size: usize = self.0.iter().map(|datum| datum.estimated_heap_size()).sum();
102 self.0.len() * mem::size_of::<Datum>() + data_heap_size
103 }
104}
105
106impl Row for OwnedRow {
107 #[inline]
108 fn datum_at(&self, index: usize) -> DatumRef<'_> {
109 self[index].to_datum_ref()
110 }
111
112 #[inline]
113 unsafe fn datum_at_unchecked(&self, index: usize) -> DatumRef<'_> {
114 unsafe { self.0.get_unchecked(index).to_datum_ref() }
115 }
116
117 #[inline]
118 fn len(&self) -> usize {
119 self.0.len()
120 }
121
122 #[inline]
123 fn iter(&self) -> impl ExactSizeIterator<Item = DatumRef<'_>> {
124 self.0.iter().map(ToDatumRef::to_datum_ref)
125 }
126
127 #[inline]
128 fn to_owned_row(&self) -> OwnedRow {
129 self.clone()
130 }
131
132 #[inline]
133 fn into_owned_row(self) -> OwnedRow {
134 self
135 }
136}
137
138impl IntoIterator for OwnedRow {
139 type IntoIter = std::vec::IntoIter<Datum>;
140 type Item = Datum;
141
142 fn into_iter(self) -> Self::IntoIter {
143 self.0.into_vec().into_iter()
144 }
145}
146
147impl FromIterator<Datum> for OwnedRow {
148 fn from_iter<T: IntoIterator<Item = Datum>>(iter: T) -> Self {
149 Self(iter.into_iter().collect())
150 }
151}
152
153#[derive(Clone, Debug)]
155pub struct RowDeserializer<D: AsRef<[DataType]> = Vec<DataType>> {
156 data_types: D,
157}
158
159impl<D: AsRef<[DataType]>> RowDeserializer<D> {
160 pub fn new(data_types: D) -> Self {
162 RowDeserializer { data_types }
163 }
164
165 pub fn deserialize_to(
166 &self,
167 mut data: impl bytes::Buf,
168 values: &mut Vec<Datum>,
169 ) -> value_encoding::Result<()> {
170 for typ in self.data_types() {
171 values.push(deserialize_datum(&mut data, typ)?);
172 }
173 Ok(())
174 }
175
176 pub fn deserialize(&self, data: impl bytes::Buf) -> value_encoding::Result<OwnedRow> {
178 let mut values = Vec::with_capacity(self.data_types().len());
179 self.deserialize_to(data, &mut values)?;
180 Ok(OwnedRow(values.into()))
181 }
182
183 pub fn data_types(&self) -> &[DataType] {
184 self.data_types.as_ref()
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use itertools::Itertools;
191
192 use super::*;
193 use crate::row::RowExt;
194 use crate::types::DataType as Ty;
195 use crate::util::hash_util::Crc32FastBuilder;
196
197 #[test]
198 fn row_value_encode_decode() {
199 let row = OwnedRow::new(vec![
200 Some(ScalarImpl::Utf8("string".into())),
201 Some(ScalarImpl::Bool(true)),
202 Some(ScalarImpl::Int16(1)),
203 Some(ScalarImpl::Int32(2)),
204 Some(ScalarImpl::Int64(3)),
205 Some(ScalarImpl::Float32(4.0.into())),
206 Some(ScalarImpl::Float64(5.0.into())),
207 Some(ScalarImpl::Decimal("-233.3".parse().unwrap())),
208 Some(ScalarImpl::Interval(Interval::from_month_day_usec(7, 8, 9))),
209 ]);
210 let value_indices = (0..9).collect_vec();
211 let bytes = (&row).project(&value_indices).value_serialize();
212 assert_eq!(bytes.len(), 10 + 1 + 2 + 4 + 8 + 4 + 8 + 16 + 16 + 9);
213 let de = RowDeserializer::new(vec![
214 Ty::Varchar,
215 Ty::Boolean,
216 Ty::Int16,
217 Ty::Int32,
218 Ty::Int64,
219 Ty::Float32,
220 Ty::Float64,
221 Ty::Decimal,
222 Ty::Interval,
223 ]);
224 let row1 = de.deserialize(bytes.as_ref()).unwrap();
225 assert_eq!(row, row1);
226 }
227
228 #[test]
229 fn test_hash_row() {
230 let hash_builder = Crc32FastBuilder;
231
232 let row1 = OwnedRow::new(vec![
233 Some(ScalarImpl::Utf8("string".into())),
234 Some(ScalarImpl::Bool(true)),
235 Some(ScalarImpl::Int16(1)),
236 Some(ScalarImpl::Int32(2)),
237 Some(ScalarImpl::Int64(3)),
238 Some(ScalarImpl::Float32(4.0.into())),
239 Some(ScalarImpl::Float64(5.0.into())),
240 Some(ScalarImpl::Decimal("-233.3".parse().unwrap())),
241 Some(ScalarImpl::Interval(Interval::from_month_day_usec(7, 8, 9))),
242 ]);
243 let row2 = OwnedRow::new(vec![
244 Some(ScalarImpl::Interval(Interval::from_month_day_usec(7, 8, 9))),
245 Some(ScalarImpl::Utf8("string".into())),
246 Some(ScalarImpl::Bool(true)),
247 Some(ScalarImpl::Int16(1)),
248 Some(ScalarImpl::Int32(2)),
249 Some(ScalarImpl::Int64(3)),
250 Some(ScalarImpl::Float32(4.0.into())),
251 Some(ScalarImpl::Float64(5.0.into())),
252 Some(ScalarImpl::Decimal("-233.3".parse().unwrap())),
253 ]);
254 assert_ne!(row1.hash(hash_builder), row2.hash(hash_builder));
255
256 let row_default = OwnedRow::default();
257 assert_eq!(row_default.hash(hash_builder).value(), 0);
258 }
259}