66 lines
1.9 KiB
Rust
66 lines
1.9 KiB
Rust
|
use core::pin::Pin;
|
||
|
use futures_core::stream::{FusedStream, Stream};
|
||
|
use futures_core::task::{Context, Poll};
|
||
|
use futures_sink::Sink;
|
||
|
use pin_project_lite::pin_project;
|
||
|
|
||
|
pin_project! {
|
||
|
/// Sink for the [`sink_map_err`](super::SinkExt::sink_map_err) method.
|
||
|
#[derive(Debug, Clone)]
|
||
|
#[must_use = "sinks do nothing unless polled"]
|
||
|
pub struct SinkMapErr<Si, F> {
|
||
|
#[pin]
|
||
|
sink: Si,
|
||
|
f: Option<F>,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
impl<Si, F> SinkMapErr<Si, F> {
|
||
|
pub(super) fn new(sink: Si, f: F) -> Self {
|
||
|
Self { sink, f: Some(f) }
|
||
|
}
|
||
|
|
||
|
delegate_access_inner!(sink, Si, ());
|
||
|
|
||
|
fn take_f(self: Pin<&mut Self>) -> F {
|
||
|
self.project().f.take().expect("polled MapErr after completion")
|
||
|
}
|
||
|
}
|
||
|
|
||
|
impl<Si, F, E, Item> Sink<Item> for SinkMapErr<Si, F>
|
||
|
where
|
||
|
Si: Sink<Item>,
|
||
|
F: FnOnce(Si::Error) -> E,
|
||
|
{
|
||
|
type Error = E;
|
||
|
|
||
|
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||
|
self.as_mut().project().sink.poll_ready(cx).map_err(|e| self.as_mut().take_f()(e))
|
||
|
}
|
||
|
|
||
|
fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
|
||
|
self.as_mut().project().sink.start_send(item).map_err(|e| self.as_mut().take_f()(e))
|
||
|
}
|
||
|
|
||
|
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||
|
self.as_mut().project().sink.poll_flush(cx).map_err(|e| self.as_mut().take_f()(e))
|
||
|
}
|
||
|
|
||
|
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||
|
self.as_mut().project().sink.poll_close(cx).map_err(|e| self.as_mut().take_f()(e))
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Forwarding impl of Stream from the underlying sink
|
||
|
impl<S: Stream, F> Stream for SinkMapErr<S, F> {
|
||
|
type Item = S::Item;
|
||
|
|
||
|
delegate_stream!(sink);
|
||
|
}
|
||
|
|
||
|
impl<S: FusedStream, F> FusedStream for SinkMapErr<S, F> {
|
||
|
fn is_terminated(&self) -> bool {
|
||
|
self.sink.is_terminated()
|
||
|
}
|
||
|
}
|