pub struct SpillOp {
pub op: Operator,
}
Expand description
SpillOp
is used to manage the spill directory of the spilling executor and it will drop the directory with a RAII style.
Fields§
§op: Operator
Implementations§
source§impl SpillOp
impl SpillOp
pub fn create(path: String, spill_backend: SpillBackend) -> Result<SpillOp>
pub async fn clean_spill_directory() -> Result<()>
pub async fn writer_with(&self, name: &str) -> Result<Writer>
pub async fn reader_with(&self, name: &str) -> Result<Reader>
sourcepub fn read_stream(
reader: Reader,
spill_metrics: Arc<BatchSpillMetrics>,
) -> Pin<Box<dyn Stream<Item = Result<DataChunk, BatchError>> + Send>>
pub fn read_stream( reader: Reader, spill_metrics: Arc<BatchSpillMetrics>, ) -> Pin<Box<dyn Stream<Item = Result<DataChunk, BatchError>> + Send>>
spill file content will look like the below.
[proto_len]
[proto_bytes]
...
[proto_len]
[proto_bytes]
Methods from Deref<Target = Operator>§
pub fn limit(&self) -> usize
pub fn limit(&self) -> usize
Get current operator’s limit. Limit is usually the maximum size of data that operator will handle in one operation.
pub fn with_limit(&self, limit: usize) -> Operator
pub fn with_limit(&self, limit: usize) -> Operator
Specify the batch limit.
Default: 1000
pub fn default_executor(&self) -> Option<Executor>
pub fn default_executor(&self) -> Option<Executor>
Get the default executor.
pub fn with_default_executor(&self, executor: Executor) -> Operator
pub fn with_default_executor(&self, executor: Executor) -> Operator
Specify the default executor.
pub fn info(&self) -> OperatorInfo
pub fn info(&self) -> OperatorInfo
pub fn blocking(&self) -> BlockingOperator
pub fn blocking(&self) -> BlockingOperator
Create a new blocking operator.
This operation is nearly no cost.
pub async fn check(&self) -> Result<(), Error>
pub async fn check(&self) -> Result<(), Error>
Check if this operator can work correctly.
We will send a list
request to path and return any errors we met.
use opendal::Operator;
op.check().await?;
pub async fn stat(&self, path: &str) -> Result<Metadata, Error>
pub async fn stat(&self, path: &str) -> Result<Metadata, Error>
Get given path’s metadata.
§Notes
§Extra Options
[Operator::stat
] is a wrapper of [Operator::stat_with
] without any options. To use extra
options like if_match
and if_none_match
, please use [Operator::stat_with
] instead.
§Reuse Metadata
For fetch metadata of entries returned by [Lister
], it’s better to use
[Operator::list_with
] and [Operator::lister_with
] with metakey
query like
Metakey::ContentLength | Metakey::LastModified
so that we can avoid extra stat requests.
§Examples
§Check if file exists
use opendal::ErrorKind;
if let Err(e) = op.stat("test").await {
if e.kind() == ErrorKind::NotFound {
println!("file not exist")
}
}
pub fn stat_with(
&self,
path: &str,
) -> OperatorFuture<OpStat, Metadata, impl Future<Output = Result<Metadata, Error>>>
pub fn stat_with( &self, path: &str, ) -> OperatorFuture<OpStat, Metadata, impl Future<Output = Result<Metadata, Error>>>
Get given path’s metadata with extra options.
§Notes
§Reuse Metadata
For fetch metadata of entries returned by [Lister
], it’s better to use
[Operator::list_with
] and [Operator::lister_with
] with metakey
query like
Metakey::ContentLength | Metakey::LastModified
so that we can avoid extra requests.
§Options
§if_match
Set if_match
for this stat
request.
This feature can be used to check if the file’s ETag
matches the given ETag
.
If file exists and it’s etag doesn’t match, an error with kind [ErrorKind::ConditionNotMatch
]
will be returned.
use opendal::Operator;
let mut metadata = op.stat_with("path/to/file").if_match(etag).await?;
§if_none_match
Set if_none_match
for this stat
request.
This feature can be used to check if the file’s ETag
doesn’t match the given ETag
.
If file exists and it’s etag match, an error with kind [ErrorKind::ConditionNotMatch
]
will be returned.
use opendal::Operator;
let mut metadata = op.stat_with("path/to/file").if_none_match(etag).await?;
§Examples
§Get metadata while ETag
matches
stat_with
will
- return
Ok(metadata)
ifETag
matches - return
Err(error)
anderror.kind() == ErrorKind::ConditionNotMatch
if file exists butETag
mismatch - return
Err(err)
if other errors occur, for example,NotFound
.
use opendal::ErrorKind;
if let Err(e) = op.stat_with("test").if_match("<etag>").await {
if e.kind() == ErrorKind::ConditionNotMatch {
println!("file exists, but etag mismatch")
}
if e.kind() == ErrorKind::NotFound {
println!("file not exist")
}
}
§Behavior
§Services that support create_dir
test
and test/
may vary in some services such as S3. However, on a local file system,
they’re identical. Therefore, the behavior of stat("test")
and stat("test/")
might differ
in certain edge cases. Always use stat("test/")
when you need to access a directory if possible.
Here are the behavior list:
Case | Path | Result |
---|---|---|
stat existing dir | abc/ | Metadata with dir mode |
stat existing file | abc/def_file | Metadata with file mode |
stat dir without / | abc/def_dir | Error NotFound or metadata with dir mode |
stat file with / | abc/def_file/ | Error NotFound |
stat not existing path | xyz | Error NotFound |
Refer to [RFC: List Prefix][crate::docs::rfcs::rfc_3243_list_prefix] for more details.
§Services that not support create_dir
For services that not support create_dir
, stat("test/")
will return NotFound
even
when test/abc
exists since the service won’t have the concept of dir. There is nothing
we can do about this.
pub async fn is_exist(&self, path: &str) -> Result<bool, Error>
pub async fn is_exist(&self, path: &str) -> Result<bool, Error>
Check if this path exists or not.
§Example
use anyhow::Result;
use futures::io;
use opendal::Operator;
async fn test(op: Operator) -> Result<()> {
let _ = op.is_exist("test").await?;
Ok(())
}
pub async fn create_dir(&self, path: &str) -> Result<(), Error>
pub async fn create_dir(&self, path: &str) -> Result<(), Error>
Create a dir at given path.
§Notes
To indicate that a path is a directory, it is compulsory to include
a trailing / in the path. Failure to do so may result in
NotADirectory
error being returned by OpenDAL.
§Behavior
- Create on existing dir will succeed.
- Create dir is always recursive, works like
mkdir -p
§Examples
op.create_dir("path/to/dir/").await?;
pub async fn read(&self, path: &str) -> Result<Buffer, Error>
pub async fn read(&self, path: &str) -> Result<Buffer, Error>
Read the whole path into a bytes.
§Notes
§Extra Options
[Operator::read
] is a wrapper of [Operator::read_with
] without any options. To use
extra options like range
and if_match
, please use [Operator::read_with
] instead.
§Streaming Read
This function will allocate a new bytes internally. For more precise memory control or
reading data lazily, please use [Operator::reader
]
§Examples
let bs = op.read("path/to/file").await?;
pub fn read_with(
&self,
path: &str,
) -> OperatorFuture<(OpRead, OpReader), Buffer, impl Future<Output = Result<Buffer, Error>>>
pub fn read_with( &self, path: &str, ) -> OperatorFuture<(OpRead, OpReader), Buffer, impl Future<Output = Result<Buffer, Error>>>
Read the whole path into a bytes with extra options.
This function will allocate a new bytes internally. For more precise memory control or
reading data lazily, please use [Operator::reader
]
§Notes
§Streaming Read
This function will allocate a new bytes internally. For more precise memory control or
reading data lazily, please use [Operator::reader
]
§Options
§range
Set range
for this read
request.
If we have a file with size n
.
..
means read bytes in range[0, n)
of file.0..1024
means read bytes in range[0, 1024)
of file1024..
means read bytes in range[1024, n)
of file..1024
means read bytes in range(n - 1024, n)
of file
let bs = op.read_with("path/to/file").range(0..1024).await?;
§if_match
Set if_match
for this read
request.
This feature can be used to check if the file’s ETag
matches the given ETag
.
If file exists and it’s etag doesn’t match, an error with kind [ErrorKind::ConditionNotMatch
]
will be returned.
use opendal::Operator;
let mut metadata = op.read_with("path/to/file").if_match(etag).await?;
§if_none_match
Set if_none_match
for this read
request.
This feature can be used to check if the file’s ETag
doesn’t match the given ETag
.
If file exists and it’s etag match, an error with kind [ErrorKind::ConditionNotMatch
]
will be returned.
use opendal::Operator;
let mut metadata = op.read_with("path/to/file").if_none_match(etag).await?;
§concurrent
Set concurrent
for the reader.
OpenDAL by default to write file without concurrent. This is not efficient for cases when users
read large chunks of data. By setting concurrent
, opendal will read files concurrently
on support storage services.
By setting concurrent
, opendal will fetch chunks concurrently with
the given chunk size.
let r = op.read_with("path/to/file").concurrent(8).await?;
§chunk
OpenDAL will use services’ preferred chunk size by default. Users can set chunk based on their own needs.
This following example will make opendal read data in 4MiB chunks:
let r = op.read_with("path/to/file").chunk(4 * 1024 * 1024).await?;
§Examples
Read the whole path into a bytes.
let bs = op.read_with("path/to/file").await?;
let bs = op.read_with("path/to/file").range(0..10).await?;
pub fn reader_with(
&self,
path: &str,
) -> OperatorFuture<(OpRead, OpReader), Reader, impl Future<Output = Result<Reader, Error>>>
pub fn reader_with( &self, path: &str, ) -> OperatorFuture<(OpRead, OpReader), Reader, impl Future<Output = Result<Reader, Error>>>
Create a new reader with extra options
§Notes
§Extra Options
[Operator::reader
] is a wrapper of [Operator::reader_with
] without any options. To use
extra options like version
, please use [Operator::reader_with
] instead.
§Options
§concurrent
Set concurrent
for the reader.
OpenDAL by default to write file without concurrent. This is not efficient for cases when users
read large chunks of data. By setting concurrent
, opendal will reading files concurrently
on support storage services.
By setting `concurrent``, opendal will fetch chunks concurrently with the give chunk size.
let r = op.reader_with("path/to/file").concurrent(8).await?;
§chunk
OpenDAL will use services’ preferred chunk size by default. Users can set chunk based on their own needs.
This following example will make opendal read data in 4MiB chunks:
let r = op
.reader_with("path/to/file")
.chunk(4 * 1024 * 1024)
.await?;
§Examples
let r = op.reader_with("path/to/file").version("version_id").await?;
pub async fn write(
&self,
path: &str,
bs: impl Into<Buffer>,
) -> Result<(), Error>
pub async fn write( &self, path: &str, bs: impl Into<Buffer>, ) -> Result<(), Error>
Write bytes into path.
§Notes
§Extra Options
[Operator::write
] is a wrapper of [Operator::write_with
] without any options. To use
extra options like content_type
and cache_control
, please use [Operator::write_with
]
instead.
§Streaming Write
This function will write all bytes at once. For more precise memory control or
writing data continuously, please use [Operator::writer
].
§Multipart Uploads Write
OpenDAL abstracts the multipart uploads into [Writer
]. It will automatically
handle the multipart uploads for you. You can control the behavior of multipart uploads
by setting chunk
, concurrent
via [Operator::writer_with
]
§Examples
use bytes::Bytes;
op.write("path/to/file", vec![0; 4096]).await?;
pub async fn writer(&self, path: &str) -> Result<Writer, Error>
pub async fn writer(&self, path: &str) -> Result<Writer, Error>
Write multiple bytes into path.
§Notes
§Extra Options
[Operator::writer
] is a wrapper of [Operator::writer_with
] without any options. To use
extra options like content_type
and cache_control
, please use [Operator::writer_with
]
instead.
§Chunk
Some storage services have a minimum chunk size requirement. For example, s3
could return
hard errors like EntityTooSmall
if the chunk size is too small. Some services like gcs
also return errors if the chunk size is not aligned. Besides, cloud storage services will cost
more money if we write data in small chunks.
OpenDAL sets the chunk size automatically based on the Capability
of the service if users don’t set it. Users can set chunk
to control the exact size to send
to the storage service.
Users can use [Operator::writer_with
] to set a good chunk size might improve the performance,
§Examples
use bytes::Bytes;
let mut w = op.writer("path/to/file").await?;
w.write(vec![0; 4096]).await?;
w.write(vec![1; 4096]).await?;
w.close().await?;
pub fn writer_with(
&self,
path: &str,
) -> OperatorFuture<(OpWrite, OpWriter), Writer, impl Future<Output = Result<Writer, Error>>>
pub fn writer_with( &self, path: &str, ) -> OperatorFuture<(OpWrite, OpWriter), Writer, impl Future<Output = Result<Writer, Error>>>
Write multiple bytes into path with extra options.
§Options
§append
Set append
for this write
request.
write
by default to overwrite existing files. To append to the end of file instead,
please set append
to true.
The following example will append data to existing file instead.
use bytes::Bytes;
let mut w = op.writer_with("path/to/file").append(true).await?;
w.write(vec![0; 4096]).await?;
w.write(vec![1; 4096]).await?;
w.close().await?;
§chunk
Set chunk
for the writer.
Some storage services have a minimum chunk size requirement. For example, s3
could return
hard errors like EntityTooSmall
if the chunk size is too small. Some services like gcs
also return errors if the chunk size is not aligned. Besides, cloud storage services will cost
more money if we write data in small chunks.
OpenDAL sets the chunk size automatically based on the Capability
of the service if users don’t set it. Users can set chunk
to control the exact size to send
to the storage service.
Set a good chunk size might improve the performance, reduce the API calls and save money.
The following example will set the writer chunk to 8MiB. Only one API call will be sent at
close
instead.
use bytes::Bytes;
let mut w = op
.writer_with("path/to/file")
.chunk(8 * 1024 * 1024)
.await?;
w.write(vec![0; 4096]).await?;
w.write(vec![1; 4096]).await?;
w.close().await?;
§concurrent
Set concurrent
for the writer.
OpenDAL by default to write file without concurrent. This is not efficient for cases when users
write large chunks of data. By setting concurrent
, opendal will writing files concurrently
on support storage services.
The following example will set the writer concurrent to 8.
- The first write will start and return immediately.
- The second write will start and return immediately.
- The close will make sure all writes are done in order and return result.
use bytes::Bytes;
let mut w = op.writer_with("path/to/file").concurrent(8).await?;
w.write(vec![0; 4096]).await?; // Start the first write
w.write(vec![1; 4096]).await?; // Second write will be concurrent without wait
w.close().await?; // Close will make sure all writes are done and success
§cache_control
Set the cache_control
for this write
request.
Some storage services support setting cache_control
as system metadata.
use bytes::Bytes;
let mut w = op
.writer_with("path/to/file")
.cache_control("max-age=604800")
.await?;
w.write(vec![0; 4096]).await?;
w.write(vec![1; 4096]).await?;
w.close().await?;
§content_type
Set the content_type
for this write
request.
Some storage services support setting content_type
as system metadata.
use bytes::Bytes;
let mut w = op
.writer_with("path/to/file")
.content_type("text/plain")
.await?;
w.write(vec![0; 4096]).await?;
w.write(vec![1; 4096]).await?;
w.close().await?;
§content_disposition
Set the content_disposition
for this write
request.
Some storage services support setting content_disposition
as system metadata.
use bytes::Bytes;
let mut w = op
.writer_with("path/to/file")
.content_disposition("attachment; filename=\"filename.jpg\"")
.await?;
w.write(vec![0; 4096]).await?;
w.write(vec![1; 4096]).await?;
w.close().await?;
§Examples
use bytes::Bytes;
let mut w = op
.writer_with("path/to/file")
.content_type("application/octet-stream")
.await?;
w.write(vec![0; 4096]).await?;
w.write(vec![1; 4096]).await?;
w.close().await?;
pub fn write_with(
&self,
path: &str,
bs: impl Into<Buffer>,
) -> OperatorFuture<(OpWrite, OpWriter, Buffer), (), impl Future<Output = Result<(), Error>>>
pub fn write_with( &self, path: &str, bs: impl Into<Buffer>, ) -> OperatorFuture<(OpWrite, OpWriter, Buffer), (), impl Future<Output = Result<(), Error>>>
Write data with extra options.
§Notes
§Streaming Write
This function will write all bytes at once. For more precise memory control or
writing data lazily, please use [Operator::writer_with
].
§Multipart Uploads Write
OpenDAL abstracts the multipart uploads into [Writer
]. It will automatically
handle the multipart uploads for you. You can control the behavior of multipart uploads
by setting chunk
, concurrent
via [Operator::writer_with
]
§Options
§append
Set append
for this write
request.
write
by default to overwrite existing files. To append to the end of file instead,
please set append
to true.
The following example will append data to existing file instead.
use bytes::Bytes;
let bs = b"hello, world!".to_vec();
let _ = op.write_with("path/to/file", bs).append(true).await?;
§cache_control
Set the cache_control
for this write
request.
Some storage services support setting cache_control
as system metadata.
use bytes::Bytes;
let bs = b"hello, world!".to_vec();
let _ = op
.write_with("path/to/file", bs)
.cache_control("max-age=604800")
.await?;
§content_type
Set the content_type
for this write
request.
Some storage services support setting content_type
as system metadata.
use bytes::Bytes;
let bs = b"hello, world!".to_vec();
let _ = op
.write_with("path/to/file", bs)
.content_type("text/plain")
.await?;
§content_disposition
Set the content_disposition
for this write
request.
Some storage services support setting content_disposition
as system metadata.
use bytes::Bytes;
let bs = b"hello, world!".to_vec();
let _ = op
.write_with("path/to/file", bs)
.content_disposition("attachment; filename=\"filename.jpg\"")
.await?;
§Examples
use bytes::Bytes;
let bs = b"hello, world!".to_vec();
let _ = op
.write_with("path/to/file", bs)
.content_type("text/plain")
.await?;
pub fn delete_with(
&self,
path: &str,
) -> OperatorFuture<OpDelete, (), impl Future<Output = Result<(), Error>>>
pub fn delete_with( &self, path: &str, ) -> OperatorFuture<OpDelete, (), impl Future<Output = Result<(), Error>>>
pub async fn remove_via(
&self,
input: impl Stream<Item = String> + Unpin,
) -> Result<(), Error>
pub async fn remove_via( &self, input: impl Stream<Item = String> + Unpin, ) -> Result<(), Error>
remove will remove files via the given paths.
remove_via will remove files via the given stream.
We will delete by chunks with given batch limit on the stream.
§Notes
If underlying services support delete in batch, we will use batch delete instead.
§Examples
use futures::stream;
let stream = stream::iter(vec!["abc".to_string(), "def".to_string()]);
op.remove_via(stream).await?;
pub async fn remove_all(&self, path: &str) -> Result<(), Error>
pub async fn remove_all(&self, path: &str) -> Result<(), Error>
pub async fn list(&self, path: &str) -> Result<Vec<Entry>, Error>
pub async fn list(&self, path: &str) -> Result<Vec<Entry>, Error>
List entries that starts with given path
in parent dir.
§Notes
§Recursively List
This function only read the children of the given directory. To read
all entries recursively, use Operator::list_with("path").recursive(true)
instead.
§Streaming List
This function will read all entries in the given directory. It could take very long time and consume a lot of memory if the directory contains a lot of entries.
In order to avoid this, you can use [Operator::lister
] to list entries in
a streaming way.
§Reuse Metadata
The only metadata that is guaranteed to be available is the Mode
.
For fetching more metadata, please use [Operator::list_with
] and metakey
.
§Examples
§List entries under a dir
This example will list all entries under the dir path/to/dir/
.
use opendal::EntryMode;
use opendal::Metakey;
use opendal::Operator;
let mut entries = op.list("path/to/dir/").await?;
for entry in entries {
match entry.metadata().mode() {
EntryMode::FILE => {
println!("Handling file")
}
EntryMode::DIR => {
println!("Handling dir {}", entry.path())
}
EntryMode::Unknown => continue,
}
}
§List entries with prefix
This example will list all entries under the prefix path/to/prefix
.
NOTE: it’s possible that the prefix itself is also a dir. In this case, you could get
path/to/prefix/
, path/to/prefix_1
and so on. If you do want to list a dir, please
make sure the path is end with /
.
use opendal::EntryMode;
use opendal::Metakey;
use opendal::Operator;
let mut entries = op.list("path/to/prefix").await?;
for entry in entries {
match entry.metadata().mode() {
EntryMode::FILE => {
println!("Handling file")
}
EntryMode::DIR => {
println!("Handling dir {}", entry.path())
}
EntryMode::Unknown => continue,
}
}
pub fn list_with(
&self,
path: &str,
) -> OperatorFuture<OpList, Vec<Entry>, impl Future<Output = Result<Vec<Entry>, Error>>>
pub fn list_with( &self, path: &str, ) -> OperatorFuture<OpList, Vec<Entry>, impl Future<Output = Result<Vec<Entry>, Error>>>
List entries that starts with given path
in parent dir with more options.
§Notes
§Streaming list
This function will read all entries in the given directory. It could take very long time and consume a lot of memory if the directory contains a lot of entries.
In order to avoid this, you can use [Operator::lister
] to list entries in
a streaming way.
§Options
§start_after
Specify the specified key to start listing from.
This feature can be used to resume a listing from a previous point.
The following example will resume the list operation from the breakpoint
.
use opendal::Operator;
let mut entries = op
.list_with("path/to/dir/")
.start_after("breakpoint")
.await?;
§recursive
Specify whether to list recursively or not.
If recursive
is set to true
, we will list all entries recursively. If not, we’ll only
list the entries in the specified dir.
use opendal::Operator;
let mut entries = op.list_with("path/to/dir/").recursive(true).await?;
§metakey
Specify the metadata that required to be fetched in entries.
If metakey
is not set, we will fetch only the entry’s mode
. Otherwise, we will retrieve
the required metadata from storage services. Even if metakey
is specified, the metadata
may still be None
, indicating that the storage service does not supply this information.
Some storage services like s3
could return more metadata like content-length
and
last-modified
. By using metakey
, we can fetch those metadata without an extra stat
call.
Please pick up the metadata you need to reduce the extra stat
cost.
This example shows how to list entries with content-length
and last-modified
metadata:
use opendal::EntryMode;
use opendal::Metakey;
use opendal::Operator;
let mut entries = op
.list_with("dir/")
// Make sure content-length and last-modified been fetched.
.metakey(Metakey::ContentLength | Metakey::LastModified)
.await?;
for entry in entries {
let meta = entry.metadata();
match meta.mode() {
EntryMode::FILE => {
println!(
"Handling file {} with size {}",
entry.path(),
meta.content_length()
)
}
EntryMode::DIR => {
println!("Handling dir {}", entry.path())
}
EntryMode::Unknown => continue,
}
}
§Examples
§List all entries recursively
This example will list all entries under the dir path/to/dir/
use opendal::EntryMode;
use opendal::Metakey;
use opendal::Operator;
let mut entries = op.list_with("path/to/dir/").recursive(true).await?;
for entry in entries {
match entry.metadata().mode() {
EntryMode::FILE => {
println!("Handling file")
}
EntryMode::DIR => {
println!("Handling dir like start a new list via meta.path()")
}
EntryMode::Unknown => continue,
}
}
§List all entries start with prefix
This example will list all entries starts with prefix path/to/prefix
use opendal::EntryMode;
use opendal::Metakey;
use opendal::Operator;
let mut entries = op.list_with("path/to/prefix").recursive(true).await?;
for entry in entries {
match entry.metadata().mode() {
EntryMode::FILE => {
println!("Handling file")
}
EntryMode::DIR => {
println!("Handling dir like start a new list via meta.path()")
}
EntryMode::Unknown => continue,
}
}
pub async fn lister(&self, path: &str) -> Result<Lister, Error>
pub async fn lister(&self, path: &str) -> Result<Lister, Error>
List entries that starts with given path
in parent dir.
This function will create a new [Lister
] to list entries. Users can stop
listing via dropping this [Lister
].
§Notes
§Recursively list
This function only read the children of the given directory. To read
all entries recursively, use [Operator::lister_with
] and recursive(true)
instead.
§Reuse Metadata
The only metadata that is guaranteed to be available is the Mode
.
For fetching more metadata, please use [Operator::lister_with
] and metakey
.
§Examples
use futures::TryStreamExt;
use opendal::EntryMode;
use opendal::Metakey;
use opendal::Operator;
let mut ds = op.lister("path/to/dir/").await?;
while let Some(mut de) = ds.try_next().await? {
match de.metadata().mode() {
EntryMode::FILE => {
println!("Handling file")
}
EntryMode::DIR => {
println!("Handling dir like start a new list via meta.path()")
}
EntryMode::Unknown => continue,
}
}
pub fn lister_with(
&self,
path: &str,
) -> OperatorFuture<OpList, Lister, impl Future<Output = Result<Lister, Error>>>
pub fn lister_with( &self, path: &str, ) -> OperatorFuture<OpList, Lister, impl Future<Output = Result<Lister, Error>>>
List entries that starts with given path
in parent dir with options.
This function will create a new [Lister
] to list entries. Users can stop listing via
dropping this [Lister
].
§Options
§start_after
Specify the specified key to start listing from.
This feature can be used to resume a listing from a previous point.
The following example will resume the list operation from the breakpoint
.
use opendal::Operator;
let mut lister = op
.lister_with("path/to/dir/")
.start_after("breakpoint")
.await?;
§recursive
Specify whether to list recursively or not.
If recursive
is set to true
, we will list all entries recursively. If not, we’ll only
list the entries in the specified dir.
use opendal::Operator;
let mut lister = op.lister_with("path/to/dir/").recursive(true).await?;
§metakey
Specify the metadata that required to be fetched in entries.
If metakey
is not set, we will fetch only the entry’s mode
. Otherwise, we will retrieve
the required metadata from storage services. Even if metakey
is specified, the metadata
may still be None
, indicating that the storage service does not supply this information.
Some storage services like s3
could return more metadata like content-length
and
last-modified
. By using metakey
, we can fetch those metadata without an extra stat
call.
Please pick up the metadata you need to reduce the extra stat
cost.
This example shows how to list entries with content-length
and last-modified
metadata:
use futures::TryStreamExt;
use opendal::EntryMode;
use opendal::Metakey;
use opendal::Operator;
let mut lister = op
.lister_with("dir/")
// Make sure content-length and last-modified been fetched.
.metakey(Metakey::ContentLength | Metakey::LastModified)
.await?;
while let Some(mut entry) = lister.try_next().await? {
let meta = entry.metadata();
match meta.mode() {
EntryMode::FILE => {
println!(
"Handling file {} with size {}",
entry.path(),
meta.content_length()
)
}
EntryMode::DIR => {
println!("Handling dir {}", entry.path())
}
EntryMode::Unknown => continue,
}
}
§Examples
§List all files recursively
use futures::TryStreamExt;
use opendal::EntryMode;
use opendal::Metakey;
use opendal::Operator;
let mut lister = op.lister_with("path/to/dir/").recursive(true).await?;
while let Some(mut entry) = lister.try_next().await? {
match entry.metadata().mode() {
EntryMode::FILE => {
println!("Handling file {}", entry.path())
}
EntryMode::DIR => {
println!("Handling dir {}", entry.path())
}
EntryMode::Unknown => continue,
}
}
§List files with required metadata
use futures::TryStreamExt;
use opendal::EntryMode;
use opendal::Metakey;
use opendal::Operator;
let mut ds = op
.lister_with("path/to/dir/")
.metakey(Metakey::ContentLength | Metakey::LastModified)
.await?;
while let Some(mut entry) = ds.try_next().await? {
let meta = entry.metadata();
match meta.mode() {
EntryMode::FILE => {
println!(
"Handling file {} with size {}",
entry.path(),
meta.content_length()
)
}
EntryMode::DIR => {
println!("Handling dir {}", entry.path())
}
EntryMode::Unknown => continue,
}
}
pub async fn presign_stat(
&self,
path: &str,
expire: Duration,
) -> Result<PresignedRequest, Error>
pub async fn presign_stat( &self, path: &str, expire: Duration, ) -> Result<PresignedRequest, Error>
Presign an operation for stat(head).
§Example
use anyhow::Result;
use futures::io;
use opendal::Operator;
use std::time::Duration;
async fn test(op: Operator) -> Result<()> {
let signed_req = op.presign_stat("test",Duration::from_secs(3600)).await?;
let req = http::Request::builder()
.method(signed_req.method())
.uri(signed_req.uri())
.body(())?;
pub fn presign_stat_with(
&self,
path: &str,
expire: Duration,
) -> OperatorFuture<(OpStat, Duration), PresignedRequest, impl Future<Output = Result<PresignedRequest, Error>>>
pub fn presign_stat_with( &self, path: &str, expire: Duration, ) -> OperatorFuture<(OpStat, Duration), PresignedRequest, impl Future<Output = Result<PresignedRequest, Error>>>
Presign an operation for stat(head).
§Example
use anyhow::Result;
use futures::io;
use opendal::Operator;
use std::time::Duration;
async fn test(op: Operator) -> Result<()> {
let signed_req = op.presign_stat_with("test",Duration::from_secs(3600)).override_content_disposition("attachment; filename=\"othertext.txt\"").await?;
pub async fn presign_read(
&self,
path: &str,
expire: Duration,
) -> Result<PresignedRequest, Error>
pub async fn presign_read( &self, path: &str, expire: Duration, ) -> Result<PresignedRequest, Error>
Presign an operation for read.
§Notes
§Extra Options
presign_read
is a wrapper of [Self::presign_read_with
] without any options. To use
extra options like override_content_disposition
, please use [Self::presign_read_with
]
instead.
§Example
use anyhow::Result;
use futures::io;
use opendal::Operator;
use std::time::Duration;
async fn test(op: Operator) -> Result<()> {
let signed_req = op.presign_read("test.txt", Duration::from_secs(3600)).await?;
signed_req.method()
:GET
signed_req.uri()
:https://s3.amazonaws.com/examplebucket/test.txt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=access_key_id/20130721/us-east-1/s3/aws4_request&X-Amz-Date=20130721T201207Z&X-Amz-Expires=86400&X-Amz-SignedHeaders=host&X-Amz-Signature=<signature-value>
signed_req.headers()
:{ "host": "s3.amazonaws.com" }
We can download this file via curl
or other tools without credentials:
curl "https://s3.amazonaws.com/examplebucket/test.txt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=access_key_id/20130721/us-east-1/s3/aws4_request&X-Amz-Date=20130721T201207Z&X-Amz-Expires=86400&X-Amz-SignedHeaders=host&X-Amz-Signature=<signature-value>" -O /tmp/test.txt
pub fn presign_read_with(
&self,
path: &str,
expire: Duration,
) -> OperatorFuture<(OpRead, Duration), PresignedRequest, impl Future<Output = Result<PresignedRequest, Error>>>
pub fn presign_read_with( &self, path: &str, expire: Duration, ) -> OperatorFuture<(OpRead, Duration), PresignedRequest, impl Future<Output = Result<PresignedRequest, Error>>>
Presign an operation for read with extra options.
§Options
§override_content_disposition
Override the content-disposition
header returned by storage services.
use std::time::Duration;
use anyhow::Result;
use opendal::Operator;
async fn test(op: Operator) -> Result<()> {
let signed_req = op
.presign_read_with("test.txt", Duration::from_secs(3600))
.override_content_disposition("attachment; filename=\"othertext.txt\"")
.await?;
Ok(())
}
§override_cache_control
Override the cache-control
header returned by storage services.
use std::time::Duration;
use anyhow::Result;
use opendal::Operator;
async fn test(op: Operator) -> Result<()> {
let signed_req = op
.presign_read_with("test.txt", Duration::from_secs(3600))
.override_cache_control("no-store")
.await?;
Ok(())
}
§override_content_type
Override the content-type
header returned by storage services.
use std::time::Duration;
use anyhow::Result;
use futures::io;
use opendal::Operator;
async fn test(op: Operator) -> Result<()> {
let signed_req = op
.presign_read_with("test.txt", Duration::from_secs(3600))
.override_content_type("text/plain")
.await?;
Ok(())
}
pub async fn presign_write(
&self,
path: &str,
expire: Duration,
) -> Result<PresignedRequest, Error>
pub async fn presign_write( &self, path: &str, expire: Duration, ) -> Result<PresignedRequest, Error>
Presign an operation for write.
§Notes
§Extra Options
presign_write
is a wrapper of [Self::presign_write_with
] without any options. To use
extra options like content_type
, please use [Self::presign_write_with
] instead.
§Example
use std::time::Duration;
use anyhow::Result;
use opendal::Operator;
async fn test(op: Operator) -> Result<()> {
let signed_req = op
.presign_write("test.txt", Duration::from_secs(3600))
.await?;
Ok(())
}
signed_req.method()
:PUT
signed_req.uri()
:https://s3.amazonaws.com/examplebucket/test.txt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=access_key_id/20130721/us-east-1/s3/aws4_request&X-Amz-Date=20130721T201207Z&X-Amz-Expires=86400&X-Amz-SignedHeaders=host&X-Amz-Signature=<signature-value>
signed_req.headers()
:{ "host": "s3.amazonaws.com" }
We can upload file as this file via curl
or other tools without credential:
curl -X PUT "https://s3.amazonaws.com/examplebucket/test.txt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=access_key_id/20130721/us-east-1/s3/aws4_request&X-Amz-Date=20130721T201207Z&X-Amz-Expires=86400&X-Amz-SignedHeaders=host&X-Amz-Signature=<signature-value>" -d "Hello, World!"
pub fn presign_write_with(
&self,
path: &str,
expire: Duration,
) -> OperatorFuture<(OpWrite, Duration), PresignedRequest, impl Future<Output = Result<PresignedRequest, Error>>>
pub fn presign_write_with( &self, path: &str, expire: Duration, ) -> OperatorFuture<(OpWrite, Duration), PresignedRequest, impl Future<Output = Result<PresignedRequest, Error>>>
Presign an operation for write with extra options.
§Options
§content_type
Set the content-type
header returned by storage services.
use std::time::Duration;
use anyhow::Result;
use opendal::Operator;
async fn test(op: Operator) -> Result<()> {
let signed_req = op
.presign_write_with("test", Duration::from_secs(3600))
.content_type("text/csv")
.await?;
let req = http::Request::builder()
.method(signed_req.method())
.uri(signed_req.uri())
.body(())?;
Ok(())
}
§content_disposition
Set the content-disposition
header returned by storage services.
use std::time::Duration;
use anyhow::Result;
use opendal::Operator;
async fn test(op: Operator) -> Result<()> {
let signed_req = op
.presign_write_with("test", Duration::from_secs(3600))
.content_disposition("attachment; filename=\"cool.html\"")
.await?;
let req = http::Request::builder()
.method(signed_req.method())
.uri(signed_req.uri())
.body(())?;
Ok(())
}
§cache_control
Set the cache-control
header returned by storage services.
use std::time::Duration;
use anyhow::Result;
use opendal::Operator;
async fn test(op: Operator) -> Result<()> {
let signed_req = op
.presign_write_with("test", Duration::from_secs(3600))
.cache_control("no-store")
.await?;
let req = http::Request::builder()
.method(signed_req.method())
.uri(signed_req.uri())
.body(())?;
Ok(())
}
Trait Implementations§
Auto Trait Implementations§
impl Freeze for SpillOp
impl !RefUnwindSafe for SpillOp
impl Send for SpillOp
impl Sync for SpillOp
impl Unpin for SpillOp
impl !UnwindSafe for SpillOp
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
§impl<T> Downcast for Twhere
T: Any,
impl<T> Downcast for Twhere
T: Any,
§fn into_any(self: Box<T>) -> Box<dyn Any>
fn into_any(self: Box<T>) -> Box<dyn Any>
Box<dyn Trait>
(where Trait: Downcast
) to Box<dyn Any>
. Box<dyn Any>
can
then be further downcast
into Box<ConcreteType>
where ConcreteType
implements Trait
.§fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
Rc<Trait>
(where Trait: Downcast
) to Rc<Any>
. Rc<Any>
can then be
further downcast
into Rc<ConcreteType>
where ConcreteType
implements Trait
.§fn as_any(&self) -> &(dyn Any + 'static)
fn as_any(&self) -> &(dyn Any + 'static)
&Trait
(where Trait: Downcast
) to &Any
. This is needed since Rust cannot
generate &Any
’s vtable from &Trait
’s.§fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
&mut Trait
(where Trait: Downcast
) to &Any
. This is needed since Rust cannot
generate &mut Any
’s vtable from &mut Trait
’s.§impl<T> DowncastSync for T
impl<T> DowncastSync for T
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoEither for T
impl<T> IntoEither for T
source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
§impl<T> IntoResult<T> for T
impl<T> IntoResult<T> for T
type Err = Infallible
fn into_result(self) -> Result<T, <T as IntoResult<T>>::Err>
§impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
source§impl<M> MetricVecRelabelExt for M
impl<M> MetricVecRelabelExt for M
source§fn relabel(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level
.source§fn relabel_n(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
relabel_num: usize,
) -> RelabeledMetricVec<M>
fn relabel_n( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, relabel_num: usize, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n
.source§fn relabel_debug_1(
self,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel_debug_1( self, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n
with metric_level
set to
MetricLevel::Debug
and relabel_num
set to 1.§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<Source> Sculptor<HNil, HNil> for Source
impl<Source> Sculptor<HNil, HNil> for Source
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.