risingwave_storage/hummock/store/table_change_log_manager.rs
1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::iter;
17use std::pin::Pin;
18use std::sync::Arc;
19
20use futures::FutureExt;
21use futures::future::Shared;
22use moka::sync::Cache;
23use risingwave_common::id::TableId;
24use risingwave_hummock_sdk::change_log::TableChangeLogs;
25use risingwave_rpc_client::HummockMetaClient;
26
27use crate::hummock::{HummockError, HummockResult};
28
29type InflightResult = Shared<Pin<Box<dyn Future<Output = HummockResult<TableChangeLogs>> + Send>>>;
30
31#[derive(Eq, Hash, PartialEq)]
32struct CacheKey {
33 table_id: TableId,
34 epoch_range: (u64, u64),
35 include_epoch_only: bool,
36 limit: Option<u32>,
37}
38
39/// A naive cache to reduce number of RPC sent to meta node.
40pub struct TableChangeLogManager {
41 cache: Cache<CacheKey, InflightResult>,
42 hummock_meta_client: Arc<dyn HummockMetaClient>,
43}
44
45impl TableChangeLogManager {
46 pub fn new(capacity: u64, hummock_meta_client: Arc<dyn HummockMetaClient>) -> Self {
47 let cache = Cache::builder().max_capacity(capacity).build();
48 Self {
49 cache,
50 hummock_meta_client,
51 }
52 }
53
54 async fn get_or_insert(
55 &self,
56 table_id: TableId,
57 epoch_range: (u64, u64),
58 include_epoch_only: bool,
59 limit: Option<u32>,
60 fetch: impl Future<Output = HummockResult<TableChangeLogs>> + Send + 'static,
61 ) -> HummockResult<TableChangeLogs> {
62 self.cache
63 .entry(CacheKey {
64 table_id,
65 epoch_range,
66 include_epoch_only,
67 limit,
68 })
69 .or_insert_with_if(
70 || fetch.boxed().shared(),
71 |inflight| {
72 if let Some(result) = inflight.peek() {
73 return result.is_err();
74 }
75 false
76 },
77 )
78 .value()
79 .clone()
80 .await
81 }
82
83 /// Fetches table change logs for the given `table_id` and `epoch_range`.
84 ///
85 /// - If the end value of `epoch_range` is not `u64::MAX`, attempts to retrieve logs from the cache; if not cached, fetches via an RPC to the meta node and stores the result in the cache.
86 /// - If the end value of `epoch_range` is `u64::MAX`, always fetches table change logs directly from the meta node (bypassing the cache).
87 ///
88 /// Both the start and end values of `epoch_range` are inclusive.
89 ///
90 /// IMPORTANT: The caller must guarantee that the current max committed epoch is at least as large as the end of the provided `epoch_range`, if it's not `u64::MAX`.
91 /// Otherwise, the cache may serve outdated results: as new epochs are committed beyond the current maximum, subsequent RPC calls for the same `epoch_range`
92 /// could retrieve different or additional change logs that were not present in the previously cached result. For example, if you request logs for
93 /// `epoch_range = (1, N)` when the current max committed epoch is `M < N`, committing epochs `M+1` through `N` would make the cache inconsistent with reality;
94 /// future fetches for `(1, N)` could return new or updated information absent from the previous cache entry.
95 pub async fn fetch_table_change_logs(
96 &self,
97 table_id: TableId,
98 epoch_range: (u64, u64),
99 include_epoch_only: bool,
100 limit: Option<u32>,
101 ) -> HummockResult<TableChangeLogs> {
102 let hummock_meta_client = self.hummock_meta_client.clone();
103 let fetch = async move {
104 hummock_meta_client
105 .get_table_change_logs(
106 include_epoch_only,
107 Some(epoch_range.0),
108 Some(epoch_range.1),
109 Some(iter::once(table_id).collect()),
110 false,
111 limit,
112 )
113 .await
114 .map_err(HummockError::meta_error)
115 };
116 if epoch_range.1 == u64::MAX {
117 return fetch.await;
118 }
119 self.get_or_insert(table_id, epoch_range, include_epoch_only, limit, fetch)
120 .await
121 }
122}