risingwave_batch::spill::spill_op

Struct SpillOp

source
pub struct SpillOp {
    pub op: Operator,
}
Expand description

SpillOp is used to manage the spill directory of the spilling executor and it will drop the directory with a RAII style.

Fields§

§op: Operator

Implementations§

source§

impl SpillOp

source

pub fn create(path: String, spill_backend: SpillBackend) -> Result<SpillOp>

source

pub async fn clean_spill_directory() -> Result<()>

source

pub async fn writer_with(&self, name: &str) -> Result<Writer>

source

pub async fn reader_with(&self, name: &str) -> Result<Reader>

source

pub fn read_stream( reader: Reader, spill_metrics: Arc<BatchSpillMetrics>, ) -> Pin<Box<dyn Stream<Item = Result<DataChunk, BatchError>> + Send>>

spill file content will look like the below.

[proto_len]
[proto_bytes]
...
[proto_len]
[proto_bytes]

Methods from Deref<Target = Operator>§

pub fn limit(&self) -> usize

Get current operator’s limit. Limit is usually the maximum size of data that operator will handle in one operation.

pub fn with_limit(&self, limit: usize) -> Operator

Specify the batch limit.

Default: 1000

pub fn default_executor(&self) -> Option<Executor>

Get the default executor.

pub fn with_default_executor(&self, executor: Executor) -> Operator

Specify the default executor.

pub fn info(&self) -> OperatorInfo

Get information of underlying accessor.

§Examples
use opendal::Operator;

let info = op.info();

pub fn blocking(&self) -> BlockingOperator

Create a new blocking operator.

This operation is nearly no cost.

pub async fn check(&self) -> Result<(), Error>

Check if this operator can work correctly.

We will send a list request to path and return any errors we met.

use opendal::Operator;

op.check().await?;

pub async fn stat(&self, path: &str) -> Result<Metadata, Error>

Get given path’s metadata.

§Notes
§Extra Options

[Operator::stat] is a wrapper of [Operator::stat_with] without any options. To use extra options like if_match and if_none_match, please use [Operator::stat_with] instead.

§Reuse Metadata

For fetch metadata of entries returned by [Lister], it’s better to use [Operator::list_with] and [Operator::lister_with] with metakey query like Metakey::ContentLength | Metakey::LastModified so that we can avoid extra stat requests.

§Examples
§Check if file exists
use opendal::ErrorKind;
if let Err(e) = op.stat("test").await {
    if e.kind() == ErrorKind::NotFound {
        println!("file not exist")
    }
}

pub fn stat_with( &self, path: &str, ) -> OperatorFuture<OpStat, Metadata, impl Future<Output = Result<Metadata, Error>>>

Get given path’s metadata with extra options.

§Notes
§Reuse Metadata

For fetch metadata of entries returned by [Lister], it’s better to use [Operator::list_with] and [Operator::lister_with] with metakey query like Metakey::ContentLength | Metakey::LastModified so that we can avoid extra requests.

§Options
§if_match

Set if_match for this stat request.

This feature can be used to check if the file’s ETag matches the given ETag.

If file exists and it’s etag doesn’t match, an error with kind [ErrorKind::ConditionNotMatch] will be returned.

use opendal::Operator;

let mut metadata = op.stat_with("path/to/file").if_match(etag).await?;
§if_none_match

Set if_none_match for this stat request.

This feature can be used to check if the file’s ETag doesn’t match the given ETag.

If file exists and it’s etag match, an error with kind [ErrorKind::ConditionNotMatch] will be returned.

use opendal::Operator;

let mut metadata = op.stat_with("path/to/file").if_none_match(etag).await?;
§Examples
§Get metadata while ETag matches

stat_with will

  • return Ok(metadata) if ETag matches
  • return Err(error) and error.kind() == ErrorKind::ConditionNotMatch if file exists but ETag mismatch
  • return Err(err) if other errors occur, for example, NotFound.
use opendal::ErrorKind;
if let Err(e) = op.stat_with("test").if_match("<etag>").await {
    if e.kind() == ErrorKind::ConditionNotMatch {
        println!("file exists, but etag mismatch")
    }
    if e.kind() == ErrorKind::NotFound {
        println!("file not exist")
    }
}

§Behavior
§Services that support create_dir

test and test/ may vary in some services such as S3. However, on a local file system, they’re identical. Therefore, the behavior of stat("test") and stat("test/") might differ in certain edge cases. Always use stat("test/") when you need to access a directory if possible.

Here are the behavior list:

CasePathResult
stat existing dirabc/Metadata with dir mode
stat existing fileabc/def_fileMetadata with file mode
stat dir without /abc/def_dirError NotFound or metadata with dir mode
stat file with /abc/def_file/Error NotFound
stat not existing pathxyzError NotFound

Refer to [RFC: List Prefix][crate::docs::rfcs::rfc_3243_list_prefix] for more details.

§Services that not support create_dir

For services that not support create_dir, stat("test/") will return NotFound even when test/abc exists since the service won’t have the concept of dir. There is nothing we can do about this.

pub async fn is_exist(&self, path: &str) -> Result<bool, Error>

Check if this path exists or not.

§Example
use anyhow::Result;
use futures::io;
use opendal::Operator;

async fn test(op: Operator) -> Result<()> {
    let _ = op.is_exist("test").await?;

    Ok(())
}

