- fixup race problems from opensll in rand init from library, with

a mutex around the rand init.
       - fix pass async_id=NULL to _async resolve().
       - rewrote _wait() routine, so that it is threadsafe.
       - cancelation is threadsafe.


git-svn-id: file:///svn/unbound/trunk@902 be551aaa-1e26-0410-a405-d3ace91eadb9
This commit is contained in:
Wouter Wijngaards 2008-01-25 15:13:39 +00:00
parent bfe329e34d
commit ed57c4de4c
4 changed files with 260 additions and 54 deletions

View File

@ -6,6 +6,11 @@
- please doxygen, put doxygen comment in one place.
- asynclook -b blocking mode and test.
- refactor asynclook, nicer code.
- fixup race problems from opensll in rand init from library, with
a mutex around the rand init.
- fix pass async_id=NULL to _async resolve().
- rewrote _wait() routine, so that it is threadsafe.
- cancelation is threadsafe.
24 January 2008: Wouter
- tested the cancel() function.

View File

@ -322,51 +322,24 @@ int
ub_val_poll(struct ub_val_ctx* ctx)
{
struct timeval t;
int r;
memset(&t, 0, sizeof(t));
lock_basic_lock(&ctx->rrpipe_lock);
r = pollit(ctx, &t);
lock_basic_unlock(&ctx->rrpipe_lock);
return r;
}
int
ub_val_wait(struct ub_val_ctx* ctx)
{
int r;
lock_basic_lock(&ctx->cfglock);
while(ctx->num_async > 0) {
lock_basic_unlock(&ctx->cfglock);
lock_basic_lock(&ctx->rrpipe_lock);
r = pollit(ctx, NULL);
lock_basic_unlock(&ctx->rrpipe_lock);
if(r)
ub_val_process(ctx);
lock_basic_lock(&ctx->cfglock);
}
lock_basic_unlock(&ctx->cfglock);
return UB_NOERROR;
/* no need to hold lock while testing for readability. */
return pollit(ctx, &t);
}
int
ub_val_fd(struct ub_val_ctx* ctx)
{
int fd;
lock_basic_lock(&ctx->rrpipe_lock);
fd = ctx->rrpipe[0];
lock_basic_unlock(&ctx->rrpipe_lock);
return fd;
return ctx->rrpipe[0];
}
/** process answer from bg worker */
static int
process_answer(struct ub_val_ctx* ctx, uint8_t* msg, uint32_t len)
process_answer_detail(struct ub_val_ctx* ctx, uint8_t* msg, uint32_t len,
ub_val_callback_t* cb, void** cbarg, int* err,
struct ub_val_result** res)
{
int err;
struct ctx_query* q;
ub_val_callback_t cb;
void* cbarg;
struct ub_val_result* res;
if(context_serial_getcmd(msg, len) != UB_LIBCMD_ANSWER) {
log_err("error: bad data from bg worker %d",
(int)context_serial_getcmd(msg, len));
@ -374,7 +347,7 @@ process_answer(struct ub_val_ctx* ctx, uint8_t* msg, uint32_t len)
}
lock_basic_lock(&ctx->cfglock);
q = context_deserialize_answer(ctx, msg, len, &err);
q = context_deserialize_answer(ctx, msg, len, err);
if(!q) {
lock_basic_unlock(&ctx->cfglock);
/* probably simply the lookup that failed, i.e.
@ -384,22 +357,27 @@ process_answer(struct ub_val_ctx* ctx, uint8_t* msg, uint32_t len)
log_assert(q->async);
/* grab cb while locked */
cb = q->cb;
cbarg = q->cb_arg;
if(err) {
res = NULL;
if(q->cancelled) {
*cb = NULL;
*cbarg = NULL;
} else {
*cb = q->cb;
*cbarg = q->cb_arg;
}
if(*err) {
*res = NULL;
ub_val_result_free(q->res);
} else {
/* parse the message, extract rcode, fill result */
ldns_buffer* buf = ldns_buffer_new(q->msg_len);
struct regional* region = regional_create();
res = q->res;
res->rcode = LDNS_RCODE_SERVFAIL;
*res = q->res;
(*res)->rcode = LDNS_RCODE_SERVFAIL;
if(region && buf) {
ldns_buffer_clear(buf);
ldns_buffer_write(buf, q->msg, q->msg_len);
ldns_buffer_flip(buf);
libworker_enter_result(res, buf, region,
libworker_enter_result(*res, buf, region,
q->msg_security);
}
ldns_buffer_free(buf);
@ -412,20 +390,38 @@ process_answer(struct ub_val_ctx* ctx, uint8_t* msg, uint32_t len)
context_query_delete(q);
lock_basic_unlock(&ctx->cfglock);
if(*cb) return 2;
return 1;
}
/** process answer from bg worker */
static int
process_answer(struct ub_val_ctx* ctx, uint8_t* msg, uint32_t len)
{
int err;
ub_val_callback_t cb;
void* cbarg;
struct ub_val_result* res;
int r;
r = process_answer_detail(ctx, msg, len, &cb, &cbarg, &err, &res);
/* no locks held while calling callback, so that library is
* re-entrant. */
(*cb)(cbarg, err, res);
if(r == 2)
(*cb)(cbarg, err, res);
return 1;
return r;
}
int
ub_val_process(struct ub_val_ctx* ctx)
{
int r;
uint8_t* msg = NULL;
uint32_t len = 0;
uint8_t* msg;
uint32_t len;
while(1) {
msg = NULL;
lock_basic_lock(&ctx->rrpipe_lock);
r = libworker_read_msg(ctx->rrpipe[0], &msg, &len, 1);
lock_basic_unlock(&ctx->rrpipe_lock);
@ -442,6 +438,59 @@ ub_val_process(struct ub_val_ctx* ctx)
return UB_NOERROR;
}
int
ub_val_wait(struct ub_val_ctx* ctx)
{
int err;
ub_val_callback_t cb;
void* cbarg;
struct ub_val_result* res;
int r;
uint8_t* msg;
uint32_t len;
/* this is basically the same loop as _process(), but with changes.
* holds the rrpipe lock and waits with pollit */
while(1) {
lock_basic_lock(&ctx->rrpipe_lock);
lock_basic_lock(&ctx->cfglock);
if(ctx->num_async == 0) {
lock_basic_unlock(&ctx->cfglock);
lock_basic_unlock(&ctx->rrpipe_lock);
break;
}
lock_basic_unlock(&ctx->cfglock);
/* keep rrpipe locked, while
* o waiting for pipe readable
* o parsing message
* o possibly decrementing num_async
* do callback without lock
*/
r = pollit(ctx, NULL);
if(r) {
r = libworker_read_msg(ctx->rrpipe[0], &msg, &len, 1);
if(r == 0) {
lock_basic_unlock(&ctx->rrpipe_lock);
return UB_PIPE;
}
if(r == -1) {
lock_basic_unlock(&ctx->rrpipe_lock);
continue;
}
r = process_answer_detail(ctx, msg, len,
&cb, &cbarg, &err, &res);
lock_basic_unlock(&ctx->rrpipe_lock);
if(r == 0)
return UB_PIPE;
if(r == 2)
(*cb)(cbarg, err, res);
} else {
lock_basic_unlock(&ctx->rrpipe_lock);
}
}
return UB_NOERROR;
}
int
ub_val_resolve(struct ub_val_ctx* ctx, char* name, int rrtype,
int rrclass, struct ub_val_result** result)
@ -491,7 +540,8 @@ ub_val_resolve_async(struct ub_val_ctx* ctx, char* name, int rrtype,
uint8_t* msg = NULL;
uint32_t len = 0;
*async_id = 0;
if(async_id)
*async_id = 0;
lock_basic_lock(&ctx->cfglock);
if(!ctx->finalized) {
int r = context_finalize(ctx);
@ -528,7 +578,8 @@ ub_val_resolve_async(struct ub_val_ctx* ctx, char* name, int rrtype,
lock_basic_unlock(&ctx->cfglock);
return UB_NOMEM;
}
*async_id = q->querynum;
if(async_id)
*async_id = q->querynum;
lock_basic_unlock(&ctx->cfglock);
lock_basic_lock(&ctx->qqpipe_lock);
@ -557,6 +608,7 @@ ub_val_cancel(struct ub_val_ctx* ctx, int async_id)
lock_basic_unlock(&ctx->cfglock);
return UB_NOMEM;
}
q->cancelled = 1;
/* delete it */
if(!ctx->dothread) { /* if forked */
@ -722,7 +774,6 @@ ub_val_ctx_resolvconf(struct ub_val_ctx* ctx, char* fname)
fclose(in);
if(numserv == 0) {
/* from resolv.conf(5) if none given, use localhost */
log_info("resconf: no nameservers, using localhost");
return ub_val_ctx_set_fwd(ctx, "127.0.0.1");
}
return UB_NOERROR;

View File

@ -125,11 +125,16 @@ libworker_setup(struct ub_val_ctx* ctx)
seed = (unsigned int)time(NULL) ^ (unsigned int)getpid() ^
(((unsigned int)w->thread_num)<<17);
seed ^= (unsigned int)w->env->alloc->next_id;
lock_basic_lock(&ctx->cfglock);
/* Openssl RAND_... functions are not as threadsafe as documented,
* put a lock around them. */
if(!ub_initstate(seed, w->env->rnd, RND_STATE_SIZE)) {
lock_basic_unlock(&ctx->cfglock);
seed = 0;
libworker_delete(w);
return NULL;
}
lock_basic_unlock(&ctx->cfglock);
seed = 0;
w->base = comm_base_create();
@ -308,6 +313,9 @@ int libworker_bg(struct ub_val_ctx* ctx)
lock_basic_unlock(&ctx->cfglock);
w = libworker_setup(ctx);
w->is_bg_thread = 1;
#ifdef ENABLE_LOCK_CHECKS
w->thread_num = 1; /* for nicer DEBUG checklocks */
#endif
if(!w) return UB_NOMEM;
ub_thread_create(&ctx->bg_tid, libworker_dobg, w);
} else {
@ -762,11 +770,13 @@ libworker_write_msg(int fd, uint8_t* buf, uint32_t len, int nonblock)
if(r != (ssize_t)sizeof(len)) {
if(write(fd, (char*)(&len)+r, sizeof(len)-r) == -1) {
log_err("msg write failed: %s", strerror(errno));
(void)fd_set_nonblock(fd);
return 0;
}
}
if(write(fd, buf, len) == -1) {
log_err("msg write failed: %s", strerror(errno));
(void)fd_set_nonblock(fd);
return 0;
}
if(!fd_set_nonblock(fd))
@ -798,22 +808,29 @@ libworker_read_msg(int fd, uint8_t** buf, uint32_t* len, int nonblock)
if(r != (ssize_t)sizeof(*len)) {
if((r=read(fd, (char*)(len)+r, sizeof(*len)-r)) == -1) {
log_err("msg read failed: %s", strerror(errno));
(void)fd_set_nonblock(fd);
return 0;
}
if(r == 0) /* EOF */
if(r == 0) /* EOF */ {
(void)fd_set_nonblock(fd);
return 0;
}
}
*buf = (uint8_t*)malloc(*len);
if(!*buf) {
log_err("out of memory");
(void)fd_set_nonblock(fd);
return 0;
}
if((r=read(fd, *buf, *len)) == -1) {
log_err("msg read failed: %s", strerror(errno));
(void)fd_set_nonblock(fd);
return 0;
}
if(r == 0) /* EOF */
if(r == 0) { /* EOF */
(void)fd_set_nonblock(fd);
return 0;
}
if(!fd_set_nonblock(fd))
return 0;
return 1;

View File

@ -43,6 +43,7 @@
#include "config.h"
#include "libunbound/unbound.h"
#include "util/locks.h"
#include "util/log.h"
/**
* result list for the lookups
@ -73,6 +74,7 @@ void usage(char* argv[])
printf(" -h : this help message\n");
printf(" -r fname : read resolv.conf from fname\n");
printf(" -t : use a resolver thread instead of forking a process\n");
printf(" -x : perform extended threaded test\n");
exit(1);
}
@ -104,7 +106,8 @@ print_result(struct lookinfo* info)
}
/** this is a function of type ub_val_callback_t */
void lookup_is_done(void* mydata, int err, struct ub_val_result* result)
static void
lookup_is_done(void* mydata, int err, struct ub_val_result* result)
{
/* cast mydata back to the correct type */
struct lookinfo* info = (struct lookinfo*)mydata;
@ -116,7 +119,8 @@ void lookup_is_done(void* mydata, int err, struct ub_val_result* result)
}
/** check error, if bad, exit with error message */
static void checkerr(const char* desc, int err)
static void
checkerr(const char* desc, int err)
{
if(err != 0) {
printf("%s error: %s\n", desc, ub_val_strerror(err));
@ -124,6 +128,129 @@ static void checkerr(const char* desc, int err)
}
}
/** number of threads to make in extended test */
#define NUMTHR 10
/** struct for extended thread info */
struct ext_thr_info {
/** thread num for debug */
int thread_num;
/** thread id */
ub_thread_t tid;
/** context */
struct ub_val_ctx* ctx;
/** size of array to query */
int argc;
/** array of names to query */
char** argv;
/** number of queries to do */
int numq;
};
/** extended bg result callback, this function is ub_val_callback_t */
static void
ext_callback(void* mydata, int err, struct ub_val_result* result)
{
int* my_id = (int*)mydata;
int doprint = 0;
if(my_id) {
/* I have an id, make sure we are not cancelled */
if(*my_id == 0) {
printf("error: query returned, but was cancelled\n");
exit(1);
}
if(doprint)
printf("cb %d: ", *my_id);
}
checkerr("ext_callback", err);
log_assert(result);
if(doprint) {
struct lookinfo pi;
pi.name = result?result->qname:"noname";
pi.result = result;
pi.err = 0;
print_result(&pi);
}
ub_val_result_free(result);
}
/** extended thread worker */
static void*
ext_thread(void* arg)
{
struct ext_thr_info* inf = (struct ext_thr_info*)arg;
int i, r;
struct ub_val_result* result;
int* async_ids = NULL;
log_thread_set(&inf->thread_num);
if(inf->thread_num > NUMTHR*2/3) {
async_ids = (int*)calloc((size_t)inf->numq, sizeof(int));
if(!async_ids) {
printf("out of memory\n");
exit(1);
}
}
for(i=0; i<inf->numq; i++) {
if(async_ids) {
r = ub_val_resolve_async(inf->ctx,
inf->argv[i%inf->argc], LDNS_RR_TYPE_A,
LDNS_RR_CLASS_IN, &async_ids[i], ext_callback,
&async_ids[i]);
checkerr("ub_val_resolve_async", r);
if(i > 100) {
r = ub_val_cancel(inf->ctx, async_ids[i-100]);
checkerr("ub_val_cancel", r);
async_ids[i-100]=0;
}
} else if(inf->thread_num > NUMTHR/2) {
/* async */
r = ub_val_resolve_async(inf->ctx,
inf->argv[i%inf->argc], LDNS_RR_TYPE_A,
LDNS_RR_CLASS_IN, NULL, ext_callback, NULL);
checkerr("ub_val_resolve_async", r);
} else {
/* blocking */
r = ub_val_resolve(inf->ctx, inf->argv[i%inf->argc],
LDNS_RR_TYPE_A, LDNS_RR_CLASS_IN, &result);
checkerr("ub_val_resolve", r);
}
}
if(inf->thread_num > NUMTHR/2) {
r = ub_val_wait(inf->ctx);
checkerr("ub_val_ctx_wait", r);
}
free(async_ids);
return NULL;
}
/** perform extended threaded test */
static int
ext_test(struct ub_val_ctx* ctx, int argc, char** argv)
{
struct ext_thr_info inf[NUMTHR];
int i;
printf("extended test start (%d threads)\n", NUMTHR);
for(i=0; i<NUMTHR; i++) {
/* 0 = this, 1 = library bg worker */
inf[i].thread_num = i+2;
inf[i].ctx = ctx;
inf[i].argc = argc;
inf[i].argv = argv;
inf[i].numq = 1000;
ub_thread_create(&inf[i].tid, ext_thread, &inf[i]);
}
/* the work happens here */
for(i=0; i<NUMTHR; i++) {
ub_thread_join(inf[i].tid);
}
printf("extended test end\n");
ub_val_ctx_delete(ctx);
sleep(1); /* give bg thread time to exit */
checklock_stop();
return 0;
}
/** getopt global, in case header files fail to declare it. */
extern int optind;
/** getopt global, in case header files fail to declare it. */
@ -135,7 +262,7 @@ int main(int argc, char** argv)
int c;
struct ub_val_ctx* ctx;
struct lookinfo* lookups;
int i, r, cancel=0, blocking=0;
int i, r, cancel=0, blocking=0, ext=0;
/* lock debug start (if any) */
checklock_start();
@ -151,7 +278,7 @@ int main(int argc, char** argv)
if(argc == 1) {
usage(argv);
}
while( (c=getopt(argc, argv, "bcdf:hr:t")) != -1) {
while( (c=getopt(argc, argv, "bcdf:hr:tx")) != -1) {
switch(c) {
case 'd':
r = ub_val_ctx_debuglevel(ctx, 3);
@ -181,6 +308,9 @@ int main(int argc, char** argv)
r = ub_val_ctx_set_fwd(ctx, optarg);
checkerr("ub_val_ctx_set_fwd", r);
break;
case 'x':
ext = 1;
break;
case 'h':
case '?':
default:
@ -190,6 +320,9 @@ int main(int argc, char** argv)
argc -= optind;
argv += optind;
if(ext)
return ext_test(ctx, argc, argv);
/* allocate array for results. */
lookups = (struct lookinfo*)calloc((size_t)argc,
sizeof(struct lookinfo));