risingwave_common/row/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Cow;
16use std::fmt::Display;
17use std::hash::{BuildHasher, Hasher};
18use std::ops::RangeBounds;
19
20use bytes::{BufMut, Bytes, BytesMut};
21use itertools::Itertools;
22
23use self::empty::EMPTY;
24use crate::hash::HashCode;
25use crate::types::{DatumRef, ToDatumRef, ToOwnedDatum, ToText, hash_datum};
26use crate::util::row_serde::OrderedRowSerde;
27use crate::util::value_encoding;
28
29/// The trait for abstracting over a Row-like type.
30pub trait Row: Sized + std::fmt::Debug + PartialEq + Eq {
31    /// Returns the [`DatumRef`] at the given `index`.
32    fn datum_at(&self, index: usize) -> DatumRef<'_>;
33
34    /// Returns the [`DatumRef`] at the given `index` without bounds checking.
35    ///
36    /// # Safety
37    /// Calling this method with an out-of-bounds index is undefined behavior.
38    unsafe fn datum_at_unchecked(&self, index: usize) -> DatumRef<'_>;
39
40    /// Returns the number of datum in the row.
41    fn len(&self) -> usize;
42
43    /// Returns `true` if the row contains no datum.
44    #[inline]
45    fn is_empty(&self) -> bool {
46        self.len() == 0
47    }
48
49    /// Returns an iterator over the datums in the row, in [`DatumRef`] form.
50    fn iter(&self) -> impl Iterator<Item = DatumRef<'_>>;
51
52    /// Converts the row into an [`OwnedRow`].
53    ///
54    /// Prefer `into_owned_row` if the row is already owned.
55    #[inline]
56    fn to_owned_row(&self) -> OwnedRow {
57        OwnedRow::new(self.iter().map(|d| d.to_owned_datum()).collect())
58    }
59
60    /// Consumes `self` and converts it into an [`OwnedRow`].
61    #[inline]
62    fn into_owned_row(self) -> OwnedRow {
63        self.to_owned_row()
64    }
65
66    /// Serializes the row with value encoding, into the given `buf`.
67    #[inline]
68    fn value_serialize_into(&self, mut buf: impl BufMut) {
69        for datum in self.iter() {
70            value_encoding::serialize_datum_into(datum, &mut buf);
71        }
72    }
73
74    /// Serializes the row with value encoding and returns the bytes.
75    #[inline]
76    fn value_serialize(&self) -> Vec<u8> {
77        let estimate_size = self
78            .iter()
79            .map(value_encoding::estimate_serialize_datum_size)
80            .sum();
81        let mut buf = Vec::with_capacity(estimate_size);
82        self.value_serialize_into(&mut buf);
83        buf
84    }
85
86    /// Serializes the row with value encoding and returns the bytes.
87    #[inline]
88    fn value_serialize_bytes(&self) -> Bytes {
89        let estimate_size = self
90            .iter()
91            .map(value_encoding::estimate_serialize_datum_size)
92            .sum();
93        let mut buf = BytesMut::with_capacity(estimate_size);
94        self.value_serialize_into(&mut buf);
95        buf.freeze()
96    }
97
98    fn value_estimate_size(&self) -> usize {
99        self.iter()
100            .map(value_encoding::estimate_serialize_datum_size)
101            .sum()
102    }
103
104    /// Serializes the row with memcomparable encoding, into the given `buf`. As each datum may have
105    /// different order type, a `serde` should be provided.
106    #[inline]
107    fn memcmp_serialize_into(&self, serde: &OrderedRowSerde, buf: impl BufMut) {
108        serde.serialize(self, buf);
109    }
110
111    /// Serializes the row with memcomparable encoding and return the bytes. As each datum may have
112    /// different order type, a `serde` should be provided.
113    #[inline]
114    fn memcmp_serialize(&self, serde: &OrderedRowSerde) -> Vec<u8> {
115        let mut buf = Vec::with_capacity(self.len()); // each datum is at least 1 byte
116        self.memcmp_serialize_into(serde, &mut buf);
117        buf
118    }
119
120    /// Hash the datums of this row into the given hasher.
121    ///
122    /// Implementors should delegate [`std::hash::Hash::hash`] to this method.
123    fn hash_datums_into<H: Hasher>(&self, state: &mut H) {
124        for datum in self.iter() {
125            hash_datum(datum, state);
126        }
127    }
128
129    /// Returns the hash code of the row.
130    fn hash<H: BuildHasher>(&self, hash_builder: H) -> HashCode<H> {
131        let mut hasher = hash_builder.build_hasher();
132        self.hash_datums_into(&mut hasher);
133        hasher.finish().into()
134    }
135
136    /// Determines whether the datums of this row are equal to those of another.
137    #[inline]
138    fn eq(this: &Self, other: impl Row) -> bool {
139        if this.len() != other.len() {
140            return false;
141        }
142        for i in (0..this.len()).rev() {
143            // compare from the end to the start, as it's more likely to have same prefix
144            // SAFETY: index is in bounds as we are iterating from 0 to len.
145            if unsafe { this.datum_at_unchecked(i) != other.datum_at_unchecked(i) } {
146                return false;
147            }
148        }
149        true
150    }
151}
152
153const fn assert_row<R: Row>(r: R) -> R {
154    r
155}
156
157/// An extension trait for [`Row`]s that provides a variety of convenient adapters.
158pub trait RowExt: Row {
159    /// Adapter for chaining two rows together.
160    fn chain<R: Row>(self, other: R) -> Chain<Self, R>
161    where
162        Self: Sized,
163    {
164        assert_row(Chain::new(self, other))
165    }
166
167    /// Adapter for projecting a row onto a subset of its columns with the given `indices`.
168    ///
169    /// # Panics
170    /// Panics if `indices` contains an out-of-bounds index.
171    fn project(self, indices: &[usize]) -> Project<'_, Self>
172    where
173        Self: Sized,
174    {
175        assert_row(Project::new(self, indices))
176    }
177
178    /// Adapter for slicing a row with the given `range`.
179    ///
180    /// # Panics
181    /// Panics if range is out of bounds.
182    fn slice(self, range: impl RangeBounds<usize>) -> Slice<Self>
183    where
184        Self: Sized,
185    {
186        assert_row(Slice::new(self, range))
187    }
188
189    fn display(&self) -> impl Display + '_ {
190        struct D<'a, T: Row>(&'a T);
191        impl<T: Row> Display for D<'_, T> {
192            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193                write!(
194                    f,
195                    "{}",
196                    self.0.iter().format_with(" | ", |datum, f| {
197                        match datum {
198                            None => f(&"NULL"),
199                            Some(scalar) => f(&format_args!("{}", scalar.to_text())),
200                        }
201                    })
202                )
203            }
204        }
205        D(self)
206    }
207
208    fn is_null_at(&self, index: usize) -> bool {
209        self.datum_at(index).is_none()
210    }
211}
212
213impl<R: Row> RowExt for R {}
214
215/// Forward the implementation of [`Row`] to the deref target.
216macro_rules! deref_forward_row {
217    () => {
218        fn datum_at(&self, index: usize) -> crate::types::DatumRef<'_> {
219            (**self).datum_at(index)
220        }
221
222        unsafe fn datum_at_unchecked(&self, index: usize) -> crate::types::DatumRef<'_> {
223            unsafe { (**self).datum_at_unchecked(index) }
224        }
225
226        fn len(&self) -> usize {
227            (**self).len()
228        }
229
230        fn is_empty(&self) -> bool {
231            (**self).is_empty()
232        }
233
234        fn iter(&self) -> impl Iterator<Item = crate::types::DatumRef<'_>> {
235            (**self).iter()
236        }
237
238        fn to_owned_row(&self) -> OwnedRow {
239            (**self).to_owned_row()
240        }
241
242        fn value_serialize_into(&self, buf: impl bytes::BufMut) {
243            (**self).value_serialize_into(buf)
244        }
245
246        fn value_serialize(&self) -> Vec<u8> {
247            (**self).value_serialize()
248        }
249
250        fn memcmp_serialize_into(
251            &self,
252            serde: &$crate::util::row_serde::OrderedRowSerde,
253            buf: impl bytes::BufMut,
254        ) {
255            (**self).memcmp_serialize_into(serde, buf)
256        }
257
258        fn memcmp_serialize(&self, serde: &$crate::util::row_serde::OrderedRowSerde) -> Vec<u8> {
259            (**self).memcmp_serialize(serde)
260        }
261
262        fn hash<H: std::hash::BuildHasher>(&self, hash_builder: H) -> $crate::hash::HashCode<H> {
263            (**self).hash(hash_builder)
264        }
265
266        fn hash_datums_into<H: std::hash::Hasher>(&self, state: &mut H) {
267            (**self).hash_datums_into(state)
268        }
269
270        fn eq(this: &Self, other: impl Row) -> bool {
271            Row::eq(&(**this), other)
272        }
273    };
274}
275
276impl<R: Row> Row for &R {
277    deref_forward_row!();
278}
279
280impl<R: Row + Clone> Row for Cow<'_, R> {
281    deref_forward_row!();
282
283    // Manually implemented in case `R` has a more efficient implementation.
284    fn into_owned_row(self) -> OwnedRow {
285        self.into_owned().into_owned_row()
286    }
287}
288
289impl<R: Row> Row for Box<R> {
290    deref_forward_row!();
291
292    // Manually implemented in case the `Cow` is `Owned` and `R` has a more efficient
293    // implementation.
294    fn into_owned_row(self) -> OwnedRow {
295        (*self).into_owned_row()
296    }
297}
298
299/// Implements [`Row`] for a slice of datums.
300macro_rules! impl_slice_row {
301    () => {
302        #[inline]
303        fn datum_at(&self, index: usize) -> DatumRef<'_> {
304            self[index].to_datum_ref()
305        }
306
307        #[inline]
308        unsafe fn datum_at_unchecked(&self, index: usize) -> DatumRef<'_> {
309            unsafe { self.get_unchecked(index).to_datum_ref() }
310        }
311
312        #[inline]
313        fn len(&self) -> usize {
314            self.as_ref().len()
315        }
316
317        #[inline]
318        fn iter(&self) -> impl Iterator<Item = DatumRef<'_>> {
319            self.as_ref().iter().map(ToDatumRef::to_datum_ref)
320        }
321    };
322}
323
324impl<D: ToDatumRef> Row for &[D] {
325    impl_slice_row!();
326}
327
328impl<D: ToDatumRef, const N: usize> Row for [D; N] {
329    impl_slice_row!();
330}
331
332impl<D: ToDatumRef + Default, const N: usize> Row for ArrayVec<[D; N]> {
333    impl_slice_row!();
334}
335
336/// Implements [`Row`] for an optional row.
337impl<R: Row> Row for Option<R> {
338    fn datum_at(&self, index: usize) -> DatumRef<'_> {
339        match self {
340            Some(row) => row.datum_at(index),
341            None => EMPTY.datum_at(index),
342        }
343    }
344
345    unsafe fn datum_at_unchecked(&self, index: usize) -> DatumRef<'_> {
346        unsafe {
347            match self {
348                Some(row) => row.datum_at_unchecked(index),
349                None => EMPTY.datum_at_unchecked(index),
350            }
351        }
352    }
353
354    fn len(&self) -> usize {
355        match self {
356            Some(row) => row.len(),
357            None => 0,
358        }
359    }
360
361    fn iter(&self) -> impl Iterator<Item = DatumRef<'_>> {
362        match self {
363            Some(row) => either::Either::Left(row.iter()),
364            None => either::Either::Right(EMPTY.iter()),
365        }
366    }
367
368    fn to_owned_row(&self) -> OwnedRow {
369        match self {
370            Some(row) => row.to_owned_row(),
371            None => OwnedRow::new(Vec::new()),
372        }
373    }
374
375    fn into_owned_row(self) -> OwnedRow {
376        match self {
377            Some(row) => row.into_owned_row(),
378            None => OwnedRow::new(Vec::new()),
379        }
380    }
381
382    fn value_serialize_into(&self, buf: impl BufMut) {
383        if let Some(row) = self {
384            row.value_serialize_into(buf);
385        }
386    }
387
388    fn memcmp_serialize_into(&self, serde: &OrderedRowSerde, buf: impl BufMut) {
389        if let Some(row) = self {
390            row.memcmp_serialize_into(serde, buf);
391        }
392    }
393}
394
395/// Implements [`Row`] for an [`either::Either`] of two different types of rows.
396impl<R1: Row, R2: Row> Row for either::Either<R1, R2> {
397    fn datum_at(&self, index: usize) -> DatumRef<'_> {
398        either::for_both!(self, row => row.datum_at(index))
399    }
400
401    unsafe fn datum_at_unchecked(&self, index: usize) -> DatumRef<'_> {
402        unsafe { either::for_both!(self, row => row.datum_at_unchecked(index)) }
403    }
404
405    fn len(&self) -> usize {
406        either::for_both!(self, row => row.len())
407    }
408
409    fn is_empty(&self) -> bool {
410        either::for_both!(self, row => row.is_empty())
411    }
412
413    fn iter(&self) -> impl Iterator<Item = DatumRef<'_>> {
414        self.as_ref().map_either(Row::iter, Row::iter)
415    }
416
417    fn to_owned_row(&self) -> OwnedRow {
418        either::for_both!(self, row => row.to_owned_row())
419    }
420
421    fn into_owned_row(self) -> OwnedRow {
422        either::for_both!(self, row => row.into_owned_row())
423    }
424
425    fn value_serialize_into(&self, buf: impl BufMut) {
426        either::for_both!(self, row => row.value_serialize_into(buf))
427    }
428
429    fn value_serialize(&self) -> Vec<u8> {
430        either::for_both!(self, row => row.value_serialize())
431    }
432
433    fn value_serialize_bytes(&self) -> Bytes {
434        either::for_both!(self, row => row.value_serialize_bytes())
435    }
436
437    fn memcmp_serialize_into(&self, serde: &OrderedRowSerde, buf: impl BufMut) {
438        either::for_both!(self, row => row.memcmp_serialize_into(serde, buf))
439    }
440
441    fn memcmp_serialize(&self, serde: &OrderedRowSerde) -> Vec<u8> {
442        either::for_both!(self, row => row.memcmp_serialize(serde))
443    }
444
445    fn hash_datums_into<H: Hasher>(&self, state: &mut H) {
446        either::for_both!(self, row => row.hash_datums_into(state))
447    }
448
449    fn hash<H: BuildHasher>(&self, hash_builder: H) -> HashCode<H> {
450        either::for_both!(self, row => row.hash(hash_builder))
451    }
452
453    fn eq(this: &Self, other: impl Row) -> bool {
454        either::for_both!(this, row => Row::eq(row, other))
455    }
456}
457
458mod chain;
459mod compacted_row;
460mod empty;
461mod once;
462mod ordered;
463mod owned_row;
464mod project;
465mod repeat_n;
466mod slice;
467pub use ::tinyvec::ArrayVec;
468pub use chain::Chain;
469pub use compacted_row::CompactedRow;
470pub use empty::{Empty, empty};
471pub use once::{Once, once};
472pub use owned_row::{OwnedRow, RowDeserializer};
473pub use project::Project;
474pub use repeat_n::{RepeatN, repeat_n};
475pub use slice::Slice;