//! Asynchronous sinks. //! //! This module contains: //! //! - The [`Sink`] trait, which allows you to asynchronously write data. //! - The [`SinkExt`] trait, which provides adapters for chaining and composing //! sinks. use crate::future::{assert_future, Either}; use core::pin::Pin; use futures_core::future::Future; use futures_core::stream::{Stream, TryStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "compat")] use crate::compat::CompatSink; pub use futures_sink::Sink; mod close; pub use self::close::Close; mod drain; pub use self::drain::{drain, Drain}; mod fanout; pub use self::fanout::Fanout; mod feed; pub use self::feed::Feed; mod flush; pub use self::flush::Flush; mod err_into; pub use self::err_into::SinkErrInto; mod map_err; pub use self::map_err::SinkMapErr; mod send; pub use self::send::Send; mod send_all; pub use self::send_all::SendAll; mod unfold; pub use self::unfold::{unfold, Unfold}; mod with; pub use self::with::With; mod with_flat_map; pub use self::with_flat_map::WithFlatMap; #[cfg(feature = "alloc")] mod buffer; #[cfg(feature = "alloc")] pub use self::buffer::Buffer; impl SinkExt for T where T: Sink {} /// An extension trait for `Sink`s that provides a variety of convenient /// combinator functions. pub trait SinkExt: Sink { /// Composes a function *in front of* the sink. /// /// This adapter produces a new sink that passes each value through the /// given function `f` before sending it to `self`. /// /// To process each value, `f` produces a *future*, which is then polled to /// completion before passing its result down to the underlying sink. If the /// future produces an error, that error is returned by the new sink. /// /// Note that this function consumes the given sink, returning a wrapped /// version, much like `Iterator::map`. fn with(self, f: F) -> With where F: FnMut(U) -> Fut, Fut: Future>, E: From, Self: Sized, { assert_sink::(With::new(self, f)) } /// Composes a function *in front of* the sink. /// /// This adapter produces a new sink that passes each value through the /// given function `f` before sending it to `self`. /// /// To process each value, `f` produces a *stream*, of which each value /// is passed to the underlying sink. A new value will not be accepted until /// the stream has been drained /// /// Note that this function consumes the given sink, returning a wrapped /// version, much like `Iterator::flat_map`. /// /// # Examples /// /// ``` /// # futures::executor::block_on(async { /// use futures::channel::mpsc; /// use futures::sink::SinkExt; /// use futures::stream::{self, StreamExt}; /// /// let (tx, rx) = mpsc::channel(5); /// /// let mut tx = tx.with_flat_map(|x| { /// stream::iter(vec![Ok(42); x]) /// }); /// /// tx.send(5).await.unwrap(); /// drop(tx); /// let received: Vec = rx.collect().await; /// assert_eq!(received, vec![42, 42, 42, 42, 42]); /// # }); /// ``` fn with_flat_map(self, f: F) -> WithFlatMap where F: FnMut(U) -> St, St: Stream>, Self: Sized, { assert_sink::(WithFlatMap::new(self, f)) } /* fn with_map(self, f: F) -> WithMap where F: FnMut(U) -> Self::SinkItem, Self: Sized; fn with_filter(self, f: F) -> WithFilter where F: FnMut(Self::SinkItem) -> bool, Self: Sized; fn with_filter_map(self, f: F) -> WithFilterMap where F: FnMut(U) -> Option, Self: Sized; */ /// Transforms the error returned by the sink. fn sink_map_err(self, f: F) -> SinkMapErr where F: FnOnce(Self::Error) -> E, Self: Sized, { assert_sink::(SinkMapErr::new(self, f)) } /// Map this sink's error to a different error type using the `Into` trait. /// /// If wanting to map errors of a `Sink + Stream`, use `.sink_err_into().err_into()`. fn sink_err_into(self) -> err_into::SinkErrInto where Self: Sized, Self::Error: Into, { assert_sink::(SinkErrInto::new(self)) } /// Adds a fixed-size buffer to the current sink. /// /// The resulting sink will buffer up to `capacity` items when the /// underlying sink is unwilling to accept additional items. Calling `flush` /// on the buffered sink will attempt to both empty the buffer and complete /// processing on the underlying sink. /// /// Note that this function consumes the given sink, returning a wrapped /// version, much like `Iterator::map`. /// /// This method is only available when the `std` or `alloc` feature of this /// library is activated, and it is activated by default. #[cfg(feature = "alloc")] fn buffer(self, capacity: usize) -> Buffer where Self: Sized, { assert_sink::(Buffer::new(self, capacity)) } /// Close the sink. fn close(&mut self) -> Close<'_, Self, Item> where Self: Unpin, { assert_future::, _>(Close::new(self)) } /// Fanout items to multiple sinks. /// /// This adapter clones each incoming item and forwards it to both this as well as /// the other sink at the same time. fn fanout(self, other: Si) -> Fanout where Self: Sized, Item: Clone, Si: Sink, { assert_sink::(Fanout::new(self, other)) } /// Flush the sink, processing all pending items. /// /// This adapter is intended to be used when you want to stop sending to the sink /// until all current requests are processed. fn flush(&mut self) -> Flush<'_, Self, Item> where Self: Unpin, { assert_future::, _>(Flush::new(self)) } /// A future that completes after the given item has been fully processed /// into the sink, including flushing. /// /// Note that, **because of the flushing requirement, it is usually better /// to batch together items to send via `feed` or `send_all`, /// rather than flushing between each item.** fn send(&mut self, item: Item) -> Send<'_, Self, Item> where Self: Unpin, { assert_future::, _>(Send::new(self, item)) } /// A future that completes after the given item has been received /// by the sink. /// /// Unlike `send`, the returned future does not flush the sink. /// It is the caller's responsibility to ensure all pending items /// are processed, which can be done via `flush` or `close`. fn feed(&mut self, item: Item) -> Feed<'_, Self, Item> where Self: Unpin, { assert_future::, _>(Feed::new(self, item)) } /// A future that completes after the given stream has been fully processed /// into the sink, including flushing. /// /// This future will drive the stream to keep producing items until it is /// exhausted, sending each item to the sink. It will complete once both the /// stream is exhausted, the sink has received all items, and the sink has /// been flushed. Note that the sink is **not** closed. If the stream produces /// an error, that error will be returned by this future without flushing the sink. /// /// Doing `sink.send_all(stream)` is roughly equivalent to /// `stream.forward(sink)`. The returned future will exhaust all items from /// `stream` and send them to `self`. fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St> where St: TryStream + Stream + Unpin + ?Sized, // St: Stream> + Unpin + ?Sized, Self: Unpin, { // TODO: type mismatch resolving `::Item == std::result::Result>::Error>` // assert_future::, _>(SendAll::new(self, stream)) SendAll::new(self, stream) } /// Wrap this sink in an `Either` sink, making it the left-hand variant /// of that `Either`. /// /// This can be used in combination with the `right_sink` method to write `if` /// statements that evaluate to different streams in different branches. fn left_sink(self) -> Either where Si2: Sink, Self: Sized, { assert_sink::(Either::Left(self)) } /// Wrap this stream in an `Either` stream, making it the right-hand variant /// of that `Either`. /// /// This can be used in combination with the `left_sink` method to write `if` /// statements that evaluate to different streams in different branches. fn right_sink(self) -> Either where Si1: Sink, Self: Sized, { assert_sink::(Either::Right(self)) } /// Wraps a [`Sink`] into a sink compatible with libraries using /// futures 0.1 `Sink`. Requires the `compat` feature to be enabled. #[cfg(feature = "compat")] #[cfg_attr(docsrs, doc(cfg(feature = "compat")))] fn compat(self) -> CompatSink where Self: Sized + Unpin, { CompatSink::new(self) } /// A convenience method for calling [`Sink::poll_ready`] on [`Unpin`] /// sink types. fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll> where Self: Unpin, { Pin::new(self).poll_ready(cx) } /// A convenience method for calling [`Sink::start_send`] on [`Unpin`] /// sink types. fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error> where Self: Unpin, { Pin::new(self).start_send(item) } /// A convenience method for calling [`Sink::poll_flush`] on [`Unpin`] /// sink types. fn poll_flush_unpin(&mut self, cx: &mut Context<'_>) -> Poll> where Self: Unpin, { Pin::new(self).poll_flush(cx) } /// A convenience method for calling [`Sink::poll_close`] on [`Unpin`] /// sink types. fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll> where Self: Unpin, { Pin::new(self).poll_close(cx) } } // Just a helper function to ensure the sinks we're returning all have the // right implementations. pub(crate) fn assert_sink(sink: S) -> S where S: Sink, { sink }