iothread find msg.

This commit is contained in:
W.C.A. Wijngaards 2020-01-21 17:14:47 +01:00
parent efc79beb2d
commit 351e0e6986

View File

@ -196,15 +196,59 @@ void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
}
}
/** pick a message from the queue, lock and unlock, true if a message */
static int pick_msg_from_queue(struct dt_msg_queue* mq, void** buf,
size_t* len)
{
lock_basic_lock(&mq->lock);
if(mq->first) {
struct dt_msg_entry* entry = mq->first;
mq->first = entry->next;
if(!entry->next) mq->last = NULL;
mq->cursize -= entry->len;
lock_basic_unlock(&mq->lock);
*buf = entry->buf;
*len = entry->len;
free(entry);
return 1;
}
lock_basic_unlock(&mq->lock);
return 0;
}
/** find message in queue, false if no message, true if message to send */
static int dtio_find_in_queue(struct dt_io_thread* dtio,
struct dt_msg_queue* mq)
{
void* buf=NULL;
size_t len=0;
if(pick_msg_from_queue(mq, &buf, &len)) {
dtio->cur_msg = buf;
dtio->cur_msg_len = len;
dtio->cur_msg_done = 0;
return 1;
}
return 0;
}
/** find a new message to write, search message queues, false if none */
static int dtio_find_msg(struct dt_io_thread* dtio)
{
struct dt_io_list_item* item = dtio->io_list;
while(item) {
if(dtio_find_in_queue(dtio, item->queue))
return 1;
item = item->next;
}
return 0;
}
/** write more of the current messsage. false if incomplete, true if
* the message is done */
static int dtio_write_more(struct dt_io_thread* dtio)
static int dtio_write_more(int fd, struct dt_io_thread* dtio)
{
return 0;
}
/** callback for the dnstap events, to write to the output */
@ -220,7 +264,7 @@ static void dtio_output_cb(int fd, short ATTR_UNUSED(bits), void* arg)
/* write it */
if(dtio->cur_msg_done < dtio->cur_msg_len) {
if(!dtio_write_more(dtio))
if(!dtio_write_more(fd, dtio))
return;
}