risingwave_storage/table/
mod.rs1pub mod batch_table;
16pub mod merge_sort;
17
18use std::ops::Deref;
19
20use futures::{Stream, StreamExt};
21use risingwave_common::array::DataChunk;
22use risingwave_common::catalog::Schema;
23use risingwave_common::hash::VirtualNode;
24pub use risingwave_common::hash::table_distribution::*;
25use risingwave_common::row::{OwnedRow, Row};
26use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
27use risingwave_common::util::iter_util::ZipEqDebug;
28use risingwave_hummock_sdk::key::TableKey;
29
30use crate::StateStoreIter;
31use crate::error::StorageResult;
32use crate::row_serde::value_serde::ValueRowSerde;
33use crate::store::{ChangeLogValue, StateStoreIterExt, StateStoreReadLogItem};
34
35pub trait TableIter: Send {
36 async fn next_row(&mut self) -> StorageResult<Option<OwnedRow>>;
37}
38
39pub async fn collect_data_chunk<E, S, R>(
40 stream: &mut S,
41 schema: &Schema,
42 chunk_size: Option<usize>,
43) -> Result<Option<DataChunk>, E>
44where
45 S: Stream<Item = Result<R, E>> + Unpin,
46 R: Row,
47{
48 let mut builders = schema.create_array_builders(chunk_size.unwrap_or(0));
49 let mut row_count = 0;
50 for _ in 0..chunk_size.unwrap_or(usize::MAX) {
51 match stream.next().await.transpose()? {
52 Some(row) => {
53 for (datum, builder) in row.iter().zip_eq_debug(builders.iter_mut()) {
54 builder.append(datum);
55 }
56 }
57 None => break,
58 }
59
60 row_count += 1;
61 }
62
63 let chunk = {
64 let columns: Vec<_> = builders
65 .into_iter()
66 .map(|builder| builder.finish().into())
67 .collect();
68 DataChunk::new(columns, row_count)
69 };
70
71 if chunk.cardinality() == 0 {
72 Ok(None)
73 } else {
74 Ok(Some(chunk))
75 }
76}
77
78pub async fn collect_data_chunk_with_builder<E, S, R>(
80 stream: &mut S,
81 builder: &mut DataChunkBuilder,
82) -> Result<Option<DataChunk>, E>
83where
84 R: Row,
85 S: Stream<Item = Result<R, E>> + Unpin,
86{
87 while let Some(row) = stream.next().await.transpose()? {
90 let result = builder.append_one_row(row);
91 if let Some(chunk) = result {
92 return Ok(Some(chunk));
93 }
94 }
95
96 let chunk = builder.consume_all();
97 Ok(chunk)
98}
99
100pub fn get_second<T, U, E>(arg: Result<(T, U), E>) -> Result<U, E> {
101 arg.map(|x| x.1)
102}
103
104#[derive(Debug)]
105pub struct KeyedRow<T: AsRef<[u8]>, R = OwnedRow> {
106 vnode_prefixed_key: TableKey<T>,
107 row: R,
108}
109
110impl<T: AsRef<[u8]>, R> KeyedRow<T, R> {
111 pub fn new(table_key: TableKey<T>, row: R) -> Self {
112 Self {
113 vnode_prefixed_key: table_key,
114 row,
115 }
116 }
117
118 pub fn into_owned_row(self) -> R {
119 self.row
120 }
121
122 pub fn into_owned_row_key(self) -> (TableKey<T>, R) {
123 (self.vnode_prefixed_key, self.row)
124 }
125
126 pub fn vnode(&self) -> VirtualNode {
127 self.vnode_prefixed_key.vnode_part()
128 }
129
130 pub fn key(&self) -> &[u8] {
131 self.vnode_prefixed_key.key_part()
132 }
133
134 pub fn row(&self) -> &R {
135 &self.row
136 }
137
138 pub fn into_parts(self) -> (TableKey<T>, R) {
139 (self.vnode_prefixed_key, self.row)
140 }
141}
142
143impl<T: AsRef<[u8]>> Deref for KeyedRow<T> {
144 type Target = OwnedRow;
145
146 fn deref(&self) -> &Self::Target {
147 &self.row
148 }
149}
150
151pub type KeyedChangeLogRow<T> = KeyedRow<T, ChangeLogRow>;
152
153pub type ChangeLogRow = ChangeLogValue<OwnedRow>;
154
155pub fn deserialize_log_stream<'a>(
156 iter: impl StateStoreIter<StateStoreReadLogItem> + 'a,
157 deserializer: &'a impl ValueRowSerde,
158) -> impl Stream<Item = StorageResult<ChangeLogRow>> + 'a {
159 iter.into_stream(|(_key, log_value)| {
160 log_value.try_map(|slice| Ok(OwnedRow::new(deserializer.deserialize(slice)?)))
161 })
162}