risingwave_storage/table/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod batch_table;
16pub mod merge_sort;
17
18use std::ops::Deref;
19
20use futures::{Stream, StreamExt};
21use risingwave_common::array::DataChunk;
22use risingwave_common::catalog::Schema;
23use risingwave_common::hash::VirtualNode;
24pub use risingwave_common::hash::table_distribution::*;
25use risingwave_common::row::{OwnedRow, Row};
26use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
27use risingwave_common::util::iter_util::ZipEqDebug;
28use risingwave_hummock_sdk::key::TableKey;
29
30use crate::StateStoreIter;
31use crate::error::StorageResult;
32use crate::row_serde::value_serde::ValueRowSerde;
33use crate::store::{ChangeLogValue, StateStoreIterExt, StateStoreReadLogItem};
34
35pub trait TableIter: Send {
36    async fn next_row(&mut self) -> StorageResult<Option<OwnedRow>>;
37}
38
39pub async fn collect_data_chunk<E, S, R>(
40    stream: &mut S,
41    schema: &Schema,
42    chunk_size: Option<usize>,
43) -> Result<Option<DataChunk>, E>
44where
45    S: Stream<Item = Result<R, E>> + Unpin,
46    R: Row,
47{
48    let mut builders = schema.create_array_builders(chunk_size.unwrap_or(0));
49    let mut row_count = 0;
50    for _ in 0..chunk_size.unwrap_or(usize::MAX) {
51        match stream.next().await.transpose()? {
52            Some(row) => {
53                for (datum, builder) in row.iter().zip_eq_debug(builders.iter_mut()) {
54                    builder.append(datum);
55                }
56            }
57            None => break,
58        }
59
60        row_count += 1;
61    }
62
63    let chunk = {
64        let columns: Vec<_> = builders
65            .into_iter()
66            .map(|builder| builder.finish().into())
67            .collect();
68        DataChunk::new(columns, row_count)
69    };
70
71    if chunk.cardinality() == 0 {
72        Ok(None)
73    } else {
74        Ok(Some(chunk))
75    }
76}
77
78/// Collects data chunks from stream of rows.
79pub async fn collect_data_chunk_with_builder<E, S, R>(
80    stream: &mut S,
81    builder: &mut DataChunkBuilder,
82) -> Result<Option<DataChunk>, E>
83where
84    R: Row,
85    S: Stream<Item = Result<R, E>> + Unpin,
86{
87    // TODO(kwannoel): If necessary, we can optimize it in the future.
88    // This can be done by moving the check if builder is full from `append_one_row` to here,
89    while let Some(row) = stream.next().await.transpose()? {
90        let result = builder.append_one_row(row);
91        if let Some(chunk) = result {
92            return Ok(Some(chunk));
93        }
94    }
95
96    let chunk = builder.consume_all();
97    Ok(chunk)
98}
99
100pub fn get_second<T, U, E>(arg: Result<(T, U), E>) -> Result<U, E> {
101    arg.map(|x| x.1)
102}
103
104#[derive(Debug)]
105pub struct KeyedRow<T: AsRef<[u8]>, R = OwnedRow> {
106    vnode_prefixed_key: TableKey<T>,
107    row: R,
108}
109
110impl<T: AsRef<[u8]>, R> KeyedRow<T, R> {
111    pub fn new(table_key: TableKey<T>, row: R) -> Self {
112        Self {
113            vnode_prefixed_key: table_key,
114            row,
115        }
116    }
117
118    pub fn into_owned_row(self) -> R {
119        self.row
120    }
121
122    pub fn into_owned_row_key(self) -> (TableKey<T>, R) {
123        (self.vnode_prefixed_key, self.row)
124    }
125
126    pub fn vnode(&self) -> VirtualNode {
127        self.vnode_prefixed_key.vnode_part()
128    }
129
130    pub fn key(&self) -> &[u8] {
131        self.vnode_prefixed_key.key_part()
132    }
133
134    pub fn row(&self) -> &R {
135        &self.row
136    }
137
138    pub fn into_parts(self) -> (TableKey<T>, R) {
139        (self.vnode_prefixed_key, self.row)
140    }
141}
142
143impl<T: AsRef<[u8]>> Deref for KeyedRow<T> {
144    type Target = OwnedRow;
145
146    fn deref(&self) -> &Self::Target {
147        &self.row
148    }
149}
150
151pub type KeyedChangeLogRow<T> = KeyedRow<T, ChangeLogRow>;
152
153pub type ChangeLogRow = ChangeLogValue<OwnedRow>;
154
155pub fn deserialize_log_stream<'a>(
156    iter: impl StateStoreIter<StateStoreReadLogItem> + 'a,
157    deserializer: &'a impl ValueRowSerde,
158) -> impl Stream<Item = StorageResult<ChangeLogRow>> + 'a {
159    iter.into_stream(|(_key, log_value)| {
160        log_value.try_map(|slice| Ok(OwnedRow::new(deserializer.deserialize(slice)?)))
161    })
162}