risingwave_java_binding/
hummock_iterator.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::anyhow;
18use bytes::Bytes;
19use foyer::{CacheBuilder, HybridCacheBuilder};
20use futures::{TryFutureExt, TryStreamExt};
21use risingwave_common::catalog::ColumnDesc;
22use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
23use risingwave_common::hash::VirtualNode;
24use risingwave_common::row::OwnedRow;
25use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
26use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
27use risingwave_hummock_sdk::key::{TableKeyRange, prefixed_range_with_vnode};
28use risingwave_hummock_sdk::version::HummockVersion;
29use risingwave_jni_core::HummockJavaBindingIterator;
30use risingwave_object_store::object::build_remote_object_store;
31use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
32use risingwave_pb::java_binding::key_range::Bound;
33use risingwave_pb::java_binding::{KeyRange, ReadPlan};
34use risingwave_storage::error::{StorageError, StorageResult};
35use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion;
36use risingwave_storage::hummock::none::NoneRecentFilter;
37use risingwave_storage::hummock::store::HummockStorageIterator;
38use risingwave_storage::hummock::store::version::HummockVersionReader;
39use risingwave_storage::hummock::{
40    CachePolicy, HummockError, SstableStore, SstableStoreConfig, get_committed_read_version_tuple,
41};
42use risingwave_storage::monitor::{HummockStateStoreMetrics, global_hummock_state_store_metrics};
43use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew;
44use risingwave_storage::store::{ReadOptions, StateStoreIterExt};
45use rw_futures_util::select_all;
46use tokio::sync::mpsc::unbounded_channel;
47
48type SingleIterStream = HummockJavaBindingIterator;
49
50fn select_all_vnode_stream(streams: Vec<SingleIterStream>) -> HummockJavaBindingIterator {
51    Box::pin(select_all(streams))
52}
53
54fn to_deserialized_stream(
55    iter: HummockStorageIterator,
56    row_serde: EitherSerde,
57) -> SingleIterStream {
58    Box::pin(
59        iter.into_stream(move |(key, value)| {
60            Ok((
61                Bytes::copy_from_slice(key.user_key.table_key.0),
62                row_serde.deserialize(value).map(OwnedRow::new)?,
63            ))
64        })
65        .map_err(|e| anyhow!(e)),
66    )
67}
68
69pub(crate) async fn new_hummock_java_binding_iter(
70    read_plan: ReadPlan,
71) -> StorageResult<HummockJavaBindingIterator> {
72    {
73        // Note(bugen): should we forward the implementation to the `StorageTable`?
74        let object_store = Arc::new(
75            build_remote_object_store(
76                &read_plan.object_store_url,
77                Arc::new(ObjectStoreMetrics::unused()),
78                "Hummock",
79                Arc::new(ObjectStoreConfig::default()),
80            )
81            .await,
82        );
83
84        let meta_cache = HybridCacheBuilder::new()
85            .memory(1 << 10)
86            .with_shards(2)
87            .storage()
88            .build()
89            .map_err(HummockError::foyer_error)
90            .map_err(StorageError::from)
91            .await?;
92        let block_cache = HybridCacheBuilder::new()
93            .memory(1 << 10)
94            .with_shards(2)
95            .storage()
96            .build()
97            .map_err(HummockError::foyer_error)
98            .map_err(StorageError::from)
99            .await?;
100
101        let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
102            store: object_store,
103            path: read_plan.data_dir,
104            prefetch_buffer_capacity: 1 << 10,
105            max_prefetch_block_number: 16,
106            recent_filter: Arc::new(NoneRecentFilter::default().into()),
107            state_store_metrics: Arc::new(global_hummock_state_store_metrics(
108                MetricLevel::Disabled,
109            )),
110            use_new_object_prefix_strategy: read_plan.use_new_object_prefix_strategy,
111            skip_bloom_filter_in_serde: false,
112            meta_cache,
113            block_cache,
114            vector_meta_cache: CacheBuilder::new(1 << 10).build(),
115            vector_block_cache: CacheBuilder::new(1 << 10).build(),
116        }));
117        let reader = HummockVersionReader::new(
118            sstable_store,
119            Arc::new(HummockStateStoreMetrics::unused()),
120            0,
121        );
122
123        let table = read_plan.table_catalog.unwrap();
124        let versioned = table.version.is_some();
125        let table_columns = table
126            .columns
127            .into_iter()
128            .map(|c| ColumnDesc::from(c.column_desc.unwrap()));
129
130        // Decide which serializer to use based on whether the table is versioned or not.
131        let row_serde: EitherSerde = if versioned {
132            ColumnAwareSerde::new(
133                Arc::from_iter(0..table_columns.len()),
134                Arc::from_iter(table_columns),
135            )
136            .into()
137        } else {
138            BasicSerde::new(
139                Arc::from_iter(0..table_columns.len()),
140                Arc::from_iter(table_columns),
141            )
142            .into()
143        };
144
145        let mut streams = Vec::with_capacity(read_plan.vnode_ids.len());
146        let key_range = read_plan.key_range.unwrap();
147        let pin_version = PinnedVersion::new(
148            HummockVersion::from_rpc_protobuf(&read_plan.version.unwrap()),
149            unbounded_channel().0,
150        );
151        let table_id = read_plan.table_id.into();
152
153        for vnode in read_plan.vnode_ids {
154            let vnode = VirtualNode::from_index(vnode as usize);
155            let key_range = table_key_range_from_prost(vnode, key_range.clone());
156            let (key_range, read_version_tuple) = get_committed_read_version_tuple(
157                pin_version.clone(),
158                table_id,
159                key_range,
160                read_plan.epoch,
161            );
162            let iter = reader
163                .iter(
164                    key_range,
165                    read_plan.epoch,
166                    table_id,
167                    ReadOptions {
168                        cache_policy: CachePolicy::NotFill,
169                        ..Default::default()
170                    },
171                    read_version_tuple,
172                )
173                .await?;
174            streams.push(to_deserialized_stream(iter, row_serde.clone()));
175        }
176
177        let stream = select_all_vnode_stream(streams);
178
179        Ok(stream)
180    }
181}
182
183fn table_key_range_from_prost(vnode: VirtualNode, r: KeyRange) -> TableKeyRange {
184    let map_bound = |b, v| match b {
185        Bound::Unbounded => std::ops::Bound::Unbounded,
186        Bound::Included => std::ops::Bound::Included(v),
187        Bound::Excluded => std::ops::Bound::Excluded(v),
188        _ => unreachable!(),
189    };
190    let left_bound = r.left_bound();
191    let right_bound = r.right_bound();
192    let left = map_bound(left_bound, r.left);
193    let right = map_bound(right_bound, r.right);
194
195    prefixed_range_with_vnode((left, right), vnode)
196}