risingwave_object_store/object/opendal_engine/
opendal_object_store.rs1use std::ops::Range;
16use std::sync::Arc;
17use std::time::Duration;
18
19use bytes::Bytes;
20use fail::fail_point;
21use futures::{StreamExt, stream};
22use opendal::layers::{RetryLayer, TimeoutLayer};
23use opendal::raw::BoxedStaticFuture;
24use opendal::services::Memory;
25use opendal::{Execute, Executor, Operator, Writer};
26use risingwave_common::config::ObjectStoreConfig;
27use risingwave_common::range::RangeBoundsExt;
28use thiserror_ext::AsReport;
29
30use crate::object::object_metrics::ObjectStoreMetrics;
31use crate::object::{
32 ObjectDataStream, ObjectError, ObjectMetadata, ObjectMetadataIter, ObjectRangeBounds,
33 ObjectResult, ObjectStore, OperationType, StreamingUploader, prefix,
34};
35
36#[derive(Clone)]
38pub struct OpendalObjectStore {
39 pub(crate) op: Operator,
40 pub(crate) media_type: MediaType,
41 pub(crate) config: Arc<ObjectStoreConfig>,
42 pub(crate) metrics: Arc<ObjectStoreMetrics>,
43}
44
45#[derive(Clone)]
46pub enum MediaType {
47 Memory,
48 Hdfs,
49 Gcs,
50 Minio,
51 S3,
52 Obs,
53 Oss,
54 Webhdfs,
55 Azblob,
56 Fs,
57}
58
59impl MediaType {
60 pub fn as_str(&self) -> &'static str {
61 match self {
62 MediaType::Memory => "Memory",
63 MediaType::Hdfs => "Hdfs",
64 MediaType::Gcs => "Gcs",
65 MediaType::Minio => "Minio",
66 MediaType::S3 => "S3",
67 MediaType::Obs => "Obs",
68 MediaType::Oss => "Oss",
69 MediaType::Webhdfs => "Webhdfs",
70 MediaType::Azblob => "Azblob",
71 MediaType::Fs => "Fs",
72 }
73 }
74}
75
76impl OpendalObjectStore {
77 pub fn test_new_memory_engine() -> ObjectResult<Self> {
79 let builder = Memory::default();
81 let op: Operator = Operator::new(builder)?.finish();
82
83 Ok(Self {
84 op,
85 media_type: MediaType::Memory,
86 config: Arc::new(ObjectStoreConfig::default()),
87 metrics: Arc::new(ObjectStoreMetrics::unused()),
88 })
89 }
90}
91
92#[inline]
93fn timestamp_to_secs(t: opendal::raw::Timestamp) -> f64 {
94 t.into_inner().as_second() as f64
95}
96
97#[async_trait::async_trait]
98impl ObjectStore for OpendalObjectStore {
99 type StreamingUploader = OpendalStreamingUploader;
100
101 fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String {
102 match self.media_type {
103 MediaType::S3 => prefix::s3::get_object_prefix(obj_id),
104 MediaType::Minio => prefix::s3::get_object_prefix(obj_id),
105 MediaType::Memory => String::default(),
106 MediaType::Hdfs
107 | MediaType::Gcs
108 | MediaType::Obs
109 | MediaType::Oss
110 | MediaType::Webhdfs
111 | MediaType::Azblob
112 | MediaType::Fs => {
113 prefix::opendal_engine::get_object_prefix(obj_id, use_new_object_prefix_strategy)
114 }
115 }
116 }
117
118 async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> {
119 if obj.is_empty() {
120 Err(ObjectError::internal("upload empty object"))
121 } else {
122 self.op.write(path, obj).await?;
123 Ok(())
124 }
125 }
126
127 async fn streaming_upload(&self, path: &str) -> ObjectResult<Self::StreamingUploader> {
128 Ok(OpendalStreamingUploader::new(
129 self.op.clone(),
130 path.to_owned(),
131 self.config.clone(),
132 self.metrics.clone(),
133 self.store_media_type(),
134 )
135 .await?)
136 }
137
138 async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult<Bytes> {
139 let data = if range.is_full() {
140 self.op.read(path).await?
141 } else {
142 self.op
143 .read_with(path)
144 .range(range.map(|v| *v as u64))
145 .await?
146 };
147
148 if let Some(len) = range.len()
149 && len != data.len()
150 {
151 return Err(ObjectError::internal(format!(
152 "mismatched size: expected {}, found {} when reading {} at {:?}",
153 len,
154 data.len(),
155 path,
156 range,
157 )));
158 }
159
160 Ok(data.to_bytes())
161 }
162
163 async fn streaming_read(
167 &self,
168 path: &str,
169 range: Range<usize>,
170 ) -> ObjectResult<ObjectDataStream> {
171 fail_point!("opendal_streaming_read_err", |_| Err(
172 ObjectError::internal("opendal streaming read error")
173 ));
174 let range: Range<u64> = (range.start as u64)..(range.end as u64);
175
176 let reader = self
181 .op
182 .clone()
183 .layer(TimeoutLayer::new().with_io_timeout(Duration::from_millis(
184 self.config.retry.streaming_read_attempt_timeout_ms,
185 )))
186 .layer(
187 RetryLayer::new()
188 .with_min_delay(Duration::from_millis(
189 self.config.retry.req_backoff_interval_ms,
190 ))
191 .with_max_delay(Duration::from_millis(
192 self.config.retry.req_backoff_max_delay_ms,
193 ))
194 .with_max_times(self.config.retry.streaming_read_retry_attempts)
195 .with_factor(self.config.retry.req_backoff_factor as f32)
196 .with_jitter(),
197 )
198 .reader_with(path)
199 .await?;
200 let stream = reader.into_bytes_stream(range).await?.map(|item| {
201 item.map(|b| Bytes::copy_from_slice(b.as_ref()))
202 .map_err(|e| {
203 ObjectError::internal(format!("reader into_stream fail {}", e.as_report()))
204 })
205 });
206
207 Ok(Box::pin(stream))
208 }
209
210 async fn metadata(&self, path: &str) -> ObjectResult<ObjectMetadata> {
211 let opendal_metadata = self.op.stat(path).await?;
212 let key = path.to_owned();
213 let last_modified = match opendal_metadata.last_modified() {
214 Some(t) => timestamp_to_secs(t),
215 None => 0_f64,
216 };
217
218 let total_size = opendal_metadata.content_length() as usize;
219 let metadata = ObjectMetadata {
220 key,
221 last_modified,
222 total_size,
223 };
224 Ok(metadata)
225 }
226
227 async fn delete(&self, path: &str) -> ObjectResult<()> {
228 self.op.delete(path).await?;
229 Ok(())
230 }
231
232 async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> {
235 self.op.delete_iter(paths.to_vec()).await?;
236 Ok(())
237 }
238
239 async fn list(
240 &self,
241 prefix: &str,
242 start_after: Option<String>,
243 limit: Option<usize>,
244 ) -> ObjectResult<ObjectMetadataIter> {
245 let mut object_lister = self.op.lister_with(prefix).recursive(true);
246 if let Some(start_after) = start_after {
247 object_lister = object_lister.start_after(&start_after);
248 }
249 let object_lister = object_lister.await?;
250
251 let op = self.op.clone();
252 let stream = stream::unfold(object_lister, move |mut object_lister| {
253 let op = op.clone();
254
255 async move {
256 match object_lister.next().await {
257 Some(Ok(object)) => {
258 let key = object.path().to_owned();
259
260 let meta = object.metadata();
265 let mut last_modified = meta.last_modified().map(timestamp_to_secs);
266 let mut total_size = meta.content_length() as usize;
267 if last_modified.is_none() || total_size == 0 {
268 let stat_meta = op.stat(&key).await.ok()?;
269 last_modified = stat_meta.last_modified().map(timestamp_to_secs);
270 total_size = stat_meta.content_length() as usize;
271 }
272
273 let metadata = ObjectMetadata {
274 key,
275 last_modified: last_modified.unwrap_or(0_f64),
276 total_size,
277 };
278 Some((Ok(metadata), object_lister))
279 }
280 Some(Err(err)) => Some((Err(err.into()), object_lister)),
281 None => None,
282 }
283 }
284 });
285
286 Ok(stream.take(limit.unwrap_or(usize::MAX)).boxed())
287 }
288
289 fn store_media_type(&self) -> &'static str {
290 self.media_type.as_str()
291 }
292
293 fn support_streaming_upload(&self) -> bool {
294 self.op.info().native_capability().write_can_multi
295 }
296}
297
298impl OpendalObjectStore {
299 pub async fn copy(&self, from_path: &str, to_path: &str) -> ObjectResult<()> {
300 self.op.copy(from_path, to_path).await?;
301 Ok(())
302 }
303}
304
305struct OpendalStreamingUploaderExecute {
306 metrics: Arc<ObjectStoreMetrics>,
308 media_type: &'static str,
309}
310
311impl OpendalStreamingUploaderExecute {
312 const STREAMING_UPLOAD_TYPE: OperationType = OperationType::StreamingUpload;
313
314 fn new(metrics: Arc<ObjectStoreMetrics>, media_type: &'static str) -> Self {
315 Self {
316 metrics,
317 media_type,
318 }
319 }
320}
321
322impl Execute for OpendalStreamingUploaderExecute {
323 fn execute(&self, f: BoxedStaticFuture<()>) {
324 let operation_type_str = Self::STREAMING_UPLOAD_TYPE.as_str();
325 let media_type = self.media_type;
326
327 let metrics = self.metrics.clone();
328 let _handle = tokio::spawn(async move {
329 let _timer = metrics
330 .operation_latency
331 .with_label_values(&[media_type, operation_type_str])
332 .start_timer();
333
334 f.await
335 });
336 }
337}
338
339pub struct OpendalStreamingUploader {
341 writer: Writer,
342 buf: Vec<Bytes>,
345 not_uploaded_len: usize,
347 is_valid: bool,
349
350 abort_on_err: bool,
351
352 upload_part_size: usize,
353}
354
355impl OpendalStreamingUploader {
356 pub async fn new(
357 op: Operator,
358 path: String,
359 config: Arc<ObjectStoreConfig>,
360 metrics: Arc<ObjectStoreMetrics>,
361 media_type: &'static str,
362 ) -> ObjectResult<Self> {
363 let monitored_execute = OpendalStreamingUploaderExecute::new(metrics, media_type);
364 let executor = Executor::with(monitored_execute);
365 op.update_executor(|_| executor);
366
367 let writer = op
368 .clone()
369 .layer(TimeoutLayer::new().with_io_timeout(Duration::from_millis(
370 config.retry.streaming_upload_attempt_timeout_ms,
371 )))
372 .layer(
373 RetryLayer::new()
374 .with_min_delay(Duration::from_millis(config.retry.req_backoff_interval_ms))
375 .with_max_delay(Duration::from_millis(config.retry.req_backoff_max_delay_ms))
376 .with_max_times(config.retry.streaming_upload_retry_attempts)
377 .with_factor(config.retry.req_backoff_factor as f32)
378 .with_jitter(),
379 )
380 .writer_with(&path)
381 .concurrent(config.opendal_upload_concurrency)
382 .await?;
383 Ok(Self {
384 writer,
385 buf: vec![],
386 not_uploaded_len: 0,
387 is_valid: true,
388 abort_on_err: config.opendal_writer_abort_on_err,
389 upload_part_size: config.upload_part_size,
390 })
391 }
392
393 async fn flush(&mut self) -> ObjectResult<()> {
394 let data: Vec<Bytes> = self.buf.drain(..).collect();
395 debug_assert_eq!(
396 data.iter().map(|b| b.len()).sum::<usize>(),
397 self.not_uploaded_len
398 );
399 if let Err(err) = self.writer.write(data).await {
400 self.is_valid = false;
401 if self.abort_on_err {
402 self.writer.abort().await?;
403 }
404 return Err(err.into());
405 }
406 self.not_uploaded_len = 0;
407 Ok(())
408 }
409}
410
411impl StreamingUploader for OpendalStreamingUploader {
412 async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> {
413 assert!(self.is_valid);
414 self.not_uploaded_len += data.len();
415 self.buf.push(data);
416 if self.not_uploaded_len >= self.upload_part_size {
417 self.flush().await?;
418 }
419 Ok(())
420 }
421
422 async fn finish(mut self) -> ObjectResult<()> {
423 assert!(self.is_valid);
424 if self.not_uploaded_len > 0 {
425 self.flush().await?;
426 }
427
428 assert!(self.buf.is_empty());
429 assert_eq!(self.not_uploaded_len, 0);
430
431 self.is_valid = false;
432 match self.writer.close().await {
433 Ok(_) => (),
434 Err(err) => {
435 if self.abort_on_err {
436 self.writer.abort().await?;
437 }
438 return Err(err.into());
439 }
440 };
441
442 Ok(())
443 }
444
445 fn get_memory_usage(&self) -> u64 {
447 self.not_uploaded_len as u64
448 }
449}
450
451#[cfg(test)]
452mod tests {
453 use stream::TryStreamExt;
454
455 use super::*;
456
457 async fn list_all(prefix: &str, store: &OpendalObjectStore) -> Vec<ObjectMetadata> {
458 store
459 .list(prefix, None, None)
460 .await
461 .unwrap()
462 .try_collect::<Vec<_>>()
463 .await
464 .unwrap()
465 }
466
467 #[tokio::test]
468 async fn test_memory_upload() {
469 let block = Bytes::from("123456");
470 let store = OpendalObjectStore::test_new_memory_engine().unwrap();
471 store.upload("/abc", block).await.unwrap();
472
473 store.read("/ab", 0..3).await.unwrap_err();
475
476 let bytes = store.read("/abc", 4..6).await.unwrap();
477 assert_eq!(String::from_utf8(bytes.to_vec()).unwrap(), "56".to_owned());
478
479 store.delete("/abc").await.unwrap();
480
481 store.read("/abc", 0..3).await.unwrap_err();
483 }
484
485 #[tokio::test]
486 #[should_panic]
487 async fn test_memory_read_overflow() {
488 let block = Bytes::from("123456");
489 let store = OpendalObjectStore::test_new_memory_engine().unwrap();
490 store.upload("/abc", block).await.unwrap();
491
492 store.read("/abc", 4..44).await.unwrap_err();
494 }
495
496 #[tokio::test]
497 async fn test_memory_metadata() {
498 let block = Bytes::from("123456");
499 let path = "/abc".to_owned();
500 let obj_store = OpendalObjectStore::test_new_memory_engine().unwrap();
501 obj_store.upload("/abc", block).await.unwrap();
502
503 let err = obj_store.metadata("/not_exist").await.unwrap_err();
504 assert!(err.is_object_not_found_error());
505 let metadata = obj_store.metadata("/abc").await.unwrap();
506 assert_eq!(metadata.total_size, 6);
507 obj_store.delete(&path).await.unwrap();
508 }
509
510 #[tokio::test]
511 async fn test_memory_delete_objects_and_list_object() {
512 let block1 = Bytes::from("123456");
513 let block2 = Bytes::from("987654");
514 let store = OpendalObjectStore::test_new_memory_engine().unwrap();
515 store.upload("abc", Bytes::from("123456")).await.unwrap();
516 store.upload("prefix/abc", block1).await.unwrap();
517
518 store.upload("prefix/xyz", block2).await.unwrap();
519
520 assert_eq!(list_all("", &store).await.len(), 3);
521 assert_eq!(list_all("prefix/", &store).await.len(), 2);
522 let str_list = [String::from("prefix/abc"), String::from("prefix/xyz")];
523
524 store.delete_objects(&str_list).await.unwrap();
525
526 assert!(store.read("prefix/abc/", ..).await.is_err());
527 assert!(store.read("prefix/xyz/", ..).await.is_err());
528 assert_eq!(list_all("", &store).await.len(), 1);
529 assert_eq!(list_all("prefix/", &store).await.len(), 0);
530 }
531}