1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
use futures::{Future, Sink, Stream, Poll, Async}; use super::*; pub struct Transmit<I, C> where I: Clone + Send + Debug + 'static, C: Sink + Stream + 'static { client: Option<Client<I, C>>, msg: Option<C::SinkItem>, } impl<I, C> Transmit<I, C> where I: Clone + Send + Debug + 'static, C: Sink + Stream + 'static { #[doc(hidden)] pub fn new(client: Client<I, C>, msg: C::SinkItem) -> Transmit<I, C> { Transmit { client: Some(client), msg: Some(msg), } } pub fn into_inner(mut self) -> Option<Client<I, C>> { self.client.take() } } impl<I, C> Future for Transmit<I, C> where I: Clone + Send + Debug + 'static, C: Sink + Stream + 'static { type Item = Client<I, C>; type Error = Client<I, C>; fn poll(&mut self) -> Poll<Self::Item, Self::Error> { if let Some(msg) = self.msg.take() { let start_send = { let client = self.client.as_mut().expect("Polled after Async::Ready."); client.start_send(msg) }; match start_send { Ok(AsyncSink::NotReady(msg)) => { self.msg = Some(msg); return Ok(Async::NotReady); } Ok(AsyncSink::Ready) => {} Err(()) => return Err(self.client.take().unwrap()), } } let poll_complete = { let client = self.client.as_mut().expect("Polled after Async::Ready."); client.poll_complete() }; match poll_complete { Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::Ready(())) => Ok(Async::Ready(self.client.take().unwrap())), Err(()) => Err(self.client.take().unwrap()), } } }