From 8397d8c3b4beab67bad7a33b28e72f10ea335635 Mon Sep 17 00:00:00 2001 From: Neil Tallim Date: Tue, 5 Sep 2023 10:21:31 -0600 Subject: [PATCH 1/7] Reworked to use cfg_if, rather than just cfg, since nix doesn't export symbols when being built for Windows, even as stubs --- Cargo.toml | 1 + src/stream/tcp.rs | 41 ++++++++++++++++++++++++++++------------- src/stream/udp.rs | 42 ++++++++++++++++++++++++++++-------------- 3 files changed, 57 insertions(+), 27 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9a096fb..b82b4b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ categories = ["network-utilities"] readme = "README.md" [dependencies] +cfg-if = "1.0" chrono = "0.4" clap = "~2.33.3" core_affinity = "0.5" diff --git a/src/stream/tcp.rs b/src/stream/tcp.rs index 721ed4a..c1f0bc2 100644 --- a/src/stream/tcp.rs +++ b/src/stream/tcp.rs @@ -18,9 +18,12 @@ * along with rperf. If not, see . */ -extern crate nix; - -use nix::sys::socket::{setsockopt, sockopt::RcvBuf, sockopt::SndBuf}; +cfg_if::cfg_if! { + if #[cfg(unix)] { //NOTE: features unsupported on Windows + extern crate nix; + use nix::sys::socket::{setsockopt, sockopt::RcvBuf, sockopt::SndBuf}; + } +} use crate::protocol::results::{IntervalResult, TcpReceiveResult, TcpSendResult, get_unix_timestamp}; @@ -65,9 +68,13 @@ impl TcpTestDefinition { pub mod receiver { + cfg_if::cfg_if! { + if #[cfg(unix)] { //NOTE: features unsupported on Windows + use std::os::unix::io::AsRawFd; + } + } use std::io::Read; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; - use std::os::unix::io::AsRawFd; use std::sync::{Mutex}; use std::time::{Duration, Instant}; @@ -255,10 +262,12 @@ pub mod receiver { Ok(_) => { if buffer == self.test_definition.test_id { log::debug!("validated TCP stream {} connection from {}", self.stream_idx, address); - if !cfg!(windows) { //NOTE: features unsupported on Windows - if self.receive_buffer != 0 { - log::debug!("setting receive-buffer to {}...", self.receive_buffer); - super::setsockopt(stream.as_raw_fd(), super::RcvBuf, &self.receive_buffer)?; + cfg_if::cfg_if! { + if #[cfg(unix)] { //NOTE: features unsupported on Windows + if self.receive_buffer != 0 { + log::debug!("setting receive-buffer to {}...", self.receive_buffer); + super::setsockopt(stream.as_raw_fd(), super::RcvBuf, &self.receive_buffer)?; + } } } @@ -411,9 +420,13 @@ pub mod receiver { pub mod sender { + cfg_if::cfg_if! { + if #[cfg(unix)] { //NOTE: features unsupported on Windows + use std::os::unix::io::AsRawFd; + } + } use std::io::Write; use std::net::{IpAddr, SocketAddr}; - use std::os::unix::io::AsRawFd; use std::time::{Duration, Instant}; use mio::net::TcpStream; @@ -486,10 +499,12 @@ pub mod sender { log::debug!("setting no-delay..."); stream.set_nodelay(true)?; } - if !cfg!(windows) { //NOTE: features unsupported on Windows - if self.send_buffer != 0 { - log::debug!("setting send-buffer to {}...", self.send_buffer); - super::setsockopt(stream.as_raw_fd(), super::SndBuf, &self.send_buffer)?; + cfg_if::cfg_if! { + if #[cfg(unix)] { //NOTE: features unsupported on Windows + if self.send_buffer != 0 { + log::debug!("setting send-buffer to {}...", self.send_buffer); + super::setsockopt(stream.as_raw_fd(), super::SndBuf, &self.send_buffer)?; + } } } Ok(stream) diff --git a/src/stream/udp.rs b/src/stream/udp.rs index 80404bd..c8c144c 100644 --- a/src/stream/udp.rs +++ b/src/stream/udp.rs @@ -17,13 +17,15 @@ * You should have received a copy of the GNU General Public License * along with rperf. If not, see . */ - -extern crate log; -extern crate nix; use std::error::Error; -use nix::sys::socket::{setsockopt, sockopt::RcvBuf, sockopt::SndBuf}; +cfg_if::cfg_if! { + if #[cfg(unix)] { //NOTE: features unsupported on Windows + extern crate nix; + use nix::sys::socket::{setsockopt, sockopt::RcvBuf, sockopt::SndBuf}; + } +} use crate::protocol::results::{IntervalResult, UdpReceiveResult, UdpSendResult, get_unix_timestamp}; @@ -67,9 +69,13 @@ impl UdpTestDefinition { } pub mod receiver { + cfg_if::cfg_if! { + if #[cfg(unix)] { //NOTE: features unsupported on Windows + use std::os::unix::io::AsRawFd; + } + } use std::convert::TryInto; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; - use std::os::unix::io::AsRawFd; use std::sync::{Mutex}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; @@ -203,10 +209,12 @@ pub mod receiver { log::debug!("binding UDP receive socket for stream {}...", stream_idx); let socket:UdpSocket = port_pool.bind(peer_ip).expect(format!("failed to bind UDP socket").as_str()); socket.set_read_timeout(Some(READ_TIMEOUT))?; - if !cfg!(windows) { //NOTE: features unsupported on Windows - if *receive_buffer != 0 { - log::debug!("setting receive-buffer to {}...", receive_buffer); - super::setsockopt(socket.as_raw_fd(), super::RcvBuf, receive_buffer)?; + cfg_if::cfg_if! { + if #[cfg(unix)] { //NOTE: features unsupported on Windows + if *receive_buffer != 0 { + log::debug!("setting receive-buffer to {}...", receive_buffer); + super::setsockopt(socket.as_raw_fd(), super::RcvBuf, receive_buffer)?; + } } } log::debug!("bound UDP receive socket for stream {}: {}", stream_idx, socket.local_addr()?); @@ -435,8 +443,12 @@ pub mod receiver { pub mod sender { + cfg_if::cfg_if! { + if #[cfg(unix)] { //NOTE: features unsupported on Windows + use std::os::unix::io::AsRawFd; + } + } use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; - use std::os::unix::io::AsRawFd; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use std::net::UdpSocket; @@ -469,10 +481,12 @@ pub mod sender { IpAddr::V4(_) => UdpSocket::bind(&SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), *port)).expect(format!("failed to bind UDP socket, port {}", port).as_str()), }; socket.set_write_timeout(Some(WRITE_TIMEOUT))?; - if !cfg!(windows) { //NOTE: features unsupported on Windows - if *send_buffer != 0 { - log::debug!("setting send-buffer to {}...", send_buffer); - super::setsockopt(socket.as_raw_fd(), super::SndBuf, send_buffer)?; + cfg_if::cfg_if! { + if #[cfg(unix)] { //NOTE: features unsupported on Windows + if *send_buffer != 0 { + log::debug!("setting send-buffer to {}...", send_buffer); + super::setsockopt(socket.as_raw_fd(), super::SndBuf, send_buffer)?; + } } } socket.connect(socket_addr_receiver)?; From 5e8f86566bc00a5b5e4f60056a277d890f1f68d9 Mon Sep 17 00:00:00 2001 From: Neil Tallim Date: Tue, 5 Sep 2023 16:39:27 -0600 Subject: [PATCH 2/7] Removed mio as a dependency, opting for Socket2 instead --- Cargo.toml | 2 +- src/client.rs | 20 +-- src/protocol/communication.rs | 141 +++++++++------------ src/server.rs | 111 ++++++++--------- src/stream/tcp.rs | 225 ++++++++++++++-------------------- 5 files changed, 206 insertions(+), 293 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b82b4b9..78eba6f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,11 +18,11 @@ core_affinity = "0.5" ctrlc = "3.1" env_logger = "0.8" log = {version = "0.4", features = ["std"]} -mio = "0.6" nix = "0.20" serde = {version = "1.0", features = ["derive"]} serde_json = "1.0" simple-error = "0.2" +socket2 = "0.5.3" uuid = {version = "0.8", features = ["v4"]} #configuration for cargo-deb diff --git a/src/client.rs b/src/client.rs index 797c540..45dc171 100644 --- a/src/client.rs +++ b/src/client.rs @@ -27,7 +27,8 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use clap::ArgMatches; -use mio::net::{TcpStream}; +use std::net::{TcpStream}; +use socket2::{SockRef, TcpKeepalive}; use crate::protocol::communication::{receive, send, KEEPALIVE_DURATION}; @@ -62,18 +63,19 @@ fn connect_to_server(address:&str, port:&u16) -> BoxResult { if server_addr.is_none() { return Err(Box::new(simple_error::simple_error!("unable to resolve {}", address))); } - let raw_stream = match std::net::TcpStream::connect_timeout(&server_addr.unwrap(), CONNECT_TIMEOUT) { + + let stream = match TcpStream::connect_timeout(&server_addr.unwrap(), CONNECT_TIMEOUT) { Ok(s) => s, Err(e) => return Err(Box::new(simple_error::simple_error!("unable to connect: {}", e))), }; - let stream = match TcpStream::from_stream(raw_stream) { - Ok(s) => s, - Err(e) => return Err(Box::new(simple_error::simple_error!("unable to prepare TCP control-channel: {}", e))), - }; - log::info!("connected to server"); - + log::debug!("connected TCP control-channel to {}", destination); stream.set_nodelay(true).expect("cannot disable Nagle's algorithm"); - stream.set_keepalive(Some(KEEPALIVE_DURATION)).expect("unable to set TCP keepalive"); + + let keepalive_parameters = TcpKeepalive::new().with_time(KEEPALIVE_DURATION); + let raw_socket = SockRef::from(&stream); + raw_socket.set_tcp_keepalive(&keepalive_parameters).expect("unable to set TCP keepalive"); + + log::info!("connected to server"); Ok(stream) } diff --git a/src/protocol/communication.rs b/src/protocol/communication.rs index 5eaae27..472ffe4 100644 --- a/src/protocol/communication.rs +++ b/src/protocol/communication.rs @@ -21,8 +21,7 @@ use std::io::{self, Read, Write}; use std::time::Duration; -use mio::{Events, Ready, Poll, PollOpt, Token}; -use mio::net::{TcpStream}; +use std::net::{TcpStream}; use std::error::Error; type BoxResult = Result>; @@ -48,53 +47,38 @@ pub fn send(stream:&mut TcpStream, message:&serde_json::Value) -> BoxResult<()> /// receives the length-count of a pending message over a client-server communications stream fn receive_length(stream:&mut TcpStream, alive_check:fn() -> bool, results_handler:&mut dyn FnMut() -> BoxResult<()>) -> BoxResult { let mut cloned_stream = stream.try_clone()?; - - let mio_token = Token(0); - let poll = Poll::new()?; - poll.register( - &cloned_stream, - mio_token, - Ready::readable(), - PollOpt::edge(), - )?; - let mut events = Events::with_capacity(1); //only interacting with one stream + cloned_stream.set_read_timeout(Some(POLL_TIMEOUT)).expect("unable to set TCP read-timeout"); let mut length_bytes_read = 0; let mut length_spec:[u8; 2] = [0; 2]; while alive_check() { //waiting to find out how long the next message is results_handler()?; //send any outstanding results between cycles - poll.poll(&mut events, Some(POLL_TIMEOUT))?; - for event in events.iter() { - match event.token() { - _ => loop { - match cloned_stream.read(&mut length_spec[length_bytes_read..]) { - Ok(size) => { - if size == 0 { - if alive_check() { - return Err(Box::new(simple_error::simple_error!("connection lost"))); - } else { //shutting down; a disconnect is expected - return Err(Box::new(simple_error::simple_error!("local shutdown requested"))); - } - } - - length_bytes_read += size; - if length_bytes_read == 2 { - let length = u16::from_be_bytes(length_spec); - log::debug!("received length-spec of {} from {}", length, stream.peer_addr()?); - return Ok(length); - } else { - log::debug!("received partial length-spec from {}", stream.peer_addr()?); - } - }, - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { //nothing left to process - break; - }, - Err(e) => { - return Err(Box::new(e)); - }, + + match cloned_stream.read(&mut length_spec[length_bytes_read..]) { + Ok(size) => { + if size == 0 { + if alive_check() { + return Err(Box::new(simple_error::simple_error!("connection lost"))); + } else { //shutting down; a disconnect is expected + return Err(Box::new(simple_error::simple_error!("local shutdown requested"))); } - }, - } + } + + length_bytes_read += size; + if length_bytes_read == 2 { + let length = u16::from_be_bytes(length_spec); + log::debug!("received length-spec of {} from {}", length, stream.peer_addr()?); + return Ok(length); + } else { + log::debug!("received partial length-spec from {}", stream.peer_addr()?); + } + }, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock || e.kind() == std::io::ErrorKind::TimedOut => { + //nothing available to process + }, + Err(e) => { + return Err(Box::new(e)); + }, } } Err(Box::new(simple_error::simple_error!("system shutting down"))) @@ -102,59 +86,44 @@ fn receive_length(stream:&mut TcpStream, alive_check:fn() -> bool, results_handl /// receives the data-value of a pending message over a client-server communications stream fn receive_payload(stream:&mut TcpStream, alive_check:fn() -> bool, results_handler:&mut dyn FnMut() -> BoxResult<()>, length:u16) -> BoxResult { let mut cloned_stream = stream.try_clone()?; - - let mio_token = Token(0); - let poll = Poll::new()?; - poll.register( - &cloned_stream, - mio_token, - Ready::readable(), - PollOpt::edge(), - )?; - let mut events = Events::with_capacity(1); //only interacting with one stream + cloned_stream.set_read_timeout(Some(POLL_TIMEOUT)).expect("unable to set TCP read-timeout"); let mut bytes_read = 0; let mut buffer = vec![0_u8; length.into()]; while alive_check() { //waiting to receive the payload results_handler()?; //send any outstanding results between cycles - poll.poll(&mut events, Some(POLL_TIMEOUT))?; - for event in events.iter() { - match event.token() { - _ => loop { - match cloned_stream.read(&mut buffer[bytes_read..]) { - Ok(size) => { - if size == 0 { - if alive_check() { - return Err(Box::new(simple_error::simple_error!("connection lost"))); - } else { //shutting down; a disconnect is expected - return Err(Box::new(simple_error::simple_error!("local shutdown requested"))); - } - } - - bytes_read += size; - if bytes_read == length as usize { - match serde_json::from_slice(&buffer) { - Ok(v) => { - log::debug!("received {:?} from {}", v, stream.peer_addr()?); - return Ok(v); - }, - Err(e) => { - return Err(Box::new(e)); - }, - } - } else { - log::debug!("received partial payload from {}", stream.peer_addr()?); - } - }, - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { //nothing left to process - break; + + match cloned_stream.read(&mut buffer[bytes_read..]) { + Ok(size) => { + if size == 0 { + if alive_check() { + return Err(Box::new(simple_error::simple_error!("connection lost"))); + } else { //shutting down; a disconnect is expected + return Err(Box::new(simple_error::simple_error!("local shutdown requested"))); + } + } + + bytes_read += size; + if bytes_read == length as usize { + match serde_json::from_slice(&buffer) { + Ok(v) => { + log::debug!("received {:?} from {}", v, stream.peer_addr()?); + return Ok(v); }, Err(e) => { return Err(Box::new(e)); }, } - }, - } + } else { + log::debug!("received partial payload from {}", stream.peer_addr()?); + } + }, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock || e.kind() == std::io::ErrorKind::TimedOut => { + //nothing available to process + }, + Err(e) => { + return Err(Box::new(e)); + }, } } Err(Box::new(simple_error::simple_error!("system shutting down"))) diff --git a/src/server.rs b/src/server.rs index 7256226..bc61da5 100644 --- a/src/server.rs +++ b/src/server.rs @@ -29,8 +29,8 @@ use std::time::{Duration}; use clap::ArgMatches; -use mio::net::{TcpListener, TcpStream}; -use mio::{Events, Ready, Poll, PollOpt, Token}; +use std::net::{TcpListener, TcpStream}; +use socket2::{SockRef, TcpKeepalive}; use crate::protocol::communication::{receive, send, KEEPALIVE_DURATION}; @@ -246,14 +246,14 @@ fn handle_client( log::debug!("[{}] stopping any still-in-progress streams", &peer_addr); for ps in parallel_streams.iter_mut() { - let mut stream = match (*ps).lock() { + let mut test_stream = match (*ps).lock() { Ok(guard) => guard, Err(poisoned) => { log::error!("[{}] a stream-handler was poisoned; this indicates some sort of logic error", &peer_addr); poisoned.into_inner() }, }; - stream.stop(); + test_stream.stop(); } log::debug!("[{}] waiting for all streams to end", &peer_addr); for jh in parallel_streams_joinhandles { @@ -301,72 +301,59 @@ pub fn serve(args:ArgMatches) -> BoxResult<()> { //start listening for connections let port:u16 = args.value_of("port").unwrap().parse()?; - let mut listener:TcpListener; + let listener:TcpListener; if args.is_present("version6") { listener = TcpListener::bind(&SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port)).expect(format!("failed to bind TCP socket, port {}", port).as_str()); } else { listener = TcpListener::bind(&SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port)).expect(format!("failed to bind TCP socket, port {}", port).as_str()); } + listener.set_nonblocking(true).expect("unable to make TCP server non-blocking"); log::info!("server listening on {}", listener.local_addr()?); - let mio_token = Token(0); - let poll = Poll::new()?; - poll.register( - &mut listener, - mio_token, - Ready::readable(), - PollOpt::edge(), - )?; - let mut events = Events::with_capacity(32); - while is_alive() { - poll.poll(&mut events, Some(POLL_TIMEOUT))?; - for event in events.iter() { - match event.token() { - _ => loop { - match listener.accept() { - Ok((mut stream, address)) => { - log::info!("connection from {}", address); - - stream.set_nodelay(true).expect("cannot disable Nagle's algorithm"); - stream.set_keepalive(Some(KEEPALIVE_DURATION)).expect("unable to set TCP keepalive"); - - let client_count = CLIENTS.fetch_add(1, Ordering::Relaxed) + 1; - if client_limit > 0 && client_count > client_limit { - log::warn!("client-limit ({}) reached; disconnecting {}...", client_limit, address.to_string()); - stream.shutdown(Shutdown::Both).unwrap_or_default(); - CLIENTS.fetch_sub(1, Ordering::Relaxed); - } else { - let c_cam = cpu_affinity_manager.clone(); - let c_tcp_port_pool = tcp_port_pool.clone(); - let c_udp_port_pool = udp_port_pool.clone(); - let thread_builder = thread::Builder::new() - .name(address.to_string().into()); - thread_builder.spawn(move || { - //ensure the client is accounted-for even if the handler panics - let _client_thread_monitor = ClientThreadMonitor{ - client_address: address.to_string(), - }; - - match handle_client(&mut stream, c_cam, c_tcp_port_pool, c_udp_port_pool) { - Ok(_) => (), - Err(e) => log::error!("error in client-handler: {}", e), - } - - //in the event of panic, this will happen when the stream is dropped - stream.shutdown(Shutdown::Both).unwrap_or_default(); - })?; - } - }, - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { //nothing to do - break; - }, - Err(e) => { - return Err(Box::new(e)); - }, - } - }, - } + match listener.accept() { + Ok((mut stream, address)) => { + log::info!("connection from {}", address); + + stream.set_nodelay(true).expect("cannot disable Nagle's algorithm"); + + let keepalive_parameters = TcpKeepalive::new().with_time(KEEPALIVE_DURATION); + let raw_socket = SockRef::from(&stream); + raw_socket.set_tcp_keepalive(&keepalive_parameters).expect("unable to set TCP keepalive"); + + let client_count = CLIENTS.fetch_add(1, Ordering::Relaxed) + 1; + if client_limit > 0 && client_count > client_limit { + log::warn!("client-limit ({}) reached; disconnecting {}...", client_limit, address.to_string()); + stream.shutdown(Shutdown::Both).unwrap_or_default(); + CLIENTS.fetch_sub(1, Ordering::Relaxed); + } else { + let c_cam = cpu_affinity_manager.clone(); + let c_tcp_port_pool = tcp_port_pool.clone(); + let c_udp_port_pool = udp_port_pool.clone(); + let thread_builder = thread::Builder::new() + .name(address.to_string().into()); + thread_builder.spawn(move || { + //ensure the client is accounted-for even if the handler panics + let _client_thread_monitor = ClientThreadMonitor{ + client_address: address.to_string(), + }; + + match handle_client(&mut stream, c_cam, c_tcp_port_pool, c_udp_port_pool) { + Ok(_) => (), + Err(e) => log::error!("error in client-handler: {}", e), + } + + //in the event of panic, this will happen when the stream is dropped + stream.shutdown(Shutdown::Both).unwrap_or_default(); + })?; + } + }, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { //no pending clients + thread::sleep(POLL_TIMEOUT); + }, + Err(e) => { + return Err(Box::new(e)); + }, } } diff --git a/src/stream/tcp.rs b/src/stream/tcp.rs index c1f0bc2..94cab76 100644 --- a/src/stream/tcp.rs +++ b/src/stream/tcp.rs @@ -30,8 +30,10 @@ use crate::protocol::results::{IntervalResult, TcpReceiveResult, TcpSendResult, use super::{INTERVAL, TestStream, parse_port_spec}; use std::error::Error; +use std::time::Duration; type BoxResult = Result>; +const KEEPALIVE_DURATION:Duration = Duration::from_secs(5); pub const TEST_HEADER_SIZE:usize = 16; #[derive(Clone)] @@ -68,18 +70,19 @@ impl TcpTestDefinition { pub mod receiver { + use crate::stream::tcp::KEEPALIVE_DURATION; cfg_if::cfg_if! { if #[cfg(unix)] { //NOTE: features unsupported on Windows use std::os::unix::io::AsRawFd; } } use std::io::Read; - use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpListener, TcpStream}; use std::sync::{Mutex}; + use std::thread; use std::time::{Duration, Instant}; - use mio::net::{TcpListener, TcpStream}; - use mio::{Events, Ready, Poll, PollOpt, Token}; + use socket2::{SockRef, TcpKeepalive}; const POLL_TIMEOUT:Duration = Duration::from_millis(250); const RECEIVE_TIMEOUT:Duration = Duration::from_secs(3); @@ -186,8 +189,6 @@ pub mod receiver { listener: Option, stream: Option, - mio_poll_token: Token, - mio_poll: Poll, receive_buffer: usize, } @@ -195,11 +196,9 @@ pub mod receiver { pub fn new(test_definition:super::TcpTestDefinition, stream_idx:&u8, port_pool:&mut TcpPortPool, peer_ip:&IpAddr, receive_buffer:&usize) -> super::BoxResult { log::debug!("binding TCP listener for stream {}...", stream_idx); let listener:TcpListener = port_pool.bind(peer_ip).expect(format!("failed to bind TCP socket").as_str()); + listener.set_nonblocking(true).expect("unable to make TCP socket non-blocking"); log::debug!("bound TCP listener for stream {}: {}", stream_idx, listener.local_addr()?); - let mio_poll_token = Token(0); - let mio_poll = Poll::new()?; - Ok(TcpReceiver{ active: true, test_definition: test_definition, @@ -207,8 +206,6 @@ pub mod receiver { listener: Some(listener), stream: None, - mio_poll_token: mio_poll_token, - mio_poll: mio_poll, receive_buffer: receive_buffer.to_owned(), }) @@ -218,15 +215,6 @@ pub mod receiver { log::debug!("preparing to receive TCP stream {} connection...", self.stream_idx); let listener = self.listener.as_mut().unwrap(); - let mio_token = Token(0); - let poll = Poll::new()?; - poll.register( - listener, - mio_token, - Ready::readable(), - PollOpt::edge(), - )?; - let mut events = Events::with_capacity(1); let start = Instant::now(); @@ -235,74 +223,54 @@ pub mod receiver { return Err(Box::new(simple_error::simple_error!("TCP listening for stream {} timed out", self.stream_idx))); } - poll.poll(&mut events, Some(POLL_TIMEOUT))?; - for event in events.iter() { - match event.token() { - _ => loop { - match listener.accept() { - Ok((stream, address)) => { - log::debug!("received TCP stream {} connection from {}", self.stream_idx, address); - - let mut verification_stream = stream.try_clone()?; - let mio_token2 = Token(0); - let poll2 = Poll::new()?; - poll2.register( - &verification_stream, - mio_token2, - Ready::readable(), - PollOpt::edge(), - )?; - - let mut buffer = [0_u8; 16]; - let mut events2 = Events::with_capacity(1); - poll2.poll(&mut events2, Some(RECEIVE_TIMEOUT))?; - for event2 in events2.iter() { - match event2.token() { - _ => match verification_stream.read(&mut buffer) { - Ok(_) => { - if buffer == self.test_definition.test_id { - log::debug!("validated TCP stream {} connection from {}", self.stream_idx, address); - cfg_if::cfg_if! { - if #[cfg(unix)] { //NOTE: features unsupported on Windows - if self.receive_buffer != 0 { - log::debug!("setting receive-buffer to {}...", self.receive_buffer); - super::setsockopt(stream.as_raw_fd(), super::RcvBuf, &self.receive_buffer)?; - } - } - } - - self.mio_poll.register( - &stream, - self.mio_poll_token, - Ready::readable(), - PollOpt::edge(), - )?; - return Ok(stream); - } - }, - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { //client didn't provide anything - break; - }, - Err(e) => { - return Err(Box::new(e)); - }, + match listener.accept() { + Ok((stream, address)) => { + log::debug!("received TCP stream {} connection from {}", self.stream_idx, address); + + stream.set_read_timeout(Some(RECEIVE_TIMEOUT)).expect("unable to set TCP read-timeout"); + + let keepalive_parameters = TcpKeepalive::new().with_time(KEEPALIVE_DURATION); + let raw_socket = SockRef::from(&stream); + raw_socket.set_tcp_keepalive(&keepalive_parameters).expect("unable to set TCP keepalive"); + + let mut verification_stream = stream.try_clone()?; + let mut buffer = [0_u8; 16]; + + match verification_stream.read(&mut buffer) { + Ok(_) => { + if buffer == self.test_definition.test_id { + log::debug!("validated TCP stream {} connection from {}", self.stream_idx, address); + cfg_if::cfg_if! { + if #[cfg(unix)] { //NOTE: features unsupported on Windows + if self.receive_buffer != 0 { + log::debug!("setting receive-buffer to {}...", self.receive_buffer); + super::setsockopt(stream.as_raw_fd(), super::RcvBuf, &self.receive_buffer)?; } } } - log::warn!("could not validate TCP stream {} connection from {}", self.stream_idx, address); - }, - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { //nothing to do - break; - }, - Err(e) => { - return Err(Box::new(e)); - }, - } - }, - } + + return Ok(stream); + } + }, + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock || e.kind() == std::io::ErrorKind::TimedOut => { //client didn't provide anything + let _ = stream.shutdown(std::net::Shutdown::Both); + }, + Err(e) => { + let _ = stream.shutdown(std::net::Shutdown::Both); + return Err(Box::new(e)); + }, + } + log::warn!("could not validate TCP stream {} connection from {}", self.stream_idx, address); + }, + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { //no pending connection + thread::sleep(POLL_TIMEOUT); + }, + Err(e) => { + return Err(Box::new(e)); + }, } } - Err(Box::new(simple_error::simple_error!("did not receive a connection"))) + Err(Box::new(simple_error::simple_error!("did not establish a connection"))) } } impl super::TestStream for TcpReceiver { @@ -324,7 +292,6 @@ pub mod receiver { } let stream = self.stream.as_mut().unwrap(); - let mut events = Events::with_capacity(1); //only watching one socket let mut buf = vec![0_u8; self.test_definition.length]; let peer_addr = match stream.peer_addr() { @@ -334,52 +301,37 @@ pub mod receiver { let start = Instant::now(); while self.active { - if start.elapsed() >= RECEIVE_TIMEOUT { - return Some(Err(Box::new(simple_error::simple_error!("TCP reception for stream {} from {} timed out", self.stream_idx, peer_addr)))); - } - log::trace!("awaiting TCP stream {} from {}...", self.stream_idx, peer_addr); - let poll_result = self.mio_poll.poll(&mut events, Some(POLL_TIMEOUT)); - if poll_result.is_err() { - return Some(Err(Box::new(poll_result.unwrap_err()))); - } - for event in events.iter() { - if event.token() == self.mio_poll_token { - loop { - match stream.read(&mut buf) { - Ok(packet_size) => { - log::trace!("received {} bytes in TCP stream {} from {}", packet_size, self.stream_idx, peer_addr); - if packet_size == 0 { //test's over - self.active = false; //HACK: can't call self.stop() because it's a double-borrow due to the unwrapped stream - break; - } - - bytes_received += packet_size as u64; - - let elapsed_time = start.elapsed(); - if elapsed_time >= super::INTERVAL { - return Some(Ok(Box::new(super::TcpReceiveResult{ - timestamp: super::get_unix_timestamp(), - - stream_idx: self.stream_idx, - - duration: elapsed_time.as_secs_f32(), - - bytes_received: bytes_received, - }))) - } - }, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { //receive timeout - break; - }, - Err(e) => { - return Some(Err(Box::new(e))); - }, - } + + match stream.read(&mut buf) { + Ok(packet_size) => { + log::trace!("received {} bytes in TCP stream {} from {}", packet_size, self.stream_idx, peer_addr); + if packet_size == 0 { //test's over + self.active = false; //HACK: can't call self.stop() because it's a double-borrow due to the unwrapped stream + break; } - } else { - log::warn!("got event for unbound token: {:?}", event); - } + + bytes_received += packet_size as u64; + + let elapsed_time = start.elapsed(); + if elapsed_time >= super::INTERVAL { + return Some(Ok(Box::new(super::TcpReceiveResult{ + timestamp: super::get_unix_timestamp(), + + stream_idx: self.stream_idx, + + duration: elapsed_time.as_secs_f32(), + + bytes_received: bytes_received, + }))) + } + }, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock || e.kind() == std::io::ErrorKind::TimedOut => { //receive timeout + return Some(Err(Box::new(simple_error::simple_error!("TCP reception for stream {} from {} timed out", self.stream_idx, peer_addr)))); + }, + Err(e) => { + return Some(Err(Box::new(e))); + }, } } if bytes_received > 0 { @@ -420,16 +372,17 @@ pub mod receiver { pub mod sender { + use crate::stream::tcp::KEEPALIVE_DURATION; cfg_if::cfg_if! { if #[cfg(unix)] { //NOTE: features unsupported on Windows use std::os::unix::io::AsRawFd; } } use std::io::Write; - use std::net::{IpAddr, SocketAddr}; + use std::net::{IpAddr, SocketAddr, TcpStream}; use std::time::{Duration, Instant}; - use mio::net::TcpStream; + use socket2::{SockRef, TcpKeepalive}; use std::thread::{sleep}; @@ -484,15 +437,17 @@ pub mod sender { fn process_connection(&mut self) -> super::BoxResult { log::debug!("preparing to connect TCP stream {}...", self.stream_idx); - let raw_stream = match std::net::TcpStream::connect_timeout(&self.socket_addr, CONNECT_TIMEOUT) { + let stream = match TcpStream::connect_timeout(&self.socket_addr, CONNECT_TIMEOUT) { Ok(s) => s, Err(e) => return Err(Box::new(simple_error::simple_error!("unable to connect stream {}: {}", self.stream_idx, e))), }; - raw_stream.set_write_timeout(Some(WRITE_TIMEOUT))?; - let stream = match TcpStream::from_stream(raw_stream) { - Ok(s) => s, - Err(e) => return Err(Box::new(simple_error::simple_error!("unable to prepare TCP stream {}: {}", self.stream_idx, e))), - }; + stream.set_nonblocking(true).expect("unable to make TCP stream non-blocking"); + stream.set_write_timeout(Some(WRITE_TIMEOUT))?; + + let keepalive_parameters = TcpKeepalive::new().with_time(KEEPALIVE_DURATION); + let raw_socket = SockRef::from(&stream); + raw_socket.set_tcp_keepalive(&keepalive_parameters).expect("unable to set TCP keepalive"); + log::debug!("connected TCP stream {} to {}", self.stream_idx, stream.peer_addr()?); if self.no_delay { From 27ae644e251a0e3ef8182c495a110aa38c8f005c Mon Sep 17 00:00:00 2001 From: Neil Tallim Date: Tue, 5 Sep 2023 20:21:38 -0600 Subject: [PATCH 3/7] Also dropped nix and made buffers accessible to Windows --- Cargo.toml | 2 -- src/protocol/communication.rs | 10 +++--- src/stream/tcp.rs | 58 ++++++++++------------------------- src/stream/udp.rs | 41 ++++++------------------- 4 files changed, 30 insertions(+), 81 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 78eba6f..bdb180b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,14 +11,12 @@ categories = ["network-utilities"] readme = "README.md" [dependencies] -cfg-if = "1.0" chrono = "0.4" clap = "~2.33.3" core_affinity = "0.5" ctrlc = "3.1" env_logger = "0.8" log = {version = "0.4", features = ["std"]} -nix = "0.20" serde = {version = "1.0", features = ["derive"]} serde_json = "1.0" simple-error = "0.2" diff --git a/src/protocol/communication.rs b/src/protocol/communication.rs index 472ffe4..b1af64a 100644 --- a/src/protocol/communication.rs +++ b/src/protocol/communication.rs @@ -46,15 +46,14 @@ pub fn send(stream:&mut TcpStream, message:&serde_json::Value) -> BoxResult<()> /// receives the length-count of a pending message over a client-server communications stream fn receive_length(stream:&mut TcpStream, alive_check:fn() -> bool, results_handler:&mut dyn FnMut() -> BoxResult<()>) -> BoxResult { - let mut cloned_stream = stream.try_clone()?; - cloned_stream.set_read_timeout(Some(POLL_TIMEOUT)).expect("unable to set TCP read-timeout"); + stream.set_read_timeout(Some(POLL_TIMEOUT)).expect("unable to set TCP read-timeout"); let mut length_bytes_read = 0; let mut length_spec:[u8; 2] = [0; 2]; while alive_check() { //waiting to find out how long the next message is results_handler()?; //send any outstanding results between cycles - match cloned_stream.read(&mut length_spec[length_bytes_read..]) { + match stream.read(&mut length_spec[length_bytes_read..]) { Ok(size) => { if size == 0 { if alive_check() { @@ -85,15 +84,14 @@ fn receive_length(stream:&mut TcpStream, alive_check:fn() -> bool, results_handl } /// receives the data-value of a pending message over a client-server communications stream fn receive_payload(stream:&mut TcpStream, alive_check:fn() -> bool, results_handler:&mut dyn FnMut() -> BoxResult<()>, length:u16) -> BoxResult { - let mut cloned_stream = stream.try_clone()?; - cloned_stream.set_read_timeout(Some(POLL_TIMEOUT)).expect("unable to set TCP read-timeout"); + stream.set_read_timeout(Some(POLL_TIMEOUT)).expect("unable to set TCP read-timeout"); let mut bytes_read = 0; let mut buffer = vec![0_u8; length.into()]; while alive_check() { //waiting to receive the payload results_handler()?; //send any outstanding results between cycles - match cloned_stream.read(&mut buffer[bytes_read..]) { + match stream.read(&mut buffer[bytes_read..]) { Ok(size) => { if size == 0 { if alive_check() { diff --git a/src/stream/tcp.rs b/src/stream/tcp.rs index 94cab76..1f8a4c8 100644 --- a/src/stream/tcp.rs +++ b/src/stream/tcp.rs @@ -18,13 +18,6 @@ * along with rperf. If not, see . */ -cfg_if::cfg_if! { - if #[cfg(unix)] { //NOTE: features unsupported on Windows - extern crate nix; - use nix::sys::socket::{setsockopt, sockopt::RcvBuf, sockopt::SndBuf}; - } -} - use crate::protocol::results::{IntervalResult, TcpReceiveResult, TcpSendResult, get_unix_timestamp}; use super::{INTERVAL, TestStream, parse_port_spec}; @@ -40,7 +33,7 @@ pub const TEST_HEADER_SIZE:usize = 16; pub struct TcpTestDefinition { //a UUID used to identify packets associated with this test pub test_id: [u8; 16], - //bandwidth target, in bytes/sec + // //bandwidth target, in bytes/sec pub bandwidth: u64, //the length of the buffer to exchange pub length: usize, @@ -71,11 +64,6 @@ impl TcpTestDefinition { pub mod receiver { use crate::stream::tcp::KEEPALIVE_DURATION; - cfg_if::cfg_if! { - if #[cfg(unix)] { //NOTE: features unsupported on Windows - use std::os::unix::io::AsRawFd; - } - } use std::io::Read; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpListener, TcpStream}; use std::sync::{Mutex}; @@ -196,7 +184,7 @@ pub mod receiver { pub fn new(test_definition:super::TcpTestDefinition, stream_idx:&u8, port_pool:&mut TcpPortPool, peer_ip:&IpAddr, receive_buffer:&usize) -> super::BoxResult { log::debug!("binding TCP listener for stream {}...", stream_idx); let listener:TcpListener = port_pool.bind(peer_ip).expect(format!("failed to bind TCP socket").as_str()); - listener.set_nonblocking(true).expect("unable to make TCP socket non-blocking"); + listener.set_nonblocking(true)?; log::debug!("bound TCP listener for stream {}: {}", stream_idx, listener.local_addr()?); Ok(TcpReceiver{ @@ -224,29 +212,24 @@ pub mod receiver { } match listener.accept() { - Ok((stream, address)) => { + Ok((mut stream, address)) => { log::debug!("received TCP stream {} connection from {}", self.stream_idx, address); - stream.set_read_timeout(Some(RECEIVE_TIMEOUT)).expect("unable to set TCP read-timeout"); + stream.set_read_timeout(Some(RECEIVE_TIMEOUT))?; + let cloned_stream = stream.try_clone()?; let keepalive_parameters = TcpKeepalive::new().with_time(KEEPALIVE_DURATION); - let raw_socket = SockRef::from(&stream); - raw_socket.set_tcp_keepalive(&keepalive_parameters).expect("unable to set TCP keepalive"); + let raw_socket = SockRef::from(&cloned_stream); + raw_socket.set_tcp_keepalive(&keepalive_parameters)?; - let mut verification_stream = stream.try_clone()?; let mut buffer = [0_u8; 16]; - - match verification_stream.read(&mut buffer) { + match stream.read(&mut buffer) { Ok(_) => { if buffer == self.test_definition.test_id { log::debug!("validated TCP stream {} connection from {}", self.stream_idx, address); - cfg_if::cfg_if! { - if #[cfg(unix)] { //NOTE: features unsupported on Windows - if self.receive_buffer != 0 { - log::debug!("setting receive-buffer to {}...", self.receive_buffer); - super::setsockopt(stream.as_raw_fd(), super::RcvBuf, &self.receive_buffer)?; - } - } + if self.receive_buffer != 0 { + log::debug!("setting receive-buffer to {}...", self.receive_buffer); + raw_socket.set_recv_buffer_size(self.receive_buffer)?; } return Ok(stream); @@ -373,11 +356,6 @@ pub mod receiver { pub mod sender { use crate::stream::tcp::KEEPALIVE_DURATION; - cfg_if::cfg_if! { - if #[cfg(unix)] { //NOTE: features unsupported on Windows - use std::os::unix::io::AsRawFd; - } - } use std::io::Write; use std::net::{IpAddr, SocketAddr, TcpStream}; use std::time::{Duration, Instant}; @@ -441,12 +419,12 @@ pub mod sender { Ok(s) => s, Err(e) => return Err(Box::new(simple_error::simple_error!("unable to connect stream {}: {}", self.stream_idx, e))), }; - stream.set_nonblocking(true).expect("unable to make TCP stream non-blocking"); + stream.set_nonblocking(true)?; stream.set_write_timeout(Some(WRITE_TIMEOUT))?; let keepalive_parameters = TcpKeepalive::new().with_time(KEEPALIVE_DURATION); let raw_socket = SockRef::from(&stream); - raw_socket.set_tcp_keepalive(&keepalive_parameters).expect("unable to set TCP keepalive"); + raw_socket.set_tcp_keepalive(&keepalive_parameters)?; log::debug!("connected TCP stream {} to {}", self.stream_idx, stream.peer_addr()?); @@ -454,13 +432,9 @@ pub mod sender { log::debug!("setting no-delay..."); stream.set_nodelay(true)?; } - cfg_if::cfg_if! { - if #[cfg(unix)] { //NOTE: features unsupported on Windows - if self.send_buffer != 0 { - log::debug!("setting send-buffer to {}...", self.send_buffer); - super::setsockopt(stream.as_raw_fd(), super::SndBuf, &self.send_buffer)?; - } - } + if self.send_buffer != 0 { + log::debug!("setting send-buffer to {}...", self.send_buffer); + raw_socket.set_send_buffer_size(self.send_buffer)?; } Ok(stream) } diff --git a/src/stream/udp.rs b/src/stream/udp.rs index c8c144c..e87130a 100644 --- a/src/stream/udp.rs +++ b/src/stream/udp.rs @@ -20,13 +20,6 @@ use std::error::Error; -cfg_if::cfg_if! { - if #[cfg(unix)] { //NOTE: features unsupported on Windows - extern crate nix; - use nix::sys::socket::{setsockopt, sockopt::RcvBuf, sockopt::SndBuf}; - } -} - use crate::protocol::results::{IntervalResult, UdpReceiveResult, UdpSendResult, get_unix_timestamp}; use super::{INTERVAL, TestStream, parse_port_spec}; @@ -69,11 +62,6 @@ impl UdpTestDefinition { } pub mod receiver { - cfg_if::cfg_if! { - if #[cfg(unix)] { //NOTE: features unsupported on Windows - use std::os::unix::io::AsRawFd; - } - } use std::convert::TryInto; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::{Mutex}; @@ -82,6 +70,7 @@ pub mod receiver { use chrono::{NaiveDateTime}; use std::net::UdpSocket; + use socket2::SockRef; const READ_TIMEOUT:Duration = Duration::from_millis(50); const RECEIVE_TIMEOUT:Duration = Duration::from_secs(3); @@ -209,13 +198,10 @@ pub mod receiver { log::debug!("binding UDP receive socket for stream {}...", stream_idx); let socket:UdpSocket = port_pool.bind(peer_ip).expect(format!("failed to bind UDP socket").as_str()); socket.set_read_timeout(Some(READ_TIMEOUT))?; - cfg_if::cfg_if! { - if #[cfg(unix)] { //NOTE: features unsupported on Windows - if *receive_buffer != 0 { - log::debug!("setting receive-buffer to {}...", receive_buffer); - super::setsockopt(socket.as_raw_fd(), super::RcvBuf, receive_buffer)?; - } - } + if *receive_buffer != 0 { + log::debug!("setting receive-buffer to {}...", receive_buffer); + let raw_socket = SockRef::from(&socket); + raw_socket.set_recv_buffer_size(*receive_buffer)?; } log::debug!("bound UDP receive socket for stream {}: {}", stream_idx, socket.local_addr()?); @@ -443,15 +429,11 @@ pub mod receiver { pub mod sender { - cfg_if::cfg_if! { - if #[cfg(unix)] { //NOTE: features unsupported on Windows - use std::os::unix::io::AsRawFd; - } - } use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use std::net::UdpSocket; + use socket2::SockRef; use std::thread::{sleep}; @@ -481,13 +463,10 @@ pub mod sender { IpAddr::V4(_) => UdpSocket::bind(&SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), *port)).expect(format!("failed to bind UDP socket, port {}", port).as_str()), }; socket.set_write_timeout(Some(WRITE_TIMEOUT))?; - cfg_if::cfg_if! { - if #[cfg(unix)] { //NOTE: features unsupported on Windows - if *send_buffer != 0 { - log::debug!("setting send-buffer to {}...", send_buffer); - super::setsockopt(socket.as_raw_fd(), super::SndBuf, send_buffer)?; - } - } + if *send_buffer != 0 { + log::debug!("setting send-buffer to {}...", send_buffer); + let raw_socket = SockRef::from(&socket); + raw_socket.set_send_buffer_size(*send_buffer)?; } socket.connect(socket_addr_receiver)?; log::debug!("connected UDP stream {} to {}", stream_idx, socket_addr_receiver); From 03de466334ba075ea4d91b4677b3d960e517196b Mon Sep 17 00:00:00 2001 From: Neil Tallim Date: Tue, 5 Sep 2023 23:28:24 -0600 Subject: [PATCH 4/7] Partially restored mio. Things are cleaner overall --- Cargo.toml | 2 + src/client.rs | 17 ++-- src/main.rs | 4 +- src/server.rs | 7 +- src/stream/tcp.rs | 215 +++++++++++++++++++++++++++------------------- src/stream/udp.rs | 32 +++++-- 6 files changed, 174 insertions(+), 103 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bdb180b..f6263a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,12 +11,14 @@ categories = ["network-utilities"] readme = "README.md" [dependencies] +cfg-if = "1.0" chrono = "0.4" clap = "~2.33.3" core_affinity = "0.5" ctrlc = "3.1" env_logger = "0.8" log = {version = "0.4", features = ["std"]} +mio = {version = "0.8", features = ["os-poll", "net"]} serde = {version = "1.0", features = ["derive"]} serde_json = "1.0" simple-error = "0.2" diff --git a/src/client.rs b/src/client.rs index 45dc171..78f109d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -28,7 +28,11 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use clap::ArgMatches; use std::net::{TcpStream}; -use socket2::{SockRef, TcpKeepalive}; +cfg_if::cfg_if! { + if #[cfg(unix)] { + use socket2::{SockRef, TcpKeepalive}; + } +} use crate::protocol::communication::{receive, send, KEEPALIVE_DURATION}; @@ -71,9 +75,13 @@ fn connect_to_server(address:&str, port:&u16) -> BoxResult { log::debug!("connected TCP control-channel to {}", destination); stream.set_nodelay(true).expect("cannot disable Nagle's algorithm"); - let keepalive_parameters = TcpKeepalive::new().with_time(KEEPALIVE_DURATION); - let raw_socket = SockRef::from(&stream); - raw_socket.set_tcp_keepalive(&keepalive_parameters).expect("unable to set TCP keepalive"); + cfg_if::cfg_if! { + if #[cfg(unix)] { + let keepalive_parameters = TcpKeepalive::new().with_time(KEEPALIVE_DURATION); + let raw_socket = SockRef::from(&stream); + raw_socket.set_tcp_keepalive(&keepalive_parameters).expect("unable to set TCP keepalive"); + } + } log::info!("connected to server"); @@ -227,7 +235,6 @@ pub fn execute(args:ArgMatches) -> BoxResult<()> { test_definition.clone(), &(stream_idx as u8), &mut tcp_port_pool, &server_addr.ip(), - &(download_config["receive_buffer"].as_i64().unwrap() as usize), )?; stream_ports.push(test.get_port()?); parallel_streams.push(Arc::new(Mutex::new(test))); diff --git a/src/main.rs b/src/main.rs index 6ec2614..7a80f68 100644 --- a/src/main.rs +++ b/src/main.rs @@ -161,7 +161,7 @@ fn main() { ) .arg( Arg::with_name("send_buffer") - .help("send_buffer, in bytes (only supported on some platforms; if set too small, a 'resource unavailable' error may occur; affects TCP window-size)") + .help("send_buffer, in bytes (only supported on some platforms; if set too small, a 'resource unavailable' error may occur; affects UDP and TCP window-size)") .takes_value(true) .long("send-buffer") .required(false) @@ -169,7 +169,7 @@ fn main() { ) .arg( Arg::with_name("receive_buffer") - .help("receive_buffer, in bytes (only supported on some platforms; if set too small, a 'resource unavailable' error may occur; affects TCP window-size)") + .help("receive_buffer, in bytes (only supported on some platforms; if set too small, a 'resource unavailable' error may occur; affects UDP)") .takes_value(true) .long("receive-buffer") .required(false) diff --git a/src/server.rs b/src/server.rs index bc61da5..91d2ccd 100644 --- a/src/server.rs +++ b/src/server.rs @@ -30,7 +30,11 @@ use std::time::{Duration}; use clap::ArgMatches; use std::net::{TcpListener, TcpStream}; -use socket2::{SockRef, TcpKeepalive}; +cfg_if::cfg_if! { + if #[cfg(unix)] { + use socket2::{SockRef, TcpKeepalive}; + } +} use crate::protocol::communication::{receive, send, KEEPALIVE_DURATION}; @@ -126,7 +130,6 @@ fn handle_client( test_definition.clone(), &(stream_idx as u8), &mut c_tcp_port_pool, &peer_addr.ip(), - &(payload["receive_buffer"].as_i64().unwrap() as usize), )?; stream_ports.push(test.get_port()?); parallel_streams.push(Arc::new(Mutex::new(test))); diff --git a/src/stream/tcp.rs b/src/stream/tcp.rs index 1f8a4c8..9a47731 100644 --- a/src/stream/tcp.rs +++ b/src/stream/tcp.rs @@ -63,14 +63,12 @@ impl TcpTestDefinition { pub mod receiver { - use crate::stream::tcp::KEEPALIVE_DURATION; use std::io::Read; - use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpListener, TcpStream}; + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::{Mutex}; - use std::thread; use std::time::{Duration, Instant}; - use socket2::{SockRef, TcpKeepalive}; + use mio::net::{TcpListener, TcpStream}; const POLL_TIMEOUT:Duration = Duration::from_millis(250); const RECEIVE_TIMEOUT:Duration = Duration::from_secs(3); @@ -114,12 +112,12 @@ pub mod receiver { match peer_ip { IpAddr::V6(_) => { if self.ports_ip6.is_empty() { - return Ok(TcpListener::bind(&SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)).expect(format!("failed to bind OS-assigned IPv6 TCP socket").as_str())); + return Ok(TcpListener::bind(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)).expect(format!("failed to bind OS-assigned IPv6 TCP socket").as_str())); } else { let _guard = self.lock_ip6.lock().unwrap(); for port_idx in (self.pos_ip6 + 1)..self.ports_ip6.len() { //iterate to the end of the pool; this will skip the first element in the pool initially, but that's fine - let listener_result = TcpListener::bind(&SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), self.ports_ip6[port_idx])); + let listener_result = TcpListener::bind(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), self.ports_ip6[port_idx])); if listener_result.is_ok() { self.pos_ip6 = port_idx; return Ok(listener_result.unwrap()); @@ -128,7 +126,7 @@ pub mod receiver { } } for port_idx in 0..=self.pos_ip6 { //circle back to where the search started - let listener_result = TcpListener::bind(&SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), self.ports_ip6[port_idx])); + let listener_result = TcpListener::bind(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), self.ports_ip6[port_idx])); if listener_result.is_ok() { self.pos_ip6 = port_idx; return Ok(listener_result.unwrap()); @@ -141,12 +139,12 @@ pub mod receiver { }, IpAddr::V4(_) => { if self.ports_ip4.is_empty() { - return Ok(TcpListener::bind(&SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0)).expect(format!("failed to bind OS-assigned IPv4 TCP socket").as_str())); + return Ok(TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0)).expect(format!("failed to bind OS-assigned IPv4 TCP socket").as_str())); } else { let _guard = self.lock_ip4.lock().unwrap(); for port_idx in (self.pos_ip4 + 1)..self.ports_ip4.len() { //iterate to the end of the pool; this will skip the first element in the pool initially, but that's fine - let listener_result = TcpListener::bind(&SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), self.ports_ip4[port_idx])); + let listener_result = TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), self.ports_ip4[port_idx])); if listener_result.is_ok() { self.pos_ip4 = port_idx; return Ok(listener_result.unwrap()); @@ -155,7 +153,7 @@ pub mod receiver { } } for port_idx in 0..=self.pos_ip4 { //circle back to where the search started - let listener_result = TcpListener::bind(&SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), self.ports_ip4[port_idx])); + let listener_result = TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), self.ports_ip4[port_idx])); if listener_result.is_ok() { self.pos_ip4 = port_idx; return Ok(listener_result.unwrap()); @@ -178,15 +176,23 @@ pub mod receiver { listener: Option, stream: Option, - receive_buffer: usize, + mio_events: mio::Events, + mio_poll: mio::Poll, + mio_token: mio::Token, } impl TcpReceiver { - pub fn new(test_definition:super::TcpTestDefinition, stream_idx:&u8, port_pool:&mut TcpPortPool, peer_ip:&IpAddr, receive_buffer:&usize) -> super::BoxResult { + pub fn new(test_definition:super::TcpTestDefinition, stream_idx:&u8, port_pool:&mut TcpPortPool, peer_ip:&IpAddr) -> super::BoxResult { log::debug!("binding TCP listener for stream {}...", stream_idx); - let listener:TcpListener = port_pool.bind(peer_ip).expect(format!("failed to bind TCP socket").as_str()); - listener.set_nonblocking(true)?; + let mut listener:TcpListener = port_pool.bind(peer_ip).expect("failed to bind TCP socket"); log::debug!("bound TCP listener for stream {}: {}", stream_idx, listener.local_addr()?); + let port = listener.local_addr()?.port(); + + let mio_events = mio::Events::with_capacity(8); + let mio_poll = mio::Poll::new()?; + let mio_token = mio::Token(port as usize); + mio_poll.registry().register(&mut listener, mio_token, mio::Interest::READABLE)?; + Ok(TcpReceiver{ active: true, test_definition: test_definition, @@ -195,7 +201,9 @@ pub mod receiver { listener: Some(listener), stream: None, - receive_buffer: receive_buffer.to_owned(), + mio_events: mio_events, + mio_poll: mio_poll, + mio_token: mio_token, }) } @@ -203,6 +211,7 @@ pub mod receiver { log::debug!("preparing to receive TCP stream {} connection...", self.stream_idx); let listener = self.listener.as_mut().unwrap(); + let mio_token = self.mio_token; let start = Instant::now(); @@ -211,46 +220,59 @@ pub mod receiver { return Err(Box::new(simple_error::simple_error!("TCP listening for stream {} timed out", self.stream_idx))); } - match listener.accept() { - Ok((mut stream, address)) => { - log::debug!("received TCP stream {} connection from {}", self.stream_idx, address); - - stream.set_read_timeout(Some(RECEIVE_TIMEOUT))?; - let cloned_stream = stream.try_clone()?; - - let keepalive_parameters = TcpKeepalive::new().with_time(KEEPALIVE_DURATION); - let raw_socket = SockRef::from(&cloned_stream); - raw_socket.set_tcp_keepalive(&keepalive_parameters)?; - - let mut buffer = [0_u8; 16]; - match stream.read(&mut buffer) { - Ok(_) => { - if buffer == self.test_definition.test_id { - log::debug!("validated TCP stream {} connection from {}", self.stream_idx, address); - if self.receive_buffer != 0 { - log::debug!("setting receive-buffer to {}...", self.receive_buffer); - raw_socket.set_recv_buffer_size(self.receive_buffer)?; - } + self.mio_poll.poll(&mut self.mio_events, Some(POLL_TIMEOUT))?; + for event in self.mio_events.iter() { + if event.token() == mio_token { + loop { + match listener.accept() { + Ok((mut stream, address)) => { + log::debug!("received TCP stream {} connection from {}", self.stream_idx, address); + + let mio_token2 = mio::Token(0); + let mut mio_poll2 = mio::Poll::new()?; + mio_poll2.registry().register(&mut stream, mio_token2, mio::Interest::READABLE)?; - return Ok(stream); - } - }, - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock || e.kind() == std::io::ErrorKind::TimedOut => { //client didn't provide anything - let _ = stream.shutdown(std::net::Shutdown::Both); - }, - Err(e) => { - let _ = stream.shutdown(std::net::Shutdown::Both); - return Err(Box::new(e)); - }, + let mut buffer = [0_u8; 16]; + let mut events2 = mio::Events::with_capacity(1); + mio_poll2.poll(&mut events2, Some(RECEIVE_TIMEOUT))?; + for event2 in events2.iter() { + match event2.token() { + _ => match stream.read(&mut buffer) { + Ok(_) => { + if buffer == self.test_definition.test_id { + log::debug!("validated TCP stream {} connection from {}", self.stream_idx, address); + + mio_poll2.registry().deregister(&mut stream)?; + self.mio_poll.registry().deregister(listener)?; + self.mio_poll.registry().register(&mut stream, mio_token, mio::Interest::READABLE)?; + + return Ok(stream); + } + }, + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock || e.kind() == std::io::ErrorKind::TimedOut => { //client didn't provide anything + let _ = stream.shutdown(std::net::Shutdown::Both); + }, + Err(e) => { + let _ = stream.shutdown(std::net::Shutdown::Both); + return Err(Box::new(e)); + }, + }, + } + } + log::warn!("could not validate TCP stream {} connection from {}", self.stream_idx, address); + mio_poll2.registry().deregister(&mut stream)?; + }, + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { //no pending connections available + break; + }, + Err(e) => { + return Err(Box::new(e)); + }, + } } - log::warn!("could not validate TCP stream {} connection from {}", self.stream_idx, address); - }, - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { //no pending connection - thread::sleep(POLL_TIMEOUT); - }, - Err(e) => { - return Err(Box::new(e)); - }, + } else { + log::warn!("got event for unbound token: {:?}", event); + } } } Err(Box::new(simple_error::simple_error!("did not establish a connection"))) @@ -275,6 +297,7 @@ pub mod receiver { } let stream = self.stream.as_mut().unwrap(); + let mio_token = self.mio_token; let mut buf = vec![0_u8; self.test_definition.length]; let peer_addr = match stream.peer_addr() { @@ -284,37 +307,49 @@ pub mod receiver { let start = Instant::now(); while self.active { - log::trace!("awaiting TCP stream {} from {}...", self.stream_idx, peer_addr); + if start.elapsed() >= RECEIVE_TIMEOUT { + return Some(Err(Box::new(simple_error::simple_error!("TCP reception for stream {} from {} timed out", self.stream_idx, peer_addr)))); + } - match stream.read(&mut buf) { - Ok(packet_size) => { - log::trace!("received {} bytes in TCP stream {} from {}", packet_size, self.stream_idx, peer_addr); - if packet_size == 0 { //test's over - self.active = false; //HACK: can't call self.stop() because it's a double-borrow due to the unwrapped stream - break; - } - - bytes_received += packet_size as u64; - - let elapsed_time = start.elapsed(); - if elapsed_time >= super::INTERVAL { - return Some(Ok(Box::new(super::TcpReceiveResult{ - timestamp: super::get_unix_timestamp(), - - stream_idx: self.stream_idx, - - duration: elapsed_time.as_secs_f32(), - - bytes_received: bytes_received, - }))) + log::trace!("awaiting TCP stream {} from {}...", self.stream_idx, peer_addr); + self.mio_poll.poll(&mut self.mio_events, Some(POLL_TIMEOUT)).ok()?; + for event in self.mio_events.iter() { + if event.token() == mio_token { + loop { + match stream.read(&mut buf) { + Ok(packet_size) => { + log::trace!("received {} bytes in TCP stream {} from {}", packet_size, self.stream_idx, peer_addr); + if packet_size == 0 { //test's over + self.active = false; //HACK: can't call self.stop() because it's a double-borrow due to the unwrapped stream + break; + } + + bytes_received += packet_size as u64; + + let elapsed_time = start.elapsed(); + if elapsed_time >= super::INTERVAL { + return Some(Ok(Box::new(super::TcpReceiveResult{ + timestamp: super::get_unix_timestamp(), + + stream_idx: self.stream_idx, + + duration: elapsed_time.as_secs_f32(), + + bytes_received: bytes_received, + }))) + } + }, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { //receive timeout + break; + }, + Err(e) => { + return Some(Err(Box::new(e))); + }, + } } - }, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock || e.kind() == std::io::ErrorKind::TimedOut => { //receive timeout - return Some(Err(Box::new(simple_error::simple_error!("TCP reception for stream {} from {} timed out", self.stream_idx, peer_addr)))); - }, - Err(e) => { - return Some(Err(Box::new(e))); - }, + } else { + log::warn!("got event for unbound token: {:?}", event); + } } } if bytes_received > 0 { @@ -360,7 +395,11 @@ pub mod sender { use std::net::{IpAddr, SocketAddr, TcpStream}; use std::time::{Duration, Instant}; - use socket2::{SockRef, TcpKeepalive}; + cfg_if::cfg_if! { + if #[cfg(unix)] { + use socket2::{SockRef, TcpKeepalive}; + } + } use std::thread::{sleep}; @@ -422,9 +461,13 @@ pub mod sender { stream.set_nonblocking(true)?; stream.set_write_timeout(Some(WRITE_TIMEOUT))?; - let keepalive_parameters = TcpKeepalive::new().with_time(KEEPALIVE_DURATION); - let raw_socket = SockRef::from(&stream); - raw_socket.set_tcp_keepalive(&keepalive_parameters)?; + cfg_if::cfg_if! { + if #[cfg(unix)] { + let keepalive_parameters = TcpKeepalive::new().with_time(KEEPALIVE_DURATION); + let raw_socket = SockRef::from(&stream); + raw_socket.set_tcp_keepalive(&keepalive_parameters)?; + } + } log::debug!("connected TCP stream {} to {}", self.stream_idx, stream.peer_addr()?); diff --git a/src/stream/udp.rs b/src/stream/udp.rs index e87130a..bbe461c 100644 --- a/src/stream/udp.rs +++ b/src/stream/udp.rs @@ -70,7 +70,11 @@ pub mod receiver { use chrono::{NaiveDateTime}; use std::net::UdpSocket; - use socket2::SockRef; + cfg_if::cfg_if! { + if #[cfg(unix)] { + use socket2::SockRef; + } + } const READ_TIMEOUT:Duration = Duration::from_millis(50); const RECEIVE_TIMEOUT:Duration = Duration::from_secs(3); @@ -199,9 +203,13 @@ pub mod receiver { let socket:UdpSocket = port_pool.bind(peer_ip).expect(format!("failed to bind UDP socket").as_str()); socket.set_read_timeout(Some(READ_TIMEOUT))?; if *receive_buffer != 0 { - log::debug!("setting receive-buffer to {}...", receive_buffer); - let raw_socket = SockRef::from(&socket); - raw_socket.set_recv_buffer_size(*receive_buffer)?; + cfg_if::cfg_if! { + if #[cfg(unix)] { + log::debug!("setting receive-buffer to {}...", receive_buffer); + let raw_socket = SockRef::from(&socket); + raw_socket.set_recv_buffer_size(*receive_buffer)?; + } + } } log::debug!("bound UDP receive socket for stream {}: {}", stream_idx, socket.local_addr()?); @@ -433,7 +441,11 @@ pub mod sender { use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use std::net::UdpSocket; - use socket2::SockRef; + cfg_if::cfg_if! { + if #[cfg(unix)] { + use socket2::SockRef; + } + } use std::thread::{sleep}; @@ -464,9 +476,13 @@ pub mod sender { }; socket.set_write_timeout(Some(WRITE_TIMEOUT))?; if *send_buffer != 0 { - log::debug!("setting send-buffer to {}...", send_buffer); - let raw_socket = SockRef::from(&socket); - raw_socket.set_send_buffer_size(*send_buffer)?; + cfg_if::cfg_if! { + if #[cfg(unix)] { + log::debug!("setting send-buffer to {}...", send_buffer); + let raw_socket = SockRef::from(&socket); + raw_socket.set_send_buffer_size(*send_buffer)?; + } + } } socket.connect(socket_addr_receiver)?; log::debug!("connected UDP stream {} to {}", stream_idx, socket_addr_receiver); From 1664450677a3dbd819aa2643371db59874c55653 Mon Sep 17 00:00:00 2001 From: Neil Tallim Date: Tue, 5 Sep 2023 23:41:25 -0600 Subject: [PATCH 5/7] Fixed lingering Windows build issues --- src/client.rs | 3 ++- src/protocol/communication.rs | 10 +++++++--- src/server.rs | 13 +++++++++---- src/stream/tcp.rs | 18 +++++++++++++----- 4 files changed, 31 insertions(+), 13 deletions(-) diff --git a/src/client.rs b/src/client.rs index 78f109d..e020534 100644 --- a/src/client.rs +++ b/src/client.rs @@ -30,11 +30,12 @@ use clap::ArgMatches; use std::net::{TcpStream}; cfg_if::cfg_if! { if #[cfg(unix)] { + use crate::protocol::communication::{KEEPALIVE_DURATION}; use socket2::{SockRef, TcpKeepalive}; } } -use crate::protocol::communication::{receive, send, KEEPALIVE_DURATION}; +use crate::protocol::communication::{receive, send}; use crate::protocol::messaging::{ prepare_begin, prepare_end, diff --git a/src/protocol/communication.rs b/src/protocol/communication.rs index b1af64a..732a528 100644 --- a/src/protocol/communication.rs +++ b/src/protocol/communication.rs @@ -26,9 +26,13 @@ use std::net::{TcpStream}; use std::error::Error; type BoxResult = Result>; -/// how long to wait for keepalive events -// the communications channels typically exchange data every second, so 2s is reasonable to avoid excess noise -pub const KEEPALIVE_DURATION:Duration = Duration::from_secs(2); +cfg_if::cfg_if! { + if #[cfg(unix)] { + /// how long to wait for keepalive events + // the communications channels typically exchange data every second, so 3s is reasonable to avoid excess noise + pub const KEEPALIVE_DURATION:Duration = Duration::from_secs(3); + } +} /// how long to block on polling operations const POLL_TIMEOUT:Duration = Duration::from_millis(50); diff --git a/src/server.rs b/src/server.rs index 91d2ccd..97757d9 100644 --- a/src/server.rs +++ b/src/server.rs @@ -32,11 +32,12 @@ use clap::ArgMatches; use std::net::{TcpListener, TcpStream}; cfg_if::cfg_if! { if #[cfg(unix)] { + use crate::protocol::communication::{KEEPALIVE_DURATION}; use socket2::{SockRef, TcpKeepalive}; } } -use crate::protocol::communication::{receive, send, KEEPALIVE_DURATION}; +use crate::protocol::communication::{receive, send}; use crate::protocol::messaging::{ prepare_connect, prepare_connect_ready, @@ -320,9 +321,13 @@ pub fn serve(args:ArgMatches) -> BoxResult<()> { stream.set_nodelay(true).expect("cannot disable Nagle's algorithm"); - let keepalive_parameters = TcpKeepalive::new().with_time(KEEPALIVE_DURATION); - let raw_socket = SockRef::from(&stream); - raw_socket.set_tcp_keepalive(&keepalive_parameters).expect("unable to set TCP keepalive"); + cfg_if::cfg_if! { + if #[cfg(unix)] { + let keepalive_parameters = TcpKeepalive::new().with_time(KEEPALIVE_DURATION); + let raw_socket = SockRef::from(&stream); + raw_socket.set_tcp_keepalive(&keepalive_parameters).expect("unable to set TCP keepalive"); + } + } let client_count = CLIENTS.fetch_add(1, Ordering::Relaxed) + 1; if client_limit > 0 && client_count > client_limit { diff --git a/src/stream/tcp.rs b/src/stream/tcp.rs index 9a47731..f43c44e 100644 --- a/src/stream/tcp.rs +++ b/src/stream/tcp.rs @@ -23,10 +23,14 @@ use crate::protocol::results::{IntervalResult, TcpReceiveResult, TcpSendResult, use super::{INTERVAL, TestStream, parse_port_spec}; use std::error::Error; -use std::time::Duration; type BoxResult = Result>; -const KEEPALIVE_DURATION:Duration = Duration::from_secs(5); +cfg_if::cfg_if! { + if #[cfg(unix)] { + use std::time::Duration; + const KEEPALIVE_DURATION:Duration = Duration::from_secs(5); + } +} pub const TEST_HEADER_SIZE:usize = 16; #[derive(Clone)] @@ -390,13 +394,13 @@ pub mod receiver { pub mod sender { - use crate::stream::tcp::KEEPALIVE_DURATION; use std::io::Write; use std::net::{IpAddr, SocketAddr, TcpStream}; use std::time::{Duration, Instant}; cfg_if::cfg_if! { if #[cfg(unix)] { + use crate::stream::tcp::KEEPALIVE_DURATION; use socket2::{SockRef, TcpKeepalive}; } } @@ -476,8 +480,12 @@ pub mod sender { stream.set_nodelay(true)?; } if self.send_buffer != 0 { - log::debug!("setting send-buffer to {}...", self.send_buffer); - raw_socket.set_send_buffer_size(self.send_buffer)?; + cfg_if::cfg_if! { + if #[cfg(unix)] { + log::debug!("setting send-buffer to {}...", self.send_buffer); + raw_socket.set_send_buffer_size(self.send_buffer)?; + } + } } Ok(stream) } From 7b42ed8b13c056375cb6b82a166bc9534958be26 Mon Sep 17 00:00:00 2001 From: Neil Tallim Date: Thu, 7 Sep 2023 23:39:08 -0600 Subject: [PATCH 6/7] Windows UDP reception now works Might have working handling for operation-would-block --- src/protocol/communication.rs | 29 ++++++++++- src/stream/tcp.rs | 98 +++++++++++++++++++---------------- src/stream/udp.rs | 11 ++-- 3 files changed, 89 insertions(+), 49 deletions(-) diff --git a/src/protocol/communication.rs b/src/protocol/communication.rs index 732a528..4eb01f2 100644 --- a/src/protocol/communication.rs +++ b/src/protocol/communication.rs @@ -19,7 +19,7 @@ */ use std::io::{self, Read, Write}; -use std::time::Duration; +use std::time::{Duration, Instant}; use std::net::{TcpStream}; @@ -37,15 +37,40 @@ cfg_if::cfg_if! { /// how long to block on polling operations const POLL_TIMEOUT:Duration = Duration::from_millis(50); +/// how long to allow for send-operations to complete +const SEND_TIMEOUT:Duration = Duration::from_secs(5); + /// sends JSON data over a client-server communications stream pub fn send(stream:&mut TcpStream, message:&serde_json::Value) -> BoxResult<()> { + stream.set_write_timeout(Some(POLL_TIMEOUT)).expect("unable to set TCP write-timeout"); + let serialised_message = serde_json::to_vec(message)?; log::debug!("sending message of length {}, {:?}, to {}...", serialised_message.len(), message, stream.peer_addr()?); let mut output_buffer = vec![0_u8; (serialised_message.len() + 2).into()]; output_buffer[..2].copy_from_slice(&(serialised_message.len() as u16).to_be_bytes()); output_buffer[2..].copy_from_slice(serialised_message.as_slice()); - Ok(stream.write_all(&output_buffer)?) + + let start = Instant::now(); + let mut total_bytes_written:usize = 0; + + while start.elapsed() < SEND_TIMEOUT { + match stream.write(&output_buffer[total_bytes_written..]) { + Ok(bytes_written) => { + total_bytes_written += bytes_written; + if total_bytes_written == output_buffer.len() { + return Ok(()) + } + }, + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock || e.kind() == std::io::ErrorKind::TimedOut => { + //unable to write at the moment; keep trying until the full timeout is reached + }, + Err(e) => { + return Err(Box::new(e)); + }, + } + } + return Err(Box::new(simple_error::simple_error!("timed out while attempting to send status-message to {}", stream.peer_addr()?))); } /// receives the length-count of a pending message over a client-server communications stream diff --git a/src/stream/tcp.rs b/src/stream/tcp.rs index f43c44e..af38401 100644 --- a/src/stream/tcp.rs +++ b/src/stream/tcp.rs @@ -75,6 +75,7 @@ pub mod receiver { use mio::net::{TcpListener, TcpStream}; const POLL_TIMEOUT:Duration = Duration::from_millis(250); + const CONNECTION_TIMEOUT:Duration = Duration::from_secs(1); const RECEIVE_TIMEOUT:Duration = Duration::from_secs(3); pub struct TcpPortPool { @@ -192,7 +193,7 @@ pub mod receiver { let port = listener.local_addr()?.port(); - let mio_events = mio::Events::with_capacity(8); + let mio_events = mio::Events::with_capacity(1); let mio_poll = mio::Poll::new()?; let mio_token = mio::Token(port as usize); mio_poll.registry().register(&mut listener, mio_token, mio::Interest::READABLE)?; @@ -224,59 +225,62 @@ pub mod receiver { return Err(Box::new(simple_error::simple_error!("TCP listening for stream {} timed out", self.stream_idx))); } + let mut stream:Option = None; //assigned upon establishing a connection + self.mio_poll.poll(&mut self.mio_events, Some(POLL_TIMEOUT))?; for event in self.mio_events.iter() { if event.token() == mio_token { - loop { - match listener.accept() { - Ok((mut stream, address)) => { - log::debug!("received TCP stream {} connection from {}", self.stream_idx, address); - - let mio_token2 = mio::Token(0); - let mut mio_poll2 = mio::Poll::new()?; - mio_poll2.registry().register(&mut stream, mio_token2, mio::Interest::READABLE)?; - - let mut buffer = [0_u8; 16]; - let mut events2 = mio::Events::with_capacity(1); - mio_poll2.poll(&mut events2, Some(RECEIVE_TIMEOUT))?; - for event2 in events2.iter() { - match event2.token() { - _ => match stream.read(&mut buffer) { - Ok(_) => { - if buffer == self.test_definition.test_id { - log::debug!("validated TCP stream {} connection from {}", self.stream_idx, address); - - mio_poll2.registry().deregister(&mut stream)?; - self.mio_poll.registry().deregister(listener)?; - self.mio_poll.registry().register(&mut stream, mio_token, mio::Interest::READABLE)?; - - return Ok(stream); - } - }, - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock || e.kind() == std::io::ErrorKind::TimedOut => { //client didn't provide anything - let _ = stream.shutdown(std::net::Shutdown::Both); - }, - Err(e) => { - let _ = stream.shutdown(std::net::Shutdown::Both); - return Err(Box::new(e)); - }, - }, - } + match listener.accept() { + Ok((mut new_stream, address)) => { + log::debug!("received TCP stream {} connection from {}", self.stream_idx, address); + + //hand over flow to the new connection + self.mio_poll.registry().deregister(listener)?; + self.mio_poll.registry().register(&mut new_stream, mio_token, mio::Interest::READABLE)?; + stream = Some(new_stream); + break; + }, + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { //no pending connections available + break; + }, + Err(e) => { + return Err(Box::new(e)); + }, + } + } else { + log::warn!("got event for unbound token: {:?}", event); + } + } + + if stream.is_some() { + let mut unwrapped_stream = stream.unwrap(); + + //process the stream + let mut buffer = [0_u8; 16]; + self.mio_poll.poll(&mut self.mio_events, Some(CONNECTION_TIMEOUT))?; + for event in self.mio_events.iter() { + match event.token() { + _ => match unwrapped_stream.read(&mut buffer) { + Ok(_) => { + if buffer == self.test_definition.test_id { + log::debug!("validated TCP stream {} connection from {}", self.stream_idx, unwrapped_stream.peer_addr()?); + + return Ok(unwrapped_stream); } - log::warn!("could not validate TCP stream {} connection from {}", self.stream_idx, address); - mio_poll2.registry().deregister(&mut stream)?; }, - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { //no pending connections available - break; + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock || e.kind() == std::io::ErrorKind::TimedOut => { //client didn't provide anything + let _ = unwrapped_stream.shutdown(std::net::Shutdown::Both); }, Err(e) => { + let _ = unwrapped_stream.shutdown(std::net::Shutdown::Both); + self.mio_poll.registry().deregister(&mut unwrapped_stream)?; return Err(Box::new(e)); }, - } + }, } - } else { - log::warn!("got event for unbound token: {:?}", event); } + self.mio_poll.registry().deregister(&mut unwrapped_stream)?; + log::warn!("could not validate TCP stream {} connection from {}", self.stream_idx, unwrapped_stream.peer_addr()?); } } Err(Box::new(simple_error::simple_error!("did not establish a connection"))) @@ -332,6 +336,7 @@ pub mod receiver { let elapsed_time = start.elapsed(); if elapsed_time >= super::INTERVAL { + log::debug!("{} bytes received via TCP stream {} from {} in this interval; reporting...", bytes_received, self.stream_idx, peer_addr); return Some(Ok(Box::new(super::TcpReceiveResult{ timestamp: super::get_unix_timestamp(), @@ -343,7 +348,7 @@ pub mod receiver { }))) } }, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { //receive timeout + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock || e.kind() == std::io::ErrorKind::TimedOut => { //receive timeout break; }, Err(e) => { @@ -357,6 +362,7 @@ pub mod receiver { } } if bytes_received > 0 { + log::debug!("{} bytes received via TCP stream {} from {} in this interval; reporting...", bytes_received, self.stream_idx, peer_addr); Some(Ok(Box::new(super::TcpReceiveResult{ timestamp: super::get_unix_timestamp(), @@ -367,6 +373,7 @@ pub mod receiver { bytes_received: bytes_received, }))) } else { + log::debug!("no bytes received via TCP stream {} from {} in this interval", self.stream_idx, peer_addr); None } } @@ -538,6 +545,7 @@ pub mod sender { if elapsed_time >= super::INTERVAL { self.remaining_duration -= packet_start.elapsed().as_secs_f32(); + log::debug!("{} bytes sent via TCP stream {} to {} in this interval; reporting...", bytes_sent, self.stream_idx, peer_addr); return Some(Ok(Box::new(super::TcpSendResult{ timestamp: super::get_unix_timestamp(), @@ -577,6 +585,7 @@ pub mod sender { self.remaining_duration -= packet_start.elapsed().as_secs_f32(); } if bytes_sent > 0 { + log::debug!("{} bytes sent via TCP stream {} to {} in this interval; reporting...", bytes_sent, self.stream_idx, peer_addr); Some(Ok(Box::new(super::TcpSendResult{ timestamp: super::get_unix_timestamp(), @@ -588,6 +597,7 @@ pub mod sender { sends_blocked: sends_blocked, }))) } else { + log::debug!("no bytes sent via TCP stream {} to {} in this interval; shutting down...", self.stream_idx, peer_addr); //indicate that the test is over by dropping the stream self.stream = None; None diff --git a/src/stream/udp.rs b/src/stream/udp.rs index bbe461c..83aa08f 100644 --- a/src/stream/udp.rs +++ b/src/stream/udp.rs @@ -63,13 +63,12 @@ impl UdpTestDefinition { pub mod receiver { use std::convert::TryInto; - use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; use std::sync::{Mutex}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use chrono::{NaiveDateTime}; - use std::net::UdpSocket; cfg_if::cfg_if! { if #[cfg(unix)] { use socket2::SockRef; @@ -366,6 +365,7 @@ pub mod receiver { let elapsed_time = start.elapsed(); if elapsed_time >= super::INTERVAL { + log::debug!("{} bytes received via UDP stream {} from {} in this interval; reporting...", bytes_received, self.stream_idx, peer_addr); return Some(Ok(Box::new(super::UdpReceiveResult{ timestamp: super::get_unix_timestamp(), @@ -388,7 +388,7 @@ pub mod receiver { continue; } }, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { //receive timeout + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock || e.kind() == std::io::ErrorKind::TimedOut => { //receive timeout break; }, Err(e) => { @@ -398,6 +398,7 @@ pub mod receiver { } } if bytes_received > 0 { + log::debug!("{} bytes received via UDP stream {} in this interval; reporting...", bytes_received, self.stream_idx); Some(Ok(Box::new(super::UdpReceiveResult{ timestamp: super::get_unix_timestamp(), @@ -415,6 +416,7 @@ pub mod receiver { jitter_seconds: history.longest_jitter_seconds, }))) } else { + log::debug!("no bytes received via UDP stream {} in this interval", self.stream_idx); None } } @@ -557,6 +559,7 @@ pub mod sender { if elapsed_time >= super::INTERVAL { self.remaining_duration -= packet_start.elapsed().as_secs_f32(); + log::debug!("{} bytes sent via UDP stream {} in this interval; reporting...", bytes_sent, self.stream_idx); return Some(Ok(Box::new(super::UdpSendResult{ timestamp: super::get_unix_timestamp(), @@ -597,6 +600,7 @@ pub mod sender { self.remaining_duration -= packet_start.elapsed().as_secs_f32(); } if bytes_sent > 0 { + log::debug!("{} bytes sent via UDP stream {} in this interval; reporting...", bytes_sent, self.stream_idx); Some(Ok(Box::new(super::UdpSendResult{ timestamp: super::get_unix_timestamp(), @@ -609,6 +613,7 @@ pub mod sender { sends_blocked: sends_blocked, }))) } else { + log::debug!("no bytes sent via UDP stream {} in this interval; shutting down...", self.stream_idx); //indicate that the test is over by sending the test ID by itself let mut remaining_announcements = 5; while remaining_announcements > 0 { //do it a few times in case of loss From cf01ff4e783cb4686ba543b203b879beb5834a69 Mon Sep 17 00:00:00 2001 From: Neil Tallim Date: Fri, 8 Sep 2023 00:46:37 -0600 Subject: [PATCH 7/7] Windows should now be able to handle TCP reception --- src/stream/tcp.rs | 90 +++++++++++++++++++++++++---------------------- src/stream/udp.rs | 28 +-------------- 2 files changed, 48 insertions(+), 70 deletions(-) diff --git a/src/stream/tcp.rs b/src/stream/tcp.rs index af38401..54a7328 100644 --- a/src/stream/tcp.rs +++ b/src/stream/tcp.rs @@ -212,7 +212,7 @@ pub mod receiver { }) } - fn process_connection(&mut self) -> super::BoxResult { + fn process_connection(&mut self) -> super::BoxResult<(TcpStream, u64, f32)> { log::debug!("preparing to receive TCP stream {} connection...", self.stream_idx); let listener = self.listener.as_mut().unwrap(); @@ -256,29 +256,49 @@ pub mod receiver { let mut unwrapped_stream = stream.unwrap(); //process the stream - let mut buffer = [0_u8; 16]; + let mut buf = vec![0_u8; self.test_definition.length]; + let mut validated:bool = false; + let mut bytes_received:u64 = 0; + + let start_validation = Instant::now(); + self.mio_poll.poll(&mut self.mio_events, Some(CONNECTION_TIMEOUT))?; for event in self.mio_events.iter() { - match event.token() { - _ => match unwrapped_stream.read(&mut buffer) { - Ok(_) => { - if buffer == self.test_definition.test_id { - log::debug!("validated TCP stream {} connection from {}", self.stream_idx, unwrapped_stream.peer_addr()?); + if event.token() == mio_token { + loop { + match unwrapped_stream.read(&mut buf) { + Ok(packet_size) => { + if !validated { + if buf[..16] == self.test_definition.test_id { + log::debug!("validated TCP stream {} connection from {}", self.stream_idx, unwrapped_stream.peer_addr()?); + validated = true; + } else { + log::warn!("unexpected ID in stream {} connection from {}", self.stream_idx, unwrapped_stream.peer_addr()?); + break; + } + } - return Ok(unwrapped_stream); - } - }, - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock || e.kind() == std::io::ErrorKind::TimedOut => { //client didn't provide anything - let _ = unwrapped_stream.shutdown(std::net::Shutdown::Both); - }, - Err(e) => { - let _ = unwrapped_stream.shutdown(std::net::Shutdown::Both); - self.mio_poll.registry().deregister(&mut unwrapped_stream)?; - return Err(Box::new(e)); - }, - }, + if validated { + bytes_received += packet_size as u64; + } + }, + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock || e.kind() == std::io::ErrorKind::TimedOut => { //end of input reached + break; + }, + Err(e) => { + let _ = unwrapped_stream.shutdown(std::net::Shutdown::Both); + self.mio_poll.registry().deregister(&mut unwrapped_stream)?; + return Err(Box::new(e)); + }, + } + } } } + if validated { + return Ok((unwrapped_stream, bytes_received, start_validation.elapsed().as_secs_f32())); + } + + let _ = unwrapped_stream.shutdown(std::net::Shutdown::Both); self.mio_poll.registry().deregister(&mut unwrapped_stream)?; log::warn!("could not validate TCP stream {} connection from {}", self.stream_idx, unwrapped_stream.peer_addr()?); } @@ -290,12 +310,14 @@ pub mod receiver { fn run_interval(&mut self) -> Option>> { let mut bytes_received:u64 = 0; + let mut additional_time_elapsed:f32 = 0.0; if self.stream.is_none() { //if still in the setup phase, receive the sender match self.process_connection() { - Ok(stream) => { + Ok((stream, bytes_received_in_validation, time_spent_in_validation)) => { self.stream = Some(stream); - //NOTE: the connection process consumes the test-header; account for those bytes - bytes_received += super::TEST_HEADER_SIZE as u64; + //NOTE: the connection process consumes packets; account for those bytes + bytes_received += bytes_received_in_validation; + additional_time_elapsed += time_spent_in_validation; }, Err(e) => { return Some(Err(e)); @@ -314,16 +336,12 @@ pub mod receiver { }; let start = Instant::now(); - while self.active { - if start.elapsed() >= RECEIVE_TIMEOUT { - return Some(Err(Box::new(simple_error::simple_error!("TCP reception for stream {} from {} timed out", self.stream_idx, peer_addr)))); - } - + while self.active && start.elapsed() < super::INTERVAL { log::trace!("awaiting TCP stream {} from {}...", self.stream_idx, peer_addr); self.mio_poll.poll(&mut self.mio_events, Some(POLL_TIMEOUT)).ok()?; for event in self.mio_events.iter() { if event.token() == mio_token { - loop { + loop { match stream.read(&mut buf) { Ok(packet_size) => { log::trace!("received {} bytes in TCP stream {} from {}", packet_size, self.stream_idx, peer_addr); @@ -333,20 +351,6 @@ pub mod receiver { } bytes_received += packet_size as u64; - - let elapsed_time = start.elapsed(); - if elapsed_time >= super::INTERVAL { - log::debug!("{} bytes received via TCP stream {} from {} in this interval; reporting...", bytes_received, self.stream_idx, peer_addr); - return Some(Ok(Box::new(super::TcpReceiveResult{ - timestamp: super::get_unix_timestamp(), - - stream_idx: self.stream_idx, - - duration: elapsed_time.as_secs_f32(), - - bytes_received: bytes_received, - }))) - } }, Err(e) if e.kind() == std::io::ErrorKind::WouldBlock || e.kind() == std::io::ErrorKind::TimedOut => { //receive timeout break; @@ -368,7 +372,7 @@ pub mod receiver { stream_idx: self.stream_idx, - duration: start.elapsed().as_secs_f32(), + duration: start.elapsed().as_secs_f32() + additional_time_elapsed, bytes_received: bytes_received, }))) diff --git a/src/stream/udp.rs b/src/stream/udp.rs index 83aa08f..b4cfc5c 100644 --- a/src/stream/udp.rs +++ b/src/stream/udp.rs @@ -76,7 +76,6 @@ pub mod receiver { } const READ_TIMEOUT:Duration = Duration::from_millis(50); - const RECEIVE_TIMEOUT:Duration = Duration::from_secs(3); pub struct UdpPortPool { pub ports_ip4: Vec, @@ -338,11 +337,7 @@ pub mod receiver { let start = Instant::now(); - while self.active { - if start.elapsed() >= RECEIVE_TIMEOUT { - return Some(Err(Box::new(simple_error::simple_error!("UDP reception for stream {} timed out, likely because the end-signal was lost", self.stream_idx)))); - } - + while self.active && start.elapsed() < super::INTERVAL { log::trace!("awaiting UDP packets on stream {}...", self.stream_idx); loop { match self.socket.recv_from(&mut buf) { @@ -362,27 +357,6 @@ pub mod receiver { if self.process_packet(&buf, &mut history) { //NOTE: duplicate packets increase this count; this is intentional because the stack still processed data bytes_received += packet_size as u64 + super::UDP_HEADER_SIZE as u64; - - let elapsed_time = start.elapsed(); - if elapsed_time >= super::INTERVAL { - log::debug!("{} bytes received via UDP stream {} from {} in this interval; reporting...", bytes_received, self.stream_idx, peer_addr); - return Some(Ok(Box::new(super::UdpReceiveResult{ - timestamp: super::get_unix_timestamp(), - - stream_idx: self.stream_idx, - - duration: elapsed_time.as_secs_f32(), - - bytes_received: bytes_received, - packets_received: history.packets_received, - packets_lost: history.packets_lost, - packets_out_of_order: history.packets_out_of_order, - packets_duplicated: history.packets_duplicated, - - unbroken_sequence: history.longest_unbroken_sequence, - jitter_seconds: history.longest_jitter_seconds, - }))) - } } else { log::warn!("received packet unrelated to UDP stream {} from {}", self.stream_idx, peer_addr); continue;