Skip to content

Commit

Permalink
Merge pull request #5963 from Vincent-lau/private/shul2/chealth-delay
Browse files Browse the repository at this point in the history
CA-398438: Signal exit to the watcher thread
  • Loading branch information
robhoes authored Sep 9, 2024
2 parents cdbea98 + 24b4626 commit 98d4689
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 47 deletions.
2 changes: 2 additions & 0 deletions ocaml/xapi/xapi_cluster.ml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ let create ~__context ~pIF ~cluster_stack ~pool_auto_join ~token_timeout
| Error error ->
D.warn
"Error occurred during Cluster.create. Shutting down cluster daemon" ;
Xapi_clustering.Watcher.signal_exit () ;
Xapi_clustering.Daemon.disable ~__context ;
handle_error error
)
Expand Down Expand Up @@ -156,6 +157,7 @@ let destroy ~__context ~self =
Db.Cluster.destroy ~__context ~self ;
D.debug "Cluster destroyed successfully" ;
set_ha_cluster_stack ~__context ;
Xapi_clustering.Watcher.signal_exit () ;
Xapi_clustering.Daemon.disable ~__context

(* Get pool master's cluster_host, return network of PIF *)
Expand Down
3 changes: 2 additions & 1 deletion ocaml/xapi/xapi_cluster_host.ml
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ let destroy_op ~__context ~self ~force =
) ;
Db.Cluster_host.destroy ~__context ~self ;
debug "Cluster_host.%s was successful" fn_str ;
Xapi_clustering.Watcher.signal_exit () ;
Xapi_clustering.Daemon.disable ~__context
| Error error ->
warn "Error occurred during Cluster_host.%s" fn_str ;
Expand Down Expand Up @@ -361,7 +362,7 @@ let enable ~__context ~self =
in

(* TODO: Pass these through from CLI *)
if not !Xapi_clustering.Daemon.enabled then (
if not (Xapi_clustering.Daemon.is_enabled ()) then (
D.debug
"Cluster_host.enable: xapi-clusterd not running - attempting to start" ;
Xapi_clustering.Daemon.enable ~__context
Expand Down
131 changes: 87 additions & 44 deletions ocaml/xapi/xapi_clustering.ml
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ let assert_cluster_host_has_no_attached_sr_which_requires_cluster_stack
raise Api_errors.(Server_error (cluster_stack_in_use, [cluster_stack]))

module Daemon = struct
let enabled = ref false
let enabled = Atomic.make false

let is_enabled () = Atomic.get enabled

let maybe_call_script ~__context script params =
match Context.get_test_clusterd_rpc __context with
Expand Down Expand Up @@ -283,13 +285,13 @@ module Daemon = struct
(internal_error, [Printf.sprintf "could not start %s" service])
)
) ;
enabled := true ;
Atomic.set enabled true ;
debug "Cluster daemon: enabled & started"

let disable ~__context =
let port = string_of_int !Xapi_globs.xapi_clusterd_port in
debug "Disabling and stopping the clustering daemon" ;
enabled := false ;
Atomic.set enabled false ;
maybe_call_script ~__context !Xapi_globs.systemctl ["disable"; service] ;
maybe_call_script ~__context !Xapi_globs.systemctl ["stop"; service] ;
maybe_call_script ~__context
Expand All @@ -309,7 +311,7 @@ end
* Instead of returning an empty URL which wouldn't work just raise an
* exception. *)
let rpc ~__context =
if not !Daemon.enabled then
if not (Daemon.is_enabled ()) then
raise
Api_errors.(
Server_error
Expand Down Expand Up @@ -427,6 +429,8 @@ let compute_corosync_max_host_failures ~__context =
corosync_ha_max_hosts

module Watcher = struct
module Delay = Xapi_stdext_threads.Threadext.Delay

let routine_updates = "routine updates"

let on_corosync_update ~__context ~cluster updates =
Expand Down Expand Up @@ -552,14 +556,40 @@ module Watcher = struct
from corosync represents a consistent snapshot of the current cluster state. *)
let stabilising_period = Mtime.Span.(5 * s)

(* The delay on which the watcher will wait. *)
let delay = Delay.make ()

let finish_watch = Atomic.make false

let cluster_stack_watcher : bool Atomic.t = Atomic.make false

(* This function exists to store the fact that the watcher should be destroyed,
to avoid the race that the cluster is destroyed, while the watcher is
still waiting/stabilising.
There are two cases this function shall be called: 1. when the clustering
is to be disabled; 2. when this host is no longer the coordinator. For the second
case it is only necessary to do this when there is a manual designation of a new
master since in the case of ha the old coordinator would have died, and so would
this thread on the old coordinator. *)
let signal_exit () =
D.debug "%s: Signaled to exit cluster watcher" __FUNCTION__ ;
Delay.signal delay ;
(* set the cluster change watcher back to false as soon as we are signalled
to prevent any race conditions *)
Atomic.set cluster_change_watcher false ;
D.debug
"%s: watcher for cluster change exit, reset cluster_change_watcher back \
to false"
__FUNCTION__ ;
Atomic.set finish_watch true

