risingwave_batch/spill/
spill_op.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::hash::BuildHasher;
16use std::ops::{Deref, DerefMut};
17use std::path::{Path, PathBuf};
18use std::sync::{Arc, LazyLock};
19
20use anyhow::anyhow;
21use futures_async_stream::try_stream;
22use futures_util::AsyncReadExt;
23use opendal::Operator;
24use opendal::layers::RetryLayer;
25use opendal::services::{Fs, Memory};
26use risingwave_common::array::DataChunk;
27use risingwave_common::util::batch_spill_config::batch_spill_base_dir;
28use risingwave_pb::Message;
29use risingwave_pb::data::DataChunk as PbDataChunk;
30use thiserror_ext::AsReport;
31use tokio::sync::Mutex;
32use twox_hash::XxHash64;
33
34use crate::error::{BatchError, Result};
35use crate::monitor::BatchSpillMetrics;
36
37pub const DEFAULT_SPILL_PARTITION_NUM: usize = 20;
38const RW_MANAGED_SPILL_DIR: &str = "rw_batch_spill/";
39const DEFAULT_IO_BUFFER_SIZE: usize = 256 * 1024;
40const DEFAULT_IO_CONCURRENT_TASK: usize = 8;
41
42#[derive(Clone)]
43pub enum SpillBackend {
44    Disk,
45    /// Only for testing purpose
46    Memory,
47}
48
49/// `SpillOp` is used to manage the spill directory of the spilling executor and it will drop the directory with a RAII style.
50pub struct SpillOp {
51    pub op: Operator,
52}
53
54impl SpillOp {
55    fn batch_spill_root() -> PathBuf {
56        batch_spill_base_dir().join(RW_MANAGED_SPILL_DIR)
57    }
58
59    pub fn create(path: impl AsRef<Path>, spill_backend: SpillBackend) -> Result<SpillOp> {
60        let path = path.as_ref();
61        if !path.is_relative() {
62            bail!("Spill path must be relative, but got {:?}", path);
63        }
64
65        let root = Self::batch_spill_root().join(path);
66
67        let op = match spill_backend {
68            SpillBackend::Disk => {
69                let builder = Fs::default().root(&root.to_string_lossy());
70                Operator::new(builder)?
71                    .layer(RetryLayer::default())
72                    .finish()
73            }
74            SpillBackend::Memory => {
75                let builder = Memory::default().root(&root.to_string_lossy());
76                Operator::new(builder)?
77                    .layer(RetryLayer::default())
78                    .finish()
79            }
80        };
81        Ok(SpillOp { op })
82    }
83
84    pub async fn clean_spill_directory() -> opendal::Result<()> {
85        static LOCK: LazyLock<Mutex<usize>> = LazyLock::new(|| Mutex::new(0));
86        let _guard = LOCK.lock().await;
87
88        let root = Self::batch_spill_root();
89
90        let builder = Fs::default().root(&root.to_string_lossy());
91
92        let op: Operator = Operator::new(builder)?
93            .layer(RetryLayer::default())
94            .finish();
95
96        op.remove_all("/").await
97    }
98
99    pub async fn writer_with(&self, name: &str) -> Result<opendal::Writer> {
100        Ok(self
101            .op
102            .writer_with(name)
103            .concurrent(DEFAULT_IO_CONCURRENT_TASK)
104            .chunk(DEFAULT_IO_BUFFER_SIZE)
105            .await?)
106    }
107
108    pub async fn reader_with(&self, name: &str) -> Result<opendal::Reader> {
109        Ok(self
110            .op
111            .reader_with(name)
112            .chunk(DEFAULT_IO_BUFFER_SIZE)
113            .await?)
114    }
115
116    /// spill file content will look like the below.
117    ///
118    /// ```text
119    /// [proto_len]
120    /// [proto_bytes]
121    /// ...
122    /// [proto_len]
123    /// [proto_bytes]
124    /// ```
125    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
126    pub async fn read_stream(reader: opendal::Reader, spill_metrics: Arc<BatchSpillMetrics>) {
127        let mut reader = reader.into_futures_async_read(..).await?;
128        let mut buf = [0u8; 4];
129        loop {
130            if let Err(err) = reader.read_exact(&mut buf).await {
131                if err.kind() == std::io::ErrorKind::UnexpectedEof {
132                    break;
133                } else {
134                    return Err(anyhow!(err).into());
135                }
136            }
137            let len = u32::from_le_bytes(buf) as usize;
138            spill_metrics.batch_spill_read_bytes.inc_by(len as u64 + 4);
139            let mut buf = vec![0u8; len];
140            reader.read_exact(&mut buf).await.map_err(|e| anyhow!(e))?;
141            let chunk_pb: PbDataChunk = Message::decode(buf.as_slice()).map_err(|e| anyhow!(e))?;
142            let chunk = DataChunk::from_protobuf(&chunk_pb)?;
143            yield chunk;
144        }
145    }
146}
147
148impl Drop for SpillOp {
149    fn drop(&mut self) {
150        let op = self.op.clone();
151        tokio::task::spawn(async move {
152            let result = op.remove_all("/").await;
153            if let Err(error) = result {
154                error!(
155                    error = %error.as_report(),
156                    "Failed to remove spill directory"
157                );
158            }
159        });
160    }
161}
162
163impl DerefMut for SpillOp {
164    fn deref_mut(&mut self) -> &mut Self::Target {
165        &mut self.op
166    }
167}
168
169impl Deref for SpillOp {
170    type Target = Operator;
171
172    fn deref(&self) -> &Self::Target {
173        &self.op
174    }
175}
176
177#[derive(Default, Clone, Copy)]
178pub struct SpillBuildHasher(pub u64);
179
180impl BuildHasher for SpillBuildHasher {
181    type Hasher = XxHash64;
182
183    fn build_hasher(&self) -> Self::Hasher {
184        XxHash64::with_seed(self.0)
185    }
186}
187
188pub const SPILL_AT_LEAST_MEMORY: u64 = 1024 * 1024;