Implement error handling on process monitor exit. Now, before

sending any message to the monitor, the sender must check a
"ready" channel.  Before exiting, the monitor records its exit
error and closes this channel, ensuring that all later reads
from the ready channel will immediately return false.

Inspired by
http://chplib.wordpress.com/2009/09/30/poison-concurrent-termination/

R=rsc
APPROVED=rsc
DELTA=47  (27 added, 11 deleted, 9 changed)
OCL=35782
CL=35784
This commit is contained in:
Austin Clements 2009-10-15 12:59:59 -07:00
parent 974b23f569
commit 049501ce4c

View File

@ -145,14 +145,22 @@ type transitionHandler struct {
// Each running process has one monitor thread, which processes // Each running process has one monitor thread, which processes
// messages from the debugEvents, debugReqs, and stopReq channels and // messages from the debugEvents, debugReqs, and stopReq channels and
// calls transition handlers. // calls transition handlers.
//
// To send a message to the monitor thread, first receive from the
// ready channel. If the ready channel returns true, the monitor is
// still running and will accept a message. If the ready channel
// returns false, the monitor is not running (the ready channel has
// been closed), and the reason it is not running will be stored in err.
type process struct { type process struct {
pid int; pid int;
threads map[int]*thread; threads map[int]*thread;
breakpoints map[uintptr]*breakpoint; breakpoints map[uintptr]*breakpoint;
ready chan bool;
debugEvents chan *debugEvent; debugEvents chan *debugEvent;
debugReqs chan *debugReq; debugReqs chan *debugReq;
stopReq chan os.Error; stopReq chan os.Error;
transitionHandlers *vector.Vector; transitionHandlers *vector.Vector;
err os.Error;
} }
// A thread represents a Linux thread in another process that is being // A thread represents a Linux thread in another process that is being
@ -212,6 +220,12 @@ func (e *newThreadError) String() string {
return fmt.Sprintf("newThread wait wanted pid %v and signal %v, got %v and %v", e.Pid, e.StopSignal(), e.wantPid, e.wantSig); return fmt.Sprintf("newThread wait wanted pid %v and signal %v, got %v and %v", e.Pid, e.StopSignal(), e.wantPid, e.wantSig);
} }
type ProcessExited struct {}
func (p ProcessExited) String() string {
return "process exited";
}
/* /*
* Ptrace wrappers * Ptrace wrappers
*/ */
@ -439,6 +453,10 @@ func (t *thread) wait() {
// the stop go through so we can // the stop go through so we can
// update the thread's state. // update the thread's state.
} }
if !<-t.proc.ready {
// The monitor exited
break;
}
t.proc.debugEvents <- &ev; t.proc.debugEvents <- &ev;
break; break;
} }
@ -695,10 +713,6 @@ func (t *thread) onStop(handle func(), onErr func(os.Error)) {
// monitor handles debug events and debug requests for p, exiting when // monitor handles debug events and debug requests for p, exiting when
// there are no threads left in p. // there are no threads left in p.
//
// TODO(austin) When an unrecoverable error occurs, abort the monitor
// and record this error so all future calls to do will return it
// immediately.
func (p *process) monitor() { func (p *process) monitor() {
var err os.Error; var err os.Error;
@ -709,13 +723,11 @@ func (p *process) monitor() {
defer runtime.UnlockOSThread(); defer runtime.UnlockOSThread();
hadThreads := false; hadThreads := false;
for { for err == nil {
p.ready <- true;
select { select {
case event := <-p.debugEvents: case event := <-p.debugEvents:
err = event.process(); err = event.process();
if err != nil {
break;
}
case req := <-p.debugReqs: case req := <-p.debugReqs:
req.res <- req.f(); req.res <- req.f();
@ -725,12 +737,9 @@ func (p *process) monitor() {
} }
if len(p.threads) == 0 { if len(p.threads) == 0 {
if hadThreads { if err == nil && hadThreads {
p.logTrace("no more threads; monitor exiting"); p.logTrace("no more threads; monitor exiting");
// TODO(austin) Use a real error do err = ProcessExited{};
// future operations will fail
err = nil;
break;
} }
} else { } else {
hadThreads = true; hadThreads = true;
@ -738,15 +747,15 @@ func (p *process) monitor() {
} }
// Abort waiting handlers // Abort waiting handlers
// TODO(austin) How do I stop the wait threads?
for _, h := range p.transitionHandlers.Data() { for _, h := range p.transitionHandlers.Data() {
h := h.(*transitionHandler); h := h.(*transitionHandler);
h.onErr(err); h.onErr(err);
} }
// TODO(austin) How do I stop the wait threads? // Indicate that the monitor cannot receive any more messages
if err != nil { p.err = err;
panic(err.String()); close(p.ready);
}
} }
// do executes f in the monitor thread (and, thus, atomically with // do executes f in the monitor thread (and, thus, atomically with
@ -754,7 +763,9 @@ func (p *process) monitor() {
// //
// Must NOT be called from the monitor thread. // Must NOT be called from the monitor thread.
func (p *process) do(f func() os.Error) os.Error { func (p *process) do(f func() os.Error) os.Error {
// TODO(austin) If monitor is stopped, return error. if !<-p.ready {
return p.err;
}
req := &debugReq{f, make(chan os.Error)}; req := &debugReq{f, make(chan os.Error)};
p.debugReqs <- req; p.debugReqs <- req;
return <-req.res; return <-req.res;
@ -763,8 +774,12 @@ func (p *process) do(f func() os.Error) os.Error {
// stopMonitor stops the monitor with the given error. If the monitor // stopMonitor stops the monitor with the given error. If the monitor
// is already stopped, does nothing. // is already stopped, does nothing.
func (p *process) stopMonitor(err os.Error) { func (p *process) stopMonitor(err os.Error) {
_ = p.stopReq <- err; // do not block if err == nil {
// TODO(austin) Wait until monitor has exited? panic("cannot stop the monitor with no error");
}
if <-p.ready {
p.stopReq <- err;
}
} }
/* /*
@ -1255,6 +1270,7 @@ func newProcess(pid int) *process {
pid: pid, pid: pid,
threads: make(map[int]*thread), threads: make(map[int]*thread),
breakpoints: make(map[uintptr]*breakpoint), breakpoints: make(map[uintptr]*breakpoint),
ready: make(chan bool, 1),
debugEvents: make(chan *debugEvent), debugEvents: make(chan *debugEvent),
debugReqs: make(chan *debugReq), debugReqs: make(chan *debugReq),
stopReq: make(chan os.Error), stopReq: make(chan os.Error),