risingwave_common/row/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Cow;
16use std::fmt::Display;
17use std::hash::{BuildHasher, Hasher};
18use std::ops::RangeBounds;
19
20use bytes::{BufMut, Bytes, BytesMut};
21use itertools::Itertools;
22
23use self::empty::EMPTY;
24use crate::hash::HashCode;
25use crate::types::{DatumRef, ToDatumRef, ToOwnedDatum, ToText, hash_datum};
26use crate::util::row_serde::OrderedRowSerde;
27use crate::util::value_encoding;
28
29/// The trait for abstracting over a Row-like type.
30pub trait Row: Sized + std::fmt::Debug + PartialEq + Eq + Send + Sync {
31    /// Returns the [`DatumRef`] at the given `index`.
32    fn datum_at(&self, index: usize) -> DatumRef<'_>;
33
34    /// Returns the [`DatumRef`] at the given `index` without bounds checking.
35    ///
36    /// # Safety
37    /// Calling this method with an out-of-bounds index is undefined behavior.
38    unsafe fn datum_at_unchecked(&self, index: usize) -> DatumRef<'_>;
39
40    /// Returns the number of datum in the row.
41    fn len(&self) -> usize;
42
43    /// Returns `true` if the row contains no datum.
44    #[inline]
45    fn is_empty(&self) -> bool {
46        self.len() == 0
47    }
48
49    /// Returns an iterator over the datums in the row, in [`DatumRef`] form.
50    fn iter(&self) -> impl Iterator<Item = DatumRef<'_>>;
51
52    /// Converts the row into an [`OwnedRow`].
53    ///
54    /// Prefer `into_owned_row` if the row is already owned.
55    #[inline]
56    fn to_owned_row(&self) -> OwnedRow {
57        OwnedRow::new(self.iter().map(|d| d.to_owned_datum()).collect())
58    }
59
60    /// Consumes `self` and converts it into an [`OwnedRow`].
61    #[inline]
62    fn into_owned_row(self) -> OwnedRow {
63        self.to_owned_row()
64    }
65
66    /// Serializes the row with value encoding, into the given `buf`.
67    #[inline]
68    fn value_serialize_into(&self, mut buf: impl BufMut) {
69        for datum in self.iter() {
70            value_encoding::serialize_datum_into(datum, &mut buf);
71        }
72    }
73
74    /// Serializes the row with value encoding and returns the bytes.
75    #[inline]
76    fn value_serialize(&self) -> Vec<u8> {
77        let estimate_size = self
78            .iter()
79            .map(value_encoding::estimate_serialize_datum_size)
80            .sum();
81        let mut buf = Vec::with_capacity(estimate_size);
82        self.value_serialize_into(&mut buf);
83        buf
84    }
85
86    /// Serializes the row with value encoding and returns the bytes.
87    #[inline]
88    fn value_serialize_bytes(&self) -> Bytes {
89        let estimate_size = self
90            .iter()
91            .map(value_encoding::estimate_serialize_datum_size)
92            .sum();
93        let mut buf = BytesMut::with_capacity(estimate_size);
94        self.value_serialize_into(&mut buf);
95        buf.freeze()
96    }
97
98    fn value_estimate_size(&self) -> usize {
99        self.iter()
100            .map(value_encoding::estimate_serialize_datum_size)
101            .sum()
102    }
103
104    /// Serializes the row with memcomparable encoding, into the given `buf`. As each datum may have
105    /// different order type, a `serde` should be provided.
106    #[inline]
107    fn memcmp_serialize_into(&self, serde: &OrderedRowSerde, buf: impl BufMut) {
108        serde.serialize(self, buf);
109    }
110
111    /// Serializes the row with memcomparable encoding and return the bytes. As each datum may have
112    /// different order type, a `serde` should be provided.
113    #[inline]
114    fn memcmp_serialize(&self, serde: &OrderedRowSerde) -> Vec<u8> {
115        let mut buf = Vec::with_capacity(self.len()); // each datum is at least 1 byte
116        self.memcmp_serialize_into(serde, &mut buf);
117        buf
118    }
119
120    /// Hash the datums of this row into the given hasher.
121    ///
122    /// Implementors should delegate [`std::hash::Hash::hash`] to this method.
123    fn hash_datums_into<H: Hasher>(&self, state: &mut H) {
124        for datum in self.iter() {
125            hash_datum(datum, state);
126        }
127    }
128
129    /// Returns the hash code of the row.
130    fn hash<H: BuildHasher>(&self, hash_builder: H) -> HashCode<H> {
131        let mut hasher = hash_builder.build_hasher();
132        self.hash_datums_into(&mut hasher);
133        hasher.finish().into()
134    }
135
136    /// Determines whether the datums of this row are equal to those of another.
137    #[inline]
138    fn eq(this: &Self, other: impl Row) -> bool {
139        if this.len() != other.len() {
140            return false;
141        }
142        for i in (0..this.len()).rev() {
143            // compare from the end to the start, as it's more likely to have same prefix
144            // SAFETY: index is in bounds as we are iterating from 0 to len.
145            if unsafe { this.datum_at_unchecked(i) != other.datum_at_unchecked(i) } {
146                return false;
147            }
148        }
149        true
150    }
151}
152
153const fn assert_row<R: Row>(r: R) -> R {
154    r
155}
156
157/// An extension trait for [`Row`]s that provides a variety of convenient adapters.
158pub trait RowExt: Row {
159    /// Adapter for chaining two rows together.
160    fn chain<R: Row>(self, other: R) -> Chain<Self, R>
161    where
162        Self: Sized,
163    {
164        assert_row(Chain::new(self, other))
165    }
166
167    /// Adapter for projecting a row onto a subset of its columns with the given `indices`.
168    ///
169    /// # Panics
170    /// Panics if `indices` contains an out-of-bounds index.
171    fn project(self, indices: &[usize]) -> Project<'_, Self>
172    where
173        Self: Sized,
174    {
175        assert_row(Project::new(self, indices))
176    }
177
178    /// Adapter for slicing a row with the given `range`.
179    ///
180    /// # Panics
181    /// Panics if range is out of bounds.
182    fn slice(self, range: impl RangeBounds<usize>) -> Slice<Self>
183    where
184        Self: Sized,
185    {
186        assert_row(Slice::new(self, range))
187    }
188
189    /// Returns a displayable wrapper for the row.
190    fn display(&self) -> impl Display + '_ {
191        struct D<'a, T: Row>(&'a T);
192        impl<T: Row> Display for D<'_, T> {
193            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194                write!(
195                    f,
196                    "[{}]",
197                    self.0.iter().format_with(", ", |datum, f| {
198                        match datum {
199                            None => f(&"NULL"),
200                            Some(scalar) => f(&scalar.text_display()),
201                        }
202                    })
203                )
204            }
205        }
206        D(self)
207    }
208
209    fn is_null_at(&self, index: usize) -> bool {
210        self.datum_at(index).is_none()
211    }
212}
213
214impl<R: Row> RowExt for R {}
215
216/// Forward the implementation of [`Row`] to the deref target.
217macro_rules! deref_forward_row {
218    () => {
219        fn datum_at(&self, index: usize) -> crate::types::DatumRef<'_> {
220            (**self).datum_at(index)
221        }
222
223        unsafe fn datum_at_unchecked(&self, index: usize) -> crate::types::DatumRef<'_> {
224            unsafe { (**self).datum_at_unchecked(index) }
225        }
226
227        fn len(&self) -> usize {
228            (**self).len()
229        }
230
231        fn is_empty(&self) -> bool {
232            (**self).is_empty()
233        }
234
235        fn iter(&self) -> impl Iterator<Item = crate::types::DatumRef<'_>> {
236            (**self).iter()
237        }
238
239        fn to_owned_row(&self) -> OwnedRow {
240            (**self).to_owned_row()
241        }
242
243        fn value_serialize_into(&self, buf: impl bytes::BufMut) {
244            (**self).value_serialize_into(buf)
245        }
246
247        fn value_serialize(&self) -> Vec<u8> {
248            (**self).value_serialize()
249        }
250
251        fn memcmp_serialize_into(
252            &self,
253            serde: &$crate::util::row_serde::OrderedRowSerde,
254            buf: impl bytes::BufMut,
255        ) {
256            (**self).memcmp_serialize_into(serde, buf)
257        }
258
259        fn memcmp_serialize(&self, serde: &$crate::util::row_serde::OrderedRowSerde) -> Vec<u8> {
260            (**self).memcmp_serialize(serde)
261        }
262
263        fn hash<H: std::hash::BuildHasher>(&self, hash_builder: H) -> $crate::hash::HashCode<H> {
264            (**self).hash(hash_builder)
265        }
266
267        fn hash_datums_into<H: std::hash::Hasher>(&self, state: &mut H) {
268            (**self).hash_datums_into(state)
269        }
270
271        fn eq(this: &Self, other: impl Row) -> bool {
272            Row::eq(&(**this), other)
273        }
274    };
275}
276
277impl<R: Row> Row for &R {
278    deref_forward_row!();
279}
280
281impl<R: Row + Clone> Row for Cow<'_, R> {
282    deref_forward_row!();
283
284    // Manually implemented in case `R` has a more efficient implementation.
285    fn into_owned_row(self) -> OwnedRow {
286        self.into_owned().into_owned_row()
287    }
288}
289
290impl<R: Row> Row for Box<R> {
291    deref_forward_row!();
292
293    // Manually implemented in case the `Cow` is `Owned` and `R` has a more efficient
294    // implementation.
295    fn into_owned_row(self) -> OwnedRow {
296        (*self).into_owned_row()
297    }
298}
299
300/// Implements [`Row`] for a slice of datums.
301macro_rules! impl_slice_row {
302    () => {
303        #[inline]
304        fn datum_at(&self, index: usize) -> DatumRef<'_> {
305            self[index].to_datum_ref()
306        }
307
308        #[inline]
309        unsafe fn datum_at_unchecked(&self, index: usize) -> DatumRef<'_> {
310            unsafe { self.get_unchecked(index).to_datum_ref() }
311        }
312
313        #[inline]
314        fn len(&self) -> usize {
315            self.as_ref().len()
316        }
317
318        #[inline]
319        fn iter(&self) -> impl Iterator<Item = DatumRef<'_>> {
320            self.as_ref().iter().map(ToDatumRef::to_datum_ref)
321        }
322    };
323}
324
325impl<D: ToDatumRef> Row for &[D] {
326    impl_slice_row!();
327}
328
329impl<D: ToDatumRef, const N: usize> Row for [D; N] {
330    impl_slice_row!();
331}
332
333impl<D: ToDatumRef + Default, const N: usize> Row for ArrayVec<[D; N]> {
334    impl_slice_row!();
335}
336
337/// Implements [`Row`] for an optional row.
338impl<R: Row> Row for Option<R> {
339    fn datum_at(&self, index: usize) -> DatumRef<'_> {
340        match self {
341            Some(row) => row.datum_at(index),
342            None => EMPTY.datum_at(index),
343        }
344    }
345
346    unsafe fn datum_at_unchecked(&self, index: usize) -> DatumRef<'_> {
347        unsafe {
348            match self {
349                Some(row) => row.datum_at_unchecked(index),
350                None => EMPTY.datum_at_unchecked(index),
351            }
352        }
353    }
354
355    fn len(&self) -> usize {
356        match self {
357            Some(row) => row.len(),
358            None => 0,
359        }
360    }
361
362    fn iter(&self) -> impl Iterator<Item = DatumRef<'_>> {
363        match self {
364            Some(row) => either::Either::Left(row.iter()),
365            None => either::Either::Right(EMPTY.iter()),
366        }
367    }
368
369    fn to_owned_row(&self) -> OwnedRow {
370        match self {
371            Some(row) => row.to_owned_row(),
372            None => OwnedRow::new(Vec::new()),
373        }
374    }
375
376    fn into_owned_row(self) -> OwnedRow {
377        match self {
378            Some(row) => row.into_owned_row(),
379            None => OwnedRow::new(Vec::new()),
380        }
381    }
382
383    fn value_serialize_into(&self, buf: impl BufMut) {
384        if let Some(row) = self {
385            row.value_serialize_into(buf);
386        }
387    }
388
389    fn memcmp_serialize_into(&self, serde: &OrderedRowSerde, buf: impl BufMut) {
390        if let Some(row) = self {
391            row.memcmp_serialize_into(serde, buf);
392        }
393    }
394}
395
396/// Implements [`Row`] for an [`either::Either`] of two different types of rows.
397impl<R1: Row, R2: Row> Row for either::Either<R1, R2> {
398    fn datum_at(&self, index: usize) -> DatumRef<'_> {
399        either::for_both!(self, row => row.datum_at(index))
400    }
401
402    unsafe fn datum_at_unchecked(&self, index: usize) -> DatumRef<'_> {
403        unsafe { either::for_both!(self, row => row.datum_at_unchecked(index)) }
404    }
405
406    fn len(&self) -> usize {
407        either::for_both!(self, row => row.len())
408    }
409
410    fn is_empty(&self) -> bool {
411        either::for_both!(self, row => row.is_empty())
412    }
413
414    fn iter(&self) -> impl Iterator<Item = DatumRef<'_>> {
415        self.as_ref().map_either(Row::iter, Row::iter)
416    }
417
418    fn to_owned_row(&self) -> OwnedRow {
419        either::for_both!(self, row => row.to_owned_row())
420    }
421
422    fn into_owned_row(self) -> OwnedRow {
423        either::for_both!(self, row => row.into_owned_row())
424    }
425
426    fn value_serialize_into(&self, buf: impl BufMut) {
427        either::for_both!(self, row => row.value_serialize_into(buf))
428    }
429
430    fn value_serialize(&self) -> Vec<u8> {
431        either::for_both!(self, row => row.value_serialize())
432    }
433
434    fn value_serialize_bytes(&self) -> Bytes {
435        either::for_both!(self, row => row.value_serialize_bytes())
436    }
437
438    fn memcmp_serialize_into(&self, serde: &OrderedRowSerde, buf: impl BufMut) {
439        either::for_both!(self, row => row.memcmp_serialize_into(serde, buf))
440    }
441
442    fn memcmp_serialize(&self, serde: &OrderedRowSerde) -> Vec<u8> {
443        either::for_both!(self, row => row.memcmp_serialize(serde))
444    }
445
446    fn hash_datums_into<H: Hasher>(&self, state: &mut H) {
447        either::for_both!(self, row => row.hash_datums_into(state))
448    }
449
450    fn hash<H: BuildHasher>(&self, hash_builder: H) -> HashCode<H> {
451        either::for_both!(self, row => row.hash(hash_builder))
452    }
453
454    fn eq(this: &Self, other: impl Row) -> bool {
455        either::for_both!(this, row => Row::eq(row, other))
456    }
457}
458
459mod chain;
460mod compacted_row;
461mod empty;
462mod once;
463mod ordered;
464mod owned_row;
465mod project;
466mod repeat_n;
467mod slice;
468pub use ::tinyvec::ArrayVec;
469pub use chain::Chain;
470pub use compacted_row::CompactedRow;
471pub use empty::{Empty, empty};
472pub use once::{Once, once};
473pub use owned_row::{OwnedRow, RowDeserializer};
474pub use project::Project;
475pub use repeat_n::{RepeatN, repeat_n};
476pub use slice::Slice;