Merge pull request #292 from matt-kimball/cygwin-signals

Rework Cygwin mtr-packet to respond to signals (such as SIGTERM)
This commit is contained in:
Roger Wolff 2019-03-03 17:12:50 +01:00 committed by GitHub
commit 60e5c5c8c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 560 additions and 428 deletions

View File

@ -113,7 +113,6 @@ endif
if CYGWIN
mtr_packet_SOURCES += \
packet/command_cygwin.c packet/command_cygwin.h \
packet/probe_cygwin.c packet/probe_cygwin.h \
packet/wait_cygwin.c
mtr_packet_LDADD += -lcygwin -liphlpapi -lws2_32
@ -148,7 +147,6 @@ else # if CYGWIN
check_PROGRAMS = mtr-packet-listen
mtr_packet_SOURCES += \
packet/command_unix.c packet/command_unix.h \
packet/construct_unix.c packet/construct_unix.h \
packet/deconstruct_unix.c packet/deconstruct_unix.h \
packet/probe_unix.c packet/probe_unix.h \

View File

@ -20,11 +20,18 @@
#include <assert.h>
#include <errno.h>
#ifdef HAVE_ERROR_H
#include <error.h>
#else
#include "portability/error.h"
#endif
#include <fcntl.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <unistd.h>
#include "cmdparse.h"
#include "platform.h"
@ -424,3 +431,63 @@ void dispatch_buffer_commands(
buffer->incoming_read_position = 0;
}
}
/*
Initialize the command buffer and put the command stream in
non-blocking mode.
*/
void init_command_buffer(
struct command_buffer_t *command_buffer,
int command_stream)
{
int flags;
memset(command_buffer, 0, sizeof(struct command_buffer_t));
command_buffer->command_stream = command_stream;
/* Get the current command stream flags */
flags = fcntl(command_stream, F_GETFL, 0);
if (flags == -1) {
error(EXIT_FAILURE, errno, "Unexpected command stream error");
}
/* Set the O_NONBLOCK bit */
if (fcntl(command_stream, F_SETFL, flags | O_NONBLOCK)) {
error(EXIT_FAILURE, errno, "Unexpected command stream error");
}
}
/* Read currently available data from the command stream */
int read_commands(
struct command_buffer_t *buffer)
{
int space_remaining =
COMMAND_BUFFER_SIZE - buffer->incoming_read_position - 1;
char *read_position =
&buffer->incoming_buffer[buffer->incoming_read_position];
int read_count;
int command_stream = buffer->command_stream;
read_count = read(command_stream, read_position, space_remaining);
/* If the command stream has been closed, read will return zero. */
if (read_count == 0) {
errno = EPIPE;
return -1;
}
if (read_count > 0) {
/* Account for the newly read data */
buffer->incoming_read_position += read_count;
}
if (read_count < 0) {
/* EAGAIN simply means there is no available data to read */
/* EINTR indicates we received a signal during read */
if (errno != EINTR && errno != EAGAIN) {
error(EXIT_FAILURE, errno, "Unexpected command buffer read error");
}
}
return 0;
}

View File

