risingwave_object_store/object/opendal_engine/
opendal_object_store.rs1use std::ops::Range;
16use std::sync::Arc;
17use std::time::Duration;
18
19use bytes::Bytes;
20use fail::fail_point;
21use futures::{StreamExt, stream};
22use opendal::layers::{RetryLayer, TimeoutLayer};
23use opendal::raw::BoxedStaticFuture;
24use opendal::services::Memory;
25use opendal::{Execute, Executor, Operator, Writer};
26use risingwave_common::config::ObjectStoreConfig;
27use risingwave_common::range::RangeBoundsExt;
28use thiserror_ext::AsReport;
29
30use crate::object::object_metrics::ObjectStoreMetrics;
31use crate::object::{
32 ObjectDataStream, ObjectError, ObjectMetadata, ObjectMetadataIter, ObjectRangeBounds,
33 ObjectResult, ObjectStore, OperationType, StreamingUploader, prefix,
34};
35
36#[derive(Clone)]
38pub struct OpendalObjectStore {
39 pub(crate) op: Operator,
40 pub(crate) media_type: MediaType,
41 pub(crate) config: Arc<ObjectStoreConfig>,
42 pub(crate) metrics: Arc<ObjectStoreMetrics>,
43}
44
45#[derive(Clone)]
46pub enum MediaType {
47 Memory,
48 Hdfs,
49 Gcs,
50 Minio,
51 S3,
52 Obs,
53 Oss,
54 Webhdfs,
55 Azblob,
56 Fs,
57}
58
59impl MediaType {
60 pub fn as_str(&self) -> &'static str {
61 match self {
62 MediaType::Memory => "Memory",
63 MediaType::Hdfs => "Hdfs",
64 MediaType::Gcs => "Gcs",
65 MediaType::Minio => "Minio",
66 MediaType::S3 => "S3",
67 MediaType::Obs => "Obs",
68 MediaType::Oss => "Oss",
69 MediaType::Webhdfs => "Webhdfs",
70 MediaType::Azblob => "Azblob",
71 MediaType::Fs => "Fs",
72 }
73 }
74}
75
76impl OpendalObjectStore {
77 pub fn test_new_memory_engine() -> ObjectResult<Self> {
79 let builder = Memory::default();
81 let op: Operator = Operator::new(builder)?.finish();
82
83 Ok(Self {
84 op,
85 media_type: MediaType::Memory,
86 config: Arc::new(ObjectStoreConfig::default()),
87 metrics: Arc::new(ObjectStoreMetrics::unused()),
88 })
89 }
90}
91
92#[async_trait::async_trait]
93impl ObjectStore for OpendalObjectStore {
94 type StreamingUploader = OpendalStreamingUploader;
95
96 fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String {
97 match self.media_type {
98 MediaType::S3 => prefix::s3::get_object_prefix(obj_id),
99 MediaType::Minio => prefix::s3::get_object_prefix(obj_id),
100 MediaType::Memory => String::default(),
101 MediaType::Hdfs
102 | MediaType::Gcs
103 | MediaType::Obs
104 | MediaType::Oss
105 | MediaType::Webhdfs
106 | MediaType::Azblob
107 | MediaType::Fs => {
108 prefix::opendal_engine::get_object_prefix(obj_id, use_new_object_prefix_strategy)
109 }
110 }
111 }
112
113 async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> {
114 if obj.is_empty() {
115 Err(ObjectError::internal("upload empty object"))
116 } else {
117 self.op.write(path, obj).await?;
118 Ok(())
119 }
120 }
121
122 async fn streaming_upload(&self, path: &str) -> ObjectResult<Self::StreamingUploader> {
123 Ok(OpendalStreamingUploader::new(
124 self.op.clone(),
125 path.to_owned(),
126 self.config.clone(),
127 self.metrics.clone(),
128 self.store_media_type(),
129 )
130 .await?)
131 }
132
133 async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult<Bytes> {
134 let data = if range.is_full() {
135 self.op.read(path).await?
136 } else {
137 self.op
138 .read_with(path)
139 .range(range.map(|v| *v as u64))
140 .await?
141 };
142
143 if let Some(len) = range.len()
144 && len != data.len()
145 {
146 return Err(ObjectError::internal(format!(
147 "mismatched size: expected {}, found {} when reading {} at {:?}",
148 len,
149 data.len(),
150 path,
151 range,
152 )));
153 }
154
155 Ok(data.to_bytes())
156 }
157
158 async fn streaming_read(
162 &self,
163 path: &str,
164 range: Range<usize>,
165 ) -> ObjectResult<ObjectDataStream> {
166 fail_point!("opendal_streaming_read_err", |_| Err(
167 ObjectError::internal("opendal streaming read error")
168 ));
169 let range: Range<u64> = (range.start as u64)..(range.end as u64);
170
171 let reader = self
176 .op
177 .clone()
178 .layer(TimeoutLayer::new().with_io_timeout(Duration::from_millis(
179 self.config.retry.streaming_read_attempt_timeout_ms,
180 )))
181 .layer(
182 RetryLayer::new()
183 .with_min_delay(Duration::from_millis(
184 self.config.retry.req_backoff_interval_ms,
185 ))
186 .with_max_delay(Duration::from_millis(
187 self.config.retry.req_backoff_max_delay_ms,
188 ))
189 .with_max_times(self.config.retry.streaming_read_retry_attempts)
190 .with_factor(self.config.retry.req_backoff_factor as f32)
191 .with_jitter(),
192 )
193 .reader_with(path)
194 .await?;
195 let stream = reader.into_bytes_stream(range).await?.map(|item| {
196 item.map(|b| Bytes::copy_from_slice(b.as_ref()))
197 .map_err(|e| {
198 ObjectError::internal(format!("reader into_stream fail {}", e.as_report()))
199 })
200 });
201
202 Ok(Box::pin(stream))
203 }
204
205 async fn metadata(&self, path: &str) -> ObjectResult<ObjectMetadata> {
206 let opendal_metadata = self.op.stat(path).await?;
207 let key = path.to_owned();
208 let last_modified = match opendal_metadata.last_modified() {
209 Some(t) => t.timestamp() as f64,
210 None => 0_f64,
211 };
212
213 let total_size = opendal_metadata.content_length() as usize;
214 let metadata = ObjectMetadata {
215 key,
216 last_modified,
217 total_size,
218 };
219 Ok(metadata)
220 }
221
222 async fn delete(&self, path: &str) -> ObjectResult<()> {
223 self.op.delete(path).await?;
224 Ok(())
225 }
226
227 async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> {
230 self.op.delete_iter(paths.to_vec()).await?;
231 Ok(())
232 }
233
234 async fn list(
235 &self,
236 prefix: &str,
237 start_after: Option<String>,
238 limit: Option<usize>,
239 ) -> ObjectResult<ObjectMetadataIter> {
240 let mut object_lister = self.op.lister_with(prefix).recursive(true);
241 if let Some(start_after) = start_after {
242 object_lister = object_lister.start_after(&start_after);
243 }
244 let object_lister = object_lister.await?;
245
246 let op = self.op.clone();
247 let full_capability = op.info().full_capability();
248 let stream = stream::unfold(object_lister, move |mut object_lister| {
249 let op = op.clone();
250
251 async move {
252 match object_lister.next().await {
253 Some(Ok(object)) => {
254 let key = object.path().to_owned();
255
256 let (last_modified, total_size) = if !full_capability
258 .list_has_content_length
259 || !full_capability.list_has_last_modified
260 {
261 let stat_meta = op.stat(&key).await.ok()?;
263 let last_modified = match stat_meta.last_modified() {
264 Some(t) => t.timestamp() as f64,
265 None => 0_f64,
266 };
267 let total_size = stat_meta.content_length() as usize;
268 (last_modified, total_size)
269 } else {
270 let meta = object.metadata();
272 let last_modified = match meta.last_modified() {
273 Some(t) => t.timestamp() as f64,
274 None => 0_f64,
275 };
276 let total_size = meta.content_length() as usize;
277 (last_modified, total_size)
278 };
279
280 let metadata = ObjectMetadata {
281 key,
282 last_modified,
283 total_size,
284 };
285 Some((Ok(metadata), object_lister))
286 }
287 Some(Err(err)) => Some((Err(err.into()), object_lister)),
288 None => None,
289 }
290 }
291 });
292
293 Ok(stream.take(limit.unwrap_or(usize::MAX)).boxed())
294 }
295
296 fn store_media_type(&self) -> &'static str {
297 self.media_type.as_str()
298 }
299
300 fn support_streaming_upload(&self) -> bool {
301 self.op.info().native_capability().write_can_multi
302 }
303}
304
305impl OpendalObjectStore {
306 pub async fn copy(&self, from_path: &str, to_path: &str) -> ObjectResult<()> {
307 self.op.copy(from_path, to_path).await?;
308 Ok(())
309 }
310}
311
312struct OpendalStreamingUploaderExecute {
313 metrics: Arc<ObjectStoreMetrics>,
315 media_type: &'static str,
316}
317
318impl OpendalStreamingUploaderExecute {
319 const STREAMING_UPLOAD_TYPE: OperationType = OperationType::StreamingUpload;
320
321 fn new(metrics: Arc<ObjectStoreMetrics>, media_type: &'static str) -> Self {
322 Self {
323 metrics,
324 media_type,
325 }
326 }
327}
328
329impl Execute for OpendalStreamingUploaderExecute {
330 fn execute(&self, f: BoxedStaticFuture<()>) {
331 let operation_type_str = Self::STREAMING_UPLOAD_TYPE.as_str();
332 let media_type = self.media_type;
333
334 let metrics = self.metrics.clone();
335 let _handle = tokio::spawn(async move {
336 let _timer = metrics
337 .operation_latency
338 .with_label_values(&[media_type, operation_type_str])
339 .start_timer();
340
341 f.await
342 });
343 }
344}
345
346pub struct OpendalStreamingUploader {
348 writer: Writer,
349 buf: Vec<Bytes>,
352 not_uploaded_len: usize,
354 is_valid: bool,
356
357 abort_on_err: bool,
358
359 upload_part_size: usize,
360}
361
362impl OpendalStreamingUploader {
363 pub async fn new(
364 op: Operator,
365 path: String,
366 config: Arc<ObjectStoreConfig>,
367 metrics: Arc<ObjectStoreMetrics>,
368 media_type: &'static str,
369 ) -> ObjectResult<Self> {
370 let monitored_execute = OpendalStreamingUploaderExecute::new(metrics, media_type);
371 let executor = Executor::with(monitored_execute);
372 op.update_executor(|_| executor);
373
374 let writer = op
375 .clone()
376 .layer(TimeoutLayer::new().with_io_timeout(Duration::from_millis(
377 config.retry.streaming_upload_attempt_timeout_ms,
378 )))
379 .layer(
380 RetryLayer::new()
381 .with_min_delay(Duration::from_millis(config.retry.req_backoff_interval_ms))
382 .with_max_delay(Duration::from_millis(config.retry.req_backoff_max_delay_ms))
383 .with_max_times(config.retry.streaming_upload_retry_attempts)
384 .with_factor(config.retry.req_backoff_factor as f32)
385 .with_jitter(),
386 )
387 .writer_with(&path)
388 .concurrent(config.opendal_upload_concurrency)
389 .await?;
390 Ok(Self {
391 writer,
392 buf: vec![],
393 not_uploaded_len: 0,
394 is_valid: true,
395 abort_on_err: config.opendal_writer_abort_on_err,
396 upload_part_size: config.upload_part_size,
397 })
398 }
399
400 async fn flush(&mut self) -> ObjectResult<()> {
401 let data: Vec<Bytes> = self.buf.drain(..).collect();
402 debug_assert_eq!(
403 data.iter().map(|b| b.len()).sum::<usize>(),
404 self.not_uploaded_len
405 );
406 if let Err(err) = self.writer.write(data).await {
407 self.is_valid = false;
408 if self.abort_on_err {
409 self.writer.abort().await?;
410 }
411 return Err(err.into());
412 }
413 self.not_uploaded_len = 0;
414 Ok(())
415 }
416}
417
418impl StreamingUploader for OpendalStreamingUploader {
419 async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> {
420 assert!(self.is_valid);
421 self.not_uploaded_len += data.len();
422 self.buf.push(data);
423 if self.not_uploaded_len >= self.upload_part_size {
424 self.flush().await?;
425 }
426 Ok(())
427 }
428
429 async fn finish(mut self) -> ObjectResult<()> {
430 assert!(self.is_valid);
431 if self.not_uploaded_len > 0 {
432 self.flush().await?;
433 }
434
435 assert!(self.buf.is_empty());
436 assert_eq!(self.not_uploaded_len, 0);
437
438 self.is_valid = false;
439 match self.writer.close().await {
440 Ok(_) => (),
441 Err(err) => {
442 if self.abort_on_err {
443 self.writer.abort().await?;
444 }
445 return Err(err.into());
446 }
447 };
448
449 Ok(())
450 }
451
452 fn get_memory_usage(&self) -> u64 {
454 self.not_uploaded_len as u64
455 }
456}
457
458#[cfg(test)]
459mod tests {
460 use stream::TryStreamExt;
461
462 use super::*;
463
464 async fn list_all(prefix: &str, store: &OpendalObjectStore) -> Vec<ObjectMetadata> {
465 store
466 .list(prefix, None, None)
467 .await
468 .unwrap()
469 .try_collect::<Vec<_>>()
470 .await
471 .unwrap()
472 }
473
474 #[tokio::test]
475 async fn test_memory_upload() {
476 let block = Bytes::from("123456");
477 let store = OpendalObjectStore::test_new_memory_engine().unwrap();
478 store.upload("/abc", block).await.unwrap();
479
480 store.read("/ab", 0..3).await.unwrap_err();
482
483 let bytes = store.read("/abc", 4..6).await.unwrap();
484 assert_eq!(String::from_utf8(bytes.to_vec()).unwrap(), "56".to_owned());
485
486 store.delete("/abc").await.unwrap();
487
488 store.read("/abc", 0..3).await.unwrap_err();
490 }
491
492 #[tokio::test]
493 #[should_panic]
494 async fn test_memory_read_overflow() {
495 let block = Bytes::from("123456");
496 let store = OpendalObjectStore::test_new_memory_engine().unwrap();
497 store.upload("/abc", block).await.unwrap();
498
499 store.read("/abc", 4..44).await.unwrap_err();
501 }
502
503 #[tokio::test]
504 async fn test_memory_metadata() {
505 let block = Bytes::from("123456");
506 let path = "/abc".to_owned();
507 let obj_store = OpendalObjectStore::test_new_memory_engine().unwrap();
508 obj_store.upload("/abc", block).await.unwrap();
509
510 let err = obj_store.metadata("/not_exist").await.unwrap_err();
511 assert!(err.is_object_not_found_error());
512 let metadata = obj_store.metadata("/abc").await.unwrap();
513 assert_eq!(metadata.total_size, 6);
514 obj_store.delete(&path).await.unwrap();
515 }
516
517 #[tokio::test]
518 async fn test_memory_delete_objects_and_list_object() {
519 let block1 = Bytes::from("123456");
520 let block2 = Bytes::from("987654");
521 let store = OpendalObjectStore::test_new_memory_engine().unwrap();
522 store.upload("abc", Bytes::from("123456")).await.unwrap();
523 store.upload("prefix/abc", block1).await.unwrap();
524
525 store.upload("prefix/xyz", block2).await.unwrap();
526
527 assert_eq!(list_all("", &store).await.len(), 3);
528 assert_eq!(list_all("prefix/", &store).await.len(), 2);
529 let str_list = [String::from("prefix/abc"), String::from("prefix/xyz")];
530
531 store.delete_objects(&str_list).await.unwrap();
532
533 assert!(store.read("prefix/abc/", ..).await.is_err());
534 assert!(store.read("prefix/xyz/", ..).await.is_err());
535 assert_eq!(list_all("", &store).await.len(), 1);
536 assert_eq!(list_all("prefix/", &store).await.len(), 0);
537 }
538}