Skip to content

Commit

Permalink
Refactor to add subchecks (#19)
Browse files Browse the repository at this point in the history
* Refactor to add subchecks
* Linter and style fixes


---------

Co-authored-by: Lorenz Kästle <[email protected]>
  • Loading branch information
martialblog and RincewindsHat authored Jun 11, 2024
1 parent 84ee0fc commit c1168b7
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 94 deletions.
1 change: 1 addition & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ disable=fixme,
too-many-locals,
too-many-statements,
too-many-branches,
too-many-return-statements,
bare-except,
missing-module-docstring,
missing-function-docstring,
Expand Down
31 changes: 20 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,34 @@ The plugin requires at least Python 3.
## Usage

```
check_brevisone [-h] -H HOSTNAME [-T TIMEOUT] [-Q QUEUE] [-F FAIL]
[--signal-warning SIGNAL_WARNING]
[--signal-critical SIGNAL_CRITICAL] [--ssl-insecure]
[--protocol PROTOCOL]
usage: check_brevisone.py [-h] [-V] -H HOSTNAME [-T TIMEOUT] [--ssl-insecure] [--protocol {http,https}] [-d] [--queue-warning QUEUE_WARNING]
[--queue-critical QUEUE_CRITICAL] [--failed-warning FAILED_WARNING] [--failed-critical FAILED_CRITICAL]
[--signal-warning SIGNAL_WARNING] [--signal-critical SIGNAL_CRITICAL] [--disk-warning DISK_WARNING] [--disk-critical DISK_CRITICAL]
```

## Example

```
check_brevisone -H 192.168.1.1 --signal-warning -85 --signal-critical -90
OK - que: 0 failed: 0 signal: -83db total: 0 state: Idle load: 0;0.03;0.05 time: 1451320254 disk free: 647569408 uptime: 9 min, 0 users
check_brevisone -H 192.168.1.1
[CRITICAL] - Brevis.One SMS Gateway Status
\_[CRITICAL] Failed sending: 12
\_[OK] Signal strength: 95
\_[CRITICAL] Que length: 23
|que=23 failed=12 signal=95 total=885 time=1713865490 disk=1400246272
check_brevisone -H 192.168.1.1 --protocol http --failed-critical 18 --failed-warning 15 --signal-warning 100 --signal-critical 120
[CRITICAL] - Brevis.One SMS Gateway Status
\_[OK] Failed sending: 12
\_[OK] Signal strength: 95
\_[CRITICAL] Que length: 23
|que=23 failed=12 signal=95 total=885 time=1713865490 disk=1400246272
```

## Advanced

Since firmware version 4.0 HTTPS is the default. To connect to a unencrypted HTTP endpoint you can use ```--protocol=http```.
Since firmware version 4.0 HTTPS is the default. To connect to a unencrypted HTTP endpoint you can use `--protocol=http`.
If you are using a self-certified certificate, use `--ssl-insecure` to disable verification.

I you are using a self-certified certificate, use ```--ssl-insecure``` to disable verification.
`--disk-warning` and `--disk-critical` don't have defaults, since we don't know the limit of the specific device. Each user will have to check their devices disk capacity and set an appropriate value.

# License

Expand Down
244 changes: 174 additions & 70 deletions check_brevisone
Original file line number Diff line number Diff line change
Expand Up @@ -55,53 +55,41 @@ import sys
# threshold or did not appear to be working properly
# 2 - CRITICAL - The plugin detected that either the service was not running or it was above some "critical" threshold
# 3 - UNKNOWN - Invalid command line arguments were supplied to the plugin or low-level failures
__version__ = '4.0.0-rc1'

OK = 0
WARNING = 1
CRITICAL = 2
UNKNOWN = 3

__version__ = '3.0.0'


def read_int(string):
"""
Get and sanitize integer data
"""
try:
return int(string.split(':')[1])
except ValueError:
return 0


def read_signal(string):
"""
Get and sanitize the signal data
"""
sig = ':'.join(string.split(':')[1:]).strip()
sig = sig.replace('db', '').replace('dBm', '').strip()

try:
return int(sig)
except ValueError:
return 0
STATES = {
OK: "OK",
WARNING: "WARNING",
CRITICAL: "CRITICAL",
UNKNOWN: "UNKNOWN",
}


def generate_output(status='UNKNOWN', lines=None, perfdata=None):
def generate_output(status=3, outputs=None, perfdata=None):
"""
Generate plugin data output with status and perfdata
"""

pluginoutput = str(status)
pluginoutput = '[{}] - Brevis.One SMS Gateway Status\n'.format(STATES.get(status, 'UNKNOWN'))

# All lines we received from the Endpoint
if lines:
pluginoutput += ' - ' + ' '.join(lines)
if outputs:
for line in outputs:
pluginoutput += line + '\n'

# Perfdata we explicitly extracted from the data
# The perfdata var should be a dict, we normalize the keys and
# transform everything into the expected output 'key 1': 3 > key_1=3
if perfdata:
pluginoutput += '|' + ' '.join([key.lower().replace(" ", "_").replace(",", "") + '=' + str(value) for key, value in perfdata.items()])
pluginoutput += '|'
for k, v in perfdata.items():
if (k and v) and isinstance(v, int):
pluginoutput += k.lower().replace(" ", "_").replace(",", "") + '=' + str(v) + ' '

print(pluginoutput)

Expand All @@ -110,18 +98,42 @@ def commandline(args):
"""
Parse commandline arguments.
"""
parser = ArgumentParser(description= "check_brevisone (Version: %s)" % (__version__))
parser = ArgumentParser(description="check_brevisone (Version: %s)" % (__version__))

parser.add_argument('-V', '--version', action='version', version='check_brevisone' + __version__)

parser.add_argument('-H', '--hostname', help='The host address of the SMS gateway', required=True)
parser.add_argument('-T', '--timeout', help='Seconds before connection times out (default 10)',
default=10,
type=int)
parser.add_argument('-Q', '--queue', help='The warning threshold for the amount of queued SMS (default 1)',
parser.add_argument('--ssl-insecure',
dest='insecure',
action='store_true',
default=False,
help='Allow insecure SSL connections (default False)')
parser.add_argument('--protocol',
choices=["http", "https"],
default='https',
help='HTTP protocol, use one of http or https (default https)')
parser.add_argument('-d', '--debug', action='store_true',
help='debug mode')

parser.add_argument('--queue-warning', help='The warning threshold for the amount of queued SMS (default 1)',
default=1,
type=int)
parser.add_argument('-F', '--fail', help='The critical threshold for failed SMS (default 1)', default=1, type=int)
parser.add_argument('--queue-critical', help='The critical threshold for the amount of queued SMS (default 5)',
default=5,
type=int)

parser.add_argument('--failed-warning',
help='The warning threshold for failed SMS (default 1)',
default=1,
type=int)
parser.add_argument('--failed-critical',
help='The critical threshold for failed SMS (default 5)',
default=5,
type=int)

parser.add_argument('--signal-warning',
help='The warning threshold for the minimum signal strength (in db, default -91)',
default=-91,
Expand All @@ -130,15 +142,15 @@ def commandline(args):
help='The critical threshold for the minimum signal strength (in db, default -107)',
default=-107,
type=int)
parser.add_argument('--ssl-insecure',
dest='insecure',
action='store_true',
default=False,
help='Allow insecure SSL connections (default False)')
parser.add_argument('--protocol',
choices=["http", "https"],
default='https',
help='HTTP protocol, use one of http or https (default https)')

parser.add_argument('--disk-warning',
help='The warning threshold for the disk space (in bytes)',
required=False,
type=int)
parser.add_argument('--disk-critical',
help='The critical threshold for the disk space (in bytes)',
required=False,
type=int)

return parser.parse_args(args)

Expand All @@ -152,7 +164,7 @@ def get_data(base_url, timeout, insecure):
HTTP/1.0 200 OK
que: foo
failed: 0
signal_strength: 15 db
signal: 15 db
total: 25
"""

Expand All @@ -164,7 +176,7 @@ def get_data(base_url, timeout, insecure):

# Example URL: https://mybrevisone/check.php
url = urljoin(base_url, "check.php")
response = urllib.request.urlopen(url=url, timeout=timeout, context=ctx) # pylint: disable=consider-using-with
response = urllib.request.urlopen(url=url, timeout=timeout, context=ctx) # pylint: disable=consider-using-with

if response.getcode() >= 400:
raise RuntimeError("Could not get response")
Expand All @@ -177,61 +189,153 @@ def get_data(base_url, timeout, insecure):

return resp

def extract_perfdata(lines):

def parse_data(data):
"""
Safely extract perfdata
Safely extract data from the APIs reponse
"""
if len(lines) < 4:
return None
lines = [str(i).strip() for i in data.split("\n") if i]

parsed_data = {}

for line in lines:
d = line.split(":")
if len(d) == 2:
key = d[0].strip()
value = d[1].strip()

# Remove the db string from the signal value
if key == "signal":
value = value.replace('db', '').replace('dBm', '').strip()

# Parse integer value to be actual integers
if value.lstrip('-').isdigit():
value = int(value)

parsed_data[key] = value

perfdata = {
'que': read_int(lines[0]),
'failed': read_int(lines[1]),
'signal': read_signal(lines[2]),
'total': read_int(lines[3])
}
return parsed_data


def worst_state(*states):
overall = -1

for state in states:
if state == CRITICAL:
overall = CRITICAL
elif state == UNKNOWN:
if overall != CRITICAL:
overall = UNKNOWN
elif state > overall:
overall = state

if overall < 0 or overall > 3:
overall = UNKNOWN

return overall


def debug_print(debug_flag, message):
"""
Print debug messages if -d is set.
"""
if not debug_flag:
return

print(message)


def determine_status(args, perfdata):
states = []
outputs = []

if perfdata['failed'] >= args.failed_critical:
outputs.append(" \\_[CRITICAL] Failed sending: {}".format(perfdata['failed']))
states.append(CRITICAL)
elif perfdata['failed'] >= args.failed_warning:
outputs.append(" \\_[WARNING] Failed sending: {}".format(perfdata['failed']))
states.append(WARNING)
else:
outputs.append(" \\_[OK] Failed sending: {}".format(perfdata['failed']))
states.append(OK)

if perfdata['signal'] <= args.signal_critical:
outputs.append(" \\_[CRITICAL] Signal strength: {}".format(perfdata['signal']))
states.append(CRITICAL)
elif perfdata['signal'] <= args.signal_warning:
outputs.append(" \\_[WARNING] Signal strength: {}".format(perfdata['signal']))
states.append(WARNING)
else:
outputs.append(" \\_[OK] Signal strength: {}".format(perfdata['signal']))
states.append(OK)

if perfdata['que'] >= args.queue_critical:
outputs.append(" \\_[CRITICAL] Queue length: {}".format(perfdata['que']))
states.append(CRITICAL)
elif perfdata['que'] >= args.queue_warning:
outputs.append(" \\_[WARNING] Queue length: {}".format(perfdata['que']))
states.append(WARNING)
else:
outputs.append(" \\_[OK] Queue length: {}".format(perfdata['que']))
states.append(OK)

if args.disk_critical and perfdata['disk'] >= args.disk_critical:
outputs.append(" \\_[CRITICAL] Disk usage: {}".format(perfdata['disk']))
states.append(CRITICAL)
elif args.disk_warning and perfdata['disk'] >= args.disk_warning:
outputs.append(" \\_[WARNING] Disk usage: {}".format(perfdata['disk']))
states.append(WARNING)
elif args.disk_warning or args.disk_critical:
outputs.append(" \\_[OK] Disk usage: {}".format(perfdata['disk']))
states.append(OK)

return states, outputs

return perfdata

def main(args):
try:
base_url = args.protocol + '://' + args.hostname
data = get_data(base_url=base_url,
timeout=args.timeout,
insecure=args.insecure)
except Exception as data_exc: # pylint: disable=broad-except
except Exception as data_exc: # pylint: disable=broad-except
print('UNKNOWN - Could not connect to SMS Gateway', data_exc)
return UNKNOWN

# Split up lines and clean up items
lines = [str(i).strip() for i in data.split("\n") if i]
# Safely extract perfdata, which we'll also use to determine the final status
perfdata = extract_perfdata(lines)
perfdata = parse_data(data)

if not perfdata:
print('UNKNOWN - Could not determine status', perfdata)
return UNKNOWN

if 'failed' not in perfdata:
print('UNKNOWN - Could not determine failed status', perfdata)
return UNKNOWN

if 'que' not in perfdata:
print('UNKNOWN - Could not determine que status', perfdata)
return UNKNOWN

# Determine the final status
if perfdata['failed'] >= args.fail or perfdata['signal'] <= args.signal_critical:
generate_output(status='CRITICAL', lines=lines, perfdata=perfdata)
return CRITICAL
states, outputs = determine_status(args, perfdata)

if perfdata['que'] >= args.queue or perfdata['signal'] <= args.signal_warning:
generate_output(status='WARNING', lines=lines, perfdata=perfdata)
return WARNING
debug_print(args.debug, "API Response:\n{}".format(data))

generate_output(status='OK', lines=lines, perfdata=perfdata)
return OK
generate_output(worst_state(*states), outputs=outputs, perfdata=perfdata)
return worst_state(*states)


if __name__ == '__main__': # pragma: no cover
if __name__ == '__main__': # pragma: no cover
try:
ARGS = commandline(sys.argv[1:])
sys.exit(main(ARGS))
except SystemExit:
# Re-throw the exception
raise sys.exc_info()[1].with_traceback(sys.exc_info()[2]) # pylint: disable=raise-missing-from
except:
print("UNKNOWN - Error: %s" % (str(sys.exc_info()[1])))
exception = sys.exc_info()[1]

if exception is not None:
raise exception.with_traceback(sys.exc_info()[2]) # pylint: disable=raise-missing-from
except Exception as excpt: # pylint: disable=broad-except
print("UNKNOWN - Error: %s" % (excpt))
sys.exit(3)
Loading

0 comments on commit c1168b7

Please sign in to comment.