unbound/dnstap/dtstream.c
W.C.A. Wijngaards 3af4e44646 - Fix dnstap wakeup, a running wakeup timer is left to expire and not
increased, a timer is started when the dtio thread is sleeping,
  the timer set disabled when the dtio thread goes to sleep, and
  after sleep the thread checks to see if there are messages to log
  immediately.
2024-07-19 16:16:02 +02:00

2225 lines
59 KiB
C

/*
* dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP
*
* Copyright (c) 2020, NLnet Labs. All rights reserved.
*
* This software is open source.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* Neither the name of the NLNET LABS nor the names of its contributors may
* be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
* TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
/**
* \file
*
* An implementation of the Frame Streams data transport protocol for
* the Unbound DNSTAP message logging facility.
*/
#include "config.h"
#include "dnstap/dtstream.h"
#include "dnstap/dnstap_fstrm.h"
#include "util/config_file.h"
#include "util/ub_event.h"
#include "util/net_help.h"
#include "services/outside_network.h"
#include "sldns/sbuffer.h"
#ifdef HAVE_SYS_UN_H
#include <sys/un.h>
#endif
#include <fcntl.h>
#ifdef HAVE_OPENSSL_SSL_H
#include <openssl/ssl.h>
#endif
#ifdef HAVE_OPENSSL_ERR_H
#include <openssl/err.h>
#endif
/** number of messages to process in one output callback */
#define DTIO_MESSAGES_PER_CALLBACK 100
/** the msec to wait for reconnect (if not immediate, the first attempt) */
#define DTIO_RECONNECT_TIMEOUT_MIN 10
/** the msec to wait for reconnect max after backoff */
#define DTIO_RECONNECT_TIMEOUT_MAX 1000
/** the msec to wait for reconnect slow, to stop busy spinning on reconnect */
#define DTIO_RECONNECT_TIMEOUT_SLOW 1000
/** number of messages before wakeup of thread */
#define DTIO_MSG_FOR_WAKEUP 32
/** maximum length of received frame */
#define DTIO_RECV_FRAME_MAX_LEN 1000
struct stop_flush_info;
/** DTIO command channel commands */
enum {
/** DTIO command channel stop */
DTIO_COMMAND_STOP = 0,
/** DTIO command channel wakeup */
DTIO_COMMAND_WAKEUP = 1
} dtio_channel_command;
/** open the output channel */
static void dtio_open_output(struct dt_io_thread* dtio);
/** add output event for read and write */
static int dtio_add_output_event_write(struct dt_io_thread* dtio);
/** start reconnection attempts */
static void dtio_reconnect_enable(struct dt_io_thread* dtio);
/** stop from stop_flush event loop */
static void dtio_stop_flush_exit(struct stop_flush_info* info);
/** setup a start control message */
static int dtio_control_start_send(struct dt_io_thread* dtio);
#ifdef HAVE_SSL
/** enable briefly waiting for a read event, for SSL negotiation */
static int dtio_enable_brief_read(struct dt_io_thread* dtio);
/** enable briefly waiting for a write event, for SSL negotiation */
static int dtio_enable_brief_write(struct dt_io_thread* dtio);
#endif
struct dt_msg_queue*
dt_msg_queue_create(struct comm_base* base)
{
struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
if(!mq) return NULL;
mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
about 1 M should contain 64K messages with some overhead,
or a whole bunch smaller ones */
mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq);
if(!mq->wakeup_timer) {
free(mq);
return NULL;
}
lock_basic_init(&mq->lock);
lock_protect(&mq->lock, mq, sizeof(*mq));
return mq;
}
/** clear the message list, caller must hold the lock */
static void
dt_msg_queue_clear(struct dt_msg_queue* mq)
{
struct dt_msg_entry* e = mq->first, *next=NULL;
while(e) {
next = e->next;
free(e->buf);
free(e);
e = next;
}
mq->first = NULL;
mq->last = NULL;
mq->cursize = 0;
mq->msgcount = 0;
}
void
dt_msg_queue_delete(struct dt_msg_queue* mq)
{
if(!mq) return;
lock_basic_destroy(&mq->lock);
dt_msg_queue_clear(mq);
comm_timer_delete(mq->wakeup_timer);
free(mq);
}
/** make the dtio wake up by sending a wakeup command */
static void dtio_wakeup(struct dt_io_thread* dtio)
{
uint8_t cmd = DTIO_COMMAND_WAKEUP;
if(!dtio) return;
if(!dtio->started) return;
while(1) {
ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
if(r == -1) {
#ifndef USE_WINSOCK
if(errno == EINTR || errno == EAGAIN)
continue;
#else
if(WSAGetLastError() == WSAEINPROGRESS)
continue;
if(WSAGetLastError() == WSAEWOULDBLOCK)
continue;
#endif
log_err("dnstap io wakeup: write: %s",
sock_strerror(errno));
break;
}
break;
}
}
void
mq_wakeup_cb(void* arg)
{
struct dt_msg_queue* mq = (struct dt_msg_queue*)arg;
lock_basic_lock(&mq->dtio->wakeup_timer_lock);
mq->dtio->wakeup_timer_enabled = 0;
lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
dtio_wakeup(mq->dtio);
}
/** start timer to wakeup dtio because there is content in the queue */
static void
dt_msg_queue_start_timer(struct dt_msg_queue* mq, int wakeupnow)
{
struct timeval tv = {0};
/* Start a timer to process messages to be logged.
* If we woke up the dtio thread for every message, the wakeup
* messages take up too much processing power. If the queue
* fills up the wakeup happens immediately. The timer wakes it up
* if there are infrequent messages to log. */
/* we cannot start a timer in dtio thread, because it is a different
* thread and its event base is in use by the other thread, it would
* give race conditions if we tried to modify its event base,
* and locks would wait until it woke up, and this is what we do. */
/* do not start the timer if a timer already exists, perhaps
* in another worker. So this variable is protected by a lock in
* dtio. */
/* If we need to wakeupnow, 0 the timer to force the callback. */
lock_basic_lock(&mq->dtio->wakeup_timer_lock);
if(mq->dtio->wakeup_timer_enabled) {
if(wakeupnow) {
tv.tv_sec = 0;
tv.tv_usec = 0;
comm_timer_set(mq->wakeup_timer, &tv);
}
lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
return;
}
mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */
/* start the timer, in mq, in the event base of our worker */
if(!wakeupnow) {
tv.tv_sec = 1;
tv.tv_usec = 0;
/* If it is already set, keep it running. */
if(!comm_timer_is_set(mq->wakeup_timer))
comm_timer_set(mq->wakeup_timer, &tv);
} else {
tv.tv_sec = 0;
tv.tv_usec = 0;
comm_timer_set(mq->wakeup_timer, &tv);
}
lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
}
void
dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
{
int wakeupnow = 0, wakeupstarttimer = 0;
struct dt_msg_entry* entry;
/* check conditions */
if(!buf) return;
if(len == 0) {
/* it is not possible to log entries with zero length,
* because the framestream protocol does not carry it.
* However the protobuf serialization does not create zero
* length datagrams for dnstap, so this should not happen. */
free(buf);
return;
}
if(!mq) {
free(buf);
return;
}
/* allocate memory for queue entry */
entry = malloc(sizeof(*entry));
if(!entry) {
log_err("out of memory logging dnstap");
free(buf);
return;
}
entry->next = NULL;
entry->buf = buf;
entry->len = len;
/* acquire lock */
lock_basic_lock(&mq->lock);
/* if list was empty, start timer for (eventual) wakeup,
* or if dtio is not writing now an eventual wakeup is needed. */
if(mq->first == NULL || !mq->dtio->event_added_is_write)
wakeupstarttimer = 1;
/* if list contains more than wakeupnum elements, wakeup now,
* or if list is (going to be) almost full */
if(mq->msgcount == DTIO_MSG_FOR_WAKEUP ||
(mq->cursize < mq->maxsize * 9 / 10 &&
mq->cursize+len >= mq->maxsize * 9 / 10))
wakeupnow = 1;
/* see if it is going to fit */
if(mq->cursize + len > mq->maxsize) {
/* buffer full, or congested. */
/* drop */
lock_basic_unlock(&mq->lock);
free(buf);
free(entry);
return;
}
mq->cursize += len;
mq->msgcount ++;
/* append to list */
if(mq->last) {
mq->last->next = entry;
} else {
mq->first = entry;
}
mq->last = entry;
/* release lock */
lock_basic_unlock(&mq->lock);
if(wakeupnow || wakeupstarttimer) {
dt_msg_queue_start_timer(mq, wakeupnow);
}
}
struct dt_io_thread* dt_io_thread_create(void)
{
struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
lock_basic_init(&dtio->wakeup_timer_lock);
lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled,
sizeof(dtio->wakeup_timer_enabled));
return dtio;
}
void dt_io_thread_delete(struct dt_io_thread* dtio)
{
struct dt_io_list_item* item, *nextitem;
if(!dtio) return;
lock_basic_destroy(&dtio->wakeup_timer_lock);
item=dtio->io_list;
while(item) {
nextitem = item->next;
free(item);
item = nextitem;
}
free(dtio->socket_path);
free(dtio->ip_str);
free(dtio->tls_server_name);
free(dtio->client_key_file);
free(dtio->client_cert_file);
if(dtio->ssl_ctx) {
#ifdef HAVE_SSL
SSL_CTX_free(dtio->ssl_ctx);
#endif
}
free(dtio);
}
int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg)
{
if(!cfg->dnstap) {
log_warn("cannot setup dnstap because dnstap-enable is no");
return 0;
}
/* what type of connectivity do we have */
if(cfg->dnstap_ip && cfg->dnstap_ip[0]) {
if(cfg->dnstap_tls)
dtio->upstream_is_tls = 1;
else dtio->upstream_is_tcp = 1;
} else {
dtio->upstream_is_unix = 1;
}
dtio->is_bidirectional = cfg->dnstap_bidirectional;
if(dtio->upstream_is_unix) {
char* nm;
if(!cfg->dnstap_socket_path ||
cfg->dnstap_socket_path[0]==0) {
log_err("dnstap setup: no dnstap-socket-path for "
"socket connect");
return 0;
}
nm = cfg->dnstap_socket_path;
if(cfg->chrootdir && cfg->chrootdir[0] && strncmp(nm,
cfg->chrootdir, strlen(cfg->chrootdir)) == 0)
nm += strlen(cfg->chrootdir);
free(dtio->socket_path);
dtio->socket_path = strdup(nm);
if(!dtio->socket_path) {
log_err("dnstap setup: malloc failure");
return 0;
}
}
if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) {
log_err("dnstap setup: no dnstap-ip for TCP connect");
return 0;
}
free(dtio->ip_str);
dtio->ip_str = strdup(cfg->dnstap_ip);
if(!dtio->ip_str) {
log_err("dnstap setup: malloc failure");
return 0;
}
}
if(dtio->upstream_is_tls) {
#ifdef HAVE_SSL
if(cfg->dnstap_tls_server_name &&
cfg->dnstap_tls_server_name[0]) {
free(dtio->tls_server_name);
dtio->tls_server_name = strdup(
cfg->dnstap_tls_server_name);
if(!dtio->tls_server_name) {
log_err("dnstap setup: malloc failure");
return 0;
}
if(!check_auth_name_for_ssl(dtio->tls_server_name))
return 0;
}
if(cfg->dnstap_tls_client_key_file &&
cfg->dnstap_tls_client_key_file[0]) {
dtio->use_client_certs = 1;
free(dtio->client_key_file);
dtio->client_key_file = strdup(
cfg->dnstap_tls_client_key_file);
if(!dtio->client_key_file) {
log_err("dnstap setup: malloc failure");
return 0;
}
if(!cfg->dnstap_tls_client_cert_file ||
cfg->dnstap_tls_client_cert_file[0]==0) {
log_err("dnstap setup: client key "
"authentication enabled with "
"dnstap-tls-client-key-file, but "
"no dnstap-tls-client-cert-file "
"is given");
return 0;
}
free(dtio->client_cert_file);
dtio->client_cert_file = strdup(
cfg->dnstap_tls_client_cert_file);
if(!dtio->client_cert_file) {
log_err("dnstap setup: malloc failure");
return 0;
}
} else {
dtio->use_client_certs = 0;
dtio->client_key_file = NULL;
dtio->client_cert_file = NULL;
}
if(cfg->dnstap_tls_cert_bundle) {
dtio->ssl_ctx = connect_sslctx_create(
dtio->client_key_file,
dtio->client_cert_file,
cfg->dnstap_tls_cert_bundle, 0);
} else {
dtio->ssl_ctx = connect_sslctx_create(
dtio->client_key_file,
dtio->client_cert_file,
cfg->tls_cert_bundle, cfg->tls_win_cert);
}
if(!dtio->ssl_ctx) {
log_err("could not setup SSL CTX");
return 0;
}
dtio->tls_use_sni = cfg->tls_use_sni;
#endif /* HAVE_SSL */
}
return 1;
}
int dt_io_thread_register_queue(struct dt_io_thread* dtio,
struct dt_msg_queue* mq)
{
struct dt_io_list_item* item = malloc(sizeof(*item));
if(!item) return 0;
lock_basic_lock(&mq->lock);
mq->dtio = dtio;
lock_basic_unlock(&mq->lock);
item->queue = mq;
item->next = dtio->io_list;
dtio->io_list = item;
dtio->io_list_iter = NULL;
return 1;
}
void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
struct dt_msg_queue* mq)
{
struct dt_io_list_item* item, *prev=NULL;
if(!dtio) return;
item = dtio->io_list;
while(item) {
if(item->queue == mq) {
/* found it */
if(prev) prev->next = item->next;
else dtio->io_list = item->next;
/* the queue itself only registered, not deleted */
lock_basic_lock(&item->queue->lock);
item->queue->dtio = NULL;
lock_basic_unlock(&item->queue->lock);
free(item);
dtio->io_list_iter = NULL;
return;
}
prev = item;
item = item->next;
}
}
/** pick a message from the queue, the routine locks and unlocks,
* returns true if there is a message */
static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf,
size_t* len)
{
lock_basic_lock(&mq->lock);
if(mq->first) {
struct dt_msg_entry* entry = mq->first;
mq->first = entry->next;
if(!entry->next) mq->last = NULL;
mq->cursize -= entry->len;
mq->msgcount --;
lock_basic_unlock(&mq->lock);
*buf = entry->buf;
*len = entry->len;
free(entry);
return 1;
}
lock_basic_unlock(&mq->lock);
return 0;
}
/** find message in queue, false if no message, true if message to send */
static int dtio_find_in_queue(struct dt_io_thread* dtio,
struct dt_msg_queue* mq)
{
void* buf=NULL;
size_t len=0;
if(dt_msg_queue_pop(mq, &buf, &len)) {
dtio->cur_msg = buf;
dtio->cur_msg_len = len;
dtio->cur_msg_done = 0;
dtio->cur_msg_len_done = 0;
return 1;
}
return 0;
}
/** find a new message to write, search message queues, false if none */
static int dtio_find_msg(struct dt_io_thread* dtio)
{
struct dt_io_list_item *spot, *item;
spot = dtio->io_list_iter;
/* use the next queue for the next message lookup,
* if we hit the end(NULL) the NULL restarts the iter at start. */
if(spot)
dtio->io_list_iter = spot->next;
else if(dtio->io_list)
dtio->io_list_iter = dtio->io_list->next;
/* scan from spot to end-of-io_list */
item = spot;
while(item) {
if(dtio_find_in_queue(dtio, item->queue))
return 1;
item = item->next;
}
/* scan starting at the start-of-list (to wrap around the end) */
item = dtio->io_list;
while(item) {
if(dtio_find_in_queue(dtio, item->queue))
return 1;
item = item->next;
}
return 0;
}
/** callback for the dnstap reconnect, to start reconnecting to output */
void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd),
short ATTR_UNUSED(bits), void* arg)
{
struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
dtio->reconnect_is_added = 0;
verbose(VERB_ALGO, "dnstap io: reconnect timer");
dtio_open_output(dtio);
if(dtio->event) {
if(!dtio_add_output_event_write(dtio))
return;
/* nothing wrong so far, wait on the output event */
return;
}
/* exponential backoff and retry on timer */
dtio_reconnect_enable(dtio);
}
/** attempt to reconnect to the output, after a timeout */
static void dtio_reconnect_enable(struct dt_io_thread* dtio)
{
struct timeval tv;
int msec;
if(dtio->want_to_exit) return;
if(dtio->reconnect_is_added)
return; /* already done */
/* exponential backoff, store the value for next timeout */
msec = dtio->reconnect_timeout;
if(msec == 0) {
dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN;
} else {
dtio->reconnect_timeout = msec*2;
if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX)
dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX;
}
verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec",
msec);
/* setup wait timer */
memset(&tv, 0, sizeof(tv));
tv.tv_sec = msec/1000;
tv.tv_usec = (msec%1000)*1000;
if(ub_timer_add(dtio->reconnect_timer, dtio->event_base,
&dtio_reconnect_timeout_cb, dtio, &tv) != 0) {
log_err("dnstap io: could not reconnect ev timer add");
return;
}
dtio->reconnect_is_added = 1;
}
/** remove dtio reconnect timer */
static void dtio_reconnect_del(struct dt_io_thread* dtio)
{
if(!dtio->reconnect_is_added)
return;
ub_timer_del(dtio->reconnect_timer);
dtio->reconnect_is_added = 0;
}
/** clear the reconnect exponential backoff timer.
* We have successfully connected so we can try again with short timeouts. */
static void dtio_reconnect_clear(struct dt_io_thread* dtio)
{
dtio->reconnect_timeout = 0;
dtio_reconnect_del(dtio);
}
/** reconnect slowly, because we already know we have to wait for a bit */
static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec)
{
dtio_reconnect_del(dtio);
dtio->reconnect_timeout = msec;
dtio_reconnect_enable(dtio);
}
/** delete the current message in the dtio, and reset counters */
static void dtio_cur_msg_free(struct dt_io_thread* dtio)
{
free(dtio->cur_msg);
dtio->cur_msg = NULL;
dtio->cur_msg_len = 0;
dtio->cur_msg_done = 0;
dtio->cur_msg_len_done = 0;
}
/** delete the buffer and counters used to read frame */
static void dtio_read_frame_free(struct dt_frame_read_buf* rb)
{
if(rb->buf) {
free(rb->buf);
rb->buf = NULL;
}
rb->buf_count = 0;
rb->buf_cap = 0;
rb->frame_len = 0;
rb->frame_len_done = 0;
rb->control_frame = 0;
}
/** del the output file descriptor event for listening */
static void dtio_del_output_event(struct dt_io_thread* dtio)
{
if(!dtio->event_added)
return;
ub_event_del(dtio->event);
dtio->event_added = 0;
dtio->event_added_is_write = 0;
}
/** close dtio socket and set it to -1 */
static void dtio_close_fd(struct dt_io_thread* dtio)
{
sock_close(dtio->fd);
dtio->fd = -1;
}
/** close and stop the output file descriptor event */
static void dtio_close_output(struct dt_io_thread* dtio)
{
if(!dtio->event)
return;
ub_event_free(dtio->event);
dtio->event = NULL;
if(dtio->ssl) {
#ifdef HAVE_SSL
SSL_shutdown(dtio->ssl);
SSL_free(dtio->ssl);
dtio->ssl = NULL;
#endif
}
dtio_close_fd(dtio);
/* if there is a (partial) message, discard it
* we cannot send (the remainder of) it, and a new
* connection needs to start with a control frame. */
if(dtio->cur_msg) {
dtio_cur_msg_free(dtio);
}
dtio->ready_frame_sent = 0;
dtio->accept_frame_received = 0;
dtio_read_frame_free(&dtio->read_frame);
dtio_reconnect_enable(dtio);
}
/** check for pending nonblocking connect errors,
* returns 1 if it is okay. -1 on error (close it), 0 to try later */
static int dtio_check_nb_connect(struct dt_io_thread* dtio)
{
int error = 0;
socklen_t len = (socklen_t)sizeof(error);
if(!dtio->check_nb_connect)
return 1; /* everything okay */
if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error,
&len) < 0) {
#ifndef USE_WINSOCK
error = errno; /* on solaris errno is error */
#else
error = WSAGetLastError();
#endif
}
#ifndef USE_WINSOCK
#if defined(EINPROGRESS) && defined(EWOULDBLOCK)
if(error == EINPROGRESS || error == EWOULDBLOCK)
return 0; /* try again later */
#endif
#else
if(error == WSAEINPROGRESS) {
return 0; /* try again later */
} else if(error == WSAEWOULDBLOCK) {
ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
dtio->stop_flush_event:dtio->event), UB_EV_WRITE);
return 0; /* try again later */
}
#endif
if(error != 0) {
char* to = dtio->socket_path;
if(!to) to = dtio->ip_str;
if(!to) to = "";
log_err("dnstap io: failed to connect to \"%s\": %s",
to, sock_strerror(error));
return -1; /* error, close it */
}
if(dtio->ip_str)
verbose(VERB_DETAIL, "dnstap io: connected to %s",
dtio->ip_str);
else if(dtio->socket_path)
verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"",
dtio->socket_path);
dtio_reconnect_clear(dtio);
dtio->check_nb_connect = 0;
return 1; /* everything okay */
}
#ifdef HAVE_SSL
/** write to ssl output
* returns number of bytes written, 0 if nothing happened,
* try again later, or -1 if the channel is to be closed. */
static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf,
size_t len)
{
int r;
ERR_clear_error();
r = SSL_write(dtio->ssl, buf, len);
if(r <= 0) {
int want = SSL_get_error(dtio->ssl, r);
if(want == SSL_ERROR_ZERO_RETURN) {
/* closed */
return -1;
} else if(want == SSL_ERROR_WANT_READ) {
/* we want a brief read event */
dtio_enable_brief_read(dtio);
return 0;
} else if(want == SSL_ERROR_WANT_WRITE) {
/* write again later */
return 0;
} else if(want == SSL_ERROR_SYSCALL) {
#ifdef EPIPE
if(errno == EPIPE && verbosity < 2)
return -1; /* silence 'broken pipe' */
#endif
#ifdef ECONNRESET
if(errno == ECONNRESET && verbosity < 2)
return -1; /* silence reset by peer */
#endif
if(errno != 0) {
log_err("dnstap io, SSL_write syscall: %s",
strerror(errno));
}
return -1;
}
log_crypto_err_io("dnstap io, could not SSL_write", want);
return -1;
}
return r;
}
#endif /* HAVE_SSL */
/** write buffer to output.
* returns number of bytes written, 0 if nothing happened,
* try again later, or -1 if the channel is to be closed. */
static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf,
size_t len)
{
ssize_t ret;
if(dtio->fd == -1)
return -1;
#ifdef HAVE_SSL
if(dtio->ssl)
return dtio_write_ssl(dtio, buf, len);
#endif
ret = send(dtio->fd, (void*)buf, len, 0);
if(ret == -1) {
#ifndef USE_WINSOCK
if(errno == EINTR || errno == EAGAIN)
return 0;
#else
if(WSAGetLastError() == WSAEINPROGRESS)
return 0;
if(WSAGetLastError() == WSAEWOULDBLOCK) {
ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
dtio->stop_flush_event:dtio->event),
UB_EV_WRITE);
return 0;
}
#endif
log_err("dnstap io: failed send: %s", sock_strerror(errno));
return -1;
}
return ret;
}
#ifdef HAVE_WRITEV
/** write with writev, len and message, in one write, if possible.
* return true if message is done, false if incomplete */
static int dtio_write_with_writev(struct dt_io_thread* dtio)
{
uint32_t sendlen = htonl(dtio->cur_msg_len);
struct iovec iov[2];
ssize_t r;
iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done;
iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done;
iov[1].iov_base = dtio->cur_msg;
iov[1].iov_len = dtio->cur_msg_len;
log_assert(iov[0].iov_len > 0);
r = writev(dtio->fd, iov, 2);
if(r == -1) {
#ifndef USE_WINSOCK
if(errno == EINTR || errno == EAGAIN)
return 0;
#else
if(WSAGetLastError() == WSAEINPROGRESS)
return 0;
if(WSAGetLastError() == WSAEWOULDBLOCK) {
ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
dtio->stop_flush_event:dtio->event),
UB_EV_WRITE);
return 0;
}
#endif
log_err("dnstap io: failed writev: %s", sock_strerror(errno));
/* close the channel */
dtio_del_output_event(dtio);
dtio_close_output(dtio);
return 0;
}
/* written r bytes */
dtio->cur_msg_len_done += r;
if(dtio->cur_msg_len_done < 4)
return 0;
if(dtio->cur_msg_len_done > 4) {
dtio->cur_msg_done = dtio->cur_msg_len_done-4;
dtio->cur_msg_len_done = 4;
}
if(dtio->cur_msg_done < dtio->cur_msg_len)
return 0;
return 1;
}
#endif /* HAVE_WRITEV */
/** write more of the length, preceding the data frame.
* return true if message is done, false if incomplete. */
static int dtio_write_more_of_len(struct dt_io_thread* dtio)
{
uint32_t sendlen;
int r;
if(dtio->cur_msg_len_done >= 4)
return 1;
#ifdef HAVE_WRITEV
if(!dtio->ssl) {
/* we try writev for everything.*/
return dtio_write_with_writev(dtio);
}
#endif /* HAVE_WRITEV */
sendlen = htonl(dtio->cur_msg_len);
r = dtio_write_buf(dtio,
((uint8_t*)&sendlen)+dtio->cur_msg_len_done,
sizeof(sendlen)-dtio->cur_msg_len_done);
if(r == -1) {
/* close the channel */
dtio_del_output_event(dtio);
dtio_close_output(dtio);
return 0;
} else if(r == 0) {
/* try again later */
return 0;
}
dtio->cur_msg_len_done += r;
if(dtio->cur_msg_len_done < 4)
return 0;
return 1;
}
/** write more of the data frame.
* return true if message is done, false if incomplete. */
static int dtio_write_more_of_data(struct dt_io_thread* dtio)
{
int r;
if(dtio->cur_msg_done >= dtio->cur_msg_len)
return 1;
r = dtio_write_buf(dtio,
((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done,
dtio->cur_msg_len - dtio->cur_msg_done);
if(r == -1) {
/* close the channel */
dtio_del_output_event(dtio);
dtio_close_output(dtio);
return 0;
} else if(r == 0) {
/* try again later */
return 0;
}
dtio->cur_msg_done += r;
if(dtio->cur_msg_done < dtio->cur_msg_len)
return 0;
return 1;
}
/** write more of the current message. false if incomplete, true if
* the message is done */
static int dtio_write_more(struct dt_io_thread* dtio)
{
if(dtio->cur_msg_len_done < 4) {
if(!dtio_write_more_of_len(dtio))
return 0;
}
if(dtio->cur_msg_done < dtio->cur_msg_len) {
if(!dtio_write_more_of_data(dtio))
return 0;
}
return 1;
}
/** Receive bytes from dtio->fd, store in buffer. Returns 0: closed,
* -1: continue, >0: number of bytes read into buffer */
static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) {
ssize_t r;
r = recv(dtio->fd, (void*)buf, len, MSG_DONTWAIT);
if(r == -1) {
char* to = dtio->socket_path;
if(!to) to = dtio->ip_str;
if(!to) to = "";
#ifndef USE_WINSOCK
if(errno == EINTR || errno == EAGAIN)
return -1; /* try later */
#else
if(WSAGetLastError() == WSAEINPROGRESS) {
return -1; /* try later */
} else if(WSAGetLastError() == WSAEWOULDBLOCK) {
ub_winsock_tcp_wouldblock(
(dtio->stop_flush_event?
dtio->stop_flush_event:dtio->event),
UB_EV_READ);
return -1; /* try later */
}
#endif
if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
verbosity < 4)
return 0; /* no log retries on low verbosity */
log_err("dnstap io: output closed, recv %s: %s", to,
strerror(errno));
/* and close below */
return 0;
}
if(r == 0) {
if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
verbosity < 4)
return 0; /* no log retries on low verbosity */
verbose(VERB_DETAIL, "dnstap io: output closed by the other side");
/* and close below */
return 0;
}
/* something was received */
return r;
}
#ifdef HAVE_SSL
/** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed,
* -1: continue, >0: number of bytes read into buffer */
static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len)
{
int r;
ERR_clear_error();
r = SSL_read(dtio->ssl, buf, len);
if(r <= 0) {
int want = SSL_get_error(dtio->ssl, r);
if(want == SSL_ERROR_ZERO_RETURN) {
if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
verbosity < 4)
return 0; /* no log retries on low verbosity */
verbose(VERB_DETAIL, "dnstap io: output closed by the "
"other side");
return 0;
} else if(want == SSL_ERROR_WANT_READ) {
/* continue later */
return -1;
} else if(want == SSL_ERROR_WANT_WRITE) {
(void)dtio_enable_brief_write(dtio);
return -1;
} else if(want == SSL_ERROR_SYSCALL) {
#ifdef ECONNRESET
if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
errno == ECONNRESET && verbosity < 4)
return 0; /* silence reset by peer */
#endif
if(errno != 0)
log_err("SSL_read syscall: %s",
strerror(errno));
verbose(VERB_DETAIL, "dnstap io: output closed by the "
"other side");
return 0;
}
log_crypto_err_io("could not SSL_read", want);
verbose(VERB_DETAIL, "dnstap io: output closed by the "
"other side");
return 0;
}
return r;
}
#endif /* HAVE_SSL */
/** check if the output fd has been closed,
* it returns false if the stream is closed. */
static int dtio_check_close(struct dt_io_thread* dtio)
{
/* we don't want to read any packets, but if there are we can
* discard the input (ignore it). Ignore of unknown (control)
* packets is okay for the framestream protocol. And also, the
* read call can return that the stream has been closed by the
* other side. */
uint8_t buf[1024];
int r = -1;
if(dtio->fd == -1) return 0;
while(r != 0) {
/* not interested in buffer content, overwrite */
r = receive_bytes(dtio, (void*)buf, sizeof(buf));
if(r == -1)
return 1;
}
/* the other end has been closed */
/* close the channel */
dtio_del_output_event(dtio);
dtio_close_output(dtio);
return 0;
}
/** Read accept frame. Returns -1: continue reading, 0: closed,
* 1: valid accept received. */
static int dtio_read_accept_frame(struct dt_io_thread* dtio)
{
int r;
size_t read_frame_done;
while(dtio->read_frame.frame_len_done < 4) {
#ifdef HAVE_SSL
if(dtio->ssl) {
r = ssl_read_bytes(dtio,
(uint8_t*)&dtio->read_frame.frame_len+
dtio->read_frame.frame_len_done,
4-dtio->read_frame.frame_len_done);
} else {
#endif
r = receive_bytes(dtio,
(uint8_t*)&dtio->read_frame.frame_len+
dtio->read_frame.frame_len_done,
4-dtio->read_frame.frame_len_done);
#ifdef HAVE_SSL
}
#endif
if(r == -1)
return -1; /* continue reading */
if(r == 0) {
/* connection closed */
goto close_connection;
}
dtio->read_frame.frame_len_done += r;
if(dtio->read_frame.frame_len_done < 4)
return -1; /* continue reading */
if(dtio->read_frame.frame_len == 0) {
dtio->read_frame.frame_len_done = 0;
dtio->read_frame.control_frame = 1;
continue;
}
dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len);
if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) {
verbose(VERB_OPS, "dnstap: received frame exceeds max "
"length of %d bytes, closing connection",
DTIO_RECV_FRAME_MAX_LEN);
goto close_connection;
}
dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len);
dtio->read_frame.buf_cap = dtio->read_frame.frame_len;
if(!dtio->read_frame.buf) {
log_err("dnstap io: out of memory (creating read "
"buffer)");
goto close_connection;
}
}
if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) {
#ifdef HAVE_SSL
if(dtio->ssl) {
r = ssl_read_bytes(dtio, dtio->read_frame.buf+
dtio->read_frame.buf_count,
dtio->read_frame.buf_cap-
dtio->read_frame.buf_count);
} else {
#endif
r = receive_bytes(dtio, dtio->read_frame.buf+
dtio->read_frame.buf_count,
dtio->read_frame.buf_cap-
dtio->read_frame.buf_count);
#ifdef HAVE_SSL
}
#endif
if(r == -1)
return -1; /* continue reading */
if(r == 0) {
/* connection closed */
goto close_connection;
}
dtio->read_frame.buf_count += r;
if(dtio->read_frame.buf_count < dtio->read_frame.frame_len)
return -1; /* continue reading */
}
/* Complete frame received, check if this is a valid ACCEPT control
* frame. */
if(dtio->read_frame.frame_len < 4) {
verbose(VERB_OPS, "dnstap: invalid data received");
goto close_connection;
}
if(sldns_read_uint32(dtio->read_frame.buf) !=
FSTRM_CONTROL_FRAME_ACCEPT) {
verbose(VERB_ALGO, "dnstap: invalid control type received, "
"ignored");
dtio->ready_frame_sent = 0;
dtio->accept_frame_received = 0;
dtio_read_frame_free(&dtio->read_frame);
return -1;
}
read_frame_done = 4; /* control frame type */
/* Iterate over control fields, ignore unknown types.
* Need to be able to read at least 8 bytes (control field type +
* length). */
while(read_frame_done+8 < dtio->read_frame.frame_len) {
uint32_t type = sldns_read_uint32(dtio->read_frame.buf +
read_frame_done);
uint32_t len = sldns_read_uint32(dtio->read_frame.buf +
read_frame_done + 4);
if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) {
if(len == strlen(DNSTAP_CONTENT_TYPE) &&
read_frame_done+8+len <=
dtio->read_frame.frame_len &&
memcmp(dtio->read_frame.buf + read_frame_done +
+ 8, DNSTAP_CONTENT_TYPE, len) == 0) {
if(!dtio_control_start_send(dtio)) {
verbose(VERB_OPS, "dnstap io: out of "
"memory while sending START frame");
goto close_connection;
}
dtio->accept_frame_received = 1;
if(!dtio_add_output_event_write(dtio))
goto close_connection;
return 1;
} else {
/* unknown content type */
verbose(VERB_ALGO, "dnstap: ACCEPT frame "
"contains unknown content type, "
"closing connection");
goto close_connection;
}
}
/* unknown option, try next */
read_frame_done += 8+len;
}
close_connection:
dtio_del_output_event(dtio);
dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
dtio_close_output(dtio);
return 0;
}
/** add the output file descriptor event for listening, read only */
static int dtio_add_output_event_read(struct dt_io_thread* dtio)
{
if(!dtio->event)
return 0;
if(dtio->event_added && !dtio->event_added_is_write)
return 1;
/* we have to (re-)register the event */
if(dtio->event_added)
ub_event_del(dtio->event);
ub_event_del_bits(dtio->event, UB_EV_WRITE);
if(ub_event_add(dtio->event, NULL) != 0) {
log_err("dnstap io: out of memory (adding event)");
dtio->event_added = 0;
dtio->event_added_is_write = 0;
/* close output and start reattempts to open it */
dtio_close_output(dtio);
return 0;
}
dtio->event_added = 1;
dtio->event_added_is_write = 0;
return 1;
}
/** add the output file descriptor event for listening, read and write */
static int dtio_add_output_event_write(struct dt_io_thread* dtio)
{
if(!dtio->event)
return 0;
if(dtio->event_added && dtio->event_added_is_write)
return 1;
/* we have to (re-)register the event */
if(dtio->event_added)
ub_event_del(dtio->event);
ub_event_add_bits(dtio->event, UB_EV_WRITE);
if(ub_event_add(dtio->event, NULL) != 0) {
log_err("dnstap io: out of memory (adding event)");
dtio->event_added = 0;
dtio->event_added_is_write = 0;
/* close output and start reattempts to open it */
dtio_close_output(dtio);
return 0;
}
dtio->event_added = 1;
dtio->event_added_is_write = 1;
return 1;
}
/** put the dtio thread to sleep */
static void dtio_sleep(struct dt_io_thread* dtio)
{
/* unregister the event polling for write, because there is
* nothing to be written */
(void)dtio_add_output_event_read(dtio);
/* Set wakeuptimer enabled off; so that the next worker thread that
* wants to log starts a timer if needed, since the writer thread
* has gone to sleep. */
lock_basic_lock(&dtio->wakeup_timer_lock);
dtio->wakeup_timer_enabled = 0;
lock_basic_unlock(&dtio->wakeup_timer_lock);
}
#ifdef HAVE_SSL
/** enable the brief read condition */
static int dtio_enable_brief_read(struct dt_io_thread* dtio)
{
dtio->ssl_brief_read = 1;
if(dtio->stop_flush_event) {
ub_event_del(dtio->stop_flush_event);
ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE);
if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
log_err("dnstap io, stop flush, could not ub_event_add");
return 0;
}
return 1;
}
return dtio_add_output_event_read(dtio);
}
#endif /* HAVE_SSL */
#ifdef HAVE_SSL
/** disable the brief read condition */
static int dtio_disable_brief_read(struct dt_io_thread* dtio)
{
dtio->ssl_brief_read = 0;
if(dtio->stop_flush_event) {
ub_event_del(dtio->stop_flush_event);
ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE);
if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
log_err("dnstap io, stop flush, could not ub_event_add");
return 0;
}
return 1;
}
return dtio_add_output_event_write(dtio);
}
#endif /* HAVE_SSL */
#ifdef HAVE_SSL
/** enable the brief write condition */
static int dtio_enable_brief_write(struct dt_io_thread* dtio)
{
dtio->ssl_brief_write = 1;
return dtio_add_output_event_write(dtio);
}
#endif /* HAVE_SSL */
#ifdef HAVE_SSL
/** disable the brief write condition */
static int dtio_disable_brief_write(struct dt_io_thread* dtio)
{
dtio->ssl_brief_write = 0;
return dtio_add_output_event_read(dtio);
}
#endif /* HAVE_SSL */
#ifdef HAVE_SSL
/** check peer verification after ssl handshake connection, false if closed*/
static int dtio_ssl_check_peer(struct dt_io_thread* dtio)
{
if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) {
/* verification */
if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) {
#ifdef HAVE_SSL_GET1_PEER_CERTIFICATE
X509* x = SSL_get1_peer_certificate(dtio->ssl);
#else
X509* x = SSL_get_peer_certificate(dtio->ssl);
#endif
if(!x) {
verbose(VERB_ALGO, "dnstap io, %s, SSL "
"connection failed no certificate",
dtio->ip_str);
return 0;
}
log_cert(VERB_ALGO, "dnstap io, peer certificate",
x);
#ifdef HAVE_SSL_GET0_PEERNAME
if(SSL_get0_peername(dtio->ssl)) {
verbose(VERB_ALGO, "dnstap io, %s, SSL "
"connection to %s authenticated",
dtio->ip_str,
SSL_get0_peername(dtio->ssl));
} else {
#endif
verbose(VERB_ALGO, "dnstap io, %s, SSL "
"connection authenticated",
dtio->ip_str);
#ifdef HAVE_SSL_GET0_PEERNAME
}
#endif
X509_free(x);
} else {
#ifdef HAVE_SSL_GET1_PEER_CERTIFICATE
X509* x = SSL_get1_peer_certificate(dtio->ssl);
#else
X509* x = SSL_get_peer_certificate(dtio->ssl);
#endif
if(x) {
log_cert(VERB_ALGO, "dnstap io, peer "
"certificate", x);
X509_free(x);
}
verbose(VERB_ALGO, "dnstap io, %s, SSL connection "
"failed: failed to authenticate",
dtio->ip_str);
return 0;
}
} else {
/* unauthenticated, the verify peer flag was not set
* in ssl when the ssl object was created from ssl_ctx */
verbose(VERB_ALGO, "dnstap io, %s, SSL connection",
dtio->ip_str);
}
return 1;
}
#endif /* HAVE_SSL */
#ifdef HAVE_SSL
/** perform ssl handshake, returns 1 if okay, 0 to stop */
static int dtio_ssl_handshake(struct dt_io_thread* dtio,
struct stop_flush_info* info)
{
int r;
if(dtio->ssl_brief_read) {
/* assume the brief read condition is satisfied,
* if we need more or again, we can set it again */
if(!dtio_disable_brief_read(dtio)) {
if(info) dtio_stop_flush_exit(info);
return 0;
}
}
if(dtio->ssl_handshake_done)
return 1;
ERR_clear_error();
r = SSL_do_handshake(dtio->ssl);
if(r != 1) {
int want = SSL_get_error(dtio->ssl, r);
if(want == SSL_ERROR_WANT_READ) {
/* we want to read on the connection */
if(!dtio_enable_brief_read(dtio)) {
if(info) dtio_stop_flush_exit(info);
return 0;
}
return 0;
} else if(want == SSL_ERROR_WANT_WRITE) {
/* we want to write on the connection */
return 0;
} else if(r == 0) {
/* closed */
if(info) dtio_stop_flush_exit(info);
dtio_del_output_event(dtio);
dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
dtio_close_output(dtio);
return 0;
} else if(want == SSL_ERROR_SYSCALL) {
/* SYSCALL and errno==0 means closed uncleanly */
int silent = 0;
#ifdef EPIPE
if(errno == EPIPE && verbosity < 2)
silent = 1; /* silence 'broken pipe' */
#endif
#ifdef ECONNRESET
if(errno == ECONNRESET && verbosity < 2)
silent = 1; /* silence reset by peer */
#endif
if(errno == 0)
silent = 1;
if(!silent)
log_err("dnstap io, SSL_handshake syscall: %s",
strerror(errno));
/* closed */
if(info) dtio_stop_flush_exit(info);
dtio_del_output_event(dtio);
dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
dtio_close_output(dtio);
return 0;
} else {
unsigned long err = ERR_get_error();
if(!squelch_err_ssl_handshake(err)) {
log_crypto_err_io_code("dnstap io, ssl handshake failed",
want, err);
verbose(VERB_OPS, "dnstap io, ssl handshake failed "
"from %s", dtio->ip_str);
}
/* closed */
if(info) dtio_stop_flush_exit(info);
dtio_del_output_event(dtio);
dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
dtio_close_output(dtio);
return 0;
}
}
/* check peer verification */
dtio->ssl_handshake_done = 1;
if(!dtio_ssl_check_peer(dtio)) {
/* closed */
if(info) dtio_stop_flush_exit(info);
dtio_del_output_event(dtio);
dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
dtio_close_output(dtio);
return 0;
}
return 1;
}
#endif /* HAVE_SSL */
/** callback for the dnstap events, to write to the output */
void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
{
struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
int i;
if(dtio->check_nb_connect) {
int connect_err = dtio_check_nb_connect(dtio);
if(connect_err == -1) {
/* close the channel */
dtio_del_output_event(dtio);
dtio_close_output(dtio);
return;
} else if(connect_err == 0) {
/* try again later */
return;
}
/* nonblocking connect check passed, continue */
}
#ifdef HAVE_SSL
if(dtio->ssl &&
(!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
if(!dtio_ssl_handshake(dtio, NULL))
return;
}
#endif
if((bits&UB_EV_READ || dtio->ssl_brief_write)) {
if(dtio->ssl_brief_write)
(void)dtio_disable_brief_write(dtio);
if(dtio->ready_frame_sent && !dtio->accept_frame_received) {
if(dtio_read_accept_frame(dtio) <= 0)
return;
} else if(!dtio_check_close(dtio))
return;
}
/* loop to process a number of messages. This improves throughput,
* because selecting on write-event if not needed for busy messages
* (dnstap log) generation and if they need to all be written back.
* The write event is usually not blocked up. But not forever,
* because the event loop needs to stay responsive for other events.
* If there are no (more) messages, or if the output buffers get
* full, it returns out of the loop. */
for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) {
/* see if there are messages that need writing */
if(!dtio->cur_msg) {
if(!dtio_find_msg(dtio)) {
if(i == 0) {
/* no messages on the first iteration,
* the queues are all empty */
dtio_sleep(dtio);
/* After putting to sleep, see if
* a message is in a message queue,
* if so, resume service. Stops a
* race condition where a thread could
* have one message but the dtio
* also just went to sleep. With the
* message queued between the
* dtio_find_msg and dtio_sleep
* calls. */
if(dtio_find_msg(dtio)) {
if(!dtio_add_output_event_write(dtio))
return;
}
}
if(!dtio->cur_msg)
return; /* nothing to do */
}
}
/* write it */
if(dtio->cur_msg_done < dtio->cur_msg_len) {
if(!dtio_write_more(dtio))
return;
}
/* done with the current message */
dtio_cur_msg_free(dtio);
/* If this is a bidirectional stream the first message will be
* the READY control frame. We can only continue writing after
* receiving an ACCEPT control frame. */
if(dtio->is_bidirectional && !dtio->ready_frame_sent) {
dtio->ready_frame_sent = 1;
(void)dtio_add_output_event_read(dtio);
break;
}
}
}
/** callback for the dnstap commandpipe, to stop the dnstap IO */
void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg)
{
struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
uint8_t cmd;
ssize_t r;
if(dtio->want_to_exit)
return;
r = read(fd, &cmd, sizeof(cmd));
if(r == -1) {
#ifndef USE_WINSOCK
if(errno == EINTR || errno == EAGAIN)
return; /* ignore this */
#else
if(WSAGetLastError() == WSAEINPROGRESS)
return;
if(WSAGetLastError() == WSAEWOULDBLOCK)
return;
#endif
log_err("dnstap io: failed to read: %s", sock_strerror(errno));
/* and then fall through to quit the thread */
} else if(r == 0) {
verbose(VERB_ALGO, "dnstap io: cmd channel closed");
} else if(r == 1 && cmd == DTIO_COMMAND_STOP) {
verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
} else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
if(dtio->is_bidirectional && !dtio->accept_frame_received) {
verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, "
"waiting for ACCEPT control frame");
return;
}
/* reregister event */
if(!dtio_add_output_event_write(dtio))
return;
return;
} else if(r == 1) {
verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
}
dtio->want_to_exit = 1;
if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base)
!= 0) {
log_err("dnstap io: could not loopexit");
}
}
#ifndef THREADS_DISABLED
/** setup the event base for the dnstap io thread */
static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs,
struct timeval* now)
{
memset(now, 0, sizeof(*now));
dtio->event_base = ub_default_event_base(0, secs, now);
if(!dtio->event_base) {
fatal_exit("dnstap io: could not create event_base");
}
}
#endif /* THREADS_DISABLED */
/** setup the cmd event for dnstap io */
static void dtio_setup_cmd(struct dt_io_thread* dtio)
{
struct ub_event* cmdev;
fd_set_nonblock(dtio->commandpipe[0]);
cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0],
UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio);
if(!cmdev) {
fatal_exit("dnstap io: out of memory");
}
dtio->command_event = cmdev;
if(ub_event_add(cmdev, NULL) != 0) {
fatal_exit("dnstap io: out of memory (adding event)");
}
}
/** setup the reconnect event for dnstap io */
static void dtio_setup_reconnect(struct dt_io_thread* dtio)
{
dtio_reconnect_clear(dtio);
dtio->reconnect_timer = ub_event_new(dtio->event_base, -1,
UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio);
if(!dtio->reconnect_timer) {
fatal_exit("dnstap io: out of memory");
}
}
/**
* structure to keep track of information during stop flush
*/
struct stop_flush_info {
/** the event base during stop flush */
struct ub_event_base* base;
/** did we already want to exit this stop-flush event base */
int want_to_exit_flush;
/** has the timer fired */
int timer_done;
/** the dtio */
struct dt_io_thread* dtio;
/** the stop control frame */
void* stop_frame;
/** length of the stop frame */
size_t stop_frame_len;
/** how much we have done of the stop frame */
size_t stop_frame_done;
};
/** exit the stop flush base */
static void dtio_stop_flush_exit(struct stop_flush_info* info)
{
if(info->want_to_exit_flush)
return;
info->want_to_exit_flush = 1;
if(ub_event_base_loopexit(info->base) != 0) {
log_err("dnstap io: could not loopexit");
}
}
/** send the stop control,
* return true if completed the frame. */
static int dtio_control_stop_send(struct stop_flush_info* info)
{
struct dt_io_thread* dtio = info->dtio;
int r;
if(info->stop_frame_done >= info->stop_frame_len)
return 1;
r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) +
info->stop_frame_done, info->stop_frame_len -
info->stop_frame_done);
if(r == -1) {
verbose(VERB_ALGO, "dnstap io: stop flush: output closed");
dtio_stop_flush_exit(info);
return 0;
}
if(r == 0) {
/* try again later, or timeout */
return 0;
}
info->stop_frame_done += r;
if(info->stop_frame_done < info->stop_frame_len)
return 0; /* not done yet */
return 1;
}
void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
void* arg)
{
struct stop_flush_info* info = (struct stop_flush_info*)arg;
if(info->want_to_exit_flush)
return;
verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush");
info->timer_done = 1;
dtio_stop_flush_exit(info);
}
void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg)
{
struct stop_flush_info* info = (struct stop_flush_info*)arg;
struct dt_io_thread* dtio = info->dtio;
if(info->want_to_exit_flush)
return;
if(dtio->check_nb_connect) {
/* we don't start the stop_flush if connect still
* in progress, but the check code is here, just in case */
int connect_err = dtio_check_nb_connect(dtio);
if(connect_err == -1) {
/* close the channel, exit the stop flush */
dtio_stop_flush_exit(info);
dtio_del_output_event(dtio);
dtio_close_output(dtio);
return;
} else if(connect_err == 0) {
/* try again later */
return;
}
/* nonblocking connect check passed, continue */
}
#ifdef HAVE_SSL
if(dtio->ssl &&
(!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
if(!dtio_ssl_handshake(dtio, info))
return;
}
#endif
if((bits&UB_EV_READ)) {
if(!dtio_check_close(dtio)) {
if(dtio->fd == -1) {
verbose(VERB_ALGO, "dnstap io: "
"stop flush: output closed");
dtio_stop_flush_exit(info);
}
return;
}
}
/* write remainder of last frame */
if(dtio->cur_msg) {
if(dtio->cur_msg_done < dtio->cur_msg_len) {
if(!dtio_write_more(dtio)) {
if(dtio->fd == -1) {
verbose(VERB_ALGO, "dnstap io: "
"stop flush: output closed");
dtio_stop_flush_exit(info);
}
return;
}
}
verbose(VERB_ALGO, "dnstap io: stop flush completed "
"last frame");
dtio_cur_msg_free(dtio);
}
/* write stop frame */
if(info->stop_frame_done < info->stop_frame_len) {
if(!dtio_control_stop_send(info))
return;
verbose(VERB_ALGO, "dnstap io: stop flush completed "
"stop control frame");
}
/* when last frame and stop frame are sent, exit */
dtio_stop_flush_exit(info);
}
/** flush at end, last packet and stop control */
static void dtio_control_stop_flush(struct dt_io_thread* dtio)
{
/* briefly attempt to flush the previous packet to the output,
* this could be a partial packet, or even the start control frame */
time_t secs = 0;
struct timeval now;
struct stop_flush_info info;
struct timeval tv;
struct ub_event* timer, *stopev;
if(dtio->fd == -1 || dtio->check_nb_connect) {
/* no connection or we have just connected, so nothing is
* sent yet, so nothing to stop or flush */
return;
}
if(dtio->ssl && !dtio->ssl_handshake_done) {
/* no SSL connection has been established yet */
return;
}
memset(&info, 0, sizeof(info));
memset(&now, 0, sizeof(now));
info.dtio = dtio;
info.base = ub_default_event_base(0, &secs, &now);
if(!info.base) {
log_err("dnstap io: malloc failure");
return;
}
timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT,
&dtio_stop_timer_cb, &info);
if(!timer) {
log_err("dnstap io: malloc failure");
ub_event_base_free(info.base);
return;
}
memset(&tv, 0, sizeof(tv));
tv.tv_sec = 2;
if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info,
&tv) != 0) {
log_err("dnstap io: cannot event_timer_add");
ub_event_free(timer);
ub_event_base_free(info.base);
return;
}
stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ |
UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info);
if(!stopev) {
log_err("dnstap io: malloc failure");
ub_timer_del(timer);
ub_event_free(timer);
ub_event_base_free(info.base);
return;
}
if(ub_event_add(stopev, NULL) != 0) {
log_err("dnstap io: cannot event_add");
ub_event_free(stopev);
ub_timer_del(timer);
ub_event_free(timer);
ub_event_base_free(info.base);
return;
}
info.stop_frame = fstrm_create_control_frame_stop(
&info.stop_frame_len);
if(!info.stop_frame) {
log_err("dnstap io: malloc failure");
ub_event_del(stopev);
ub_event_free(stopev);
ub_timer_del(timer);
ub_event_free(timer);
ub_event_base_free(info.base);
return;
}
dtio->stop_flush_event = stopev;
/* wait briefly, or until finished */
verbose(VERB_ALGO, "dnstap io: stop flush started");
if(ub_event_base_dispatch(info.base) < 0) {
log_err("dnstap io: dispatch flush failed, errno is %s",
strerror(errno));
}
verbose(VERB_ALGO, "dnstap io: stop flush ended");
free(info.stop_frame);
dtio->stop_flush_event = NULL;
ub_event_del(stopev);
ub_event_free(stopev);
ub_timer_del(timer);
ub_event_free(timer);
ub_event_base_free(info.base);
}
/** perform desetup and free stuff when the dnstap io thread exits */
static void dtio_desetup(struct dt_io_thread* dtio)
{
dtio_control_stop_flush(dtio);
dtio_del_output_event(dtio);
dtio_close_output(dtio);
ub_event_del(dtio->command_event);
ub_event_free(dtio->command_event);
#ifndef USE_WINSOCK
close(dtio->commandpipe[0]);
#else
_close(dtio->commandpipe[0]);
#endif
dtio->commandpipe[0] = -1;
dtio_reconnect_del(dtio);
ub_event_free(dtio->reconnect_timer);
dtio_cur_msg_free(dtio);
#ifndef THREADS_DISABLED
ub_event_base_free(dtio->event_base);
#endif
}
/** setup a start control message */
static int dtio_control_start_send(struct dt_io_thread* dtio)
{
log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE,
&dtio->cur_msg_len);
if(!dtio->cur_msg) {
return 0;
}
/* setup to send the control message */
/* set that the buffer needs to be sent, but the length
* of that buffer is already written, that way the buffer can
* start with 0 length and then the length of the control frame
* in it */
dtio->cur_msg_done = 0;
dtio->cur_msg_len_done = 4;
return 1;
}
/** setup a ready control message */
static int dtio_control_ready_send(struct dt_io_thread* dtio)
{
log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE,
&dtio->cur_msg_len);
if(!dtio->cur_msg) {
return 0;
}
/* setup to send the control message */
/* set that the buffer needs to be sent, but the length
* of that buffer is already written, that way the buffer can
* start with 0 length and then the length of the control frame
* in it */
dtio->cur_msg_done = 0;
dtio->cur_msg_len_done = 4;
return 1;
}
/** open the output file descriptor for af_local */
static int dtio_open_output_local(struct dt_io_thread* dtio)
{
#ifdef HAVE_SYS_UN_H
struct sockaddr_un s;
dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0);
if(dtio->fd == -1) {
log_err("dnstap io: failed to create socket: %s",
sock_strerror(errno));
return 0;
}
memset(&s, 0, sizeof(s));
#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
/* this member exists on BSDs, not Linux */
s.sun_len = (unsigned)sizeof(s);
#endif
s.sun_family = AF_LOCAL;
/* length is 92-108, 104 on FreeBSD */
(void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
fd_set_nonblock(dtio->fd);
if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
== -1) {
char* to = dtio->socket_path;
if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
verbosity < 4) {
dtio_close_fd(dtio);
return 0; /* no log retries on low verbosity */
}
log_err("dnstap io: failed to connect to \"%s\": %s",
to, sock_strerror(errno));
dtio_close_fd(dtio);
return 0;
}
return 1;
#else
log_err("cannot create af_local socket");
return 0;
#endif /* HAVE_SYS_UN_H */
}
/** open the output file descriptor for af_inet and af_inet6 */
static int dtio_open_output_tcp(struct dt_io_thread* dtio)
{
struct sockaddr_storage addr;
socklen_t addrlen;
memset(&addr, 0, sizeof(addr));
addrlen = (socklen_t)sizeof(addr);
if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen, UNBOUND_DNS_PORT)) {
log_err("could not parse IP '%s'", dtio->ip_str);
return 0;
}
dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0);
if(dtio->fd == -1) {
log_err("can't create socket: %s", sock_strerror(errno));
return 0;
}
fd_set_nonblock(dtio->fd);
if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) {
if(errno == EINPROGRESS)
return 1; /* wait until connect done*/
if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
verbosity < 4) {
dtio_close_fd(dtio);
return 0; /* no log retries on low verbosity */
}
#ifndef USE_WINSOCK
if(tcp_connect_errno_needs_log(
(struct sockaddr *)&addr, addrlen)) {
log_err("dnstap io: failed to connect to %s: %s",
dtio->ip_str, strerror(errno));
}
#else
if(WSAGetLastError() == WSAEINPROGRESS ||
WSAGetLastError() == WSAEWOULDBLOCK)
return 1; /* wait until connect done*/
if(tcp_connect_errno_needs_log(
(struct sockaddr *)&addr, addrlen)) {
log_err("dnstap io: failed to connect to %s: %s",
dtio->ip_str, wsa_strerror(WSAGetLastError()));
}
#endif
dtio_close_fd(dtio);
return 0;
}
return 1;
}
/** setup the SSL structure for new connection */
static int dtio_setup_ssl(struct dt_io_thread* dtio)
{
dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd);
if(!dtio->ssl) return 0;
dtio->ssl_handshake_done = 0;
dtio->ssl_brief_read = 0;
if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name,
dtio->tls_use_sni)) {
return 0;
}
return 1;
}
/** open the output file descriptor */
static void dtio_open_output(struct dt_io_thread* dtio)
{
struct ub_event* ev;
if(dtio->upstream_is_unix) {
if(!dtio_open_output_local(dtio)) {
dtio_reconnect_enable(dtio);
return;
}
} else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
if(!dtio_open_output_tcp(dtio)) {
dtio_reconnect_enable(dtio);
return;
}
if(dtio->upstream_is_tls) {
if(!dtio_setup_ssl(dtio)) {
dtio_close_fd(dtio);
dtio_reconnect_enable(dtio);
return;
}
}
}
dtio->check_nb_connect = 1;
/* the EV_READ is to read ACCEPT control messages, and catch channel
* close. EV_WRITE is to write packets */
ev = ub_event_new(dtio->event_base, dtio->fd,
UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
dtio);
if(!ev) {
log_err("dnstap io: out of memory");
if(dtio->ssl) {
#ifdef HAVE_SSL
SSL_free(dtio->ssl);
dtio->ssl = NULL;
#endif
}
dtio_close_fd(dtio);
dtio_reconnect_enable(dtio);
return;
}
dtio->event = ev;
/* setup protocol control message to start */
if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) ||
(dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) {
log_err("dnstap io: out of memory");
ub_event_free(dtio->event);
dtio->event = NULL;
if(dtio->ssl) {
#ifdef HAVE_SSL
SSL_free(dtio->ssl);
dtio->ssl = NULL;
#endif
}
dtio_close_fd(dtio);
dtio_reconnect_enable(dtio);
return;
}
}
/** perform the setup of the writer thread on the established event_base */
static void dtio_setup_on_base(struct dt_io_thread* dtio)
{
dtio_setup_cmd(dtio);
dtio_setup_reconnect(dtio);
dtio_open_output(dtio);
if(!dtio_add_output_event_write(dtio))
return;
}
#ifndef THREADS_DISABLED
/** the IO thread function for the DNSTAP IO */
static void* dnstap_io(void* arg)
{
struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
time_t secs = 0;
struct timeval now;
log_thread_set(&dtio->threadnum);
/* setup */
verbose(VERB_ALGO, "start dnstap io thread");
dtio_setup_base(dtio, &secs, &now);
dtio_setup_on_base(dtio);
/* run */
if(ub_event_base_dispatch(dtio->event_base) < 0) {
log_err("dnstap io: dispatch failed, errno is %s",
strerror(errno));
}
/* cleanup */
verbose(VERB_ALGO, "stop dnstap io thread");
dtio_desetup(dtio);
return NULL;
}
#endif /* THREADS_DISABLED */
int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
int numworkers)
{
/* set up the thread, can fail */
#ifndef USE_WINSOCK
if(pipe(dtio->commandpipe) == -1) {
log_err("failed to create pipe: %s", strerror(errno));
return 0;
}
#else
if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) {
log_err("failed to create _pipe: %s",
wsa_strerror(WSAGetLastError()));
return 0;
}
#endif
/* start the thread */
dtio->threadnum = numworkers+1;
dtio->started = 1;
#ifndef THREADS_DISABLED
ub_thread_create(&dtio->tid, dnstap_io, dtio);
(void)event_base_nothr;
#else
dtio->event_base = event_base_nothr;
dtio_setup_on_base(dtio);
#endif
return 1;
}
void dt_io_thread_stop(struct dt_io_thread* dtio)
{
#ifndef THREADS_DISABLED
uint8_t cmd = DTIO_COMMAND_STOP;
#endif
if(!dtio) return;
if(!dtio->started) return;
verbose(VERB_ALGO, "dnstap io: send stop cmd");
#ifndef THREADS_DISABLED
while(1) {
ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
if(r == -1) {
#ifndef USE_WINSOCK
if(errno == EINTR || errno == EAGAIN)
continue;
#else
if(WSAGetLastError() == WSAEINPROGRESS)
continue;
if(WSAGetLastError() == WSAEWOULDBLOCK)
continue;
#endif
log_err("dnstap io stop: write: %s",
sock_strerror(errno));
break;
}
break;
}
dtio->started = 0;
#endif /* THREADS_DISABLED */
#ifndef USE_WINSOCK
close(dtio->commandpipe[1]);
#else
_close(dtio->commandpipe[1]);
#endif
dtio->commandpipe[1] = -1;
#ifndef THREADS_DISABLED
ub_thread_join(dtio->tid);
#else
dtio->want_to_exit = 1;
dtio_desetup(dtio);
#endif
}