risingwave_storage/table/
mod.rs1pub mod batch_table;
16pub mod merge_sort;
17
18use std::ops::Deref;
19
20use futures::{Stream, StreamExt};
21use risingwave_common::array::DataChunk;
22use risingwave_common::catalog::Schema;
23use risingwave_common::hash::VirtualNode;
24pub use risingwave_common::hash::table_distribution::*;
25use risingwave_common::row::{OwnedRow, Row};
26use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
27use risingwave_common::util::iter_util::ZipEqDebug;
28use risingwave_hummock_sdk::key::TableKey;
29
30use crate::StateStoreIter;
31use crate::error::StorageResult;
32use crate::row_serde::value_serde::ValueRowSerde;
33use crate::store::{ChangeLogValue, StateStoreIterExt, StateStoreReadLogItem};
34
35pub trait TableIter: Send {
36 async fn next_row(&mut self) -> StorageResult<Option<OwnedRow>>;
37}
38
39pub fn should_calculate_prefix_hint(
40 prefix_hint_len: usize,
41 pk_prefix_len: usize,
42 is_prefix: bool,
43) -> bool {
44 if prefix_hint_len == 0 {
45 return false;
46 }
47
48 if is_prefix {
49 prefix_hint_len <= pk_prefix_len
50 } else {
51 prefix_hint_len == pk_prefix_len
52 }
53}
54
55pub async fn collect_data_chunk<E, S, R>(
56 stream: &mut S,
57 schema: &Schema,
58 chunk_size: Option<usize>,
59) -> Result<Option<DataChunk>, E>
60where
61 S: Stream<Item = Result<R, E>> + Unpin,
62 R: Row,
63{
64 let mut builders = schema.create_array_builders(chunk_size.unwrap_or(0));
65 let mut row_count = 0;
66 for _ in 0..chunk_size.unwrap_or(usize::MAX) {
67 match stream.next().await.transpose()? {
68 Some(row) => {
69 for (datum, builder) in row.iter().zip_eq_debug(builders.iter_mut()) {
70 builder.append(datum);
71 }
72 }
73 None => break,
74 }
75
76 row_count += 1;
77 }
78
79 let chunk = {
80 let columns: Vec<_> = builders
81 .into_iter()
82 .map(|builder| builder.finish().into())
83 .collect();
84 DataChunk::new(columns, row_count)
85 };
86
87 if chunk.cardinality() == 0 {
88 Ok(None)
89 } else {
90 Ok(Some(chunk))
91 }
92}
93
94pub async fn collect_data_chunk_with_builder<E, S, R>(
96 stream: &mut S,
97 builder: &mut DataChunkBuilder,
98) -> Result<Option<DataChunk>, E>
99where
100 R: Row,
101 S: Stream<Item = Result<R, E>> + Unpin,
102{
103 while let Some(row) = stream.next().await.transpose()? {
106 let result = builder.append_one_row(row);
107 if let Some(chunk) = result {
108 return Ok(Some(chunk));
109 }
110 }
111
112 let chunk = builder.consume_all();
113 Ok(chunk)
114}
115
116pub fn get_second<T, U, E>(arg: Result<(T, U), E>) -> Result<U, E> {
117 arg.map(|x| x.1)
118}
119
120#[derive(Debug)]
121pub struct KeyedRow<T: AsRef<[u8]>, R = OwnedRow> {
122 vnode_prefixed_key: TableKey<T>,
123 row: R,
124}
125
126impl<T: AsRef<[u8]>, R> KeyedRow<T, R> {
127 pub fn new(table_key: TableKey<T>, row: R) -> Self {
128 Self {
129 vnode_prefixed_key: table_key,
130 row,
131 }
132 }
133
134 pub fn into_owned_row(self) -> R {
135 self.row
136 }
137
138 pub fn vnode(&self) -> VirtualNode {
139 self.vnode_prefixed_key.vnode_part()
140 }
141
142 pub fn key(&self) -> &[u8] {
143 self.vnode_prefixed_key.key_part()
144 }
145
146 pub fn row(&self) -> &R {
147 &self.row
148 }
149
150 pub fn into_parts(self) -> (TableKey<T>, R) {
151 (self.vnode_prefixed_key, self.row)
152 }
153}
154
155impl<T: AsRef<[u8]>> Deref for KeyedRow<T> {
156 type Target = OwnedRow;
157
158 fn deref(&self) -> &Self::Target {
159 &self.row
160 }
161}
162
163pub type KeyedChangeLogRow<T> = KeyedRow<T, ChangeLogRow>;
164
165pub type ChangeLogRow = ChangeLogValue<OwnedRow>;
166
167pub fn deserialize_log_stream<'a>(
168 iter: impl StateStoreIter<StateStoreReadLogItem> + 'a,
169 deserializer: &'a impl ValueRowSerde,
170) -> impl Stream<Item = StorageResult<ChangeLogRow>> + 'a {
171 iter.into_stream(|(_key, log_value)| {
172 log_value.try_map(|slice| Ok(OwnedRow::new(deserializer.deserialize(slice)?)))
173 })
174}