risingwave_java_binding/
hummock_iterator.rs1use std::sync::Arc;
16
17use anyhow::anyhow;
18use bytes::Bytes;
19use foyer::{CacheBuilder, HybridCacheBuilder};
20use futures::{TryFutureExt, TryStreamExt};
21use risingwave_common::catalog::ColumnDesc;
22use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
23use risingwave_common::hash::VirtualNode;
24use risingwave_common::row::OwnedRow;
25use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
26use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
27use risingwave_hummock_sdk::key::{TableKeyRange, prefixed_range_with_vnode};
28use risingwave_hummock_sdk::version::HummockVersion;
29use risingwave_jni_core::HummockJavaBindingIterator;
30use risingwave_object_store::object::build_remote_object_store;
31use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
32use risingwave_pb::java_binding::key_range::Bound;
33use risingwave_pb::java_binding::{KeyRange, ReadPlan};
34use risingwave_storage::error::{StorageError, StorageResult};
35use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion;
36use risingwave_storage::hummock::none::NoneRecentFilter;
37use risingwave_storage::hummock::store::HummockStorageIterator;
38use risingwave_storage::hummock::store::version::HummockVersionReader;
39use risingwave_storage::hummock::{
40 CachePolicy, HummockError, SstableStore, SstableStoreConfig, get_committed_read_version_tuple,
41};
42use risingwave_storage::monitor::{HummockStateStoreMetrics, global_hummock_state_store_metrics};
43use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew;
44use risingwave_storage::store::{ReadOptions, StateStoreIterExt};
45use rw_futures_util::select_all;
46use tokio::sync::mpsc::unbounded_channel;
47
48type SingleIterStream = HummockJavaBindingIterator;
49
50fn select_all_vnode_stream(streams: Vec<SingleIterStream>) -> HummockJavaBindingIterator {
51 Box::pin(select_all(streams))
52}
53
54fn to_deserialized_stream(
55 iter: HummockStorageIterator,
56 row_serde: EitherSerde,
57) -> SingleIterStream {
58 Box::pin(
59 iter.into_stream(move |(key, value)| {
60 Ok((
61 Bytes::copy_from_slice(key.user_key.table_key.0),
62 row_serde.deserialize(value).map(OwnedRow::new)?,
63 ))
64 })
65 .map_err(|e| anyhow!(e)),
66 )
67}
68
69pub(crate) async fn new_hummock_java_binding_iter(
70 read_plan: ReadPlan,
71) -> StorageResult<HummockJavaBindingIterator> {
72 {
73 let object_store = Arc::new(
75 build_remote_object_store(
76 &read_plan.object_store_url,
77 Arc::new(ObjectStoreMetrics::unused()),
78 "Hummock",
79 Arc::new(ObjectStoreConfig::default()),
80 )
81 .await,
82 );
83
84 let meta_cache = HybridCacheBuilder::new()
85 .memory(1 << 10)
86 .with_shards(2)
87 .storage()
88 .build()
89 .map_err(HummockError::foyer_error)
90 .map_err(StorageError::from)
91 .await?;
92 let block_cache = HybridCacheBuilder::new()
93 .memory(1 << 10)
94 .with_shards(2)
95 .storage()
96 .build()
97 .map_err(HummockError::foyer_error)
98 .map_err(StorageError::from)
99 .await?;
100
101 let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
102 store: object_store,
103 path: read_plan.data_dir,
104 prefetch_buffer_capacity: 1 << 10,
105 max_prefetch_block_number: 16,
106 recent_filter: Arc::new(NoneRecentFilter::default().into()),
107 state_store_metrics: Arc::new(global_hummock_state_store_metrics(
108 MetricLevel::Disabled,
109 )),
110 use_new_object_prefix_strategy: read_plan.use_new_object_prefix_strategy,
111 skip_bloom_filter_in_serde: false,
112 meta_cache,
113 block_cache,
114 vector_meta_cache: CacheBuilder::new(1 << 10).build(),
115 vector_block_cache: CacheBuilder::new(1 << 10).build(),
116 }));
117 let reader = HummockVersionReader::new(
118 sstable_store,
119 Arc::new(HummockStateStoreMetrics::unused()),
120 0,
121 );
122
123 let table = read_plan.table_catalog.unwrap();
124 let versioned = table.version.is_some();
125 let table_columns = table
126 .columns
127 .into_iter()
128 .map(|c| ColumnDesc::from(c.column_desc.unwrap()));
129
130 let row_serde: EitherSerde = if versioned {
132 ColumnAwareSerde::new(
133 Arc::from_iter(0..table_columns.len()),
134 Arc::from_iter(table_columns),
135 )
136 .into()
137 } else {
138 BasicSerde::new(
139 Arc::from_iter(0..table_columns.len()),
140 Arc::from_iter(table_columns),
141 )
142 .into()
143 };
144
145 let mut streams = Vec::with_capacity(read_plan.vnode_ids.len());
146 let key_range = read_plan.key_range.unwrap();
147 let pin_version = PinnedVersion::new(
148 HummockVersion::from_rpc_protobuf(&read_plan.version.unwrap()),
149 unbounded_channel().0,
150 );
151 let table_id = read_plan.table_id.into();
152
153 for vnode in read_plan.vnode_ids {
154 let vnode = VirtualNode::from_index(vnode as usize);
155 let key_range = table_key_range_from_prost(vnode, key_range.clone());
156 let (key_range, read_version_tuple) = get_committed_read_version_tuple(
157 pin_version.clone(),
158 table_id,
159 key_range,
160 read_plan.epoch,
161 );
162 let iter = reader
163 .iter(
164 key_range,
165 read_plan.epoch,
166 table_id,
167 ReadOptions {
168 cache_policy: CachePolicy::NotFill,
169 ..Default::default()
170 },
171 read_version_tuple,
172 )
173 .await?;
174 streams.push(to_deserialized_stream(iter, row_serde.clone()));
175 }
176
177 let stream = select_all_vnode_stream(streams);
178
179 Ok(stream)
180 }
181}
182
183fn table_key_range_from_prost(vnode: VirtualNode, r: KeyRange) -> TableKeyRange {
184 let map_bound = |b, v| match b {
185 Bound::Unbounded => std::ops::Bound::Unbounded,
186 Bound::Included => std::ops::Bound::Included(v),
187 Bound::Excluded => std::ops::Bound::Excluded(v),
188 _ => unreachable!(),
189 };
190 let left_bound = r.left_bound();
191 let right_bound = r.right_bound();
192 let left = map_bound(left_bound, r.left);
193 let right = map_bound(right_bound, r.right);
194
195 prefixed_range_with_vnode((left, right), vnode)
196}