pub async fn create_dir(&self, path: &str) -> Result<(), Error>

Create a dir at given path.

§Notes

To indicate that a path is a directory, it is compulsory to include a trailing / in the path. Failure to do so may result in NotADirectory error being returned by OpenDAL.

§Behavior
  • Create on existing dir will succeed.
  • Create dir is always recursive, works like mkdir -p
§Examples
op.create_dir("path/to/dir/").await?;

pub async fn read(&self, path: &str) -> Result<Buffer, Error>

Read the whole path into a bytes.

§Notes
§Extra Options

[Operator::read] is a wrapper of [Operator::read_with] without any options. To use extra options like range and if_match, please use [Operator::read_with] instead.

§Streaming Read

This function will allocate a new bytes internally. For more precise memory control or reading data lazily, please use [Operator::reader]

§Examples
let bs = op.read("path/to/file").await?;

pub fn read_with( &self, path: &str, ) -> OperatorFuture<(OpRead, OpReader), Buffer, impl Future<Output = Result<Buffer, Error>>>

Read the whole path into a bytes with extra options.

This function will allocate a new bytes internally. For more precise memory control or reading data lazily, please use [Operator::reader]

§Notes
§Streaming Read

This function will allocate a new bytes internally. For more precise memory control or reading data lazily, please use [Operator::reader]

§Options
§range

Set range for this read request.

If we have a file with size n.

  • .. means read bytes in range [0, n) of file.
  • 0..1024 means read bytes in range [0, 1024) of file
  • 1024.. means read bytes in range [1024, n) of file
  • ..1024 means read bytes in range (n - 1024, n) of file
let bs = op.read_with("path/to/file").range(0..1024).await?;
§if_match

Set if_match for this read request.

This feature can be used to check if the file’s ETag matches the given ETag.

If file exists and it’s etag doesn’t match, an error with kind [ErrorKind::ConditionNotMatch] will be returned.

use opendal::Operator;
let mut metadata = op.read_with("path/to/file").if_match(etag).await?;
§if_none_match

Set if_none_match for this read request.

This feature can be used to check if the file’s ETag doesn’t match the given ETag.

If file exists and it’s etag match, an error with kind [ErrorKind::ConditionNotMatch] will be returned.

use opendal::Operator;
let mut metadata = op.read_with("path/to/file").if_none_match(etag).await?;
§concurrent

Set concurrent for the reader.

OpenDAL by default to write file without concurrent. This is not efficient for cases when users read large chunks of data. By setting concurrent, opendal will read files concurrently on support storage services.

By setting concurrent, opendal will fetch chunks concurrently with the given chunk size.

let r = op.read_with("path/to/file").concurrent(8).await?;
§chunk

OpenDAL will use services’ preferred chunk size by default. Users can set chunk based on their own needs.

This following example will make opendal read data in 4MiB chunks:

let r = op.read_with("path/to/file").chunk(4 * 1024 * 1024).await?;
§Examples

Read the whole path into a bytes.

let bs = op.read_with("path/to/file").await?;
let bs = op.read_with("path/to/file").range(0..10).await?;

pub async fn reader(&self, path: &str) -> Result<Reader, Error>

Create a new reader which can read the whole path.

§Notes
§Extra Options

[Operator::reader] is a wrapper of [Operator::reader_with] without any options. To use extra options like concurrent, please use [Operator::reader_with] instead.

§Examples
let r = op.reader("path/to/file").await?;

pub fn reader_with( &self, path: &str, ) -> OperatorFuture<(OpRead, OpReader), Reader, impl Future<Output = Result<Reader, Error>>>

Create a new reader with extra options

§Notes
§Extra Options

[Operator::reader] is a wrapper of [Operator::reader_with] without any options. To use extra options like version, please use [Operator::reader_with] instead.

§Options
§concurrent

Set concurrent for the reader.

OpenDAL by default to write file without concurrent. This is not efficient for cases when users read large chunks of data. By setting concurrent, opendal will reading files concurrently on support storage services.