(* we handle unclean hosts join and leave in the watcher, i.e. hosts joining and leaving
due to network problems, power cut, etc. Join and leave initiated by the
API will be handled in the API call themselves, but they share the same code
as the watcher. *)
let watch_cluster_change ~__context ~host =
while !Daemon.enabled do
while not (Atomic.get finish_watch) do
let m =
Cluster_client.LocalClient.UPDATES.get (rpc ~__context)
"cluster change watcher call"
Expand All @@ -569,9 +599,13 @@ module Watcher = struct
match find_cluster_host ~__context ~host with
| Some ch ->
let cluster = Db.Cluster_host.get_cluster ~__context ~self:ch in
if wait then
Thread.delay (Clock.Timer.span_to_s stabilising_period) ;
on_corosync_update ~__context ~cluster updates
if not wait then
on_corosync_update ~__context ~cluster updates
else if
wait
&& Clock.Timer.span_to_s stabilising_period |> Delay.wait delay
then
on_corosync_update ~__context ~cluster updates
| None ->
()
in
Expand All @@ -591,55 +625,60 @@ module Watcher = struct
| exception exn ->
warn "%s: Got exception %s while query cluster host updates, retrying"
__FUNCTION__ (Printexc.to_string exn) ;
Thread.delay (Clock.Timer.span_to_s cluster_change_interval)
done ;
Atomic.set cluster_change_watcher false
let _ : bool =
Clock.Timer.span_to_s cluster_change_interval |> Delay.wait delay
in
()
done

