diff --git a/dnstap/dtstream.c b/dnstap/dtstream.c index 318e04bfc..db3cac475 100644 --- a/dnstap/dtstream.c +++ b/dnstap/dtstream.c @@ -196,15 +196,59 @@ void dt_io_thread_unregister_queue(struct dt_io_thread* dtio, } } +/** pick a message from the queue, lock and unlock, true if a message */ +static int pick_msg_from_queue(struct dt_msg_queue* mq, void** buf, + size_t* len) +{ + lock_basic_lock(&mq->lock); + if(mq->first) { + struct dt_msg_entry* entry = mq->first; + mq->first = entry->next; + if(!entry->next) mq->last = NULL; + mq->cursize -= entry->len; + lock_basic_unlock(&mq->lock); + + *buf = entry->buf; + *len = entry->len; + free(entry); + return 1; + } + lock_basic_unlock(&mq->lock); + return 0; +} + +/** find message in queue, false if no message, true if message to send */ +static int dtio_find_in_queue(struct dt_io_thread* dtio, + struct dt_msg_queue* mq) +{ + void* buf=NULL; + size_t len=0; + if(pick_msg_from_queue(mq, &buf, &len)) { + dtio->cur_msg = buf; + dtio->cur_msg_len = len; + dtio->cur_msg_done = 0; + return 1; + } + return 0; +} + /** find a new message to write, search message queues, false if none */ static int dtio_find_msg(struct dt_io_thread* dtio) { + struct dt_io_list_item* item = dtio->io_list; + while(item) { + if(dtio_find_in_queue(dtio, item->queue)) + return 1; + item = item->next; + } + return 0; } /** write more of the current messsage. false if incomplete, true if * the message is done */ -static int dtio_write_more(struct dt_io_thread* dtio) +static int dtio_write_more(int fd, struct dt_io_thread* dtio) { + return 0; } /** callback for the dnstap events, to write to the output */ @@ -220,7 +264,7 @@ static void dtio_output_cb(int fd, short ATTR_UNUSED(bits), void* arg) /* write it */ if(dtio->cur_msg_done < dtio->cur_msg_len) { - if(!dtio_write_more(dtio)) + if(!dtio_write_more(fd, dtio)) return; }