mirror of
https://github.com/NLnetLabs/unbound.git
synced 2024-09-21 06:37:08 +00:00
- Fix #305: dnstap logging significantly affects unbound performance
(regression in 1.11).
This commit is contained in:
parent
249d5a706b
commit
48a56751e9
@ -1717,14 +1717,6 @@ worker_create(struct daemon* daemon, int id, int* ports, int n)
|
||||
return NULL;
|
||||
}
|
||||
explicit_bzero(&seed, sizeof(seed));
|
||||
#ifdef USE_DNSTAP
|
||||
if(daemon->cfg->dnstap) {
|
||||
log_assert(daemon->dtenv != NULL);
|
||||
memcpy(&worker->dtenv, daemon->dtenv, sizeof(struct dt_env));
|
||||
if(!dt_init(&worker->dtenv))
|
||||
fatal_exit("dt_init failed");
|
||||
}
|
||||
#endif
|
||||
return worker;
|
||||
}
|
||||
|
||||
@ -1783,6 +1775,14 @@ worker_init(struct worker* worker, struct config_file *cfg,
|
||||
} else { /* !do_sigs */
|
||||
worker->comsig = NULL;
|
||||
}
|
||||
#ifdef USE_DNSTAP
|
||||
if(cfg->dnstap) {
|
||||
log_assert(worker->daemon->dtenv != NULL);
|
||||
memcpy(&worker->dtenv, worker->daemon->dtenv, sizeof(struct dt_env));
|
||||
if(!dt_init(&worker->dtenv, worker->base))
|
||||
fatal_exit("dt_init failed");
|
||||
}
|
||||
#endif
|
||||
worker->front = listen_create(worker->base, ports,
|
||||
cfg->msg_buffer_size, (int)cfg->incoming_num_tcp,
|
||||
cfg->do_tcp_keepalive
|
||||
|
@ -246,9 +246,9 @@ dt_apply_cfg(struct dt_env *env, struct config_file *cfg)
|
||||
}
|
||||
|
||||
int
|
||||
dt_init(struct dt_env *env)
|
||||
dt_init(struct dt_env *env, struct comm_base* base)
|
||||
{
|
||||
env->msgqueue = dt_msg_queue_create();
|
||||
env->msgqueue = dt_msg_queue_create(base);
|
||||
if(!env->msgqueue) {
|
||||
log_err("malloc failure");
|
||||
return 0;
|
||||
|
@ -101,10 +101,11 @@ dt_apply_cfg(struct dt_env *env, struct config_file *cfg);
|
||||
/**
|
||||
* Initialize per-worker state in dnstap environment object.
|
||||
* @param env: dnstap environment object to initialize, created with dt_create().
|
||||
* @param base: event base for wakeup timer.
|
||||
* @return: true on success, false on failure.
|
||||
*/
|
||||
int
|
||||
dt_init(struct dt_env *env);
|
||||
dt_init(struct dt_env *env, struct comm_base* base);
|
||||
|
||||
/**
|
||||
* Deletes the per-worker state created by dt_init
|
||||
|
@ -68,6 +68,8 @@
|
||||
#define DTIO_RECONNECT_TIMEOUT_MAX 1000
|
||||
/** the msec to wait for reconnect slow, to stop busy spinning on reconnect */
|
||||
#define DTIO_RECONNECT_TIMEOUT_SLOW 1000
|
||||
/** number of messages before wakeup of thread */
|
||||
#define DTIO_MSG_FOR_WAKEUP 32
|
||||
|
||||
/** maximum length of received frame */
|
||||
#define DTIO_RECV_FRAME_MAX_LEN 1000
|
||||
@ -99,13 +101,18 @@ static int dtio_enable_brief_write(struct dt_io_thread* dtio);
|
||||
#endif
|
||||
|
||||
struct dt_msg_queue*
|
||||
dt_msg_queue_create(void)
|
||||
dt_msg_queue_create(struct comm_base* base)
|
||||
{
|
||||
struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
|
||||
if(!mq) return NULL;
|
||||
mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
|
||||
about 1 M should contain 64K messages with some overhead,
|
||||
or a whole bunch smaller ones */
|
||||
mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq);
|
||||
if(!mq->wakeup_timer) {
|
||||
free(mq);
|
||||
return NULL;
|
||||
}
|
||||
lock_basic_init(&mq->lock);
|
||||
lock_protect(&mq->lock, mq, sizeof(*mq));
|
||||
return mq;
|
||||
@ -125,6 +132,7 @@ dt_msg_queue_clear(struct dt_msg_queue* mq)
|
||||
mq->first = NULL;
|
||||
mq->last = NULL;
|
||||
mq->cursize = 0;
|
||||
mq->msgcount = 0;
|
||||
}
|
||||
|
||||
void
|
||||
@ -133,6 +141,7 @@ dt_msg_queue_delete(struct dt_msg_queue* mq)
|
||||
if(!mq) return;
|
||||
lock_basic_destroy(&mq->lock);
|
||||
dt_msg_queue_clear(mq);
|
||||
comm_timer_delete(mq->wakeup_timer);
|
||||
free(mq);
|
||||
}
|
||||
|
||||
@ -163,10 +172,57 @@ static void dtio_wakeup(struct dt_io_thread* dtio)
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
mq_wakeup_cb(void* arg)
|
||||
{
|
||||
struct dt_msg_queue* mq = (struct dt_msg_queue*)arg;
|
||||
/* even if the dtio is already active, because perhaps much
|
||||
* traffic suddenly, we leave the timer running to save on
|
||||
* managing it, the once a second timer is less work then
|
||||
* starting and stopping the timer frequently */
|
||||
lock_basic_lock(&mq->dtio->wakeup_timer_lock);
|
||||
mq->dtio->wakeup_timer_enabled = 0;
|
||||
lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
|
||||
dtio_wakeup(mq->dtio);
|
||||
}
|
||||
|
||||
/** start timer to wakeup dtio because there is content in the queue */
|
||||
static void
|
||||
dt_msg_queue_start_timer(struct dt_msg_queue* mq)
|
||||
{
|
||||
struct timeval tv;
|
||||
/* Start a timer to process messages to be logged.
|
||||
* If we woke up the dtio thread for every message, the wakeup
|
||||
* messages take up too much processing power. If the queue
|
||||
* fills up the wakeup happens immediately. The timer wakes it up
|
||||
* if there are infrequent messages to log. */
|
||||
|
||||
/* we cannot start a timer in dtio thread, because it is a different
|
||||
* thread and its event base is in use by the other thread, it would
|
||||
* give race conditions if we tried to modify its event base,
|
||||
* and locks would wait until it woke up, and this is what we do. */
|
||||
|
||||
/* do not start the timer if a timer already exists, perhaps
|
||||
* in another worker. So this variable is protected by a lock in
|
||||
* dtio */
|
||||
lock_basic_lock(&mq->dtio->wakeup_timer_lock);
|
||||
if(mq->dtio->wakeup_timer_enabled) {
|
||||
lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
|
||||
return;
|
||||
}
|
||||
mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */
|
||||
lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
|
||||
|
||||
/* start the timer, in mq, in the event base of our worker */
|
||||
tv.tv_sec = 1;
|
||||
tv.tv_usec = 0;
|
||||
comm_timer_set(mq->wakeup_timer, &tv);
|
||||
}
|
||||
|
||||
void
|
||||
dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
|
||||
{
|
||||
int wakeup = 0;
|
||||
int wakeupnow = 0, wakeupstarttimer = 0;
|
||||
struct dt_msg_entry* entry;
|
||||
|
||||
/* check conditions */
|
||||
@ -197,9 +253,14 @@ dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
|
||||
|
||||
/* aqcuire lock */
|
||||
lock_basic_lock(&mq->lock);
|
||||
/* list was empty, wakeup dtio */
|
||||
/* if list was empty, start timer for (eventual) wakeup */
|
||||
if(mq->first == NULL)
|
||||
wakeup = 1;
|
||||
wakeupstarttimer = 1;
|
||||
/* if list contains more than wakeupnum elements, wakeup now,
|
||||
* or if list is (going to be) almost full */
|
||||
if(mq->msgcount+1 > DTIO_MSG_FOR_WAKEUP ||
|
||||
mq->cursize+len >= mq->maxsize * 9 / 10)
|
||||
wakeupnow = 1;
|
||||
/* see if it is going to fit */
|
||||
if(mq->cursize + len > mq->maxsize) {
|
||||
/* buffer full, or congested. */
|
||||
@ -210,6 +271,7 @@ dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
|
||||
return;
|
||||
}
|
||||
mq->cursize += len;
|
||||
mq->msgcount ++;
|
||||
/* append to list */
|
||||
if(mq->last) {
|
||||
mq->last->next = entry;
|
||||
@ -220,13 +282,19 @@ dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
|
||||
/* release lock */
|
||||
lock_basic_unlock(&mq->lock);
|
||||
|
||||
if(wakeup)
|
||||
if(wakeupnow) {
|
||||
dtio_wakeup(mq->dtio);
|
||||
} else if(wakeupstarttimer) {
|
||||
dt_msg_queue_start_timer(mq);
|
||||
}
|
||||
}
|
||||
|
||||
struct dt_io_thread* dt_io_thread_create(void)
|
||||
{
|
||||
struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
|
||||
lock_basic_init(&dtio->wakeup_timer_lock);
|
||||
lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled,
|
||||
sizeof(dtio->wakeup_timer_enabled));
|
||||
return dtio;
|
||||
}
|
||||
|
||||
@ -234,6 +302,7 @@ void dt_io_thread_delete(struct dt_io_thread* dtio)
|
||||
{
|
||||
struct dt_io_list_item* item, *nextitem;
|
||||
if(!dtio) return;
|
||||
lock_basic_destroy(&dtio->wakeup_timer_lock);
|
||||
item=dtio->io_list;
|
||||
while(item) {
|
||||
nextitem = item->next;
|
||||
@ -416,6 +485,7 @@ static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf,
|
||||
mq->first = entry->next;
|
||||
if(!entry->next) mq->last = NULL;
|
||||
mq->cursize -= entry->len;
|
||||
mq->msgcount --;
|
||||
lock_basic_unlock(&mq->lock);
|
||||
|
||||
*buf = entry->buf;
|
||||
|
@ -49,6 +49,7 @@ struct dt_msg_entry;
|
||||
struct dt_io_list_item;
|
||||
struct dt_io_thread;
|
||||
struct config_file;
|
||||
struct comm_base;
|
||||
|
||||
/**
|
||||
* A message buffer with dnstap messages queued up. It is per-worker.
|
||||
@ -68,11 +69,15 @@ struct dt_msg_queue {
|
||||
/** current size of the buffer, in bytes. data bytes of messages.
|
||||
* If a new message make it more than maxsize, the buffer is full */
|
||||
size_t cursize;
|
||||
/** number of messages in the queue */
|
||||
int msgcount;
|
||||
/** list of messages. The messages are added to the back and taken
|
||||
* out from the front. */
|
||||
struct dt_msg_entry* first, *last;
|
||||
/** reference to the io thread to wakeup */
|
||||
struct dt_io_thread* dtio;
|
||||
/** the wakeup timer for dtio, on worker event base */
|
||||
struct comm_timer* wakeup_timer;
|
||||
};
|
||||
|
||||
/**
|
||||
@ -166,6 +171,10 @@ struct dt_io_thread {
|
||||
* for the current message length that precedes the frame */
|
||||
size_t cur_msg_len_done;
|
||||
|
||||
/** lock on wakeup_timer_enabled */
|
||||
lock_basic_type wakeup_timer_lock;
|
||||
/** if wakeup timer is enabled in some thread */
|
||||
int wakeup_timer_enabled;
|
||||
/** command pipe that stops the pipe if closed. Used to quit
|
||||
* the program. [0] is read, [1] is written to. */
|
||||
int commandpipe[2];
|
||||
@ -233,9 +242,10 @@ struct dt_io_list_item {
|
||||
|
||||
/**
|
||||
* Create new (empty) worker message queue. Limit set to default on max.
|
||||
* @param base: event base for wakeup timer.
|
||||
* @return NULL on malloc failure or a new queue (not locked).
|
||||
*/
|
||||
struct dt_msg_queue* dt_msg_queue_create(void);
|
||||
struct dt_msg_queue* dt_msg_queue_create(struct comm_base* base);
|
||||
|
||||
/**
|
||||
* Delete a worker message queue. It has to be unlinked from access,
|
||||
@ -258,6 +268,9 @@ void dt_msg_queue_delete(struct dt_msg_queue* mq);
|
||||
*/
|
||||
void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len);
|
||||
|
||||
/** timer callback to wakeup dtio thread to process messages */
|
||||
void mq_wakeup_cb(void* arg);
|
||||
|
||||
/**
|
||||
* Create IO thread.
|
||||
* @return new io thread object. not yet started. or NULL malloc failure.
|
||||
|
@ -1,3 +1,7 @@
|
||||
23 September 2020: Wouter
|
||||
- Fix #305: dnstap logging significantly affects unbound performance
|
||||
(regression in 1.11).
|
||||
|
||||
21 September 2020: Ralph
|
||||
- Fix #304: dnstap logging not recovering after dnstap process restarts
|
||||
|
||||
|
@ -138,6 +138,7 @@ fptr_whitelist_comm_timer(void (*fptr)(void*))
|
||||
else if(fptr == &auth_xfer_probe_timer_callback) return 1;
|
||||
else if(fptr == &auth_xfer_transfer_timer_callback) return 1;
|
||||
else if(fptr == &mesh_serve_expired_callback) return 1;
|
||||
else if(fptr == &mq_wakeup_cb) return 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user