risingwave_frontend/handler/
flush.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::{PgResponse, StatementType};
16
17use super::RwPgResponse;
18use crate::error::Result;
19use crate::handler::HandlerArgs;
20use crate::session::SessionImpl;
21
22pub(super) async fn handle_flush(handler_args: HandlerArgs) -> Result<RwPgResponse> {
23    do_flush(&handler_args.session).await?;
24    Ok(PgResponse::empty_result(StatementType::FLUSH))
25}
26
27pub(crate) async fn do_flush(session: &SessionImpl) -> Result<()> {
28    let client = session.env().meta_client();
29    let database_id = session
30        .env()
31        .catalog_reader()
32        .read_guard()
33        .get_database_by_name(&session.database())?
34        .id();
35    let version_id = client.flush(database_id).await?;
36
37    // Wait for the snapshot to be synchronized, so that future reads in this session can see
38    // previous writes.
39    session
40        .env()
41        .hummock_snapshot_manager()
42        .wait(version_id)
43        .await;
44
45    Ok(())
46}