let watch_cluster_stack_version ~__context ~host =
if !Daemon.enabled then
match find_cluster_host ~__context ~host with
| Some ch ->
let cluster_ref = Db.Cluster_host.get_cluster ~__context ~self:ch in
let cluster_rec =
Db.Cluster.get_record ~__context ~self:cluster_ref
in
if
Cluster_stack.of_version
( cluster_rec.API.cluster_cluster_stack
, cluster_rec.API.cluster_cluster_stack_version
)
= Cluster_stack.Corosync2
then (
debug "%s: Detected Corosync 2 running as cluster stack"
__FUNCTION__ ;
let body =
"The current cluster stack version of Corosync 2 is out of date, \
consider updating to Corosync 3"
in
let name, priority = Api_messages.cluster_stack_out_of_date in
let host_uuid = Db.Host.get_uuid ~__context ~self:host in

Helpers.call_api_functions ~__context (fun rpc session_id ->
let _ : [> `message] Ref.t =
Client.Client.Message.create ~rpc ~session_id ~name ~priority
~cls:`Host ~obj_uuid:host_uuid ~body
in
()
match find_cluster_host ~__context ~host with
| Some ch ->
let cluster_ref = Db.Cluster_host.get_cluster ~__context ~self:ch in
let cluster_rec = Db.Cluster.get_record ~__context ~self:cluster_ref in
if
Cluster_stack.of_version
( cluster_rec.API.cluster_cluster_stack
, cluster_rec.API.cluster_cluster_stack_version
)
= Cluster_stack.Corosync2
then (
debug "%s: Detected Corosync 2 running as cluster stack" __FUNCTION__ ;
let body =
"The current cluster stack version of Corosync 2 is out of date, \
consider updating to Corosync 3"
in
let name, priority = Api_messages.cluster_stack_out_of_date in
let host_uuid = Db.Host.get_uuid ~__context ~self:host in

Helpers.call_api_functions ~__context (fun rpc session_id ->
let _ : [> `message] Ref.t =
Client.Client.Message.create ~rpc ~session_id ~name ~priority
~cls:`Host ~obj_uuid:host_uuid ~body
in
()
)
| None ->
debug "%s: No cluster host, no need to watch" __FUNCTION__
) else
debug
"%s: Detected Corosync 3 as cluster stack, not generating a \
warning messsage"
__FUNCTION__
| None ->
debug "%s: No cluster host, no need to watch" __FUNCTION__

(** [create_as_necessary] will create cluster watchers on the coordinator if they are not
already created.
There is no need to destroy them: once the clustering daemon is disabled,
these threads will exit as well. *)
let create_as_necessary ~__context ~host =
if Helpers.is_pool_master ~__context ~host then (
let is_master = Helpers.is_pool_master ~__context ~host in
let daemon_enabled = Daemon.is_enabled () in
if is_master && daemon_enabled then (
if Xapi_cluster_helpers.cluster_health_enabled ~__context then
if Atomic.compare_and_set cluster_change_watcher false true then (
debug "%s: create watcher for corosync-notifyd on coordinator"
__FUNCTION__ ;
Atomic.set finish_watch false ;
let _ : Thread.t =
Thread.create (fun () -> watch_cluster_change ~__context ~host) ()
in
Expand All @@ -666,5 +705,9 @@ module Watcher = struct
) else
debug "%s: not create watcher for cluster stack as it already exists"
__FUNCTION__
)
) else
debug
"%s not create watcher because we are %b master and clustering is \
enabled %b "
__FUNCTION__ is_master daemon_enabled
end
91 changes: 91 additions & 0 deletions ocaml/xapi/xapi_clustering.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
(* Copyright (C) Cloud Software Group Inc.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published
by the Free Software Foundation; version 2.1 only. with the special
exception on linking described in file LICENSE.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
*)

val set_ha_cluster_stack : __context:Context.t -> unit

val with_clustering_lock : string -> (unit -> 'a) -> 'a

val pif_of_host :
__context:Context.t -> API.ref_network -> API.ref_host -> 'a Ref.t * API.pIF_t

val ip_of_pif : 'a Ref.t * API.pIF_t -> Cluster_interface.address

val assert_pif_prerequisites : 'a Ref.t * API.pIF_t -> unit

val assert_pif_attached_to :
__context:Context.t -> host:[`host] Ref.t -> pIF:[`PIF] Ref.t -> unit

val handle_error : Cluster_interface.error -> 'a

val assert_cluster_host_can_be_created :
__context:Context.t -> host:'a Ref.t -> unit

val get_required_cluster_stacks :
__context:Context.t -> sr_sm_type:string -> string list

val assert_cluster_stack_valid : cluster_stack:string -> unit

val with_clustering_lock_if_needed :
__context:Context.t -> sr_sm_type:string -> string -> (unit -> 'a) -> 'a

val with_clustering_lock_if_cluster_exists :
__context:Context.t -> string -> (unit -> 'a) -> 'a

val find_cluster_host :
__context:Context.t -> host:[`host] Ref.t -> 'a Ref.t option

val get_network_internal :
__context:Context.t -> self:[`Cluster] Ref.t -> [`network] Ref.t

val assert_cluster_host_enabled :
__context:Context.t -> self:[`Cluster_host] Ref.t -> expected:bool -> unit

val assert_operation_host_target_is_localhost :
__context:Context.t -> host:[`host] Ref.t -> unit

val assert_cluster_host_has_no_attached_sr_which_requires_cluster_stack :
__context:Context.t -> self:[`Cluster_host] Ref.t -> unit

module Daemon : sig
val is_enabled : unit -> bool

val enable : __context:Context.t -> unit

val disable : __context:Context.t -> unit

val restart : __context:Context.t -> unit
end

val rpc : __context:Context.t -> Rpc.call -> Rpc.response Idl.IdM.t

val maybe_switch_cluster_stack_version :
__context:Context.t
-> self:'a Ref.t
-> cluster_stack:Cluster_interface.Cluster_stack.t
-> unit

val assert_cluster_host_is_enabled_for_matching_sms :
__context:Context.t -> host:[`host] Ref.t -> sr_sm_type:string -> unit

val is_clustering_disabled_on_host :
__context:Context.t -> [`host] Ref.t -> bool

val compute_corosync_max_host_failures : __context:Context.t -> int

module Watcher : sig
val on_corosync_update :
__context:Context.t -> cluster:[`Cluster] Ref.t -> string list -> unit

val signal_exit : unit -> unit

val create_as_necessary : __context:Context.t -> host:[`host] Ref.t -> unit
end
4 changes: 3 additions & 1 deletion ocaml/xapi/xapi_observer_components.ml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ let all = List.map of_string Constants.observer_components_all
This does mean that observer will always be enabled for clusterd. *)
let startup_components () =
List.filter
(function Xapi_clusterd -> !Xapi_clustering.Daemon.enabled | _ -> true)
(function
| Xapi_clusterd -> Xapi_clustering.Daemon.is_enabled () | _ -> true
)
all

let assert_valid_components components =
Expand Down
2 changes: 2 additions & 0 deletions ocaml/xapi/xapi_pool_transition.ml
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ let become_another_masters_slave master_address =
if Pool_role.get_role () = new_role then
debug "We are already a slave of %s; nothing to do" master_address
else (
if Pool_role.is_master () then (* I am the old master *)
Xapi_clustering.Watcher.signal_exit () ;
debug "Setting pool.conf to point to %s" master_address ;
set_role new_role ;
run_external_scripts false ;
Expand Down
2 changes: 1 addition & 1 deletion quality-gate.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ verify-cert () {
}

mli-files () {
N=511
N=510
# do not count ml files from the tests in ocaml/{tests/perftest/quicktest}
MLIS=$(git ls-files -- '**/*.mli' | grep -vE "ocaml/tests|ocaml/perftest|ocaml/quicktest|ocaml/message-switch/core_test" | xargs -I {} sh -c "echo {} | cut -f 1 -d '.'" \;)
MLS=$(git ls-files -- '**/*.ml' | grep -vE "ocaml/tests|ocaml/perftest|ocaml/quicktest|ocaml/message-switch/core_test" | xargs -I {} sh -c "echo {} | cut -f 1 -d '.'" \;)
Expand Down

0 comments on commit 98d4689

Please sign in to comment.