risingwave_storage/table/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod batch_table;
16pub mod merge_sort;
17
18use std::ops::Deref;
19
20use futures::{Stream, StreamExt};
21use risingwave_common::array::DataChunk;
22use risingwave_common::catalog::Schema;
23use risingwave_common::hash::VirtualNode;
24pub use risingwave_common::hash::table_distribution::*;
25use risingwave_common::row::{OwnedRow, Row};
26use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
27use risingwave_common::util::iter_util::ZipEqDebug;
28use risingwave_hummock_sdk::key::TableKey;
29
30use crate::StateStoreIter;
31use crate::error::StorageResult;
32use crate::row_serde::value_serde::ValueRowSerde;
33use crate::store::{ChangeLogValue, StateStoreIterExt, StateStoreReadLogItem};
34
35pub trait TableIter: Send {
36    async fn next_row(&mut self) -> StorageResult<Option<OwnedRow>>;
37}
38
39pub fn should_calculate_prefix_hint(
40    prefix_hint_len: usize,
41    pk_prefix_len: usize,
42    is_prefix: bool,
43) -> bool {
44    if prefix_hint_len == 0 {
45        return false;
46    }
47
48    if is_prefix {
49        prefix_hint_len <= pk_prefix_len
50    } else {
51        prefix_hint_len == pk_prefix_len
52    }
53}
54
55pub async fn collect_data_chunk<E, S, R>(
56    stream: &mut S,
57    schema: &Schema,
58    chunk_size: Option<usize>,
59) -> Result<Option<DataChunk>, E>
60where
61    S: Stream<Item = Result<R, E>> + Unpin,
62    R: Row,
63{
64    let mut builders = schema.create_array_builders(chunk_size.unwrap_or(0));
65    let mut row_count = 0;
66    for _ in 0..chunk_size.unwrap_or(usize::MAX) {
67        match stream.next().await.transpose()? {
68            Some(row) => {
69                for (datum, builder) in row.iter().zip_eq_debug(builders.iter_mut()) {
70                    builder.append(datum);
71                }
72            }
73            None => break,
74        }
75
76        row_count += 1;
77    }
78
79    let chunk = {
80        let columns: Vec<_> = builders
81            .into_iter()
82            .map(|builder| builder.finish().into())
83            .collect();
84        DataChunk::new(columns, row_count)
85    };
86
87    if chunk.cardinality() == 0 {
88        Ok(None)
89    } else {
90        Ok(Some(chunk))
91    }
92}
93
94/// Collects data chunks from stream of rows.
95pub async fn collect_data_chunk_with_builder<E, S, R>(
96    stream: &mut S,
97    builder: &mut DataChunkBuilder,
98) -> Result<Option<DataChunk>, E>
99where
100    R: Row,
101    S: Stream<Item = Result<R, E>> + Unpin,
102{
103    // TODO(kwannoel): If necessary, we can optimize it in the future.
104    // This can be done by moving the check if builder is full from `append_one_row` to here,
105    while let Some(row) = stream.next().await.transpose()? {
106        let result = builder.append_one_row(row);
107        if let Some(chunk) = result {
108            return Ok(Some(chunk));
109        }
110    }
111
112    let chunk = builder.consume_all();
113    Ok(chunk)
114}
115
116pub fn get_second<T, U, E>(arg: Result<(T, U), E>) -> Result<U, E> {
117    arg.map(|x| x.1)
118}
119
120#[derive(Debug)]
121pub struct KeyedRow<T: AsRef<[u8]>, R = OwnedRow> {
122    vnode_prefixed_key: TableKey<T>,
123    row: R,
124}
125
126impl<T: AsRef<[u8]>, R> KeyedRow<T, R> {
127    pub fn new(table_key: TableKey<T>, row: R) -> Self {
128        Self {
129            vnode_prefixed_key: table_key,
130            row,
131        }
132    }
133
134    pub fn into_owned_row(self) -> R {
135        self.row
136    }
137
138    pub fn vnode(&self) -> VirtualNode {
139        self.vnode_prefixed_key.vnode_part()
140    }
141
142    pub fn key(&self) -> &[u8] {
143        self.vnode_prefixed_key.key_part()
144    }
145
146    pub fn row(&self) -> &R {
147        &self.row
148    }
149
150    pub fn into_parts(self) -> (TableKey<T>, R) {
151        (self.vnode_prefixed_key, self.row)
152    }
153}
154
155impl<T: AsRef<[u8]>> Deref for KeyedRow<T> {
156    type Target = OwnedRow;
157
158    fn deref(&self) -> &Self::Target {
159        &self.row
160    }
161}
162
163pub type KeyedChangeLogRow<T> = KeyedRow<T, ChangeLogRow>;
164
165pub type ChangeLogRow = ChangeLogValue<OwnedRow>;
166
167pub fn deserialize_log_stream<'a>(
168    iter: impl StateStoreIter<StateStoreReadLogItem> + 'a,
169    deserializer: &'a impl ValueRowSerde,
170) -> impl Stream<Item = StorageResult<ChangeLogRow>> + 'a {
171    iter.into_stream(|(_key, log_value)| {
172        log_value.try_map(|slice| Ok(OwnedRow::new(deserializer.deserialize(slice)?)))
173    })
174}