risingwave_storage/hummock/store/
table_change_log_manager.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::iter;
17use std::pin::Pin;
18use std::sync::Arc;
19
20use futures::FutureExt;
21use futures::future::Shared;
22use moka::sync::Cache;
23use risingwave_common::id::TableId;
24use risingwave_hummock_sdk::change_log::TableChangeLogs;
25use risingwave_rpc_client::HummockMetaClient;
26
27use crate::hummock::{HummockError, HummockResult};
28
29type InflightResult = Shared<Pin<Box<dyn Future<Output = HummockResult<TableChangeLogs>> + Send>>>;
30
31#[derive(Eq, Hash, PartialEq)]
32struct CacheKey {
33    table_id: TableId,
34    epoch_range: (u64, u64),
35    include_epoch_only: bool,
36    limit: Option<u32>,
37}
38
39/// A naive cache to reduce number of RPC sent to meta node.
40pub struct TableChangeLogManager {
41    cache: Cache<CacheKey, InflightResult>,
42    hummock_meta_client: Arc<dyn HummockMetaClient>,
43}
44
45impl TableChangeLogManager {
46    pub fn new(capacity: u64, hummock_meta_client: Arc<dyn HummockMetaClient>) -> Self {
47        let cache = Cache::builder().max_capacity(capacity).build();
48        Self {
49            cache,
50            hummock_meta_client,
51        }
52    }
53
54    async fn get_or_insert(
55        &self,
56        table_id: TableId,
57        epoch_range: (u64, u64),
58        include_epoch_only: bool,
59        limit: Option<u32>,
60        fetch: impl Future<Output = HummockResult<TableChangeLogs>> + Send + 'static,
61    ) -> HummockResult<TableChangeLogs> {
62        self.cache
63            .entry(CacheKey {
64                table_id,
65                epoch_range,
66                include_epoch_only,
67                limit,
68            })
69            .or_insert_with_if(
70                || fetch.boxed().shared(),
71                |inflight| {
72                    if let Some(result) = inflight.peek() {
73                        return result.is_err();
74                    }
75                    false
76                },
77            )
78            .value()
79            .clone()
80            .await
81    }
82
83    /// Fetches table change logs for the given `table_id` and `epoch_range`.
84    ///
85    /// - If the end value of `epoch_range` is not `u64::MAX`, attempts to retrieve logs from the cache; if not cached, fetches via an RPC to the meta node and stores the result in the cache.
86    /// - If the end value of `epoch_range` is `u64::MAX`, always fetches table change logs directly from the meta node (bypassing the cache).
87    ///
88    /// Both the start and end values of `epoch_range` are inclusive.
89    ///
90    /// IMPORTANT: The caller must guarantee that the current max committed epoch is at least as large as the end of the provided `epoch_range`, if it's not `u64::MAX`.
91    /// Otherwise, the cache may serve outdated results: as new epochs are committed beyond the current maximum, subsequent RPC calls for the same `epoch_range`
92    /// could retrieve different or additional change logs that were not present in the previously cached result. For example, if you request logs for
93    /// `epoch_range = (1, N)` when the current max committed epoch is `M < N`, committing epochs `M+1` through `N` would make the cache inconsistent with reality;
94    /// future fetches for `(1, N)` could return new or updated information absent from the previous cache entry.
95    pub async fn fetch_table_change_logs(
96        &self,
97        table_id: TableId,
98        epoch_range: (u64, u64),
99        include_epoch_only: bool,
100        limit: Option<u32>,
101    ) -> HummockResult<TableChangeLogs> {
102        let hummock_meta_client = self.hummock_meta_client.clone();
103        let fetch = async move {
104            hummock_meta_client
105                .get_table_change_logs(
106                    include_epoch_only,
107                    Some(epoch_range.0),
108                    Some(epoch_range.1),
109                    Some(iter::once(table_id).collect()),
110                    false,
111                    limit,
112                )
113                .await
114                .map_err(HummockError::meta_error)
115        };
116        if epoch_range.1 == u64::MAX {
117            return fetch.await;
118        }
119        self.get_or_insert(table_id, epoch_range, include_epoch_only, limit, fetch)
120            .await
121    }
122}