Skip to content

Commit

Permalink
CP-394109: Alert only once for cluster host leave/join
Browse files Browse the repository at this point in the history
Previously there were two ways an alert for a cluster host join/leave
can be raised: 1. through the cluster change watcher; 2. through the api
call. These two can generate duplicate alerts as an API call can cause
the cluster change watcher to notice the change as well.

The idea of the fix here is still to let API and watcher raise alerts
separately, but now add synchronous API calls to allow API call
(cluster-host-join, etc) to call
the cluster change update code at the right time so that the cluster
change watcher won't see the change again, hence not generating duplicate
alerts.

Signed-off-by: Vincent Liu <[email protected]>
  • Loading branch information
Vincent-lau committed Jul 10, 2024
1 parent af142fc commit 3dc79e0
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 85 deletions.
4 changes: 4 additions & 0 deletions ocaml/tests/test_cluster.ml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ let test_rpc ~__context call =
Rpc.{success= true; contents= Rpc.String ""; is_notification= false}
| "Cluster_host.get_cluster_config", _ ->
Rpc.{success= true; contents= Rpc.String ""; is_notification= false}
| "Cluster.cstack_sync", [_session; self] ->
let open API in
Xapi_cluster.cstack_sync ~__context ~self:(ref_Cluster_of_rpc self) ;
Rpc.{success= true; contents= Rpc.String ""; is_notification= false}
| name, params ->
Alcotest.failf "Unexpected RPC: %s(%s)" name
(String.concat " " (List.map Rpc.to_string params))
Expand Down
8 changes: 4 additions & 4 deletions ocaml/xapi/xapi_cluster_helpers.ml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ let corosync3_enabled ~__context =
let restrictions = Db.Pool.get_restrictions ~__context ~self:pool in
List.assoc_opt "restrict_corosync3" restrictions = Some "false"

let maybe_generate_alert ~__context ~num_hosts ~missing_hosts ~new_hosts ~quorum
let maybe_generate_alert ~__context ~num_hosts ~hosts_left ~hosts_joined ~quorum
=
let generate_alert join cluster_host =
let host = Db.Cluster_host.get_host ~__context ~self:cluster_host in
Expand Down Expand Up @@ -148,10 +148,10 @@ let maybe_generate_alert ~__context ~num_hosts ~missing_hosts ~new_hosts ~quorum
)
in
if cluster_health_enabled ~__context then (
List.iter (generate_alert false) missing_hosts ;
List.iter (generate_alert true) new_hosts ;
List.iter (generate_alert false) hosts_left ;
List.iter (generate_alert true) hosts_joined ;
(* only generate this alert when the number of hosts is decreasing *)
if missing_hosts <> [] && num_hosts <= quorum then
if hosts_left <> [] && num_hosts <= quorum then
let pool = Helpers.get_pool ~__context in
let pool_uuid = Db.Pool.get_uuid ~__context ~self:pool in
let name, priority = Api_messages.cluster_quorum_approaching_lost in
Expand Down
29 changes: 10 additions & 19 deletions ocaml/xapi/xapi_cluster_host.ml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*)

open Xapi_clustering
open Xapi_cluster_helpers
open Ipaddr_rpc_type

module D = Debug.Make (struct let name = "xapi_cluster_host" end)
Expand Down Expand Up @@ -55,20 +54,6 @@ let call_api_function_with_alert ~__context ~msg ~cls ~obj_uuid ~body
raise err
)

let alert_for_cluster_host ~__context ~cluster_host ~missing_hosts ~new_hosts =
let num_hosts = Db.Cluster_host.get_all ~__context |> List.length in
let cluster = Db.Cluster_host.get_cluster ~__context ~self:cluster_host in
let quorum = Db.Cluster.get_quorum ~__context ~self:cluster |> Int64.to_int in
maybe_generate_alert ~__context ~missing_hosts ~new_hosts ~num_hosts ~quorum

let alert_for_cluster_host_leave ~__context ~cluster_host =
alert_for_cluster_host ~__context ~cluster_host ~missing_hosts:[cluster_host]
~new_hosts:[]

