risingwave_batch/spill/
spill_op.rs1use std::hash::BuildHasher;
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, LazyLock};
18
19use anyhow::anyhow;
20use futures_async_stream::try_stream;
21use futures_util::AsyncReadExt;
22use opendal::Operator;
23use opendal::layers::RetryLayer;
24use opendal::services::{Fs, Memory};
25use risingwave_common::array::DataChunk;
26use risingwave_pb::Message;
27use risingwave_pb::data::DataChunk as PbDataChunk;
28use thiserror_ext::AsReport;
29use tokio::sync::Mutex;
30use twox_hash::XxHash64;
31
32use crate::error::{BatchError, Result};
33use crate::monitor::BatchSpillMetrics;
34
35const RW_BATCH_SPILL_DIR_ENV: &str = "RW_BATCH_SPILL_DIR";
36pub const DEFAULT_SPILL_PARTITION_NUM: usize = 20;
37const DEFAULT_SPILL_DIR: &str = "/tmp/";
38const RW_MANAGED_SPILL_DIR: &str = "/rw_batch_spill/";
39const DEFAULT_IO_BUFFER_SIZE: usize = 256 * 1024;
40const DEFAULT_IO_CONCURRENT_TASK: usize = 8;
41
42#[derive(Clone)]
43pub enum SpillBackend {
44 Disk,
45 Memory,
47}
48
49pub struct SpillOp {
51 pub op: Operator,
52}
53
54impl SpillOp {
55 pub fn create(path: String, spill_backend: SpillBackend) -> Result<SpillOp> {
56 assert!(path.ends_with('/'));
57
58 let spill_dir =
59 std::env::var(RW_BATCH_SPILL_DIR_ENV).unwrap_or_else(|_| DEFAULT_SPILL_DIR.to_owned());
60 let root = format!("/{}/{}/{}/", spill_dir, RW_MANAGED_SPILL_DIR, path);
61
62 let op = match spill_backend {
63 SpillBackend::Disk => {
64 let builder = Fs::default().root(&root);
65 Operator::new(builder)?
66 .layer(RetryLayer::default())
67 .finish()
68 }
69 SpillBackend::Memory => {
70 let builder = Memory::default().root(&root);
71 Operator::new(builder)?
72 .layer(RetryLayer::default())
73 .finish()
74 }
75 };
76 Ok(SpillOp { op })
77 }
78
79 pub async fn clean_spill_directory() -> opendal::Result<()> {
80 static LOCK: LazyLock<Mutex<usize>> = LazyLock::new(|| Mutex::new(0));
81 let _guard = LOCK.lock().await;
82
83 let spill_dir =
84 std::env::var(RW_BATCH_SPILL_DIR_ENV).unwrap_or_else(|_| DEFAULT_SPILL_DIR.to_owned());
85 let root = format!("/{}/{}/", spill_dir, RW_MANAGED_SPILL_DIR);
86
87 let builder = Fs::default().root(&root);
88
89 let op: Operator = Operator::new(builder)?
90 .layer(RetryLayer::default())
91 .finish();
92
93 op.remove_all("/").await
94 }
95
96 pub async fn writer_with(&self, name: &str) -> Result<opendal::Writer> {
97 Ok(self
98 .op
99 .writer_with(name)
100 .concurrent(DEFAULT_IO_CONCURRENT_TASK)
101 .chunk(DEFAULT_IO_BUFFER_SIZE)
102 .await?)
103 }
104
105 pub async fn reader_with(&self, name: &str) -> Result<opendal::Reader> {
106 Ok(self
107 .op
108 .reader_with(name)
109 .chunk(DEFAULT_IO_BUFFER_SIZE)
110 .await?)
111 }
112
113 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
123 pub async fn read_stream(reader: opendal::Reader, spill_metrics: Arc<BatchSpillMetrics>) {
124 let mut reader = reader.into_futures_async_read(..).await?;
125 let mut buf = [0u8; 4];
126 loop {
127 if let Err(err) = reader.read_exact(&mut buf).await {
128 if err.kind() == std::io::ErrorKind::UnexpectedEof {
129 break;
130 } else {
131 return Err(anyhow!(err).into());
132 }
133 }
134 let len = u32::from_le_bytes(buf) as usize;
135 spill_metrics.batch_spill_read_bytes.inc_by(len as u64 + 4);
136 let mut buf = vec![0u8; len];
137 reader.read_exact(&mut buf).await.map_err(|e| anyhow!(e))?;
138 let chunk_pb: PbDataChunk = Message::decode(buf.as_slice()).map_err(|e| anyhow!(e))?;
139 let chunk = DataChunk::from_protobuf(&chunk_pb)?;
140 yield chunk;
141 }
142 }
143}
144
145impl Drop for SpillOp {
146 fn drop(&mut self) {
147 let op = self.op.clone();
148 tokio::task::spawn(async move {
149 let result = op.remove_all("/").await;
150 if let Err(error) = result {
151 error!(
152 error = %error.as_report(),
153 "Failed to remove spill directory"
154 );
155 }
156 });
157 }
158}
159
160impl DerefMut for SpillOp {
161 fn deref_mut(&mut self) -> &mut Self::Target {
162 &mut self.op
163 }
164}
165
166impl Deref for SpillOp {
167 type Target = Operator;
168
169 fn deref(&self) -> &Self::Target {
170 &self.op
171 }
172}
173
174#[derive(Default, Clone, Copy)]
175pub struct SpillBuildHasher(pub u64);
176
177impl BuildHasher for SpillBuildHasher {
178 type Hasher = XxHash64;
179
180 fn build_hasher(&self) -> Self::Hasher {
181 XxHash64::with_seed(self.0)
182 }
183}
184
185pub const SPILL_AT_LEAST_MEMORY: u64 = 1024 * 1024;