risingwave_object_store/object/opendal_engine/
opendal_object_store.rs1use std::ops::Range;
16use std::sync::Arc;
17use std::time::Duration;
18
19use bytes::Bytes;
20use fail::fail_point;
21use futures::{StreamExt, stream};
22use opendal::layers::{RetryLayer, TimeoutLayer};
23use opendal::raw::BoxedStaticFuture;
24use opendal::services::Memory;
25use opendal::{Execute, Executor, Metakey, Operator, Writer};
26use risingwave_common::config::ObjectStoreConfig;
27use risingwave_common::range::RangeBoundsExt;
28use thiserror_ext::AsReport;
29
30use crate::object::object_metrics::ObjectStoreMetrics;
31use crate::object::{
32 ObjectDataStream, ObjectError, ObjectMetadata, ObjectMetadataIter, ObjectRangeBounds,
33 ObjectResult, ObjectStore, OperationType, StreamingUploader, prefix,
34};
35
36#[derive(Clone)]
38pub struct OpendalObjectStore {
39 pub(crate) op: Operator,
40 pub(crate) media_type: MediaType,
41
42 pub(crate) config: Arc<ObjectStoreConfig>,
43 pub(crate) metrics: Arc<ObjectStoreMetrics>,
44}
45
46#[derive(Clone)]
47pub enum MediaType {
48 Memory,
49 Hdfs,
50 Gcs,
51 Minio,
52 S3,
53 Obs,
54 Oss,
55 Webhdfs,
56 Azblob,
57 Fs,
58}
59
60impl MediaType {
61 pub fn as_str(&self) -> &'static str {
62 match self {
63 MediaType::Memory => "Memory",
64 MediaType::Hdfs => "Hdfs",
65 MediaType::Gcs => "Gcs",
66 MediaType::Minio => "Minio",
67 MediaType::S3 => "S3",
68 MediaType::Obs => "Obs",
69 MediaType::Oss => "Oss",
70 MediaType::Webhdfs => "Webhdfs",
71 MediaType::Azblob => "Azblob",
72 MediaType::Fs => "Fs",
73 }
74 }
75}
76
77impl OpendalObjectStore {
78 pub fn test_new_memory_engine() -> ObjectResult<Self> {
80 let builder = Memory::default();
82 let op: Operator = Operator::new(builder)?.finish();
83 Ok(Self {
84 op,
85 media_type: MediaType::Memory,
86 config: Arc::new(ObjectStoreConfig::default()),
87 metrics: Arc::new(ObjectStoreMetrics::unused()),
88 })
89 }
90}
91
92#[async_trait::async_trait]
93impl ObjectStore for OpendalObjectStore {
94 type StreamingUploader = OpendalStreamingUploader;
95
96 fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String {
97 match self.media_type {
98 MediaType::S3 => prefix::s3::get_object_prefix(obj_id),
99 MediaType::Minio => prefix::s3::get_object_prefix(obj_id),
100 MediaType::Memory => String::default(),
101 MediaType::Hdfs
102 | MediaType::Gcs
103 | MediaType::Obs
104 | MediaType::Oss
105 | MediaType::Webhdfs
106 | MediaType::Azblob
107 | MediaType::Fs => {
108 prefix::opendal_engine::get_object_prefix(obj_id, use_new_object_prefix_strategy)
109 }
110 }
111 }
112
113 async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> {
114 if obj.is_empty() {
115 Err(ObjectError::internal("upload empty object"))
116 } else {
117 self.op.write(path, obj).await?;
118 Ok(())
119 }
120 }
121
122 async fn streaming_upload(&self, path: &str) -> ObjectResult<Self::StreamingUploader> {
123 Ok(OpendalStreamingUploader::new(
124 self.op.clone(),
125 path.to_owned(),
126 self.config.clone(),
127 self.metrics.clone(),
128 self.store_media_type(),
129 )
130 .await?)
131 }
132
133 async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult<Bytes> {
134 let data = if range.is_full() {
135 self.op.read(path).await?
136 } else {
137 self.op
138 .read_with(path)
139 .range(range.map(|v| *v as u64))
140 .await?
141 };
142
143 if let Some(len) = range.len()
144 && len != data.len()
145 {
146 return Err(ObjectError::internal(format!(
147 "mismatched size: expected {}, found {} when reading {} at {:?}",
148 len,
149 data.len(),
150 path,
151 range,
152 )));
153 }
154
155 Ok(data.to_bytes())
156 }
157
158 async fn streaming_read(
162 &self,
163 path: &str,
164 range: Range<usize>,
165 ) -> ObjectResult<ObjectDataStream> {
166 fail_point!("opendal_streaming_read_err", |_| Err(
167 ObjectError::internal("opendal streaming read error")
168 ));
169 let range: Range<u64> = (range.start as u64)..(range.end as u64);
170
171 let reader = self
176 .op
177 .clone()
178 .layer(TimeoutLayer::new().with_io_timeout(Duration::from_millis(
179 self.config.retry.streaming_read_attempt_timeout_ms,
180 )))
181 .layer(
182 RetryLayer::new()
183 .with_min_delay(Duration::from_millis(
184 self.config.retry.req_backoff_interval_ms,
185 ))
186 .with_max_delay(Duration::from_millis(
187 self.config.retry.req_backoff_max_delay_ms,
188 ))
189 .with_max_times(self.config.retry.streaming_read_retry_attempts)
190 .with_factor(self.config.retry.req_backoff_factor as f32)
191 .with_jitter(),
192 )
193 .reader_with(path)
194 .await?;
195 let stream = reader.into_bytes_stream(range).await?.map(|item| {
196 item.map(|b| Bytes::copy_from_slice(b.as_ref()))
197 .map_err(|e| {
198 ObjectError::internal(format!("reader into_stream fail {}", e.as_report()))
199 })
200 });
201
202 Ok(Box::pin(stream))
203 }
204
205 async fn metadata(&self, path: &str) -> ObjectResult<ObjectMetadata> {
206 let opendal_metadata = self.op.stat(path).await?;
207 let key = path.to_owned();
208 let last_modified = match opendal_metadata.last_modified() {
209 Some(t) => t.timestamp() as f64,
210 None => 0_f64,
211 };
212
213 let total_size = opendal_metadata.content_length() as usize;
214 let metadata = ObjectMetadata {
215 key,
216 last_modified,
217 total_size,
218 };
219 Ok(metadata)
220 }
221
222 async fn delete(&self, path: &str) -> ObjectResult<()> {
223 self.op.delete(path).await?;
224 Ok(())
225 }
226
227 async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> {
230 self.op.remove(paths.to_vec()).await?;
231 Ok(())
232 }
233
234 async fn list(
235 &self,
236 prefix: &str,
237 start_after: Option<String>,
238 limit: Option<usize>,
239 ) -> ObjectResult<ObjectMetadataIter> {
240 let mut object_lister = self
241 .op
242 .lister_with(prefix)
243 .recursive(true)
244 .metakey(Metakey::ContentLength);
245 if let Some(start_after) = start_after {
246 object_lister = object_lister.start_after(&start_after);
247 }
248 let object_lister = object_lister.await?;
249
250 let stream = stream::unfold(object_lister, |mut object_lister| async move {
251 match object_lister.next().await {
252 Some(Ok(object)) => {
253 let key = object.path().to_owned();
254 let om = object.metadata();
255 let last_modified = match om.last_modified() {
256 Some(t) => t.timestamp() as f64,
257 None => 0_f64,
258 };
259 let total_size = om.content_length() as usize;
260 let metadata = ObjectMetadata {
261 key,
262 last_modified,
263 total_size,
264 };
265 Some((Ok(metadata), object_lister))
266 }
267 Some(Err(err)) => Some((Err(err.into()), object_lister)),
268 None => None,
269 }
270 });
271
272 Ok(stream.take(limit.unwrap_or(usize::MAX)).boxed())
273 }
274
275 fn store_media_type(&self) -> &'static str {
276 self.media_type.as_str()
277 }
278
279 fn support_streaming_upload(&self) -> bool {
280 self.op.info().native_capability().write_can_multi
281 }
282}
283
284impl OpendalObjectStore {
285 pub async fn copy(&self, from_path: &str, to_path: &str) -> ObjectResult<()> {
286 self.op.copy(from_path, to_path).await?;
287 Ok(())
288 }
289}
290
291struct OpendalStreamingUploaderExecute {
292 metrics: Arc<ObjectStoreMetrics>,
294 media_type: &'static str,
295}
296
297impl OpendalStreamingUploaderExecute {
298 const STREAMING_UPLOAD_TYPE: OperationType = OperationType::StreamingUpload;
299
300 fn new(metrics: Arc<ObjectStoreMetrics>, media_type: &'static str) -> Self {
301 Self {
302 metrics,
303 media_type,
304 }
305 }
306}
307
308impl Execute for OpendalStreamingUploaderExecute {
309 fn execute(&self, f: BoxedStaticFuture<()>) {
310 let operation_type_str = Self::STREAMING_UPLOAD_TYPE.as_str();
311 let media_type = self.media_type;
312
313 let metrics = self.metrics.clone();
314 let _handle = tokio::spawn(async move {
315 let _timer = metrics
316 .operation_latency
317 .with_label_values(&[media_type, operation_type_str])
318 .start_timer();
319
320 f.await
321 });
322 }
323}
324
325pub struct OpendalStreamingUploader {
327 writer: Writer,
328 buf: Vec<Bytes>,
331 not_uploaded_len: usize,
333 is_valid: bool,
335
336 abort_on_err: bool,
337}
338
339impl OpendalStreamingUploader {
340 const UPLOAD_BUFFER_SIZE: usize = 16 * 1024 * 1024;
341
342 pub async fn new(
343 op: Operator,
344 path: String,
345 config: Arc<ObjectStoreConfig>,
346 metrics: Arc<ObjectStoreMetrics>,
347 media_type: &'static str,
348 ) -> ObjectResult<Self> {
349 let monitored_execute = OpendalStreamingUploaderExecute::new(metrics, media_type);
350
351 let writer = op
356 .clone()
357 .layer(TimeoutLayer::new().with_io_timeout(Duration::from_millis(
358 config.retry.streaming_upload_attempt_timeout_ms,
359 )))
360 .layer(
361 RetryLayer::new()
362 .with_min_delay(Duration::from_millis(config.retry.req_backoff_interval_ms))
363 .with_max_delay(Duration::from_millis(config.retry.req_backoff_max_delay_ms))
364 .with_max_times(config.retry.streaming_upload_retry_attempts)
365 .with_factor(config.retry.req_backoff_factor as f32)
366 .with_jitter(),
367 )
368 .writer_with(&path)
369 .concurrent(config.opendal_upload_concurrency)
370 .executor(Executor::with(monitored_execute))
371 .await?;
372 Ok(Self {
373 writer,
374 buf: vec![],
375 not_uploaded_len: 0,
376 is_valid: true,
377 abort_on_err: config.opendal_writer_abort_on_err,
378 })
379 }
380
381 async fn flush(&mut self) -> ObjectResult<()> {
382 let data: Vec<Bytes> = self.buf.drain(..).collect();
383 debug_assert_eq!(
384 data.iter().map(|b| b.len()).sum::<usize>(),
385 self.not_uploaded_len
386 );
387 if let Err(err) = self.writer.write(data).await {
388 self.is_valid = false;
389 if self.abort_on_err {
390 self.writer.abort().await?;
391 }
392 return Err(err.into());
393 }
394 self.not_uploaded_len = 0;
395 Ok(())
396 }
397}
398
399impl StreamingUploader for OpendalStreamingUploader {
400 async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> {
401 assert!(self.is_valid);
402 self.not_uploaded_len += data.len();
403 self.buf.push(data);
404 if self.not_uploaded_len >= Self::UPLOAD_BUFFER_SIZE {
405 self.flush().await?;
406 }
407 Ok(())
408 }
409
410 async fn finish(mut self) -> ObjectResult<()> {
411 assert!(self.is_valid);
412 if self.not_uploaded_len > 0 {
413 self.flush().await?;
414 }
415
416 assert!(self.buf.is_empty());
417 assert_eq!(self.not_uploaded_len, 0);
418
419 self.is_valid = false;
420 match self.writer.close().await {
421 Ok(_) => (),
422 Err(err) => {
423 if self.abort_on_err {
424 self.writer.abort().await?;
425 }
426 return Err(err.into());
427 }
428 };
429
430 Ok(())
431 }
432
433 fn get_memory_usage(&self) -> u64 {
434 Self::UPLOAD_BUFFER_SIZE as u64
435 }
436}
437
438#[cfg(test)]
439mod tests {
440 use stream::TryStreamExt;
441
442 use super::*;
443
444 async fn list_all(prefix: &str, store: &OpendalObjectStore) -> Vec<ObjectMetadata> {
445 store
446 .list(prefix, None, None)
447 .await
448 .unwrap()
449 .try_collect::<Vec<_>>()
450 .await
451 .unwrap()
452 }
453
454 #[tokio::test]
455 async fn test_memory_upload() {
456 let block = Bytes::from("123456");
457 let store = OpendalObjectStore::test_new_memory_engine().unwrap();
458 store.upload("/abc", block).await.unwrap();
459
460 store.read("/ab", 0..3).await.unwrap_err();
462
463 let bytes = store.read("/abc", 4..6).await.unwrap();
464 assert_eq!(String::from_utf8(bytes.to_vec()).unwrap(), "56".to_owned());
465
466 store.delete("/abc").await.unwrap();
467
468 store.read("/abc", 0..3).await.unwrap_err();
470 }
471
472 #[tokio::test]
473 #[should_panic]
474 async fn test_memory_read_overflow() {
475 let block = Bytes::from("123456");
476 let store = OpendalObjectStore::test_new_memory_engine().unwrap();
477 store.upload("/abc", block).await.unwrap();
478
479 store.read("/abc", 4..44).await.unwrap_err();
481 }
482
483 #[tokio::test]
484 async fn test_memory_metadata() {
485 let block = Bytes::from("123456");
486 let path = "/abc".to_owned();
487 let obj_store = OpendalObjectStore::test_new_memory_engine().unwrap();
488 obj_store.upload("/abc", block).await.unwrap();
489
490 let err = obj_store.metadata("/not_exist").await.unwrap_err();
491 assert!(err.is_object_not_found_error());
492 let metadata = obj_store.metadata("/abc").await.unwrap();
493 assert_eq!(metadata.total_size, 6);
494 obj_store.delete(&path).await.unwrap();
495 }
496
497 #[tokio::test]
498 async fn test_memory_delete_objects_and_list_object() {
499 let block1 = Bytes::from("123456");
500 let block2 = Bytes::from("987654");
501 let store = OpendalObjectStore::test_new_memory_engine().unwrap();
502 store.upload("abc", Bytes::from("123456")).await.unwrap();
503 store.upload("prefix/abc", block1).await.unwrap();
504
505 store.upload("prefix/xyz", block2).await.unwrap();
506
507 assert_eq!(list_all("", &store).await.len(), 3);
508 assert_eq!(list_all("prefix/", &store).await.len(), 2);
509 let str_list = [String::from("prefix/abc"), String::from("prefix/xyz")];
510
511 store.delete_objects(&str_list).await.unwrap();
512
513 assert!(store.read("prefix/abc/", ..).await.is_err());
514 assert!(store.read("prefix/xyz/", ..).await.is_err());
515 assert_eq!(list_all("", &store).await.len(), 1);
516 assert_eq!(list_all("prefix/", &store).await.len(), 0);
517 }
518}