risingwave_java_binding/
hummock_iterator.rs1use std::sync::Arc;
16
17use anyhow::anyhow;
18use bytes::Bytes;
19use foyer::{Engine, HybridCacheBuilder};
20use futures::{TryFutureExt, TryStreamExt};
21use risingwave_common::catalog::ColumnDesc;
22use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
23use risingwave_common::hash::VirtualNode;
24use risingwave_common::row::OwnedRow;
25use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
26use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
27use risingwave_hummock_sdk::key::{TableKeyRange, prefixed_range_with_vnode};
28use risingwave_hummock_sdk::version::HummockVersion;
29use risingwave_jni_core::HummockJavaBindingIterator;
30use risingwave_object_store::object::build_remote_object_store;
31use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
32use risingwave_pb::java_binding::key_range::Bound;
33use risingwave_pb::java_binding::{KeyRange, ReadPlan};
34use risingwave_storage::error::{StorageError, StorageResult};
35use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion;
36use risingwave_storage::hummock::store::HummockStorageIterator;
37use risingwave_storage::hummock::store::version::HummockVersionReader;
38use risingwave_storage::hummock::{
39 CachePolicy, HummockError, SstableStore, SstableStoreConfig, get_committed_read_version_tuple,
40};
41use risingwave_storage::monitor::{HummockStateStoreMetrics, global_hummock_state_store_metrics};
42use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew;
43use risingwave_storage::store::{ReadOptions, StateStoreIterExt};
44use rw_futures_util::select_all;
45use tokio::sync::mpsc::unbounded_channel;
46
47type SingleIterStream = HummockJavaBindingIterator;
48
49fn select_all_vnode_stream(streams: Vec<SingleIterStream>) -> HummockJavaBindingIterator {
50 Box::pin(select_all(streams))
51}
52
53fn to_deserialized_stream(
54 iter: HummockStorageIterator,
55 row_serde: EitherSerde,
56) -> SingleIterStream {
57 Box::pin(
58 iter.into_stream(move |(key, value)| {
59 Ok((
60 Bytes::copy_from_slice(key.user_key.table_key.0),
61 row_serde.deserialize(value).map(OwnedRow::new)?,
62 ))
63 })
64 .map_err(|e| anyhow!(e)),
65 )
66}
67
68pub(crate) async fn new_hummock_java_binding_iter(
69 read_plan: ReadPlan,
70) -> StorageResult<HummockJavaBindingIterator> {
71 {
72 let object_store = Arc::new(
74 build_remote_object_store(
75 &read_plan.object_store_url,
76 Arc::new(ObjectStoreMetrics::unused()),
77 "Hummock",
78 Arc::new(ObjectStoreConfig::default()),
79 )
80 .await,
81 );
82
83 let meta_cache = HybridCacheBuilder::new()
84 .memory(1 << 10)
85 .with_shards(2)
86 .storage(Engine::Large)
87 .build()
88 .map_err(HummockError::foyer_error)
89 .map_err(StorageError::from)
90 .await?;
91 let block_cache = HybridCacheBuilder::new()
92 .memory(1 << 10)
93 .with_shards(2)
94 .storage(Engine::Large)
95 .build()
96 .map_err(HummockError::foyer_error)
97 .map_err(StorageError::from)
98 .await?;
99
100 let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
101 store: object_store,
102 path: read_plan.data_dir,
103 prefetch_buffer_capacity: 1 << 10,
104 max_prefetch_block_number: 16,
105 recent_filter: None,
106 state_store_metrics: Arc::new(global_hummock_state_store_metrics(
107 MetricLevel::Disabled,
108 )),
109 use_new_object_prefix_strategy: read_plan.use_new_object_prefix_strategy,
110 meta_cache,
111 block_cache,
112 }));
113 let reader = HummockVersionReader::new(
114 sstable_store,
115 Arc::new(HummockStateStoreMetrics::unused()),
116 0,
117 );
118
119 let table = read_plan.table_catalog.unwrap();
120 let versioned = table.version.is_some();
121 let table_columns = table
122 .columns
123 .into_iter()
124 .map(|c| ColumnDesc::from(c.column_desc.unwrap()));
125
126 let row_serde: EitherSerde = if versioned {
128 ColumnAwareSerde::new(
129 Arc::from_iter(0..table_columns.len()),
130 Arc::from_iter(table_columns),
131 )
132 .into()
133 } else {
134 BasicSerde::new(
135 Arc::from_iter(0..table_columns.len()),
136 Arc::from_iter(table_columns),
137 )
138 .into()
139 };
140
141 let mut streams = Vec::with_capacity(read_plan.vnode_ids.len());
142 let key_range = read_plan.key_range.unwrap();
143 let pin_version = PinnedVersion::new(
144 HummockVersion::from_rpc_protobuf(&read_plan.version.unwrap()),
145 unbounded_channel().0,
146 );
147 let table_id = read_plan.table_id.into();
148
149 for vnode in read_plan.vnode_ids {
150 let vnode = VirtualNode::from_index(vnode as usize);
151 let key_range = table_key_range_from_prost(vnode, key_range.clone());
152 let (key_range, read_version_tuple) = get_committed_read_version_tuple(
153 pin_version.clone(),
154 table_id,
155 key_range,
156 read_plan.epoch,
157 );
158 let iter = reader
159 .iter(
160 key_range,
161 read_plan.epoch,
162 ReadOptions {
163 table_id,
164 cache_policy: CachePolicy::NotFill,
165 ..Default::default()
166 },
167 read_version_tuple,
168 )
169 .await?;
170 streams.push(to_deserialized_stream(iter, row_serde.clone()));
171 }
172
173 let stream = select_all_vnode_stream(streams);
174
175 Ok(stream)
176 }
177}
178
179fn table_key_range_from_prost(vnode: VirtualNode, r: KeyRange) -> TableKeyRange {
180 let map_bound = |b, v| match b {
181 Bound::Unbounded => std::ops::Bound::Unbounded,
182 Bound::Included => std::ops::Bound::Included(v),
183 Bound::Excluded => std::ops::Bound::Excluded(v),
184 _ => unreachable!(),
185 };
186 let left_bound = r.left_bound();
187 let right_bound = r.right_bound();
188 let left = map_bound(left_bound, r.left);
189 let right = map_bound(right_bound, r.right);
190
191 prefixed_range_with_vnode((left, right), vnode)
192}