@ -19,17 +19,10 @@
#ifndef COMMAND_H
#define COMMAND_H
#include "platform.h"
#include "probe.h"
#define COMMAND_BUFFER_SIZE 4096
#ifdef PLATFORM_CYGWIN
#include "command_cygwin.h"
#else
#include "command_unix.h"
#endif
/* Storage for incoming commands, prior to command parsing */
struct command_buffer_t {
/* The file descriptor of the incoming command stream */
@ -40,9 +33,6 @@ struct command_buffer_t {
/* The number of bytes read so far in incoming_buffer */
int incoming_read_position;
/* Platform specific */
struct command_buffer_platform_t platform;
};
void init_command_buffer(

View File

@ -1,153 +0,0 @@
/*
mtr -- a network diagnostic tool
Copyright (C) 2016 Matt Kimball
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License version 2 as
published by the Free Software Foundation.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "command.h"
#include <errno.h>
#include <io.h>
#include <stdio.h>
/*
A completion routine to be called by Windows when a read from
the command stream has completed.
*/
static
void CALLBACK finish_read_command(
DWORD status,
DWORD size_read,
OVERLAPPED * overlapped)
{
struct command_buffer_t *buffer;
char *read_position;
/*
hEvent is unusuaed by ReadFileEx, so we use it to pass
our command_buffer structure.
*/
buffer = (struct command_buffer_t *) overlapped->hEvent;
if (status) {
/* When the stream is closed ERROR_BROKEN_PIPE will be the result */
if (status == ERROR_BROKEN_PIPE) {
buffer->platform.pipe_open = false;
return;
}
fprintf(stderr, "ReadFileEx completion failure %d\n", status);
exit(EXIT_FAILURE);
}
/* Copy from the overlapped I/O buffer to the incoming command buffer */
read_position =
&buffer->incoming_buffer[buffer->incoming_read_position];
memcpy(read_position, buffer->platform.overlapped_buffer, size_read);
/* Account for the newly read data */
buffer->incoming_read_position += size_read;
buffer->platform.read_active = false;
}
/*
An APC which does nothing, to be used only to wake from the current
alertable wait.
*/
static
void CALLBACK empty_apc(
ULONG * param)
{
}
/* Wake from the next alertable wait without waiting for newly read data */
static
void queue_empty_apc(
void)
{
if (QueueUserAPC((PAPCFUNC) empty_apc, GetCurrentThread(), 0) == 0) {
fprintf(stderr, "Unexpected QueueUserAPC failure %d\n",
GetLastError());
exit(EXIT_FAILURE);
}
}
/* Start a new overlapped I/O read from the command stream */
void start_read_command(
struct command_buffer_t *buffer)
{
HANDLE command_stream = (HANDLE) get_osfhandle(buffer->command_stream);
int space_remaining =
COMMAND_BUFFER_SIZE - buffer->incoming_read_position - 1;
int err;
/* If a read is already active, or the pipe is closed, do nothing */
if (!buffer->platform.pipe_open || buffer->platform.read_active) {
return;
}
memset(&buffer->platform.overlapped, 0, sizeof(OVERLAPPED));
buffer->platform.overlapped.hEvent = (HANDLE) buffer;
if (!ReadFileEx
(command_stream, buffer->platform.overlapped_buffer,
space_remaining, &buffer->platform.overlapped,
finish_read_command)) {
err = GetLastError();
if (err == ERROR_BROKEN_PIPE) {
/* If the command stream has been closed, we need to wake from
the next altertable wait to exit the main loop */
buffer->platform.pipe_open = false;
queue_empty_apc();
return;
} else if (err != WAIT_IO_COMPLETION) {
fprintf(stderr, "Unexpected ReadFileEx failure %d\n",
GetLastError());
exit(EXIT_FAILURE);
}
}
/* Remember that we have started an overlapped read already */
buffer->platform.read_active = true;
}
/* Initialize the command buffer, and start the first overlapped read */
void init_command_buffer(
struct command_buffer_t *command_buffer,
int command_stream)
{
memset(command_buffer, 0, sizeof(struct command_buffer_t));
command_buffer->command_stream = command_stream;
command_buffer->platform.pipe_open = true;
}
/*
Return with errno EPIPE if the command stream has been closed.
Otherwise, not much to do for Cygwin, since we are using Overlapped I/O
to read commands.
*/
int read_commands(
struct command_buffer_t *buffer)
{
if (!buffer->platform.pipe_open) {
errno = EPIPE;
return -1;
}
return 0;
}

View File

@ -1,52 +0,0 @@
/*
mtr -- a network diagnostic tool
Copyright (C) 2016 Matt Kimball
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License version 2 as
published by the Free Software Foundation.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef COMMAND_CYGWIN_H
#define COMMAND_CYGWIN_H
/*
Though Cygwin supports the usual Unix non-blocking reads on
the command stream, we've got to use Overlapped I/O instead because
ICMP.DLL's support for sending probes requires Overlapped I/O
and alertable waits for notification of replies. Since we need
alertable waits, we can't use Cygwin's select to determine when
command stream data is available, but Overlapped I/O completion
will work.
*/
/* Overlapped I/O manament for Windows command buffer reads */
struct command_buffer_platform_t {
/* true if an overlapped I/O read is active */
bool read_active;
/* true if the command pipe is still open */
bool pipe_open;
/* Windows OVERLAPPED I/O data */
OVERLAPPED overlapped;
/* The buffer which active OVERLAPPED reads read into */
char overlapped_buffer[COMMAND_BUFFER_SIZE];
};
struct command_buffer_t;
void start_read_command(
struct command_buffer_t *buffer);
#endif

View File

@ -1,91 +0,0 @@
/*
mtr -- a network diagnostic tool
Copyright (C) 2016 Matt Kimball
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License version 2 as
published by the Free Software Foundation.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "command.h"
#include <errno.h>
#ifdef HAVE_ERROR_H
#include <error.h>
#else
#include "portability/error.h"
#endif
#include <fcntl.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
/*
Initialize the command buffer and put the command stream in
non-blocking mode.
*/
void init_command_buffer(
struct command_buffer_t *command_buffer,
int command_stream)
{
int flags;
memset(command_buffer, 0, sizeof(struct command_buffer_t));
command_buffer->command_stream = command_stream;
/* Get the current command stream flags */
flags = fcntl(command_stream, F_GETFL, 0);
if (flags == -1) {
error(EXIT_FAILURE, errno, "Unexpected command stream error");
}
/* Set the O_NONBLOCK bit */
if (fcntl(command_stream, F_SETFL, flags | O_NONBLOCK)) {
error(EXIT_FAILURE, errno, "Unexpected command stream error");
}
}
/* Read currently available data from the command stream */
int read_commands(
struct command_buffer_t *buffer)
{
int space_remaining =
COMMAND_BUFFER_SIZE - buffer->incoming_read_position - 1;
char *read_position =
&buffer->incoming_buffer[buffer->incoming_read_position];
int read_count;
int command_stream = buffer->command_stream;
read_count = read(command_stream, read_position, space_remaining);
/* If the command stream has been closed, read will return zero. */
if (read_count == 0) {
errno = EPIPE;
return -1;
}
if (read_count > 0) {
/* Account for the newly read data */
buffer->incoming_read_position += read_count;
}
if (read_count < 0) {
/* EAGAIN simply means there is no available data to read */
/* EINTR indicates we received a signal during read */
if (errno != EINTR && errno != EAGAIN) {
error(EXIT_FAILURE, errno, "Unexpected command buffer read error");
}
}
return 0;
}

View File

@ -1,26 +0,0 @@
/*
mtr -- a network diagnostic tool
Copyright (C) 2016 Matt Kimball
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License version 2 as
published by the Free Software Foundation.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef COMMAND_UNIX_H
#define COMMAND_UNIX_H
/* No platform specific data is required for Unix command streams */
struct command_buffer_platform_t {
};
#endif

View File

@ -18,22 +18,98 @@
#include "probe.h"
#include <assert.h>
#include <errno.h>
#include <error.h>
#include <fcntl.h>
#include <io.h>
#include <stdio.h>
#include <unistd.h>
#include <winternl.h>
#include "protocols.h"
/*
Implementation notes (or "Why this uses a worker thread")
Having done my time debugging various race conditions over the
last twenty-plus years as a software developer, both of my own
creation and discovered in the code of others, I almost always
try to structure my code to be single-threaded. However,
I think in this case, the ICMP service thread is unavoidable.
I would have liked to avoid multithreading entirely, but here are
the constraints:
a) mtr was originally a Unix program which used "raw sockets".
b) In order to port mtr to Windows, Cygwin is used to get a
Unix-like environment.
c) You can't use a raw socket to receive an ICMP reply on Windows.
However, Windows provides a separate API in the form of
ICMP.DLL for sending and receiving ICMP messages.
d) The ICMP API works asynchronously, and requires completion
through an asynchronous procedure call ("APC")
e) APCs are only delivered during blocking Win32 operations
which are flagged as "alertable." This prevents apps from
having APCs execute unexpectedly during an I/O operation.
f) Cygwin's implementation of POSIX functions does all I/O
through non-alertable I/O operations. This is reasonable
because APCs don't exist in the POSIX API.
g) Cygwin implements Unix-style signals at the application level,
since the Windows kernel doesn't have them. We want our
program to respond to SIGTERM and SIGKILL, at least.
h) Cygwin's signal implementation will deliver signals during
blocking I/O functions in the Cygwin library, but won't
respond to signals if the signal is sent while the application
is in a blocking Windows API call which Cygwin is not aware of.
i) Since we want to both send/receive ICMP probes and also respond
to Unix-style signals, we require two threads: one which
uses Cygwin's POSIX style blocking I/O and can respond to
signals, and one which uses alertable waits using Win32
blocking APIs.
The solution is to have the main thread using select() as the
blocking operation in its loop, and also to have an ICMP service
thread using WaitForSingleObjectEx() as its blocking operation.
The main thread will respond to signals. The ICMP service thread
will run the APCs completing ICMP.DLL requests.
These two threads communicate through a pair of pipes. One pipe
sends requests from the main thread to the ICMP service thread,
and another pipe sends the requests back as they complete.
We use the Cygwin pipe() to create the pipes, but in the ICMP
service thread we use the Win32 HANDLE that corresponds to the
recieving end of the input pipe to wait for ICMP requests.
*/
static DWORD WINAPI icmp_service_thread(LPVOID param);
/* Windows doesn't require any initialization at a privileged level */
void init_net_state_privileged(
struct net_state_t *net_state)
{
}
/* Open the ICMP.DLL interface */
/*
Convienience similar to error(), but for reporting Windows
error codes instead of errno codes.
*/
void error_win(int exit_code, int win_error, const char *str) {
fprintf(stderr, "%s (code %d)\n", str, win_error);
exit(exit_code);
}
/* Open the ICMP.DLL interface and start the ICMP service thread */
void init_net_state(
struct net_state_t *net_state)
{
HANDLE thread;
int in_pipe[2], out_pipe[2];
int err;
memset(net_state, 0, sizeof(struct net_state_t));
net_state->platform.icmp4 = IcmpCreateFile();
@ -41,11 +117,48 @@ void init_net_state(
if (net_state->platform.icmp4 == INVALID_HANDLE_VALUE
&& net_state->platform.icmp6 == INVALID_HANDLE_VALUE) {
fprintf(stderr, "Failure opening ICMP %d\n", GetLastError());
exit(EXIT_FAILURE);
error_win(EXIT_FAILURE, GetLastError(), "Failure opening ICMP");
}
net_state->platform.ip4_socket_raw = false;
net_state->platform.ip6_socket_raw = false;
/*
We need a pipe for communication with the ICMP thread
in each direction.
*/
if (pipe(in_pipe) == -1 || pipe(out_pipe) == -1) {
error(EXIT_FAILURE, errno, "Failure creating thread pipe");
}
net_state->platform.thread_in_pipe_read = in_pipe[0];
net_state->platform.thread_in_pipe_write = in_pipe[1];
net_state->platform.thread_out_pipe_read = out_pipe[0];
net_state->platform.thread_out_pipe_write = out_pipe[1];
net_state->platform.thread_in_pipe_read_handle =
(HANDLE)get_osfhandle(in_pipe[0]);
/*
The read on the out pipe needs to be nonblocking because
it will be occasionally checked in the main thread.
*/
err = fcntl(out_pipe[0], F_SETFL, O_NONBLOCK);
if (err == -1) {
error(
EXIT_FAILURE, errno,
"Failure setting pipe to non-blocking");
}
/* Spin up the ICMP service thread */
thread = CreateThread(
NULL, 0, icmp_service_thread, net_state, 0, NULL);
if (thread == NULL) {
error_win(
EXIT_FAILURE, GetLastError(),
"Failure creating ICMP service thread");
}
}
/*
@ -89,10 +202,6 @@ void platform_alloc_probe(
void platform_free_probe(
struct probe_t *probe)
{
if (probe->platform.reply4) {
free(probe->platform.reply4);
probe->platform.reply4 = NULL;
}
}
/* Report a windows error code using a platform-independent error string */
@ -113,6 +222,28 @@ void report_win_error(
}
}
/*
After we have the result of an ICMP probe on the ICMP service
thread, this is used to send the result back to the main thread
for probe result reporting.
*/
static
void queue_thread_result(struct icmp_thread_request_t *request)
{
int byte_count;
/* Pass ownership of the request back through the result pipe */
byte_count = write(
request->net_state->platform.thread_out_pipe_write,
&request,
sizeof(struct icmp_thread_request_t *));
if (byte_count == -1) {
error(
EXIT_FAILURE, errno,
"failure writing to probe result queue");
}
}
/*
The overlapped I/O style completion routine to be called by
Windows during an altertable wait when an ICMP probe has
@ -124,8 +255,8 @@ void WINAPI on_icmp_reply(
PIO_STATUS_BLOCK status,
ULONG reserved)
{
struct probe_t *probe = (struct probe_t *) context;
struct net_state_t *net_state = probe->platform.net_state;
struct icmp_thread_request_t *request =
(struct icmp_thread_request_t *) context;
int icmp_type;
int round_trip_us = 0;
int reply_count;
@ -136,8 +267,8 @@ void WINAPI on_icmp_reply(
ICMP_ECHO_REPLY *reply4;
ICMPV6_ECHO_REPLY *reply6;
if (probe->platform.ip_version == 6) {
reply6 = probe->platform.reply6;
if (request->ip_version == 6) {
reply6 = request->reply6;
reply_count = Icmp6ParseReplies(reply6, sizeof(ICMPV6_ECHO_REPLY));
if (reply_count > 0) {
@ -155,7 +286,7 @@ void WINAPI on_icmp_reply(
remote_addr6->sin6_scope_id = 0;
}
} else {
reply4 = probe->platform.reply4;
reply4 = request->reply4;
reply_count = IcmpParseReplies(reply4, sizeof(ICMP_ECHO_REPLY));
if (reply_count > 0) {
@ -194,24 +325,17 @@ void WINAPI on_icmp_reply(
icmp_type = ICMP_DEST_UNREACH;
}
if (icmp_type != -1) {
/* Record probe result */
respond_to_probe(net_state, probe, icmp_type,
&remote_addr, round_trip_us, 0, NULL);
} else {
report_win_error(probe->token, reply_status);
free_probe(net_state, probe);
}
request->icmp_type = icmp_type;
request->reply_status = reply_status;
request->remote_addr = remote_addr;
request->round_trip_us = round_trip_us;
queue_thread_result(request);
}
/* Use ICMP.DLL's send echo support to send a probe */
static
void icmp_send_probe(
struct net_state_t *net_state,
struct probe_t *probe,
const struct probe_param_t *param,
struct sockaddr_storage *src_sockaddr,
struct sockaddr_storage *dest_sockaddr,
struct icmp_thread_request_t *request,
char *payload,
int payload_size)
{
@ -224,8 +348,8 @@ void icmp_send_probe(
struct sockaddr_in6 *src_sockaddr6;
struct sockaddr_in6 *dest_sockaddr6;
if (param->timeout > 0) {
timeout = 1000 * param->timeout;
if (request->timeout > 0) {
timeout = 1000 * request->timeout;
} else {
/*
IcmpSendEcho2 will return invalid argument on a timeout of
@ -236,51 +360,56 @@ void icmp_send_probe(
}
memset(&option, 0, sizeof(IP_OPTION_INFORMATION));
option.Ttl = param->ttl;
option.Ttl = request->ttl;
if (param->ip_version == 6) {
if (request->ip_version == 6) {
reply_size = sizeof(ICMPV6_ECHO_REPLY) + payload_size;
} else {
reply_size = sizeof(ICMP_ECHO_REPLY) + payload_size;
}
probe->platform.reply4 = malloc(reply_size);
if (probe->platform.reply4 == NULL) {
perror("failure to allocate reply buffer");
exit(EXIT_FAILURE);
request->reply4 = malloc(reply_size);
if (request->reply4 == NULL) {
error(EXIT_FAILURE, errno, "failure to allocate reply buffer");
}
if (param->ip_version == 6) {
src_sockaddr6 = (struct sockaddr_in6 *) src_sockaddr;
dest_sockaddr6 = (struct sockaddr_in6 *) dest_sockaddr;
if (request->ip_version == 6) {
src_sockaddr6 = (struct sockaddr_in6 *) &request->src_sockaddr;
dest_sockaddr6 = (struct sockaddr_in6 *) &request->dest_sockaddr;
send_result = Icmp6SendEcho2(net_state->platform.icmp6, NULL,
(FARPROC) on_icmp_reply, probe,
send_result = Icmp6SendEcho2(request->net_state->platform.icmp6,
NULL,
(FARPROC) on_icmp_reply,
request,
src_sockaddr6, dest_sockaddr6,
payload, payload_size, &option,
probe->platform.reply6, reply_size,
timeout);
request->reply6,
reply_size, timeout);
} else {
dest_sockaddr4 = (struct sockaddr_in *) dest_sockaddr;
dest_sockaddr4 = (struct sockaddr_in *) &request->dest_sockaddr;
send_result = IcmpSendEcho2(net_state->platform.icmp4, NULL,
(FARPROC) on_icmp_reply, probe,
send_result = IcmpSendEcho2(request->net_state->platform.icmp4,
NULL,
(FARPROC) on_icmp_reply,
request,
dest_sockaddr4->sin_addr.s_addr,
payload, payload_size, &option,
probe->platform.reply4, reply_size,
timeout);
request->reply4,
reply_size, timeout);
}
if (send_result == 0) {
err = GetLastError();
/*
ERROR_IO_PENDING is expected for asynchronous probes,
but any other error is unexpected.
ERROR_IO_PENDING is expected when the probe is sent.
Other errors indicate the probe wasn't sent, and should
be reported in the main thread.
*/
if (err != ERROR_IO_PENDING) {
report_win_error(probe->token, err);
free_probe(net_state, probe);
request->icmp_type = -1;
request->reply_status = err;
queue_thread_result(request);
}
}
}
@ -288,24 +417,24 @@ void icmp_send_probe(
/* Fill the payload of the packet as specified by the probe parameters */
static
int fill_payload(
const struct probe_param_t *param,
const struct icmp_thread_request_t *request,
char *payload,
int payload_buffer_size)
{
int ip_icmp_size;
int payload_size;
if (param->ip_version == 6) {
if (request->ip_version == 6) {
ip_icmp_size =
sizeof(struct IP6Header) + sizeof(struct ICMPHeader);
} else if (param->ip_version == 4) {
} else if (request->ip_version == 4) {
ip_icmp_size = sizeof(struct IPHeader) + sizeof(struct ICMPHeader);
} else {
errno = EINVAL;
return -1;
}
payload_size = param->packet_size - ip_icmp_size;
payload_size = request->packet_size - ip_icmp_size;
if (payload_size < 0) {
payload_size = 0;
}
@ -315,11 +444,184 @@ int fill_payload(
return -1;
}
memset(payload, param->bit_pattern, payload_size);
memset(payload, request->bit_pattern, payload_size);
return payload_size;
}
/*
We've received a probe request from the main thread, so
fill out a payload buffer and then send the probe.
*/
static
void icmp_handle_probe_request(struct icmp_thread_request_t *request)
{
char payload[PACKET_BUFFER_SIZE];
int payload_size;
payload_size = fill_payload(request, payload, PACKET_BUFFER_SIZE);
if (payload_size < 0) {
error(EXIT_FAILURE, errno, "Error constructing packet");
}
icmp_send_probe(request, payload, payload_size);
}
/*
The main loop of the ICMP service thread. The loop starts
an overlapped read on the incoming request pipe, then waits
in an alertable wait for that read to complete. Because
the wait is alertable, ICMP probes can complete through
APCs in that wait.
*/
static
DWORD WINAPI icmp_service_thread(LPVOID param) {
struct net_state_t *net_state;
struct icmp_thread_request_t *request;
DWORD wait_status;
OVERLAPPED overlapped;
HANDLE event;
BOOL success;
bool read_pending;
int read_count;
int err;
/*
We need an event to signal completion of reads from the request
pipe.
*/
event = CreateEvent(NULL, TRUE, FALSE, NULL);
if (event == NULL) {
error_win(
EXIT_FAILURE, GetLastError(),
"failure creating ICMP thread event");
}
net_state = (struct net_state_t *)param;
read_pending = false;
while (true) {
/*
Start a new read on the request pipe if none is
currently pending.
*/
if (!read_pending) {
request = NULL;
ResetEvent(event);
memset(&overlapped, 0, sizeof(OVERLAPPED));
overlapped.hEvent = event;
success = ReadFile(
net_state->platform.thread_in_pipe_read_handle,
&request,
sizeof(struct icmp_thread_request_t *),
NULL,
&overlapped);
if (!success) {
err = GetLastError();
if (err != ERROR_IO_PENDING) {
error_win(
EXIT_FAILURE, err,
"failure starting overlapped thread pipe read");
}
}
read_pending = true;
}
/*
Wait for either the request read to complete, or
an APC which completes an ICMP probe.
*/
wait_status = WaitForSingleObjectEx(
event,
INFINITE,
TRUE);
/*
If the event we waited on has been signalled, read
the request from the pipe.
*/
if (wait_status == WAIT_OBJECT_0) {
read_pending = false;
success = GetOverlappedResult(
net_state->platform.thread_in_pipe_read_handle,
&overlapped,
&read_count,
FALSE);
if (!success) {
error_win(
EXIT_FAILURE, GetLastError(),
"failure completing overlapped thread pipe read");
}
if (read_count == 0) {
continue;
}
assert(
read_count == sizeof(struct icmp_thread_request_t *));
/* Start the new probe from the request */
icmp_handle_probe_request(request);
}
}
}
/*
When we are on the main thread and need the ICMP service thread
to start a new probe, this is used to pass the request for the
new probe to the service thread.
*/
static
void queue_thread_request(
struct net_state_t *net_state,
struct probe_t *probe,
const struct probe_param_t *param,
struct sockaddr_storage *dest_sockaddr,
struct sockaddr_storage *src_sockaddr)
{
struct icmp_thread_request_t *request;
int byte_count;
request = malloc(sizeof(struct icmp_thread_request_t));
if (request == NULL) {
error(EXIT_FAILURE, errno, "failure to allocate request");
}
memset(request, 0, sizeof(struct icmp_thread_request_t));
request->ip_version = param->ip_version;
request->ttl = param->ttl;
request->timeout = param->timeout;
request->packet_size = param->packet_size;
request->bit_pattern = param->bit_pattern;
request->net_state = net_state;
request->probe = probe;
request->dest_sockaddr = *dest_sockaddr;
request->src_sockaddr = *src_sockaddr;
/*
The ownership of the request is passed to the ICMP thread
through the pipe.
*/
byte_count = write(
net_state->platform.thread_in_pipe_write,
&request,
sizeof(struct icmp_thread_request_t *));
if (byte_count == -1) {
error(
EXIT_FAILURE, errno,
"failure writing to probe request queue");
}
}
/* Decode the probe parameters and send a probe */
void send_probe(
struct net_state_t *net_state,
@ -328,8 +630,6 @@ void send_probe(
struct probe_t *probe;
struct sockaddr_storage dest_sockaddr;
struct sockaddr_storage src_sockaddr;
char payload[PACKET_BUFFER_SIZE];
int payload_size;
if (resolve_probe_addresses(net_state, param, &dest_sockaddr,
&src_sockaddr)) {
@ -345,24 +645,75 @@ void send_probe(
probe->platform.ip_version = param->ip_version;
payload_size = fill_payload(param, payload, PACKET_BUFFER_SIZE);
if (payload_size < 0) {
perror("Error construction packet");
exit(EXIT_FAILURE);
}
icmp_send_probe(net_state, probe, param,
&src_sockaddr, &dest_sockaddr, payload, payload_size);
queue_thread_request(
net_state, probe, param, &dest_sockaddr, &src_sockaddr);
}
/*
On Windows, an implementation of receive_replies is unnecessary, because,
unlike Unix, replies are completed using Overlapped I/O during an
alertable wait, and don't require explicit reads.
After we've receive the result from the ICMP service thread,
report either the probe status, or any Windows error we
encountered while attempting to send the probe.
*/
static
void complete_icmp_result(struct icmp_thread_request_t *request)
{
struct net_state_t *net_state;
struct probe_t *probe;
/*
We can de-const the net_state and probe, since we are back
on the main thread.
*/
net_state = (struct net_state_t *)request->net_state;
probe = (struct probe_t *)request->probe;
if (request->icmp_type != -1) {
/* Record probe result */
respond_to_probe(net_state, probe,
request->icmp_type, &request->remote_addr,
request->round_trip_us, 0, NULL);
} else {
report_win_error(probe->token, request->reply_status);
free_probe(net_state, probe);
}
}
/*
Read the status of completed probes from the ICMP service
if any has completed.
*/
void receive_replies(
struct net_state_t *net_state)
{
int read_count;
struct icmp_thread_request_t *request;
read_count = read(
net_state->platform.thread_out_pipe_read,
&request,
sizeof(struct icmp_thread_request_t *));
if (read_count == -1) {
/*
EINTR and EAGAIN can occur under normal conditions, and
should be retried. We will retry the next iteration
of the main loop.
*/
if (errno == EINTR || errno == EAGAIN) {
return;
}
error(EXIT_FAILURE, errno, "thread result pipe read error");
}
assert(read_count == sizeof(struct icmp_thread_request_t *));
complete_icmp_result(request);
if (request->reply4) {
free(request->reply4);
request->reply4 = NULL;
}
free(request);
}
/*

View File

@ -58,11 +58,6 @@ struct probe_platform_t {
/* IP version (4 or 6) used for the probe */
int ip_version;
union {
ICMP_ECHO_REPLY *reply4;
ICMPV6_ECHO_REPLY *reply6;
};
};
/* A Windows HANDLE for the ICMP session */
@ -71,6 +66,49 @@ struct net_state_platform_t {
HANDLE icmp6;
bool ip4_socket_raw;
bool ip6_socket_raw;
HANDLE thread_in_pipe_read_handle;
int thread_in_pipe_read, thread_in_pipe_write;
int thread_out_pipe_read, thread_out_pipe_write;
};
/*
A request object passed between the main thread and the ICMP
service thread representing an outstanding probe.
*/
struct icmp_thread_request_t {
/*
net_state and probe are const to avoid race conditions between
the main thread and the ICMP service thread. They are to be
considered read-only on the ICMP service thread.
*/
const struct net_state_t *net_state;
const struct probe_t *probe;
/* Parameters for the probe request */
int ip_version;
int ttl;
int timeout;
int packet_size;
int bit_pattern;
/* Source and destination for the probe */
struct sockaddr_storage dest_sockaddr;
struct sockaddr_storage src_sockaddr;
/* Scratch space used by the ICMP.DLL API */
union {
ICMP_ECHO_REPLY *reply4;
ICMPV6_ECHO_REPLY *reply6;
};
/* Probe results */
int icmp_type;
int reply_status;
int round_trip_us;
/* The remote address responding to the probe */
struct sockaddr_storage remote_addr;
};
#endif

View File

@ -18,38 +18,48 @@
#include "wait.h"
#include <io.h>
#include <stdio.h>
#include <windows.h>
#include <error.h>
#include <sys/select.h>
#include "command.h"
/*
Sleep until we receive a new probe response, a new command on the
command stream, or a probe timeout. On Windows, this means that
we will sleep with an alertable wait, as all of these conditions
use I/O completion routines as notifications of these events.
Wait for either a request from the command stream or
for the probe results to be passed from the ICMP service
thread.
*/
void wait_for_activity(
struct command_buffer_t *command_buffer,
struct net_state_t *net_state)
{
DWORD wait_result;
int nfds;
fd_set read_set;
int ready_count;
FD_ZERO(&read_set);
FD_SET(command_buffer->command_stream, &read_set);
nfds = command_buffer->command_stream + 1;
FD_SET(net_state->platform.thread_out_pipe_read, &read_set);
if (net_state->platform.thread_out_pipe_read >= nfds) {
nfds = net_state->platform.thread_out_pipe_read + 1;
}
while (true) {
ready_count =
select(nfds, &read_set, NULL, NULL, NULL);
if (ready_count != -1) {
return;
}
/*
Start the command read overlapped I/O just prior to sleeping.
During development of the Cygwin port, there was a bug where the
overlapped I/O was started earlier in the mtr-packet loop, and
an intermediate alertable wait could leave us in this Sleep
without an active command read. So now we do this here, instead.
EINTR and EAGAIN simply mean that the select should
be retried.
*/
start_read_command(command_buffer);
/* Sleep until an I/O completion routine runs */
wait_result = SleepEx(INFINITE, TRUE);
if (wait_result == WAIT_FAILED) {
fprintf(stderr, "SleepEx failure %d\n", GetLastError());
exit(EXIT_FAILURE);
if (errno != EINTR && errno != EAGAIN) {
error(EXIT_FAILURE, errno, "unexpected select error");
}
}
}