1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
use futures::{Future, Sink, Stream, Poll, Async};
use super::*;

pub struct Transmit<I, C>
    where I: Clone + Send + Debug + 'static,
          C: Sink + Stream + 'static
{
    client: Option<Client<I, C>>,
    msg: Option<C::SinkItem>,
}

impl<I, C> Transmit<I, C>
    where I: Clone + Send + Debug + 'static,
          C: Sink + Stream + 'static
{
    #[doc(hidden)]
    pub fn new(client: Client<I, C>, msg: C::SinkItem) -> Transmit<I, C> {
        Transmit {
            client: Some(client),
            msg: Some(msg),
        }
    }

    pub fn into_inner(mut self) -> Option<Client<I, C>> {
        self.client.take()
    }
}

impl<I, C> Future for Transmit<I, C>
    where I: Clone + Send + Debug + 'static,
          C: Sink + Stream + 'static
{
    type Item = Client<I, C>;
    type Error = Client<I, C>;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        if let Some(msg) = self.msg.take() {
            let start_send = {
                let client = self.client.as_mut().expect("Polled after Async::Ready.");
                client.start_send(msg)
            };
            match start_send {
                Ok(AsyncSink::NotReady(msg)) => {
                    self.msg = Some(msg);
                    return Ok(Async::NotReady);
                }
                Ok(AsyncSink::Ready) => {}
                Err(()) => return Err(self.client.take().unwrap()),
            }
        }

        let poll_complete = {
            let client = self.client.as_mut().expect("Polled after Async::Ready.");
            client.poll_complete()
        };
        match poll_complete {
            Ok(Async::NotReady) => Ok(Async::NotReady),
            Ok(Async::Ready(())) => Ok(Async::Ready(self.client.take().unwrap())),
            Err(()) => Err(self.client.take().unwrap()),
        }
    }
}