Files
headscale/hscontrol/state/ha_health.go
Kristoffer Dalby 90e65ccd63 state: add HA health prober
Ping HA subnet routers each probe cycle and mark unresponsive nodes
unhealthy. Reconnecting a node clears its unhealthy state since the
fresh Noise session proves basic connectivity.

Updates #2129
Updates #2902
2026-04-16 15:10:56 +01:00

140 lines
3.2 KiB
Go

package state
import (
"context"
"sync"
"time"
"github.com/juanfont/headscale/hscontrol/types"
"github.com/juanfont/headscale/hscontrol/types/change"
"github.com/juanfont/headscale/hscontrol/util/zlog/zf"
"github.com/rs/zerolog/log"
"tailscale.com/tailcfg"
"tailscale.com/util/set"
)
// HAHealthProber periodically pings HA subnet router nodes and
// triggers failover when a primary stops responding.
type HAHealthProber struct {
state *State
cfg types.HARouteConfig
serverURL string
isConnected func(types.NodeID) bool
}
// NewHAHealthProber creates a prober that uses the given State for
// ping tracking and primary route management.
// isConnected should return true if a node has an active map session.
func NewHAHealthProber(
s *State,
cfg types.HARouteConfig,
serverURL string,
isConnected func(types.NodeID) bool,
) *HAHealthProber {
return &HAHealthProber{
state: s,
cfg: cfg,
serverURL: serverURL,
isConnected: isConnected,
}
}
// ProbeOnce pings all HA subnet router nodes. PingNode changes are
// dispatched immediately via dispatch so nodes can respond before the
// timeout. Health-related policy changes are also dispatched inline.
func (p *HAHealthProber) ProbeOnce(
ctx context.Context,
dispatch func(...change.Change),
) {
haNodes := p.state.primaryRoutes.HANodes()
if len(haNodes) == 0 {
return
}
// Deduplicate node IDs across prefixes.
seen := make(set.Set[types.NodeID])
var nodeIDs []types.NodeID
for _, nodes := range haNodes {
for _, id := range nodes {
if !seen.Contains(id) {
seen.Add(id)
nodeIDs = append(nodeIDs, id)
}
}
}
log.Debug().
Int("haNodes", len(nodeIDs)).
Msg("HA health prober starting probe cycle")
var wg sync.WaitGroup
deadline := time.After(p.cfg.ProbeTimeout)
for _, id := range nodeIDs {
if !p.isConnected(id) {
log.Debug().
Uint64(zf.NodeID, id.Uint64()).
Msg("HA probe: skipping offline node")
continue
}
pingID, responseCh := p.state.RegisterPing(id)
callbackURL := p.serverURL + "/machine/ping-response?id=" + pingID
dispatch(change.PingNode(id, &tailcfg.PingRequest{
URL: callbackURL,
}))
wg.Go(func() {
select {
case latency := <-responseCh:
log.Debug().
Uint64(zf.NodeID, id.Uint64()).
Dur("latency", latency).
Msg("HA probe: node responded")
if p.state.primaryRoutes.SetNodeHealthy(id, true) {
dispatch(change.PolicyChange())
log.Info().
Uint64(zf.NodeID, id.Uint64()).
Msg("HA probe: node recovered, recalculating primaries")
}
case <-deadline:
p.state.CancelPing(pingID)
if !p.isConnected(id) {
log.Debug().
Uint64(zf.NodeID, id.Uint64()).
Msg("HA probe: node went offline during probe, skipping")
return
}
log.Warn().
Uint64(zf.NodeID, id.Uint64()).
Dur("timeout", p.cfg.ProbeTimeout).
Msg("HA probe: node did not respond")
if p.state.primaryRoutes.SetNodeHealthy(id, false) {
dispatch(change.PolicyChange())
log.Info().
Uint64(zf.NodeID, id.Uint64()).
Msg("HA probe: node unhealthy, triggering failover")
}
case <-ctx.Done():
p.state.CancelPing(pingID)
}
})
}
wg.Wait()
}