1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
use futures::{future, Future}; use tokio_timer; use std::net::SocketAddr; use state::GridEnum; use kabuki::{Actor, ActorRef}; use std::fmt::Debug; use net::*; use utils::*; #[derive(Clone)] pub struct Handshake { grid: GridEnum, timeout: Milliseconds, timer: tokio_timer::Timer, nameserver: ActorRef<String, String, ()>, } impl Handshake { pub fn new<G>(grid: G, timeout: Milliseconds, timer: tokio_timer::Timer, nameserver: ActorRef<String, String, ()>) -> Handshake where G: Into<GridEnum> { Handshake { grid: grid.into(), timeout: timeout, timer: timer, nameserver: nameserver, } } fn transmit<I>(client: MsgClient<I>, msg: Msg) -> Box<Future<Item = MsgClient<I>, Error = ()>> where I: Clone + Send + Debug { Box::new(client.transmit(msg).map_err(|_| ())) } fn receive<I>(client: MsgClient<I>, timeout: Milliseconds, timer: tokio_timer::Timer) -> Box<Future<Item = (Msg, MsgClient<I>), Error = ()>> where I: Clone + Send + Debug { Box::new(client .receive() .with_hard_timeout(timeout.into(), &timer) .map_err(|_| ())) } fn rename_and_welcome(unnamed_client: MsgClient<SocketAddr>, desired_name: String, grid: GridEnum, timeout: Milliseconds, mut nameserver: ActorRef<String, String, ()>) -> Box<Future<Item = MsgClient<String>, Error = ()>> { let fut = nameserver .call(desired_name) .and_then(move |final_name| { let client = unnamed_client.rename(final_name); let welcome_msg = Msg::Welcome { name: client.id(), grid: grid, timeout_millis: Some(timeout), }; Self::transmit(client, welcome_msg) }); Box::new(fut) } } impl Actor for Handshake { type Request = MsgClient<SocketAddr>; type Response = (MsgClient<String>, ClientKind); type Error = (); type Future = Box<Future<Item = Self::Response, Error = Self::Error>>; fn call(&mut self, unnamed_client: Self::Request) -> Self::Future { let Handshake { grid, timeout, timer, nameserver, } = self.clone(); let version = Self::transmit(unnamed_client, Msg::version()); let registration_fn = move |unnamed_client| { Self::receive(unnamed_client, timeout, timer).and_then(move |(msg, unnamed_client)| -> Box<Future<Item = (MsgClient<String>, ClientKind), Error = ()>> { if let Msg::Register { desired_name, kind } = msg { Box::new(Self::rename_and_welcome(unnamed_client, desired_name, grid, timeout, nameserver) .map(move |client| (client, kind))) } else { Box::new(future::err(())) } }) }; Box::new(version.and_then(registration_fn)) } }