Skip to content

Commit

Permalink
Don't apply 'raise-exception' when holding a mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisVine committed Nov 16, 2021
1 parent e8c4825 commit e44794c
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 124 deletions.
43 changes: 23 additions & 20 deletions a-sync/event-loop.scm
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
;; Copyright (C) 2014 to 2019 Chris Vine
;; Copyright (C) 2014 to 2021 Chris Vine

;; This library is free software; you can redistribute it and/or
;; modify it under the terms of the GNU Lesser General Public
Expand Down Expand Up @@ -366,23 +366,26 @@
(define read-files #f)
(define write-files #f)

(with-mutex mutex
(case (_mode-get el)
((closed)
;; TODO: why aren't we using raise-exception? At a suitable
;; opportunity change this to raising a condition object.
(throw 'event-loop-error
"event-loop-run!"
"event-loop-run! applied to an event loop which has been closed"))
((running prepare-to-quit)
(throw 'event-loop-error
"event-loop-run!"
"event-loop-run! applied to an event loop which is already running"))
(else
(set! event-in (_event-in-get el))
(set! event-fd (fileno event-in))
(_loop-thread-set! el (current-thread))
(_mode-set! el 'running))))
(let ((exc
(with-mutex mutex
(case (_mode-get el)
((closed)
;; TODO: why aren't we using raise-exception? At a suitable
;; opportunity change this to raising a condition object.
(list 'event-loop-error
"event-loop-run!"
"event-loop-run! applied to an event loop which has been closed"))
((running prepare-to-quit)
(list 'event-loop-error
"event-loop-run!"
"event-loop-run! applied to an event loop which is already running"))
(else
(set! event-in (_event-in-get el))
(set! event-fd (fileno event-in))
(_loop-thread-set! el (current-thread))
(_mode-set! el 'running)
#f)))))
(when exc (apply throw exc)))

(with-exception-handler
(lambda (exc)
Expand All @@ -391,8 +394,8 @@
(with-mutex mutex
(_event-loop-reset! el)
(when (not (eq? (_mode-get el) 'closed))
(_mode-set! el #f))
(raise-exception exc)))
(_mode-set! el #f)))
(raise-exception exc))
(lambda ()
(let loop1 ()
;; we don't need to use the mutex in this procedure to access
Expand Down
220 changes: 116 additions & 104 deletions a-sync/thread-pool.scm
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
;; Copyright (C) 2017 and 2019 Chris Vine
;; Copyright (C) 2017 to 2021 Chris Vine

;; This library is free software; you can redistribute it and/or
;; modify it under the terms of the GNU Lesser General Public
Expand Down Expand Up @@ -410,8 +410,8 @@
#:unwind? #t))
(when (and (stopped-get pool)
(blocking-get pool))
(broadcast-condition-variable (condvar-get pool)))
(raise-exception exc)))
(broadcast-condition-variable (condvar-get pool))))
(raise-exception exc))
(lambda ()
(call-with-new-thread (lambda () (thread-loop pool #f))))
;; as the handler always rethrows, we do not need to unwind here
Expand All @@ -438,14 +438,17 @@
;; after the thread pool object concerned has been closed by a call to
;; thread-pool-stop!.
(define (thread-pool-set-non-blocking! pool val)
(with-mutex (mutex-get pool)
(when (stopped-get pool)
(raise-exception
(make-exception (make-thread-pool-error)
(make-exception-with-origin 'thread-pool-set-non-blocking!)
(make-exception-with-message
"thread-pool-set-non-blocking! applied to a thread pool which has been closed"))))
(blocking-set! pool (not val))))
(let ((exc
(with-mutex (mutex-get pool)
(if (stopped-get pool)
(make-exception (make-thread-pool-error)
(make-exception-with-origin 'thread-pool-set-non-blocking!)
(make-exception-with-message
"thread-pool-set-non-blocking! applied to a thread pool which has been closed"))
(begin
(blocking-set! pool (not val))
#f)))))
(when exc (raise-exception exc))))

;; This procedure returns the current idle time setting for the thread
;; pool, in milliseconds.
Expand Down Expand Up @@ -485,31 +488,34 @@
;; non-blocking setting is #f, in which case no task running on the
;; thread-pool object may call this procedure.
(define (thread-pool-stop! pool)
(let ((mutex (mutex-get pool)))
(with-mutex mutex
(when (stopped-get pool)
(raise-exception
(make-exception (make-thread-pool-error)
(make-exception-with-origin 'thread-pool-stop!)
(make-exception-with-message
"thread-pool-stop! applied to a thread pool which has been closed"))))
(stopped-set! pool #t)
(let ((thread-count (num-threads-get pool)))
;; we could be adding more 'kill-thread callbacks than
;; necessary here, because as we are doing this a timeout on a
;; thread might expire leading to its demise in that way, or a
;; concurrent call to thread-pool-change-max-threads! or
;; thread-pool-add! may have failed to start a thread and
;; thrown an exception. However, that doesn't matter - we
;; just get left with a redundant callback in 'aq' which never
;; gets used and disappears when the pool is garbage collected
(do ((kill-count 0 (1+ kill-count)))
((= kill-count thread-count))
(a-queue-push! (aq-get pool) (cons (lambda () (raise-exception 'kill-thread)) #f)))
(when (blocking-get pool)
(do ()
((= (num-threads-get pool) 0))
(wait-condition-variable (condvar-get pool) mutex)))))))
(let* ((mutex (mutex-get pool))
(exc
(with-mutex mutex
(if (stopped-get pool)
(make-exception (make-thread-pool-error)
(make-exception-with-origin 'thread-pool-stop!)
(make-exception-with-message
"thread-pool-stop! applied to a thread pool which has been closed"))
(begin
(stopped-set! pool #t)
(let ((thread-count (num-threads-get pool)))
;; we could be adding more 'kill-thread callbacks than
;; necessary here, because as we are doing this a timeout on a
;; thread might expire leading to its demise in that way, or a
;; concurrent call to thread-pool-change-max-threads! or
;; thread-pool-add! may have failed to start a thread and
;; thrown an exception. However, that doesn't matter - we
;; just get left with a redundant callback in 'aq' which never
;; gets used and disappears when the pool is garbage collected
(do ((kill-count 0 (1+ kill-count)))
((= kill-count thread-count))
(a-queue-push! (aq-get pool) (cons (lambda () (raise-exception 'kill-thread)) #f)))
(when (blocking-get pool)
(do ()
((= (num-threads-get pool) 0))
(wait-condition-variable (condvar-get pool) mutex))))
#f)))))
(when exc (raise-exception exc))))

;; This procedure adds a new task to the thread pool. 'task' must be
;; a thunk. If one or more threads in the pool are currently blocking
Expand Down Expand Up @@ -547,60 +553,65 @@
;; count under a single mutex locking operation to ensure
;; atomicity within the pool
(with-mutex mutex
(when (stopped-get pool)
(raise-exception
(make-exception (make-thread-pool-error)
(make-exception-with-origin 'thread-pool-add!)
(make-exception-with-message
"thread-pool-add! applied to a thread pool which has been closed"))))
(let ((num-threads (num-threads-get pool))
(num-tasks (num-tasks-get pool)))
(if (and (>= num-tasks num-threads)
(< num-threads (max-threads-get pool)))
(begin
(num-tasks-set! pool (1+ num-tasks))
(num-threads-set! pool (1+ num-threads))
#t)
(begin
(num-tasks-set! pool (1+ num-tasks))
#f))))))
(if (stopped-get pool)
(make-exception (make-thread-pool-error)
(make-exception-with-origin 'thread-pool-add!)
(make-exception-with-message
"thread-pool-add! applied to a thread pool which has been closed"))
(let ((num-threads (num-threads-get pool))
(num-tasks (num-tasks-get pool)))
(if (and (>= num-tasks num-threads)
(< num-threads (max-threads-get pool)))
(begin
(num-tasks-set! pool (1+ num-tasks))
(num-threads-set! pool (1+ num-threads))
#t)
(begin
(num-tasks-set! pool (1+ num-tasks))
#f)))))))
;; if the pool has been stopped, throw the exception
(when (exception? start-thread) (raise-exception start-thread))
;; if a thread is to be started, do it outside the mutex to
;; reduce contention: we will roll back if starting the thread
;; fails
(when start-thread
(with-exception-handler
(lambda (exc)
(with-mutex mutex
;; If min-threads is 0 we could be down to 0 threads
;; with tasks still pending if in the period between
;; releasing the mutex acquired on entry to this
;; procedure and acquiring it again on handling this
;; exception, there have been concurrent calls to
;; thread-pool-set-max-threads! increasing and reducing
;; the maximum thread count by at least two other
;; threads where the launching of all new threads via
;; that procedure and this one fails. In such a case we
;; should be able to launch a rescue thread while
;; holding the mutex because no other threads could be
;; running in the pool. If we still cannot launch a
;; thread the program and/or system must be totally
;; borked and there is little we can do.
(let ((retry
(if (= (num-threads-get pool) 1)
(with-exception-handler
(lambda (exc) #f)
(lambda ()
(call-with-new-thread (lambda () (thread-loop pool #f)))
#t)
#:unwind? #t)
#f)))
(when (not retry)
(num-tasks-set! pool (1- (num-tasks-get pool)))
(num-threads-set! pool (1- (num-threads-get pool)))
(when (and (stopped-get pool)
(blocking-get pool))
(broadcast-condition-variable (condvar-get pool)))
(raise-exception exc)))))
(let ((rethrow
(with-mutex mutex
;; If min-threads is 0 we could be down to 0 threads
;; with tasks still pending if in the period between
;; releasing the mutex acquired on entry to this
;; procedure and acquiring it again on handling this
;; exception, there have been concurrent calls to
;; thread-pool-set-max-threads! increasing and reducing
;; the maximum thread count by at least two other
;; threads where the launching of all new threads via
;; that procedure and this one fails. In such a case we
;; should be able to launch a rescue thread while
;; holding the mutex because no other threads could be
;; running in the pool. If we still cannot launch a
;; thread the program and/or system must be totally
;; borked and there is little we can do.
(let ((retry
(if (= (num-threads-get pool) 1)
(with-exception-handler
(lambda (exc) #f)
(lambda ()
(call-with-new-thread (lambda () (thread-loop pool #f)))
#t)
#:unwind? #t)
#f)))
(if retry
#f
(begin
(num-tasks-set! pool (1- (num-tasks-get pool)))
(num-threads-set! pool (1- (num-threads-get pool)))
(when (and (stopped-get pool)
(blocking-get pool))
(broadcast-condition-variable (condvar-get pool)))
#t))))))
(when rethrow (raise-exception exc))))
(lambda ()
(call-with-new-thread (lambda () (thread-loop pool #f))))
#:unwind? #t)))
Expand All @@ -614,26 +625,27 @@
;; await-task-in-thread-pool! to work correctly. That is not too
;; much of an additional point of contention, because
;; a-queue-push! is itself serialized.
(with-mutex mutex
(if (stopped-get pool)
(begin
;; roll back if the pool has been stopped so we do not add
;; the task after all to the pool
(num-tasks-set! pool (1- (num-tasks-get pool)))
(raise-exception
(make-exception (make-thread-pool-error)
(make-exception-with-origin 'thread-pool-add!)
(make-exception-with-message
"thread-pool-add! applied to a thread pool which has been closed"))))
(with-exception-handler
(lambda (exc)
;; roll back if adding the task fails
(num-tasks-set! pool (1- (num-tasks-get pool)))
(raise-exception exc))
(lambda ()
(a-queue-push! (aq-get pool) (cons task fail-handler)))
;; as the handler always rethrows, we do not need to unwind here
#:unwind? #f)))))
(let ((exc
(with-mutex mutex
(if (stopped-get pool)
(begin
;; roll back if the pool has been stopped so we do
;; not add the task after all to the pool
(num-tasks-set! pool (1- (num-tasks-get pool)))
(make-exception (make-thread-pool-error)
(make-exception-with-origin 'thread-pool-add!)
(make-exception-with-message
"thread-pool-add! applied to a thread pool which has been closed")))
(with-exception-handler
(lambda (exc)
;; roll back if adding the task fails
(num-tasks-set! pool (1- (num-tasks-get pool)))
exc)
(lambda ()
(a-queue-push! (aq-get pool) (cons task fail-handler))
#f)
#:unwind? #t)))))
(when exc (raise-exception exc)))))

;; This macro is intended to be called by a task running on a thread
;; pool which is about to make a blocking (non-asynchronous) call. It
Expand Down

0 comments on commit e44794c

Please sign in to comment.