risingwave_java_binding/
hummock_iterator.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::anyhow;
18use bytes::Bytes;
19use foyer::{Engine, HybridCacheBuilder};
20use futures::{TryFutureExt, TryStreamExt};
21use risingwave_common::catalog::ColumnDesc;
22use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
23use risingwave_common::hash::VirtualNode;
24use risingwave_common::row::OwnedRow;
25use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
26use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
27use risingwave_hummock_sdk::key::{TableKeyRange, prefixed_range_with_vnode};
28use risingwave_hummock_sdk::version::HummockVersion;
29use risingwave_jni_core::HummockJavaBindingIterator;
30use risingwave_object_store::object::build_remote_object_store;
31use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
32use risingwave_pb::java_binding::key_range::Bound;
33use risingwave_pb::java_binding::{KeyRange, ReadPlan};
34use risingwave_storage::error::{StorageError, StorageResult};
35use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion;
36use risingwave_storage::hummock::store::HummockStorageIterator;
37use risingwave_storage::hummock::store::version::HummockVersionReader;
38use risingwave_storage::hummock::{
39    CachePolicy, HummockError, SstableStore, SstableStoreConfig, get_committed_read_version_tuple,
40};
41use risingwave_storage::monitor::{HummockStateStoreMetrics, global_hummock_state_store_metrics};
42use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew;
43use risingwave_storage::store::{ReadOptions, StateStoreIterExt};
44use rw_futures_util::select_all;
45use tokio::sync::mpsc::unbounded_channel;
46
47type SingleIterStream = HummockJavaBindingIterator;
48
49fn select_all_vnode_stream(streams: Vec<SingleIterStream>) -> HummockJavaBindingIterator {
50    Box::pin(select_all(streams))
51}
52
53fn to_deserialized_stream(
54    iter: HummockStorageIterator,
55    row_serde: EitherSerde,
56) -> SingleIterStream {
57    Box::pin(
58        iter.into_stream(move |(key, value)| {
59            Ok((
60                Bytes::copy_from_slice(key.user_key.table_key.0),
61                row_serde.deserialize(value).map(OwnedRow::new)?,
62            ))
63        })
64        .map_err(|e| anyhow!(e)),
65    )
66}
67
68pub(crate) async fn new_hummock_java_binding_iter(
69    read_plan: ReadPlan,
70) -> StorageResult<HummockJavaBindingIterator> {
71    {
72        // Note(bugen): should we forward the implementation to the `StorageTable`?
73        let object_store = Arc::new(
74            build_remote_object_store(
75                &read_plan.object_store_url,
76                Arc::new(ObjectStoreMetrics::unused()),
77                "Hummock",
78                Arc::new(ObjectStoreConfig::default()),
79            )
80            .await,
81        );
82
83        let meta_cache = HybridCacheBuilder::new()
84            .memory(1 << 10)
85            .with_shards(2)
86            .storage(Engine::Large)
87            .build()
88            .map_err(HummockError::foyer_error)
89            .map_err(StorageError::from)
90            .await?;
91        let block_cache = HybridCacheBuilder::new()
92            .memory(1 << 10)
93            .with_shards(2)
94            .storage(Engine::Large)
95            .build()
96            .map_err(HummockError::foyer_error)
97            .map_err(StorageError::from)
98            .await?;
99
100        let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
101            store: object_store,
102            path: read_plan.data_dir,
103            prefetch_buffer_capacity: 1 << 10,
104            max_prefetch_block_number: 16,
105            recent_filter: None,
106            state_store_metrics: Arc::new(global_hummock_state_store_metrics(
107                MetricLevel::Disabled,
108            )),
109            use_new_object_prefix_strategy: read_plan.use_new_object_prefix_strategy,
110            meta_cache,
111            block_cache,
112        }));
113        let reader = HummockVersionReader::new(
114            sstable_store,
115            Arc::new(HummockStateStoreMetrics::unused()),
116            0,
117        );
118
119        let table = read_plan.table_catalog.unwrap();
120        let versioned = table.version.is_some();
121        let table_columns = table
122            .columns
123            .into_iter()
124            .map(|c| ColumnDesc::from(c.column_desc.unwrap()));
125
126        // Decide which serializer to use based on whether the table is versioned or not.
127        let row_serde: EitherSerde = if versioned {
128            ColumnAwareSerde::new(
129                Arc::from_iter(0..table_columns.len()),
130                Arc::from_iter(table_columns),
131            )
132            .into()
133        } else {
134            BasicSerde::new(
135                Arc::from_iter(0..table_columns.len()),
136                Arc::from_iter(table_columns),
137            )
138            .into()
139        };
140
141        let mut streams = Vec::with_capacity(read_plan.vnode_ids.len());
142        let key_range = read_plan.key_range.unwrap();
143        let pin_version = PinnedVersion::new(
144            HummockVersion::from_rpc_protobuf(&read_plan.version.unwrap()),
145            unbounded_channel().0,
146        );
147        let table_id = read_plan.table_id.into();
148
149        for vnode in read_plan.vnode_ids {
150            let vnode = VirtualNode::from_index(vnode as usize);
151            let key_range = table_key_range_from_prost(vnode, key_range.clone());
152            let (key_range, read_version_tuple) = get_committed_read_version_tuple(
153                pin_version.clone(),
154                table_id,
155                key_range,
156                read_plan.epoch,
157            );
158            let iter = reader
159                .iter(
160                    key_range,
161                    read_plan.epoch,
162                    ReadOptions {
163                        table_id,
164                        cache_policy: CachePolicy::NotFill,
165                        ..Default::default()
166                    },
167                    read_version_tuple,
168                )
169                .await?;
170            streams.push(to_deserialized_stream(iter, row_serde.clone()));
171        }
172
173        let stream = select_all_vnode_stream(streams);
174
175        Ok(stream)
176    }
177}
178
179fn table_key_range_from_prost(vnode: VirtualNode, r: KeyRange) -> TableKeyRange {
180    let map_bound = |b, v| match b {
181        Bound::Unbounded => std::ops::Bound::Unbounded,
182        Bound::Included => std::ops::Bound::Included(v),
183        Bound::Excluded => std::ops::Bound::Excluded(v),
184        _ => unreachable!(),
185    };
186    let left_bound = r.left_bound();
187    let right_bound = r.right_bound();
188    let left = map_bound(left_bound, r.left);
189    let right = map_bound(right_bound, r.right);
190
191    prefixed_range_with_vnode((left, right), vnode)
192}