let alert_for_cluster_host_join ~__context ~cluster_host =
alert_for_cluster_host ~__context ~cluster_host ~missing_hosts:[]
~new_hosts:[cluster_host]

(* Create xapi db object for cluster_host, resync_host calls clusterd *)
let create_internal ~__context ~cluster ~host ~pIF : API.ref_Cluster_host =
with_clustering_lock __LOC__ (fun () ->
Expand All @@ -81,7 +66,6 @@ let create_internal ~__context ~cluster ~host ~pIF : API.ref_Cluster_host =
~enabled:false ~current_operations:[] ~allowed_operations:[]
~other_config:[] ~joined:false ~live:false
~last_update_live:API.Date.epoch ;
alert_for_cluster_host_join ~__context ~cluster_host:ref ;
ref
)

Expand Down Expand Up @@ -269,16 +253,21 @@ let destroy_op ~__context ~self ~force =
(Cluster_client.LocalClient.leave, "destroy")
in
let result = local_fn (rpc ~__context) dbg in
let cluster = Db.Cluster_host.get_cluster ~__context ~self in
match Idl.IdM.run @@ Cluster_client.IDL.T.get result with
| Ok () ->
alert_for_cluster_host_leave ~__context ~cluster_host:self ;
Helpers.call_api_functions ~__context (fun rpc session_id ->
Client.Client.Cluster.cstack_sync ~rpc ~session_id ~self:cluster
) ;
Db.Cluster_host.destroy ~__context ~self ;
debug "Cluster_host.%s was successful" fn_str ;
Xapi_clustering.Daemon.disable ~__context
| Error error ->
warn "Error occurred during Cluster_host.%s" fn_str ;
if force then (
alert_for_cluster_host_leave ~__context ~cluster_host:self ;
Helpers.call_api_functions ~__context (fun rpc session_id ->
Client.Client.Cluster.cstack_sync ~rpc ~session_id ~self:cluster
) ;
let ref_str = Ref.string_of self in
Db.Cluster_host.destroy ~__context ~self ;
debug "Cluster_host %s force destroyed." ref_str
Expand Down Expand Up @@ -326,7 +315,9 @@ let forget ~__context ~self =
Db.Cluster.set_pending_forget ~__context ~self:cluster ~value:[] ;
(* must not disable the daemon here, because we declared another unreachable node dead,
* not the current one *)
alert_for_cluster_host_leave ~__context ~cluster_host:self ;
Helpers.call_api_functions ~__context (fun rpc session_id ->
Client.Client.Cluster.cstack_sync ~rpc ~session_id ~self:cluster
) ;
Db.Cluster_host.destroy ~__context ~self ;
debug "Cluster_host.forget was successful"
| Error error ->
Expand Down
141 changes: 79 additions & 62 deletions ocaml/xapi/xapi_clustering.ml
Original file line number Diff line number Diff line change
Expand Up @@ -438,74 +438,87 @@ module Watcher = struct
in
match Idl.IdM.run @@ Cluster_client.IDL.T.get m with
| Ok diag ->
Db.Cluster.set_is_quorate ~__context ~self:cluster
~value:diag.is_quorate ;
let all_cluster_hosts = Db.Cluster_host.get_all ~__context in
let ip_ch =
List.map
(fun ch ->
let pIF = Db.Cluster_host.get_PIF ~__context ~self:ch in
let ipstr =
ip_of_pif (pIF, Db.PIF.get_record ~__context ~self:pIF)
|> ipstr_of_address
in
(ipstr, ch)
)
all_cluster_hosts
in
let current_time = API.Date.now () in
( match diag.quorum_members with
| None ->
List.iter
(fun self ->
Db.Cluster_host.set_live ~__context ~self ~value:false ;
Db.Cluster_host.set_last_update_live ~__context ~self
~value:current_time
( Db.Cluster.set_is_quorate ~__context ~self:cluster
~value:diag.is_quorate ;
let all_cluster_hosts = Db.Cluster_host.get_all ~__context in
let live_hosts =
Db.Cluster_host.get_refs_where ~__context
~expr:(Eq (Field "live", Literal "true"))
in
let dead_hosts =
List.filter (fun h -> not (List.mem h live_hosts)) all_cluster_hosts
in
let ip_ch =
List.map
(fun ch ->
let pIF = Db.Cluster_host.get_PIF ~__context ~self:ch in
let ipstr =
ip_of_pif (pIF, Db.PIF.get_record ~__context ~self:pIF)
|> ipstr_of_address
in
(ipstr, ch)
)
all_cluster_hosts
| Some nodel ->
let quorum_hosts =
List.filter_map
(fun {addr; _} ->
let ipstr = ipstr_of_address addr in
match List.assoc_opt ipstr ip_ch with
| None ->
error
"%s: cannot find cluster host with network address %s, \
ignoring this host"
__FUNCTION__ ipstr ;
None
| Some ch ->
Some ch
in
let current_time = API.Date.now () in
match diag.quorum_members with
| None ->
List.iter
(fun self ->
Db.Cluster_host.set_live ~__context ~self ~value:false ;
Db.Cluster_host.set_last_update_live ~__context ~self
~value:current_time
)
nodel
in
let missing_hosts =
all_cluster_hosts
| Some nodel ->
(* nodel contains the current members of the cluster, according to corosync *)
let quorum_hosts =
List.filter_map
(fun {addr; _} ->
let ipstr = ipstr_of_address addr in
match List.assoc_opt ipstr ip_ch with
| None ->
error
"%s: cannot find cluster host with network address \
%s, ignoring this host"
__FUNCTION__ ipstr ;
None
| Some ch ->
Some ch
)
nodel
in

(* hosts_left contains the hosts that were live, but not in the list
of live hosts according to the cluster stack *)
let hosts_left =
List.filter (fun h -> not (List.mem h quorum_hosts)) live_hosts
in
(* hosts_joined contains the hosts that were dead but exists in the db,
and is now viewed as a member of the cluster by the cluster stack *)
let hosts_joined =
List.filter (fun h -> List.mem h quorum_hosts) dead_hosts
in
debug "%s: there are %d hosts joined and %d hosts left"
__FUNCTION__ (List.length hosts_joined) (List.length hosts_left) ;

List.iter
(fun self ->
Db.Cluster_host.set_live ~__context ~self ~value:true ;
Db.Cluster_host.set_last_update_live ~__context ~self
~value:current_time
)
quorum_hosts ;
List.filter
(fun h -> not (List.mem h quorum_hosts))
all_cluster_hosts
in
let new_hosts =
List.filter
(fun h -> not (Db.Cluster_host.get_live ~__context ~self:h))
quorum_hosts
in
List.iter
(fun self ->
Db.Cluster_host.set_live ~__context ~self ~value:true ;
Db.Cluster_host.set_last_update_live ~__context ~self
~value:current_time
)
new_hosts ;
List.iter
(fun self ->
Db.Cluster_host.set_live ~__context ~self ~value:false ;
Db.Cluster_host.set_last_update_live ~__context ~self
~value:current_time
)
missing_hosts ;
maybe_generate_alert ~__context ~missing_hosts ~new_hosts
~num_hosts:(List.length quorum_hosts) ~quorum:diag.quorum
|> List.iter (fun self ->
Db.Cluster_host.set_live ~__context ~self ~value:false ;
Db.Cluster_host.set_last_update_live ~__context ~self
~value:current_time
) ;
maybe_generate_alert ~__context ~hosts_left ~hosts_joined
~num_hosts:(List.length quorum_hosts) ~quorum:diag.quorum
) ;
Db.Cluster.set_quorum ~__context ~self:cluster
~value:(Int64.of_int diag.quorum) ;
Expand All @@ -527,6 +540,10 @@ module Watcher = struct
is an update *)
let cluster_change_interval = Mtime.Span.min

(* we handle unclean hosts join and leave in the watcher, i.e. hosts joining and leaving
due to network problems, power cut, etc. Join and leave initiated by the
API will be handled in the API call themselves, but they share the same code
as the watcher. *)
let watch_cluster_change ~__context ~host =
while !Daemon.enabled do
let m =
Expand Down

0 comments on commit 3dc79e0

Please sign in to comment.