dnstap io, reconnect attempts with exponential backoff to once per second.

This commit is contained in:
W.C.A. Wijngaards 2020-01-23 15:11:08 +01:00
parent 86e1948afe
commit 299086d447
2 changed files with 111 additions and 3 deletions

View File

@ -52,6 +52,11 @@
/** number of messages to process in one output callback */
#define DTIO_MESSAGES_PER_CALLBACK 100
/** the msec to wait for reconnect (if not immediate, the first attempt) */
#define DTIO_RECONNECT_TIMEOUT_MIN 10
/** the msec to wait for reconnect max after backoff */
#define DTIO_RECONNECT_TIMEOUT_MAX 1000
/** DTIO command channel commands */
enum {
/** DTIO command channel stop */
@ -60,6 +65,13 @@ enum {
DTIO_COMMAND_WAKEUP = 1
} dtio_channel_command;
/** open the output channel */
static void dtio_open_output(struct dt_io_thread* dtio);
/** add output event for read and write */
static void dtio_add_output_event_write(struct dt_io_thread* dtio);
/** start reconnection attempts */
static void dtio_reconnect_enable(struct dt_io_thread* dtio);
void* fstrm_create_control_frame_start(char* contenttype, size_t* len)
{
uint32_t* control;
@ -359,6 +371,73 @@ static int dtio_find_msg(struct dt_io_thread* dtio)
return 0;
}
/** callback for the dnstap reconnect, to start reconnecting to output */
static void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd),
short ATTR_UNUSED(bits), void* arg)
{
struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
dtio->reconnect_is_added = 0;
verbose(VERB_ALGO, "dnstap io: reconnect timer");
dtio_open_output(dtio);
if(dtio->event) {
dtio_add_output_event_write(dtio);
/* nothing wrong so far, wait on the output event */
return;
}
/* exponential backoff and retry on timer */
dtio_reconnect_enable(dtio);
}
/** attempt to reconnect to the output, after a timeout */
static void dtio_reconnect_enable(struct dt_io_thread* dtio)
{
struct timeval tv;
int msec;
if(dtio->reconnect_is_added)
return; /* already done */
/* exponential backoff, store the value for next timeout */
msec = dtio->reconnect_timeout;
if(msec == 0) {
dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN;
} else {
dtio->reconnect_timeout = msec*2;
if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX)
dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX;
}
verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec",
msec);
/* setup wait timer */
memset(&tv, 0, sizeof(tv));
tv.tv_sec = msec/1000;
tv.tv_usec = (msec%1000)*1000;
if(ub_timer_add(dtio->reconnect_timer, dtio->event_base,
&dtio_reconnect_timeout_cb, dtio, &tv) != 0) {
log_err("dnstap io: could not reconnect ev timer add");
return;
}
dtio->reconnect_is_added = 1;
}
/** remove dtio reconnect timer */
static void dtio_reconnect_del(struct dt_io_thread* dtio)
{
if(!dtio->reconnect_is_added)
return;
ub_timer_del(dtio->reconnect_timer);
dtio->reconnect_is_added = 0;
}
/** clear the reconnect exponential backoff timer.
* We have successfully connected so we can try again with short timeouts. */
static void dtio_reconnect_clear(struct dt_io_thread* dtio)
{
dtio->reconnect_timeout = 0;
dtio_reconnect_del(dtio);
}
/** delete the current message in the dtio, and reset counters */
static void dtio_cur_msg_free(struct dt_io_thread* dtio)
{
@ -399,6 +478,7 @@ static void dtio_close_output(struct dt_io_thread* dtio)
if(dtio->cur_msg) {
dtio_cur_msg_free(dtio);
}
dtio_reconnect_enable(dtio);
}
/** check for pending nonblocking connect errors,
@ -443,6 +523,7 @@ static int dtio_check_nb_connect(struct dt_io_thread* dtio)
}
verbose(VERB_ALGO, "dnstap io: connected to \"%s\"", dtio->socket_path);
dtio_reconnect_clear(dtio);
dtio->check_nb_connect = 0;
return 1; /* everything okay */
}
@ -822,6 +903,17 @@ static void dtio_setup_cmd(struct dt_io_thread* dtio)
}
}
/** setup the reconnect event for dnstap io */
static void dtio_setup_reconnect(struct dt_io_thread* dtio)
{
dtio_reconnect_clear(dtio);
dtio->reconnect_timer = ub_event_new(dtio->event_base, -1,
UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio);
if(!dtio->reconnect_timer) {
fatal_exit("dnstap io: out of memory");
}
}
/**
* structure to keep track of information during stop flush
*/
@ -1032,6 +1124,8 @@ static void dtio_desetup(struct dt_io_thread* dtio)
_close(dtio->commandpipe[0]);
#endif
dtio->commandpipe[0] = -1;
dtio_reconnect_del(dtio);
ub_event_free(dtio->reconnect_timer);
dtio_cur_msg_free(dtio);
ub_event_base_free(dtio->event_base);
}
@ -1096,6 +1190,7 @@ static void dtio_open_output(struct dt_io_thread* dtio)
closesocket(dtio->fd);
#endif
dtio->fd = -1;
dtio_reconnect_enable(dtio);
return;
}
dtio->check_nb_connect = 1;
@ -1105,19 +1200,21 @@ static void dtio_open_output(struct dt_io_thread* dtio)
UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
dtio);
if(!ev) {
log_err("dnstap io: out of memory");
#ifndef USE_WINSOCK
close(dtio->fd);
#else
closesocket(dtio->fd);
#endif
dtio->fd = -1;
log_err("dnstap io: out of memory");
dtio_reconnect_enable(dtio);
return;
}
dtio->event = ev;
/* setup protocol control message to start */
if(!dtio_control_start_send(dtio)) {
log_err("dnstap io: out of memory");
ub_event_free(dtio->event);
dtio->event = NULL;
#ifndef USE_WINSOCK
@ -1126,7 +1223,7 @@ static void dtio_open_output(struct dt_io_thread* dtio)
closesocket(dtio->fd);
#endif
dtio->fd = -1;
log_err("dnstap io: out of memory");
dtio_reconnect_enable(dtio);
return;
}
}
@ -1139,11 +1236,12 @@ static void* dnstap_io(void* arg)
struct timeval now;
/* setup */
verbose(VERB_ALGO, "start dnstap io thread");
dtio_setup_base(dtio, &secs, &now);
dtio_setup_cmd(dtio);
dtio_setup_reconnect(dtio);
dtio_open_output(dtio);
dtio_add_output_event_write(dtio);
verbose(VERB_ALGO, "start dnstap io thread");
/* run */
if(ub_event_base_dispatch(dtio->event_base) < 0) {

View File

@ -102,6 +102,7 @@ struct dt_io_thread {
struct dt_io_list_item* io_list_iter;
/** thread id, of the io thread */
ub_thread_type tid;
/** file descriptor that the thread writes to */
int fd;
/** event structure that the thread uses */
@ -112,6 +113,7 @@ struct dt_io_thread {
int event_added_is_write;
/** check for nonblocking connect errors on fd */
int check_nb_connect;
/** the buffer that currently getting written, or NULL if no
* (partial) message written now */
void* cur_msg;
@ -131,6 +133,14 @@ struct dt_io_thread {
/** the io thread wants to exit */
int want_to_exit;
/** the timer event for connection retries */
void* reconnect_timer;
/** if the reconnect timer is added to the event base */
int reconnect_is_added;
/** the current reconnection timeout, it is increased with
* exponential backoff, in msec */
int reconnect_timeout;
/** If the log server is connected to over unix domain sockets,
* eg. a file is named that is created to log onto. */
int upstream_is_unix;