By setting `concurrent``, opendal will fetch chunks concurrently with the give chunk size.

let r = op.reader_with("path/to/file").concurrent(8).await?;
§chunk

OpenDAL will use services’ preferred chunk size by default. Users can set chunk based on their own needs.

This following example will make opendal read data in 4MiB chunks:

let r = op
    .reader_with("path/to/file")
    .chunk(4 * 1024 * 1024)
    .await?;
§Examples
let r = op.reader_with("path/to/file").version("version_id").await?;

pub async fn write( &self, path: &str, bs: impl Into<Buffer>, ) -> Result<(), Error>

Write bytes into path.

§Notes
§Extra Options

[Operator::write] is a wrapper of [Operator::write_with] without any options. To use extra options like content_type and cache_control, please use [Operator::write_with] instead.

§Streaming Write

This function will write all bytes at once. For more precise memory control or writing data continuously, please use [Operator::writer].

§Multipart Uploads Write

OpenDAL abstracts the multipart uploads into [Writer]. It will automatically handle the multipart uploads for you. You can control the behavior of multipart uploads by setting chunk, concurrent via [Operator::writer_with]

§Examples
use bytes::Bytes;

op.write("path/to/file", vec![0; 4096]).await?;

pub async fn copy(&self, from: &str, to: &str) -> Result<(), Error>

Copy a file from from to to.

§Notes
  • from and to must be a file.
  • to will be overwritten if it exists.
  • If from and to are the same, an IsSameFile error will occur.
  • copy is idempotent. For same from and to input, the result will be the same.
§Examples

op.copy("path/to/file", "path/to/file2").await?;

pub async fn rename(&self, from: &str, to: &str) -> Result<(), Error>

Rename a file from from to to.

§Notes
  • from and to must be a file.
  • to will be overwritten if it exists.
  • If from and to are the same, an IsSameFile error will occur.
§Examples

op.rename("path/to/file", "path/to/file2").await?;

pub async fn writer(&self, path: &str) -> Result<Writer, Error>

Write multiple bytes into path.

§Notes
§Extra Options

[Operator::writer] is a wrapper of [Operator::writer_with] without any options. To use extra options like content_type and cache_control, please use [Operator::writer_with] instead.

§Chunk

Some storage services have a minimum chunk size requirement. For example, s3 could return hard errors like EntityTooSmall if the chunk size is too small. Some services like gcs also return errors if the chunk size is not aligned. Besides, cloud storage services will cost more money if we write data in small chunks.

OpenDAL sets the chunk size automatically based on the Capability of the service if users don’t set it. Users can set chunk to control the exact size to send to the storage service.

Users can use [Operator::writer_with] to set a good chunk size might improve the performance,

§Examples
use bytes::Bytes;

let mut w = op.writer("path/to/file").await?;
w.write(vec![0; 4096]).await?;
w.write(vec![1; 4096]).await?;
w.close().await?;

pub fn writer_with( &self, path: &str, ) -> OperatorFuture<(OpWrite, OpWriter), Writer, impl Future<Output = Result<Writer, Error>>>

Write multiple bytes into path with extra options.

§Options
§append

Set append for this write request.

write by default to overwrite existing files. To append to the end of file instead, please set append to true.

The following example will append data to existing file instead.

use bytes::Bytes;

let mut w = op.writer_with("path/to/file").append(true).await?;
w.write(vec![0; 4096]).await?;
w.write(vec![1; 4096]).await?;
w.close().await?;
§chunk

Set chunk for the writer.

Some storage services have a minimum chunk size requirement. For example, s3 could return hard errors like EntityTooSmall if the chunk size is too small. Some services like gcs also return errors if the chunk size is not aligned. Besides, cloud storage services will cost more money if we write data in small chunks.

OpenDAL sets the chunk size automatically based on the Capability of the service if users don’t set it. Users can set chunk to control the exact size to send to the storage service.

Set a good chunk size might improve the performance, reduce the API calls and save money.

The following example will set the writer chunk to 8MiB. Only one API call will be sent at close instead.

use bytes::Bytes;

let mut w = op
    .writer_with("path/to/file")
    .chunk(8 * 1024 * 1024)
    .await?;
w.write(vec![0; 4096]).await?;
w.write(vec![1; 4096]).await?;
w.close().await?;
§concurrent

Set concurrent for the writer.

OpenDAL by default to write file without concurrent. This is not efficient for cases when users write large chunks of data. By setting concurrent, opendal will writing files concurrently on support storage services.

The following example will set the writer concurrent to 8.

  • The first write will start and return immediately.
  • The second write will start and return immediately.
  • The close will make sure all writes are done in order and return result.
use bytes::Bytes;

let mut w = op.writer_with("path/to/file").concurrent(8).await?;
w.write(vec![0; 4096]).await?; // Start the first write
w.write(vec![1; 4096]).await?; // Second write will be concurrent without wait
w.close().await?; // Close will make sure all writes are done and success
§cache_control

Set the cache_control for this write request.

Some storage services support setting cache_control as system metadata.

use bytes::Bytes;

let mut w = op
    .writer_with("path/to/file")
    .cache_control("max-age=604800")
    .await?;
w.write(vec![0; 4096]).await?;
w.write(vec![1; 4096]).await?;
w.close().await?;
§content_type

Set the content_type for this write request.

Some storage services support setting content_type as system metadata.

use bytes::Bytes;

let mut w = op
    .writer_with("path/to/file")
    .content_type("text/plain")
    .await?;
w.write(vec![0; 4096]).await?;
w.write(vec![1; 4096]).await?;
w.close().await?;
§content_disposition

Set the content_disposition for this write request.

Some storage services support setting content_disposition as system metadata.

use bytes::Bytes;

let mut w = op
    .writer_with("path/to/file")
    .content_disposition("attachment; filename=\"filename.jpg\"")
    .await?;
w.write(vec![0; 4096]).await?;
w.write(vec![1; 4096]).await?;
w.close().await?;
§Examples
use bytes::Bytes;

let mut w = op
    .writer_with("path/to/file")
    .content_type("application/octet-stream")
    .await?;
w.write(vec![0; 4096]).await?;
w.write(vec![1; 4096]).await?;
w.close().await?;

pub fn write_with( &self, path: &str, bs: impl Into<Buffer>, ) -> OperatorFuture<(OpWrite, OpWriter, Buffer), (), impl Future<Output = Result<(), Error>>>

Write data with extra options.

§Notes
§Streaming Write

This function will write all bytes at once. For more precise memory control or writing data lazily, please use [Operator::writer_with].

§Multipart Uploads Write

OpenDAL abstracts the multipart uploads into [Writer]. It will automatically handle the multipart uploads for you. You can control the behavior of multipart uploads by setting chunk, concurrent via [Operator::writer_with]

§Options
§append

Set append for this write request.

write by default to overwrite existing files. To append to the end of file instead, please set append to true.

The following example will append data to existing file instead.

use bytes::Bytes;

let bs = b"hello, world!".to_vec();
let _ = op.write_with("path/to/file", bs).append(true).await?;
§cache_control

Set the cache_control for this write request.

Some storage services support setting cache_control as system metadata.

use bytes::Bytes;

let bs = b"hello, world!".to_vec();
let _ = op
    .write_with("path/to/file", bs)
    .cache_control("max-age=604800")
    .await?;
§content_type

Set the content_type for this write request.

Some storage services support setting content_type as system metadata.

use bytes::Bytes;

let bs = b"hello, world!".to_vec();
let _ = op
    .write_with("path/to/file", bs)
    .content_type("text/plain")
    .await?;
§content_disposition

Set the content_disposition for this write request.

Some storage services support setting content_disposition as system metadata.

use bytes::Bytes;

let bs = b"hello, world!".to_vec();
let _ = op
    .write_with("path/to/file", bs)
    .content_disposition("attachment; filename=\"filename.jpg\"")
    .await?;
§Examples
use bytes::Bytes;

let bs = b"hello, world!".to_vec();
let _ = op
    .write_with("path/to/file", bs)
    .content_type("text/plain")
    .await?;

pub async fn delete(&self, path: &str) -> Result<(), Error>

Delete the given path.

§Notes
  • Deleting a file that does not exist won’t return errors.
§Examples
op.delete("test").await?;

pub fn delete_with( &self, path: &str, ) -> OperatorFuture<OpDelete, (), impl Future<Output = Result<(), Error>>>

Delete the given path with extra options.

§Notes
  • Deleting a file that does not exist won’t return errors.
§Examples

op.delete_with("test").await?;

pub async fn remove(&self, paths: Vec<String>) -> Result<(), Error>

§Notes

If underlying services support delete in batch, we will use batch delete instead.

§Examples
op.remove(vec!["abc".to_string(), "def".to_string()])
    .await?;

pub async fn remove_via( &self, input: impl Stream<Item = String> + Unpin, ) -> Result<(), Error>

remove will remove files via the given paths.

remove_via will remove files via the given stream.

We will delete by chunks with given batch limit on the stream.

§Notes

If underlying services support delete in batch, we will use batch delete instead.

§Examples
use futures::stream;
let stream = stream::iter(vec!["abc".to_string(), "def".to_string()]);
op.remove_via(stream).await?;

pub async fn remove_all(&self, path: &str) -> Result<(), Error>

Remove the path and all nested dirs and files recursively.

§Notes

If underlying services support delete in batch, we will use batch delete instead.

§Examples
op.remove_all("path/to/dir").await?;

pub async fn list(&self, path: &str) -> Result<Vec<Entry>, Error>

List entries that starts with given path in parent dir.

§Notes
§Recursively List

This function only read the children of the given directory. To read all entries recursively, use Operator::list_with("path").recursive(true) instead.

§Streaming List

This function will read all entries in the given directory. It could take very long time and consume a lot of memory if the directory contains a lot of entries.

In order to avoid this, you can use [Operator::lister] to list entries in a streaming way.

§Reuse Metadata

The only metadata that is guaranteed to be available is the Mode. For fetching more metadata, please use [Operator::list_with] and metakey.

§Examples
§List entries under a dir

This example will list all entries under the dir path/to/dir/.

use opendal::EntryMode;
use opendal::Metakey;
use opendal::Operator;
let mut entries = op.list("path/to/dir/").await?;
for entry in entries {
    match entry.metadata().mode() {
        EntryMode::FILE => {
            println!("Handling file")
        }
        EntryMode::DIR => {
            println!("Handling dir {}", entry.path())
        }
        EntryMode::Unknown => continue,
    }
}
§List entries with prefix

This example will list all entries under the prefix path/to/prefix.

NOTE: it’s possible that the prefix itself is also a dir. In this case, you could get path/to/prefix/, path/to/prefix_1 and so on. If you do want to list a dir, please make sure the path is end with /.

use opendal::EntryMode;
use opendal::Metakey;
use opendal::Operator;
let mut entries = op.list("path/to/prefix").await?;
for entry in entries {
    match entry.metadata().mode() {
        EntryMode::FILE => {
            println!("Handling file")
        }
        EntryMode::DIR => {
            println!("Handling dir {}", entry.path())
        }
        EntryMode::Unknown => continue,
    }
}

pub fn list_with( &self, path: &str, ) -> OperatorFuture<OpList, Vec<Entry>, impl Future<Output = Result<Vec<Entry>, Error>>>

List entries that starts with given path in parent dir with more options.

§Notes
§Streaming list

This function will read all entries in the given directory. It could take very long time and consume a lot of memory if the directory contains a lot of entries.

In order to avoid this, you can use [Operator::lister] to list entries in a streaming way.

§Options
§start_after

Specify the specified key to start listing from.

This feature can be used to resume a listing from a previous point.

The following example will resume the list operation from the breakpoint.

use opendal::Operator;
let mut entries = op
    .list_with("path/to/dir/")
    .start_after("breakpoint")
    .await?;
§recursive

Specify whether to list recursively or not.

If recursive is set to true, we will list all entries recursively. If not, we’ll only list the entries in the specified dir.

use opendal::Operator;
let mut entries = op.list_with("path/to/dir/").recursive(true).await?;
§metakey

Specify the metadata that required to be fetched in entries.

If metakey is not set, we will fetch only the entry’s mode. Otherwise, we will retrieve the required metadata from storage services. Even if metakey is specified, the metadata may still be None, indicating that the storage service does not supply this information.

Some storage services like s3 could return more metadata like content-length and last-modified. By using metakey, we can fetch those metadata without an extra stat call. Please pick up the metadata you need to reduce the extra stat cost.

This example shows how to list entries with content-length and last-modified metadata:

use opendal::EntryMode;
use opendal::Metakey;
use opendal::Operator;
let mut entries = op
    .list_with("dir/")
    // Make sure content-length and last-modified been fetched.
    .metakey(Metakey::ContentLength | Metakey::LastModified)
    .await?;
for entry in entries {
    let meta = entry.metadata();
    match meta.mode() {
        EntryMode::FILE => {
            println!(
                "Handling file {} with size {}",
                entry.path(),
                meta.content_length()
            )
        }
        EntryMode::DIR => {
            println!("Handling dir {}", entry.path())
        }
        EntryMode::Unknown => continue,
    }
}
§Examples
§List all entries recursively

This example will list all entries under the dir path/to/dir/

use opendal::EntryMode;
use opendal::Metakey;
use opendal::Operator;
let mut entries = op.list_with("path/to/dir/").recursive(true).await?;
for entry in entries {
    match entry.metadata().mode() {
        EntryMode::FILE => {
            println!("Handling file")
        }
        EntryMode::DIR => {
            println!("Handling dir like start a new list via meta.path()")
        }
        EntryMode::Unknown => continue,
    }
}
§List all entries start with prefix

This example will list all entries starts with prefix path/to/prefix

use opendal::EntryMode;
use opendal::Metakey;
use opendal::Operator;
let mut entries = op.list_with("path/to/prefix").recursive(true).await?;
for entry in entries {
    match entry.metadata().mode() {
        EntryMode::FILE => {
            println!("Handling file")
        }
        EntryMode::DIR => {
            println!("Handling dir like start a new list via meta.path()")
        }
        EntryMode::Unknown => continue,
    }
}

pub async fn lister(&self, path: &str) -> Result<Lister, Error>

List entries that starts with given path in parent dir.

This function will create a new [Lister] to list entries. Users can stop listing via dropping this [Lister].

§Notes
§Recursively list

This function only read the children of the given directory. To read all entries recursively, use [Operator::lister_with] and recursive(true) instead.

§Reuse Metadata

The only metadata that is guaranteed to be available is the Mode. For fetching more metadata, please use [Operator::lister_with] and metakey.

§Examples
use futures::TryStreamExt;
use opendal::EntryMode;
use opendal::Metakey;
use opendal::Operator;
let mut ds = op.lister("path/to/dir/").await?;
while let Some(mut de) = ds.try_next().await? {
    match de.metadata().mode() {
        EntryMode::FILE => {
            println!("Handling file")
        }
        EntryMode::DIR => {
            println!("Handling dir like start a new list via meta.path()")
        }
        EntryMode::Unknown => continue,
    }
}

pub fn lister_with( &self, path: &str, ) -> OperatorFuture<OpList, Lister, impl Future<Output = Result<Lister, Error>>>

List entries that starts with given path in parent dir with options.

This function will create a new [Lister] to list entries. Users can stop listing via dropping this [Lister].

§Options
§start_after

Specify the specified key to start listing from.

This feature can be used to resume a listing from a previous point.

The following example will resume the list operation from the breakpoint.

use opendal::Operator;
let mut lister = op
    .lister_with("path/to/dir/")
    .start_after("breakpoint")
    .await?;
§recursive

Specify whether to list recursively or not.

If recursive is set to true, we will list all entries recursively. If not, we’ll only list the entries in the specified dir.

use opendal::Operator;
let mut lister = op.lister_with("path/to/dir/").recursive(true).await?;
§metakey

Specify the metadata that required to be fetched in entries.

If metakey is not set, we will fetch only the entry’s mode. Otherwise, we will retrieve the required metadata from storage services. Even if metakey is specified, the metadata may still be None, indicating that the storage service does not supply this information.

Some storage services like s3 could return more metadata like content-length and last-modified. By using metakey, we can fetch those metadata without an extra stat call. Please pick up the metadata you need to reduce the extra stat cost.

This example shows how to list entries with content-length and last-modified metadata:

use futures::TryStreamExt;
use opendal::EntryMode;
use opendal::Metakey;
use opendal::Operator;
let mut lister = op
    .lister_with("dir/")
    // Make sure content-length and last-modified been fetched.
    .metakey(Metakey::ContentLength | Metakey::LastModified)
    .await?;
while let Some(mut entry) = lister.try_next().await? {
    let meta = entry.metadata();
    match meta.mode() {
        EntryMode::FILE => {
            println!(
                "Handling file {} with size {}",
                entry.path(),
                meta.content_length()
            )
        }
        EntryMode::DIR => {
            println!("Handling dir {}", entry.path())
        }
        EntryMode::Unknown => continue,
    }
}
§Examples
§List all files recursively
use futures::TryStreamExt;
use opendal::EntryMode;
use opendal::Metakey;
use opendal::Operator;
let mut lister = op.lister_with("path/to/dir/").recursive(true).await?;
while let Some(mut entry) = lister.try_next().await? {
    match entry.metadata().mode() {
        EntryMode::FILE => {
            println!("Handling file {}", entry.path())
        }
        EntryMode::DIR => {
            println!("Handling dir {}", entry.path())
        }
        EntryMode::Unknown => continue,
    }
}
§List files with required metadata
use futures::TryStreamExt;
use opendal::EntryMode;
use opendal::Metakey;
use opendal::Operator;
let mut ds = op
    .lister_with("path/to/dir/")
    .metakey(Metakey::ContentLength | Metakey::LastModified)
    .await?;
while let Some(mut entry) = ds.try_next().await? {
    let meta = entry.metadata();
    match meta.mode() {
        EntryMode::FILE => {
            println!(
                "Handling file {} with size {}",
                entry.path(),
                meta.content_length()
            )
        }
        EntryMode::DIR => {
            println!("Handling dir {}", entry.path())
        }
        EntryMode::Unknown => continue,
    }
}

pub async fn presign_stat( &self, path: &str, expire: Duration, ) -> Result<PresignedRequest, Error>

Presign an operation for stat(head).

§Example
use anyhow::Result;
use futures::io;
use opendal::Operator;
use std::time::Duration;

async fn test(op: Operator) -> Result<()> {
    let signed_req = op.presign_stat("test",Duration::from_secs(3600)).await?;
    let req = http::Request::builder()
        .method(signed_req.method())
        .uri(signed_req.uri())
        .body(())?;

pub fn presign_stat_with( &self, path: &str, expire: Duration, ) -> OperatorFuture<(OpStat, Duration), PresignedRequest, impl Future<Output = Result<PresignedRequest, Error>>>

Presign an operation for stat(head).

§Example
use anyhow::Result;
use futures::io;
use opendal::Operator;
use std::time::Duration;

async fn test(op: Operator) -> Result<()> {
    let signed_req = op.presign_stat_with("test",Duration::from_secs(3600)).override_content_disposition("attachment; filename=\"othertext.txt\"").await?;

pub async fn presign_read( &self, path: &str, expire: Duration, ) -> Result<PresignedRequest, Error>

Presign an operation for read.

§Notes
§Extra Options

presign_read is a wrapper of [Self::presign_read_with] without any options. To use extra options like override_content_disposition, please use [Self::presign_read_with] instead.

§Example
use anyhow::Result;
use futures::io;
use opendal::Operator;
use std::time::Duration;

async fn test(op: Operator) -> Result<()> {
    let signed_req = op.presign_read("test.txt", Duration::from_secs(3600)).await?;
  • signed_req.method(): GET
  • signed_req.uri(): https://s3.amazonaws.com/examplebucket/test.txt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=access_key_id/20130721/us-east-1/s3/aws4_request&X-Amz-Date=20130721T201207Z&X-Amz-Expires=86400&X-Amz-SignedHeaders=host&X-Amz-Signature=<signature-value>
  • signed_req.headers(): { "host": "s3.amazonaws.com" }

We can download this file via curl or other tools without credentials:

curl "https://s3.amazonaws.com/examplebucket/test.txt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=access_key_id/20130721/us-east-1/s3/aws4_request&X-Amz-Date=20130721T201207Z&X-Amz-Expires=86400&X-Amz-SignedHeaders=host&X-Amz-Signature=<signature-value>" -O /tmp/test.txt

pub fn presign_read_with( &self, path: &str, expire: Duration, ) -> OperatorFuture<(OpRead, Duration), PresignedRequest, impl Future<Output = Result<PresignedRequest, Error>>>

Presign an operation for read with extra options.

§Options
§override_content_disposition

Override the content-disposition header returned by storage services.

use std::time::Duration;

use anyhow::Result;
use opendal::Operator;

async fn test(op: Operator) -> Result<()> {
    let signed_req = op
        .presign_read_with("test.txt", Duration::from_secs(3600))
        .override_content_disposition("attachment; filename=\"othertext.txt\"")
        .await?;
    Ok(())
}
§override_cache_control

Override the cache-control header returned by storage services.

use std::time::Duration;

use anyhow::Result;
use opendal::Operator;

async fn test(op: Operator) -> Result<()> {
    let signed_req = op
        .presign_read_with("test.txt", Duration::from_secs(3600))
        .override_cache_control("no-store")
        .await?;
    Ok(())
}
§override_content_type

Override the content-type header returned by storage services.

use std::time::Duration;

use anyhow::Result;
use futures::io;
use opendal::Operator;

async fn test(op: Operator) -> Result<()> {
    let signed_req = op
        .presign_read_with("test.txt", Duration::from_secs(3600))
        .override_content_type("text/plain")
        .await?;
    Ok(())
}

pub async fn presign_write( &self, path: &str, expire: Duration, ) -> Result<PresignedRequest, Error>

Presign an operation for write.

§Notes
§Extra Options

presign_write is a wrapper of [Self::presign_write_with] without any options. To use extra options like content_type, please use [Self::presign_write_with] instead.

§Example
use std::time::Duration;

use anyhow::Result;
use opendal::Operator;

async fn test(op: Operator) -> Result<()> {
    let signed_req = op
        .presign_write("test.txt", Duration::from_secs(3600))
        .await?;
    Ok(())
}
  • signed_req.method(): PUT
  • signed_req.uri(): https://s3.amazonaws.com/examplebucket/test.txt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=access_key_id/20130721/us-east-1/s3/aws4_request&X-Amz-Date=20130721T201207Z&X-Amz-Expires=86400&X-Amz-SignedHeaders=host&X-Amz-Signature=<signature-value>
  • signed_req.headers(): { "host": "s3.amazonaws.com" }

We can upload file as this file via curl or other tools without credential:

curl -X PUT "https://s3.amazonaws.com/examplebucket/test.txt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=access_key_id/20130721/us-east-1/s3/aws4_request&X-Amz-Date=20130721T201207Z&X-Amz-Expires=86400&X-Amz-SignedHeaders=host&X-Amz-Signature=<signature-value>" -d "Hello, World!"

pub fn presign_write_with( &self, path: &str, expire: Duration, ) -> OperatorFuture<(OpWrite, Duration), PresignedRequest, impl Future<Output = Result<PresignedRequest, Error>>>

Presign an operation for write with extra options.

§Options
§content_type

Set the content-type header returned by storage services.

use std::time::Duration;

use anyhow::Result;
use opendal::Operator;

async fn test(op: Operator) -> Result<()> {
    let signed_req = op
        .presign_write_with("test", Duration::from_secs(3600))
        .content_type("text/csv")
        .await?;
    let req = http::Request::builder()
        .method(signed_req.method())
        .uri(signed_req.uri())
        .body(())?;

    Ok(())
}
§content_disposition

Set the content-disposition header returned by storage services.

use std::time::Duration;

use anyhow::Result;
use opendal::Operator;

async fn test(op: Operator) -> Result<()> {
    let signed_req = op
        .presign_write_with("test", Duration::from_secs(3600))
        .content_disposition("attachment; filename=\"cool.html\"")
        .await?;
    let req = http::Request::builder()
        .method(signed_req.method())
        .uri(signed_req.uri())
        .body(())?;

    Ok(())
}
§cache_control

Set the cache-control header returned by storage services.

use std::time::Duration;

use anyhow::Result;
use opendal::Operator;

async fn test(op: Operator) -> Result<()> {
    let signed_req = op
        .presign_write_with("test", Duration::from_secs(3600))
        .cache_control("no-store")
        .await?;
    let req = http::Request::builder()
        .method(signed_req.method())
        .uri(signed_req.uri())
        .body(())?;

    Ok(())
}

Trait Implementations§

source§

impl Deref for SpillOp

source§

type Target = Operator

The resulting type after dereferencing.
source§

fn deref(&self) -> &Self::Target

Dereferences the value.
source§

impl DerefMut for SpillOp

source§

fn deref_mut(&mut self) -> &mut Self::Target

Mutably dereferences the value.
source§

impl Drop for SpillOp

source§

fn drop(&mut self)

Executes the destructor for this type. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
§

impl<T> Conv for T

§

fn conv<T>(self) -> T
where Self: Into<T>,

Converts self into T using Into<T>. Read more
§

impl<Choices> CoproductSubsetter<CNil, HNil> for Choices

§

type Remainder = Choices

§

fn subset( self, ) -> Result<CNil, <Choices as CoproductSubsetter<CNil, HNil>>::Remainder>

Extract a subset of the possible types in a coproduct (or get the remaining possibilities) Read more
§

impl<T> Downcast for T
where T: Any,

§

fn into_any(self: Box<T>) -> Box<dyn Any>

Convert Box<dyn Trait> (where Trait: Downcast) to Box<dyn Any>. Box<dyn Any> can then be further downcast into Box<ConcreteType> where ConcreteType implements Trait.
§

fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>

Convert Rc<Trait> (where Trait: Downcast) to Rc<Any>. Rc<Any> can then be further downcast into Rc<ConcreteType> where ConcreteType implements Trait.
§

fn as_any(&self) -> &(dyn Any + 'static)

Convert &Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot generate &Any’s vtable from &Trait’s.
§

fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)

Convert &mut Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot generate &mut Any’s vtable from &mut Trait’s.
§

impl<T> DowncastSync for T
where T: Any + Send + Sync,

§

fn into_any_arc(self: Arc<T>) -> Arc<dyn Any + Sync + Send>

Convert Arc<Trait> (where Trait: Downcast) to Arc<Any>. Arc<Any> can then be further downcast into Arc<ConcreteType> where ConcreteType implements Trait.
§

impl<T> FmtForward for T

§

fn fmt_binary(self) -> FmtBinary<Self>
where Self: Binary,

Causes self to use its Binary implementation when Debug-formatted.
§

fn fmt_display(self) -> FmtDisplay<Self>
where Self: Display,

Causes self to use its Display implementation when Debug-formatted.
§

fn fmt_lower_exp(self) -> FmtLowerExp<Self>
where Self: LowerExp,

Causes self to use its LowerExp implementation when Debug-formatted.
§

fn fmt_lower_hex(self) -> FmtLowerHex<Self>
where Self: LowerHex,

Causes self to use its LowerHex implementation when Debug-formatted.
§

fn fmt_octal(self) -> FmtOctal<Self>
where Self: Octal,

Causes self to use its Octal implementation when Debug-formatted.
§

fn fmt_pointer(self) -> FmtPointer<Self>
where Self: Pointer,

Causes self to use its Pointer implementation when Debug-formatted.
§

fn fmt_upper_exp(self) -> FmtUpperExp<Self>
where Self: UpperExp,

Causes self to use its UpperExp implementation when Debug-formatted.
§

fn fmt_upper_hex(self) -> FmtUpperHex<Self>
where Self: UpperHex,

Causes self to use its UpperHex implementation when Debug-formatted.
§

fn fmt_list(self) -> FmtList<Self>
where &'a Self: for<'a> IntoIterator,

Formats each item in a sequence. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> FutureExt for T

§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoEither for T

source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
§

impl<T> IntoResult<T> for T

§

type Err = Infallible

§

fn into_result(self) -> Result<T, <T as IntoResult<T>>::Err>

§

impl<Unshared, Shared> IntoShared<Shared> for Unshared
where Shared: FromUnshared<Unshared>,

§

fn into_shared(self) -> Shared

Creates a shared type from an unshared type.
§

impl<T, U, I> LiftInto<U, I> for T
where U: LiftFrom<T, I>,

§

fn lift_into(self) -> U

Performs the indexed conversion.
source§

impl<M> MetricVecRelabelExt for M

source§

fn relabel( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>

source§

fn relabel_n( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, relabel_num: usize, ) -> RelabeledMetricVec<M>

source§

fn relabel_debug_1( self, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>

Equivalent to RelabeledMetricVec::with_metric_level_relabel_n with metric_level set to MetricLevel::Debug and relabel_num set to 1.
§

impl<T> Pipe for T
where T: ?Sized,

§

fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> R
where Self: Sized,

Pipes by value. This is generally the method you want to use. Read more
§

fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> R
where R: 'a,

Borrows self and passes that borrow into the pipe function. Read more
§

fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> R
where R: 'a,

Mutably borrows self and passes that borrow into the pipe function. Read more
§

fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
where Self: Borrow<B>, B: 'a + ?Sized, R: 'a,

Borrows self, then passes self.borrow() into the pipe function. Read more
§

fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
where Self: BorrowMut<B>, B: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.borrow_mut() into the pipe function. Read more
§

fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
where Self: AsRef<U>, U: 'a + ?Sized, R: 'a,

Borrows self, then passes self.as_ref() into the pipe function.
§

fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
where Self: AsMut<U>, U: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.as_mut() into the pipe function.
§

fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
where Self: Deref<Target = T>, T: 'a + ?Sized, R: 'a,

Borrows self, then passes self.deref() into the pipe function.
§

fn pipe_deref_mut<'a, T, R>( &'a mut self, func: impl FnOnce(&'a mut T) -> R, ) -> R
where Self: DerefMut<Target = T> + Deref, T: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.deref_mut() into the pipe function.
§

impl<T> Pointable for T

§

const ALIGN: usize = _

The alignment of pointer.
§

type Init = T

The type for initializers.
§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<T> Same for T

source§

type Output = T

Should always be Self
§

impl<Source> Sculptor<HNil, HNil> for Source

§

type Remainder = Source

§

fn sculpt(self) -> (HNil, <Source as Sculptor<HNil, HNil>>::Remainder)

Consumes the current HList and returns an HList with the requested shape. Read more
source§

impl<T> SerTo<T> for T

§

impl<T> Tap for T

§

fn tap(self, func: impl FnOnce(&Self)) -> Self

Immutable access to a value. Read more
§

fn tap_mut(self, func: impl FnOnce(&mut Self)) -> Self

Mutable access to a value. Read more
§

fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Immutable access to the Borrow<B> of a value. Read more
§

fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Mutable access to the BorrowMut<B> of a value. Read more
§

fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Immutable access to the AsRef<R> view of a value. Read more
§

fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Mutable access to the AsMut<R> view of a value. Read more
§

fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Immutable access to the Deref::Target of a value. Read more
§

fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Mutable access to the Deref::Target of a value. Read more
§

fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self

Calls .tap() only in debug builds, and is erased in release builds.
§

fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self

Calls .tap_mut() only in debug builds, and is erased in release builds.
§

fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Calls .tap_borrow() only in debug builds, and is erased in release builds.
§

fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Calls .tap_borrow_mut() only in debug builds, and is erased in release builds.
§

fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Calls .tap_ref() only in debug builds, and is erased in release builds.
§

fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Calls .tap_ref_mut() only in debug builds, and is erased in release builds.
§

fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Calls .tap_deref() only in debug builds, and is erased in release builds.
§

fn tap_deref_mut_dbg<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Calls .tap_deref_mut() only in debug builds, and is erased in release builds.
§

impl<T> TryConv for T

§

fn try_conv<T>(self) -> Result<T, Self::Error>
where Self: TryInto<T>,

Attempts to convert self into T using TryInto<T>. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

source§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

impl<T> LruValue for T
where T: Send + Sync,

§

impl<T> MaybeSend for T
where T: Send,

§

impl<T> MaybeSend for T
where T: Send,

§

impl<T> Value for T
where T: Send + Sync + 'static,