risingwave_batch/spill/
spill_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::hash::BuildHasher;
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, LazyLock};
18
19use anyhow::anyhow;
20use futures_async_stream::try_stream;
21use futures_util::AsyncReadExt;
22use opendal::Operator;
23use opendal::layers::RetryLayer;
24use opendal::services::{Fs, Memory};
25use risingwave_common::array::DataChunk;
26use risingwave_pb::Message;
27use risingwave_pb::data::DataChunk as PbDataChunk;
28use thiserror_ext::AsReport;
29use tokio::sync::Mutex;
30use twox_hash::XxHash64;
31
32use crate::error::{BatchError, Result};
33use crate::monitor::BatchSpillMetrics;
34
35const RW_BATCH_SPILL_DIR_ENV: &str = "RW_BATCH_SPILL_DIR";
36pub const DEFAULT_SPILL_PARTITION_NUM: usize = 20;
37const DEFAULT_SPILL_DIR: &str = "/tmp/";
38const RW_MANAGED_SPILL_DIR: &str = "/rw_batch_spill/";
39const DEFAULT_IO_BUFFER_SIZE: usize = 256 * 1024;
40const DEFAULT_IO_CONCURRENT_TASK: usize = 8;
41
42#[derive(Clone)]
43pub enum SpillBackend {
44    Disk,
45    /// Only for testing purpose
46    Memory,
47}
48
49/// `SpillOp` is used to manage the spill directory of the spilling executor and it will drop the directory with a RAII style.
50pub struct SpillOp {
51    pub op: Operator,
52}
53
54impl SpillOp {
55    pub fn create(path: String, spill_backend: SpillBackend) -> Result<SpillOp> {
56        assert!(path.ends_with('/'));
57
58        let spill_dir =
59            std::env::var(RW_BATCH_SPILL_DIR_ENV).unwrap_or_else(|_| DEFAULT_SPILL_DIR.to_owned());
60        let root = format!("/{}/{}/{}/", spill_dir, RW_MANAGED_SPILL_DIR, path);
61
62        let op = match spill_backend {
63            SpillBackend::Disk => {
64                let builder = Fs::default().root(&root);
65                Operator::new(builder)?
66                    .layer(RetryLayer::default())
67                    .finish()
68            }
69            SpillBackend::Memory => {
70                let builder = Memory::default().root(&root);
71                Operator::new(builder)?
72                    .layer(RetryLayer::default())
73                    .finish()
74            }
75        };
76        Ok(SpillOp { op })
77    }
78
79    pub async fn clean_spill_directory() -> opendal::Result<()> {
80        static LOCK: LazyLock<Mutex<usize>> = LazyLock::new(|| Mutex::new(0));
81        let _guard = LOCK.lock().await;
82
83        let spill_dir =
84            std::env::var(RW_BATCH_SPILL_DIR_ENV).unwrap_or_else(|_| DEFAULT_SPILL_DIR.to_owned());
85        let root = format!("/{}/{}/", spill_dir, RW_MANAGED_SPILL_DIR);
86
87        let builder = Fs::default().root(&root);
88
89        let op: Operator = Operator::new(builder)?
90            .layer(RetryLayer::default())
91            .finish();
92
93        op.remove_all("/").await
94    }
95
96    pub async fn writer_with(&self, name: &str) -> Result<opendal::Writer> {
97        Ok(self
98            .op
99            .writer_with(name)
100            .concurrent(DEFAULT_IO_CONCURRENT_TASK)
101            .chunk(DEFAULT_IO_BUFFER_SIZE)
102            .await?)
103    }
104
105    pub async fn reader_with(&self, name: &str) -> Result<opendal::Reader> {
106        Ok(self
107            .op
108            .reader_with(name)
109            .chunk(DEFAULT_IO_BUFFER_SIZE)
110            .await?)
111    }
112
113    /// spill file content will look like the below.
114    ///
115    /// ```text
116    /// [proto_len]
117    /// [proto_bytes]
118    /// ...
119    /// [proto_len]
120    /// [proto_bytes]
121    /// ```
122    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
123    pub async fn read_stream(reader: opendal::Reader, spill_metrics: Arc<BatchSpillMetrics>) {
124        let mut reader = reader.into_futures_async_read(..).await?;
125        let mut buf = [0u8; 4];
126        loop {
127            if let Err(err) = reader.read_exact(&mut buf).await {
128                if err.kind() == std::io::ErrorKind::UnexpectedEof {
129                    break;
130                } else {
131                    return Err(anyhow!(err).into());
132                }
133            }
134            let len = u32::from_le_bytes(buf) as usize;
135            spill_metrics.batch_spill_read_bytes.inc_by(len as u64 + 4);
136            let mut buf = vec![0u8; len];
137            reader.read_exact(&mut buf).await.map_err(|e| anyhow!(e))?;
138            let chunk_pb: PbDataChunk = Message::decode(buf.as_slice()).map_err(|e| anyhow!(e))?;
139            let chunk = DataChunk::from_protobuf(&chunk_pb)?;
140            yield chunk;
141        }
142    }
143}
144
145impl Drop for SpillOp {
146    fn drop(&mut self) {
147        let op = self.op.clone();
148        tokio::task::spawn(async move {
149            let result = op.remove_all("/").await;
150            if let Err(error) = result {
151                error!(
152                    error = %error.as_report(),
153                    "Failed to remove spill directory"
154                );
155            }
156        });
157    }
158}
159
160impl DerefMut for SpillOp {
161    fn deref_mut(&mut self) -> &mut Self::Target {
162        &mut self.op
163    }
164}
165
166impl Deref for SpillOp {
167    type Target = Operator;
168
169    fn deref(&self) -> &Self::Target {
170        &self.op
171    }
172}
173
174#[derive(Default, Clone, Copy)]
175pub struct SpillBuildHasher(pub u64);
176
177impl BuildHasher for SpillBuildHasher {
178    type Hasher = XxHash64;
179
180    fn build_hasher(&self) -> Self::Hasher {
181        XxHash64::with_seed(self.0)
182    }
183}
184
185pub const SPILL_AT_LEAST_MEMORY: u64 = 1024 * 1024;