Skip to content

Commit

Permalink
service: parallelize api calls (#1744)
Browse files Browse the repository at this point in the history
Signed-off-by: Jan-Otto Kröpke <[email protected]>
  • Loading branch information
jkroepke authored Nov 18, 2024
1 parent e6a15d4 commit 9f29fc8
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 74 deletions.
8 changes: 8 additions & 0 deletions internal/collector/cpu/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package cpu
import (
"fmt"
"log/slog"
"sync"

"github.com/alecthomas/kingpin/v2"
"github.com/prometheus-community/windows_exporter/internal/mi"
Expand All @@ -25,6 +26,8 @@ type Collector struct {

perfDataCollector *perfdata.Collector

mu sync.Mutex

processorRTCValues map[string]utils.Counter
processorMPerfValues map[string]utils.Counter

Expand Down Expand Up @@ -73,6 +76,8 @@ func (c *Collector) Close() error {
func (c *Collector) Build(_ *slog.Logger, _ *mi.Session) error {
var err error

c.mu = sync.Mutex{}

c.perfDataCollector, err = perfdata.NewCollector("Processor Information", perfdata.InstanceAll, []string{
c1TimeSeconds,
c2TimeSeconds,
Expand Down Expand Up @@ -219,6 +224,9 @@ func (c *Collector) Build(_ *slog.Logger, _ *mi.Session) error {
}

func (c *Collector) Collect(ch chan<- prometheus.Metric) error {
c.mu.Lock() // Lock is needed to prevent concurrent map access to c.processorRTCValues
defer c.mu.Unlock()

data, err := c.perfDataCollector.Collect()
if err != nil {
return fmt.Errorf("failed to collect Processor Information metrics: %w", err)
Expand Down
221 changes: 147 additions & 74 deletions internal/collector/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log/slog"
"regexp"
"strconv"
"sync"
"unsafe"

"github.com/alecthomas/kingpin/v2"
Expand Down Expand Up @@ -36,11 +37,18 @@ type Collector struct {

logger *slog.Logger

apiStateValues map[uint32]string
apiStartModeValues map[uint32]string

state *prometheus.Desc
processID *prometheus.Desc
info *prometheus.Desc
startMode *prometheus.Desc

// serviceConfigPoolBytes is a pool of byte slices used to avoid allocations
// ref: https://victoriametrics.com/blog/go-sync-pool/
serviceConfigPoolBytes sync.Pool

serviceManagerHandle *mgr.Mgr
}

Expand Down Expand Up @@ -111,6 +119,12 @@ func (c *Collector) Build(logger *slog.Logger, _ *mi.Session) error {
c.logger.Warn("No filters specified for service collector. This will generate a very large number of metrics!")
}

c.serviceConfigPoolBytes = sync.Pool{
New: func() any {
return new([]byte)
},
}

c.info = prometheus.NewDesc(
prometheus.BuildFQName(types.Namespace, Name, "info"),
"A metric with a constant '1' value labeled with service information",
Expand All @@ -136,6 +150,24 @@ func (c *Collector) Build(logger *slog.Logger, _ *mi.Session) error {
nil,
)

c.apiStateValues = map[uint32]string{
windows.SERVICE_CONTINUE_PENDING: "continue pending",
windows.SERVICE_PAUSE_PENDING: "pause pending",
windows.SERVICE_PAUSED: "paused",
windows.SERVICE_RUNNING: "running",
windows.SERVICE_START_PENDING: "start pending",
windows.SERVICE_STOP_PENDING: "stop pending",
windows.SERVICE_STOPPED: "stopped",
}

c.apiStartModeValues = map[uint32]string{
windows.SERVICE_AUTO_START: "auto",
windows.SERVICE_BOOT_START: "boot",
windows.SERVICE_DEMAND_START: "manual",
windows.SERVICE_DISABLED: "disabled",
windows.SERVICE_SYSTEM_START: "system",
}

// EnumServiceStatusEx requires only SC_MANAGER_ENUM_SERVICE.
handle, err := windows.OpenSCManager(nil, nil, windows.SC_MANAGER_ENUMERATE_SERVICE)
if err != nil {
Expand Down Expand Up @@ -171,81 +203,81 @@ func (c *Collector) Collect(ch chan<- prometheus.Metric) error {
return nil
}

// Iterate through the Services List.
for _, service := range services {
serviceName := windows.UTF16PtrToString(service.ServiceName)
if c.config.ServiceExclude.MatchString(serviceName) ||
!c.config.ServiceInclude.MatchString(serviceName) {
continue
}
servicesCh := make(chan windows.ENUM_SERVICE_STATUS_PROCESS, len(services))
wg := sync.WaitGroup{}
wg.Add(len(services))

for range 4 {
go func(ch chan<- prometheus.Metric, wg *sync.WaitGroup) {
for service := range servicesCh {
c.collectWorker(ch, service)
wg.Done()
}
}(ch, &wg)
}

if err := c.collectService(ch, service); err != nil {
c.logger.Warn("failed collecting service info",
slog.Any("err", err),
slog.String("service", windows.UTF16PtrToString(service.ServiceName)),
)
}
for _, service := range services {
servicesCh <- service
}

close(servicesCh)

wg.Wait()

return nil
}

var apiStateValues = map[uint32]string{
windows.SERVICE_CONTINUE_PENDING: "continue pending",
windows.SERVICE_PAUSE_PENDING: "pause pending",
windows.SERVICE_PAUSED: "paused",
windows.SERVICE_RUNNING: "running",
windows.SERVICE_START_PENDING: "start pending",
windows.SERVICE_STOP_PENDING: "stop pending",
windows.SERVICE_STOPPED: "stopped",
}
func (c *Collector) collectWorker(ch chan<- prometheus.Metric, service windows.ENUM_SERVICE_STATUS_PROCESS) {
serviceName := windows.UTF16PtrToString(service.ServiceName)

if c.config.ServiceExclude.MatchString(serviceName) || !c.config.ServiceInclude.MatchString(serviceName) {
return
}

var apiStartModeValues = map[uint32]string{
windows.SERVICE_AUTO_START: "auto",
windows.SERVICE_BOOT_START: "boot",
windows.SERVICE_DEMAND_START: "manual",
windows.SERVICE_DISABLED: "disabled",
windows.SERVICE_SYSTEM_START: "system",
if err := c.collectService(ch, serviceName, service); err != nil {
c.logger.Warn("failed collecting service info",
slog.Any("err", err),
slog.String("service", serviceName),
)
}
}

func (c *Collector) collectService(ch chan<- prometheus.Metric, service windows.ENUM_SERVICE_STATUS_PROCESS) error {
func (c *Collector) collectService(ch chan<- prometheus.Metric, serviceName string, service windows.ENUM_SERVICE_STATUS_PROCESS) error {
// Open connection for service handler.
serviceHandle, err := windows.OpenService(c.serviceManagerHandle.Handle, service.ServiceName, windows.SERVICE_QUERY_CONFIG)
if err != nil {
return fmt.Errorf("failed to open service: %w", err)
}

serviceNameString := windows.UTF16PtrToString(service.ServiceName)

// Create handle for each service.
serviceManager := &mgr.Service{Name: serviceNameString, Handle: serviceHandle}
serviceManager := &mgr.Service{Name: serviceName, Handle: serviceHandle}
defer func(serviceManager *mgr.Service) {
if err := serviceManager.Close(); err != nil {
c.logger.Warn("failed to close service handle",
slog.Any("err", err),
slog.String("service", serviceNameString),
slog.String("service", serviceName),
)
}
}(serviceManager)

// Get Service Configuration.
serviceConfig, err := serviceManager.Config()
serviceConfig, err := c.getServiceConfig(serviceManager)
if err != nil {
if !errors.Is(err, windows.ERROR_FILE_NOT_FOUND) && !errors.Is(err, windows.ERROR_MUI_FILE_NOT_FOUND) {
return fmt.Errorf("failed to get service configuration: %w", err)
}

c.logger.Debug("failed collecting service",
c.logger.Debug("failed collecting service config",
slog.Any("err", err),
slog.String("service", serviceNameString),
slog.String("service", serviceName),
)
}

ch <- prometheus.MustNewConstMetric(
c.info,
prometheus.GaugeValue,
1.0,
serviceNameString,
serviceName,
serviceConfig.DisplayName,
serviceConfig.ServiceStartName,
serviceConfig.BinaryPathName,
Expand All @@ -256,21 +288,21 @@ func (c *Collector) collectService(ch chan<- prometheus.Metric, service windows.
isCurrentState float64
)

for _, startMode := range apiStartModeValues {
for _, startMode := range c.apiStartModeValues {
isCurrentStartMode = 0.0
if startMode == apiStartModeValues[serviceConfig.StartType] {
if startMode == c.apiStartModeValues[serviceConfig.StartType] {
isCurrentStartMode = 1.0
}
ch <- prometheus.MustNewConstMetric(
c.startMode,
prometheus.GaugeValue,
isCurrentStartMode,
serviceNameString,
serviceName,
startMode,
)
}

for state, stateValue := range apiStateValues {
for state, stateValue := range c.apiStateValues {
isCurrentState = 0.0
if state == service.ServiceStatusProcess.CurrentState {
isCurrentState = 1.0
Expand All @@ -280,7 +312,7 @@ func (c *Collector) collectService(ch chan<- prometheus.Metric, service windows.
c.state,
prometheus.GaugeValue,
isCurrentState,
serviceNameString,
serviceName,
stateValue,
)
}
Expand All @@ -297,7 +329,7 @@ func (c *Collector) collectService(ch chan<- prometheus.Metric, service windows.
c.processID,
prometheus.GaugeValue,
float64(processStartTime/1_000_000_000),
serviceNameString,
serviceName,
processID,
)

Expand All @@ -306,12 +338,12 @@ func (c *Collector) collectService(ch chan<- prometheus.Metric, service windows.

if errors.Is(err, windows.ERROR_ACCESS_DENIED) {
c.logger.Debug("failed to get process start time",
slog.String("service", serviceNameString),
slog.String("service", serviceName),
slog.Any("err", err),
)
} else {
c.logger.Warn("failed to get process start time",
slog.String("service", serviceNameString),
slog.String("service", serviceName),
slog.Any("err", err),
)
}
Expand All @@ -325,38 +357,38 @@ func (c *Collector) queryAllServices() ([]windows.ENUM_SERVICE_STATUS_PROCESS, e
var (
bytesNeeded uint32
servicesReturned uint32
resumeHandle uint32
err error
)

if err := windows.EnumServicesStatusEx(
c.serviceManagerHandle.Handle,
windows.SC_STATUS_PROCESS_INFO,
windows.SERVICE_WIN32,
windows.SERVICE_STATE_ALL,
nil,
0,
&bytesNeeded,
&servicesReturned,
&resumeHandle,
nil,
); !errors.Is(err, windows.ERROR_MORE_DATA) {
return nil, fmt.Errorf("could not fetch buffer size for EnumServicesStatusEx: %w", err)
}
buf := make([]byte, 1024*100)

for {
err = windows.EnumServicesStatusEx(
c.serviceManagerHandle.Handle,
windows.SC_STATUS_PROCESS_INFO,
windows.SERVICE_WIN32,
windows.SERVICE_STATE_ALL,
&buf[0],
uint32(len(buf)),
&bytesNeeded,
&servicesReturned,
nil,
nil,
)

buf := make([]byte, bytesNeeded)
if err := windows.EnumServicesStatusEx(
c.serviceManagerHandle.Handle,
windows.SC_STATUS_PROCESS_INFO,
windows.SERVICE_WIN32,
windows.SERVICE_STATE_ALL,
&buf[0],
bytesNeeded,
&bytesNeeded,
&servicesReturned,
&resumeHandle,
nil,
); err != nil {
return nil, fmt.Errorf("could not query windows service list: %w", err)
if err == nil {
break
}

if !errors.Is(err, windows.ERROR_MORE_DATA) {
return nil, err
}

if bytesNeeded <= uint32(len(buf)) {
return nil, err
}

buf = make([]byte, bytesNeeded)
}

if servicesReturned == 0 {
Expand Down Expand Up @@ -397,3 +429,44 @@ func (c *Collector) getProcessStartTime(pid uint32) (uint64, error) {

return uint64(creation.Nanoseconds()), nil
}

// getServiceConfig is an optimized variant of [mgr.Service] that only
// retrieves the necessary information.
func (c *Collector) getServiceConfig(service *mgr.Service) (mgr.Config, error) {
var serviceConfig *windows.QUERY_SERVICE_CONFIG

bytesNeeded := uint32(1024)

buf, ok := c.serviceConfigPoolBytes.Get().(*[]byte)
if !ok || len(*buf) == 0 {
*buf = make([]byte, bytesNeeded)
}

for {
serviceConfig = (*windows.QUERY_SERVICE_CONFIG)(unsafe.Pointer(&(*buf)[0]))

err := windows.QueryServiceConfig(service.Handle, serviceConfig, bytesNeeded, &bytesNeeded)
if err == nil {
break
}

if !errors.Is(err, windows.ERROR_INSUFFICIENT_BUFFER) {
return mgr.Config{}, err
}

if bytesNeeded <= uint32(len(*buf)) {
return mgr.Config{}, err
}

*buf = make([]byte, bytesNeeded)
}

c.serviceConfigPoolBytes.Put(buf)

return mgr.Config{
BinaryPathName: windows.UTF16PtrToString(serviceConfig.BinaryPathName),
DisplayName: windows.UTF16PtrToString(serviceConfig.DisplayName),
StartType: serviceConfig.StartType,
ServiceStartName: windows.UTF16PtrToString(serviceConfig.ServiceStartName),
}, nil
}

0 comments on commit 9f29fc8

Please sign in to comment.