risingwave_batch/spill/
spill_op.rs1use std::hash::BuildHasher;
16use std::ops::{Deref, DerefMut};
17use std::path::{Path, PathBuf};
18use std::sync::{Arc, LazyLock};
19
20use anyhow::anyhow;
21use futures_async_stream::try_stream;
22use futures_util::AsyncReadExt;
23use opendal::Operator;
24use opendal::layers::RetryLayer;
25use opendal::services::{Fs, Memory};
26use risingwave_common::array::DataChunk;
27use risingwave_common::util::batch_spill_config::batch_spill_base_dir;
28use risingwave_pb::Message;
29use risingwave_pb::data::DataChunk as PbDataChunk;
30use thiserror_ext::AsReport;
31use tokio::sync::Mutex;
32use twox_hash::XxHash64;
33
34use crate::error::{BatchError, Result};
35use crate::monitor::BatchSpillMetrics;
36
37pub const DEFAULT_SPILL_PARTITION_NUM: usize = 20;
38const RW_MANAGED_SPILL_DIR: &str = "rw_batch_spill/";
39const DEFAULT_IO_BUFFER_SIZE: usize = 256 * 1024;
40const DEFAULT_IO_CONCURRENT_TASK: usize = 8;
41
42#[derive(Clone)]
43pub enum SpillBackend {
44 Disk,
45 Memory,
47}
48
49pub struct SpillOp {
51 pub op: Operator,
52}
53
54impl SpillOp {
55 fn batch_spill_root() -> PathBuf {
56 batch_spill_base_dir().join(RW_MANAGED_SPILL_DIR)
57 }
58
59 pub fn create(path: impl AsRef<Path>, spill_backend: SpillBackend) -> Result<SpillOp> {
60 let path = path.as_ref();
61 if !path.is_relative() {
62 bail!("Spill path must be relative, but got {:?}", path);
63 }
64
65 let root = Self::batch_spill_root().join(path);
66
67 let op = match spill_backend {
68 SpillBackend::Disk => {
69 let builder = Fs::default().root(&root.to_string_lossy());
70 Operator::new(builder)?
71 .layer(RetryLayer::default())
72 .finish()
73 }
74 SpillBackend::Memory => {
75 let builder = Memory::default().root(&root.to_string_lossy());
76 Operator::new(builder)?
77 .layer(RetryLayer::default())
78 .finish()
79 }
80 };
81 Ok(SpillOp { op })
82 }
83
84 pub async fn clean_spill_directory() -> opendal::Result<()> {
85 static LOCK: LazyLock<Mutex<usize>> = LazyLock::new(|| Mutex::new(0));
86 let _guard = LOCK.lock().await;
87
88 let root = Self::batch_spill_root();
89
90 let builder = Fs::default().root(&root.to_string_lossy());
91
92 let op: Operator = Operator::new(builder)?
93 .layer(RetryLayer::default())
94 .finish();
95
96 op.remove_all("/").await
97 }
98
99 pub async fn writer_with(&self, name: &str) -> Result<opendal::Writer> {
100 Ok(self
101 .op
102 .writer_with(name)
103 .concurrent(DEFAULT_IO_CONCURRENT_TASK)
104 .chunk(DEFAULT_IO_BUFFER_SIZE)
105 .await?)
106 }
107
108 pub async fn reader_with(&self, name: &str) -> Result<opendal::Reader> {
109 Ok(self
110 .op
111 .reader_with(name)
112 .chunk(DEFAULT_IO_BUFFER_SIZE)
113 .await?)
114 }
115
116 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
126 pub async fn read_stream(reader: opendal::Reader, spill_metrics: Arc<BatchSpillMetrics>) {
127 let mut reader = reader.into_futures_async_read(..).await?;
128 let mut buf = [0u8; 4];
129 loop {
130 if let Err(err) = reader.read_exact(&mut buf).await {
131 if err.kind() == std::io::ErrorKind::UnexpectedEof {
132 break;
133 } else {
134 return Err(anyhow!(err).into());
135 }
136 }
137 let len = u32::from_le_bytes(buf) as usize;
138 spill_metrics.batch_spill_read_bytes.inc_by(len as u64 + 4);
139 let mut buf = vec![0u8; len];
140 reader.read_exact(&mut buf).await.map_err(|e| anyhow!(e))?;
141 let chunk_pb: PbDataChunk = Message::decode(buf.as_slice()).map_err(|e| anyhow!(e))?;
142 let chunk = DataChunk::from_protobuf(&chunk_pb)?;
143 yield chunk;
144 }
145 }
146}
147
148impl Drop for SpillOp {
149 fn drop(&mut self) {
150 let op = self.op.clone();
151 tokio::task::spawn(async move {
152 let result = op.remove_all("/").await;
153 if let Err(error) = result {
154 error!(
155 error = %error.as_report(),
156 "Failed to remove spill directory"
157 );
158 }
159 });
160 }
161}
162
163impl DerefMut for SpillOp {
164 fn deref_mut(&mut self) -> &mut Self::Target {
165 &mut self.op
166 }
167}
168
169impl Deref for SpillOp {
170 type Target = Operator;
171
172 fn deref(&self) -> &Self::Target {
173 &self.op
174 }
175}
176
177#[derive(Default, Clone, Copy)]
178pub struct SpillBuildHasher(pub u64);
179
180impl BuildHasher for SpillBuildHasher {
181 type Hasher = XxHash64;
182
183 fn build_hasher(&self) -> Self::Hasher {
184 XxHash64::with_seed(self.0)
185 }
186}
187
188pub const SPILL_AT_LEAST_MEMORY: u64 = 1024 * 1024;