345 lines
11 KiB
Rust
345 lines
11 KiB
Rust
//! Asynchronous sinks.
|
|
//!
|
|
//! This module contains:
|
|
//!
|
|
//! - The [`Sink`] trait, which allows you to asynchronously write data.
|
|
//! - The [`SinkExt`] trait, which provides adapters for chaining and composing
|
|
//! sinks.
|
|
|
|
use crate::future::{assert_future, Either};
|
|
use core::pin::Pin;
|
|
use futures_core::future::Future;
|
|
use futures_core::stream::{Stream, TryStream};
|
|
use futures_core::task::{Context, Poll};
|
|
|
|
#[cfg(feature = "compat")]
|
|
use crate::compat::CompatSink;
|
|
|
|
pub use futures_sink::Sink;
|
|
|
|
mod close;
|
|
pub use self::close::Close;
|
|
|
|
mod drain;
|
|
pub use self::drain::{drain, Drain};
|
|
|
|
mod fanout;
|
|
pub use self::fanout::Fanout;
|
|
|
|
mod feed;
|
|
pub use self::feed::Feed;
|
|
|
|
mod flush;
|
|
pub use self::flush::Flush;
|
|
|
|
mod err_into;
|
|
pub use self::err_into::SinkErrInto;
|
|
|
|
mod map_err;
|
|
pub use self::map_err::SinkMapErr;
|
|
|
|
mod send;
|
|
pub use self::send::Send;
|
|
|
|
mod send_all;
|
|
pub use self::send_all::SendAll;
|
|
|
|
mod unfold;
|
|
pub use self::unfold::{unfold, Unfold};
|
|
|
|
mod with;
|
|
pub use self::with::With;
|
|
|
|
mod with_flat_map;
|
|
pub use self::with_flat_map::WithFlatMap;
|
|
|
|
#[cfg(feature = "alloc")]
|
|
mod buffer;
|
|
#[cfg(feature = "alloc")]
|
|
pub use self::buffer::Buffer;
|
|
|
|
impl<T: ?Sized, Item> SinkExt<Item> for T where T: Sink<Item> {}
|
|
|
|
/// An extension trait for `Sink`s that provides a variety of convenient
|
|
/// combinator functions.
|
|
pub trait SinkExt<Item>: Sink<Item> {
|
|
/// Composes a function *in front of* the sink.
|
|
///
|
|
/// This adapter produces a new sink that passes each value through the
|
|
/// given function `f` before sending it to `self`.
|
|
///
|
|
/// To process each value, `f` produces a *future*, which is then polled to
|
|
/// completion before passing its result down to the underlying sink. If the
|
|
/// future produces an error, that error is returned by the new sink.
|
|
///
|
|
/// Note that this function consumes the given sink, returning a wrapped
|
|
/// version, much like `Iterator::map`.
|
|
fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F>
|
|
where
|
|
F: FnMut(U) -> Fut,
|
|
Fut: Future<Output = Result<Item, E>>,
|
|
E: From<Self::Error>,
|
|
Self: Sized,
|
|
{
|
|
assert_sink::<U, E, _>(With::new(self, f))
|
|
}
|
|
|
|
/// Composes a function *in front of* the sink.
|
|
///
|
|
/// This adapter produces a new sink that passes each value through the
|
|
/// given function `f` before sending it to `self`.
|
|
///
|
|
/// To process each value, `f` produces a *stream*, of which each value
|
|
/// is passed to the underlying sink. A new value will not be accepted until
|
|
/// the stream has been drained
|
|
///
|
|
/// Note that this function consumes the given sink, returning a wrapped
|
|
/// version, much like `Iterator::flat_map`.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # futures::executor::block_on(async {
|
|
/// use futures::channel::mpsc;
|
|
/// use futures::sink::SinkExt;
|
|
/// use futures::stream::{self, StreamExt};
|
|
///
|
|
/// let (tx, rx) = mpsc::channel(5);
|
|
///
|
|
/// let mut tx = tx.with_flat_map(|x| {
|
|
/// stream::iter(vec![Ok(42); x])
|
|
/// });
|
|
///
|
|
/// tx.send(5).await.unwrap();
|
|
/// drop(tx);
|
|
/// let received: Vec<i32> = rx.collect().await;
|
|
/// assert_eq!(received, vec![42, 42, 42, 42, 42]);
|
|
/// # });
|
|
/// ```
|
|
fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F>
|
|
where
|
|
F: FnMut(U) -> St,
|
|
St: Stream<Item = Result<Item, Self::Error>>,
|
|
Self: Sized,
|
|
{
|
|
assert_sink::<U, Self::Error, _>(WithFlatMap::new(self, f))
|
|
}
|
|
|
|
/*
|
|
fn with_map<U, F>(self, f: F) -> WithMap<Self, U, F>
|
|
where F: FnMut(U) -> Self::SinkItem,
|
|
Self: Sized;
|
|
|
|
fn with_filter<F>(self, f: F) -> WithFilter<Self, F>
|
|
where F: FnMut(Self::SinkItem) -> bool,
|
|
Self: Sized;
|
|
|
|
fn with_filter_map<U, F>(self, f: F) -> WithFilterMap<Self, U, F>
|
|
where F: FnMut(U) -> Option<Self::SinkItem>,
|
|
Self: Sized;
|
|
*/
|
|
|
|
/// Transforms the error returned by the sink.
|
|
fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
|
|
where
|
|
F: FnOnce(Self::Error) -> E,
|
|
Self: Sized,
|
|
{
|
|
assert_sink::<Item, E, _>(SinkMapErr::new(self, f))
|
|
}
|
|
|
|
/// Map this sink's error to a different error type using the `Into` trait.
|
|
///
|
|
/// If wanting to map errors of a `Sink + Stream`, use `.sink_err_into().err_into()`.
|
|
fn sink_err_into<E>(self) -> err_into::SinkErrInto<Self, Item, E>
|
|
where
|
|
Self: Sized,
|
|
Self::Error: Into<E>,
|
|
{
|
|
assert_sink::<Item, E, _>(SinkErrInto::new(self))
|
|
}
|
|
|
|
/// Adds a fixed-size buffer to the current sink.
|
|
///
|
|
/// The resulting sink will buffer up to `capacity` items when the
|
|
/// underlying sink is unwilling to accept additional items. Calling `flush`
|
|
/// on the buffered sink will attempt to both empty the buffer and complete
|
|
/// processing on the underlying sink.
|
|
///
|
|
/// Note that this function consumes the given sink, returning a wrapped
|
|
/// version, much like `Iterator::map`.
|
|
///
|
|
/// This method is only available when the `std` or `alloc` feature of this
|
|
/// library is activated, and it is activated by default.
|
|
#[cfg(feature = "alloc")]
|
|
fn buffer(self, capacity: usize) -> Buffer<Self, Item>
|
|
where
|
|
Self: Sized,
|
|
{
|
|
assert_sink::<Item, Self::Error, _>(Buffer::new(self, capacity))
|
|
}
|
|
|
|
/// Close the sink.
|
|
fn close(&mut self) -> Close<'_, Self, Item>
|
|
where
|
|
Self: Unpin,
|
|
{
|
|
assert_future::<Result<(), Self::Error>, _>(Close::new(self))
|
|
}
|
|
|
|
/// Fanout items to multiple sinks.
|
|
///
|
|
/// This adapter clones each incoming item and forwards it to both this as well as
|
|
/// the other sink at the same time.
|
|
fn fanout<Si>(self, other: Si) -> Fanout<Self, Si>
|
|
where
|
|
Self: Sized,
|
|
Item: Clone,
|
|
Si: Sink<Item, Error = Self::Error>,
|
|
{
|
|
assert_sink::<Item, Self::Error, _>(Fanout::new(self, other))
|
|
}
|
|
|
|
/// Flush the sink, processing all pending items.
|
|
///
|
|
/// This adapter is intended to be used when you want to stop sending to the sink
|
|
/// until all current requests are processed.
|
|
fn flush(&mut self) -> Flush<'_, Self, Item>
|
|
where
|
|
Self: Unpin,
|
|
{
|
|
assert_future::<Result<(), Self::Error>, _>(Flush::new(self))
|
|
}
|
|
|
|
/// A future that completes after the given item has been fully processed
|
|
/// into the sink, including flushing.
|
|
///
|
|
/// Note that, **because of the flushing requirement, it is usually better
|
|
/// to batch together items to send via `feed` or `send_all`,
|
|
/// rather than flushing between each item.**
|
|
fn send(&mut self, item: Item) -> Send<'_, Self, Item>
|
|
where
|
|
Self: Unpin,
|
|
{
|
|
assert_future::<Result<(), Self::Error>, _>(Send::new(self, item))
|
|
}
|
|
|
|
/// A future that completes after the given item has been received
|
|
/// by the sink.
|
|
///
|
|
/// Unlike `send`, the returned future does not flush the sink.
|
|
/// It is the caller's responsibility to ensure all pending items
|
|
/// are processed, which can be done via `flush` or `close`.
|
|
fn feed(&mut self, item: Item) -> Feed<'_, Self, Item>
|
|
where
|
|
Self: Unpin,
|
|
{
|
|
assert_future::<Result<(), Self::Error>, _>(Feed::new(self, item))
|
|
}
|
|
|
|
/// A future that completes after the given stream has been fully processed
|
|
/// into the sink, including flushing.
|
|
///
|
|
/// This future will drive the stream to keep producing items until it is
|
|
/// exhausted, sending each item to the sink. It will complete once both the
|
|
/// stream is exhausted, the sink has received all items, and the sink has
|
|
/// been flushed. Note that the sink is **not** closed. If the stream produces
|
|
/// an error, that error will be returned by this future without flushing the sink.
|
|
///
|
|
/// Doing `sink.send_all(stream)` is roughly equivalent to
|
|
/// `stream.forward(sink)`. The returned future will exhaust all items from
|
|
/// `stream` and send them to `self`.
|
|
fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
|
|
where
|
|
St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized,
|
|
// St: Stream<Item = Result<Item, Self::Error>> + Unpin + ?Sized,
|
|
Self: Unpin,
|
|
{
|
|
// TODO: type mismatch resolving `<St as Stream>::Item == std::result::Result<Item, <Self as futures_sink::Sink<Item>>::Error>`
|
|
// assert_future::<Result<(), Self::Error>, _>(SendAll::new(self, stream))
|
|
SendAll::new(self, stream)
|
|
}
|
|
|
|
/// Wrap this sink in an `Either` sink, making it the left-hand variant
|
|
/// of that `Either`.
|
|
///
|
|
/// This can be used in combination with the `right_sink` method to write `if`
|
|
/// statements that evaluate to different streams in different branches.
|
|
fn left_sink<Si2>(self) -> Either<Self, Si2>
|
|
where
|
|
Si2: Sink<Item, Error = Self::Error>,
|
|
Self: Sized,
|
|
{
|
|
assert_sink::<Item, Self::Error, _>(Either::Left(self))
|
|
}
|
|
|
|
/// Wrap this stream in an `Either` stream, making it the right-hand variant
|
|
/// of that `Either`.
|
|
///
|
|
/// This can be used in combination with the `left_sink` method to write `if`
|
|
/// statements that evaluate to different streams in different branches.
|
|
fn right_sink<Si1>(self) -> Either<Si1, Self>
|
|
where
|
|
Si1: Sink<Item, Error = Self::Error>,
|
|
Self: Sized,
|
|
{
|
|
assert_sink::<Item, Self::Error, _>(Either::Right(self))
|
|
}
|
|
|
|
/// Wraps a [`Sink`] into a sink compatible with libraries using
|
|
/// futures 0.1 `Sink`. Requires the `compat` feature to be enabled.
|
|
#[cfg(feature = "compat")]
|
|
#[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
|
|
fn compat(self) -> CompatSink<Self, Item>
|
|
where
|
|
Self: Sized + Unpin,
|
|
{
|
|
CompatSink::new(self)
|
|
}
|
|
|
|
/// A convenience method for calling [`Sink::poll_ready`] on [`Unpin`]
|
|
/// sink types.
|
|
fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
|
|
where
|
|
Self: Unpin,
|
|
{
|
|
Pin::new(self).poll_ready(cx)
|
|
}
|
|
|
|
/// A convenience method for calling [`Sink::start_send`] on [`Unpin`]
|
|
/// sink types.
|
|
fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error>
|
|
where
|
|
Self: Unpin,
|
|
{
|
|
Pin::new(self).start_send(item)
|
|
}
|
|
|
|
/// A convenience method for calling [`Sink::poll_flush`] on [`Unpin`]
|
|
/// sink types.
|
|
fn poll_flush_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
|
|
where
|
|
Self: Unpin,
|
|
{
|
|
Pin::new(self).poll_flush(cx)
|
|
}
|
|
|
|
/// A convenience method for calling [`Sink::poll_close`] on [`Unpin`]
|
|
/// sink types.
|
|
fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
|
|
where
|
|
Self: Unpin,
|
|
{
|
|
Pin::new(self).poll_close(cx)
|
|
}
|
|
}
|
|
|
|
// Just a helper function to ensure the sinks we're returning all have the
|
|
// right implementations.
|
|
pub(crate) fn assert_sink<T, E, S>(sink: S) -> S
|
|
where
|
|
S: Sink<T, Error = E>,
|
|
{
|
|
sink
|
|
}
|