diff --git a/.pylintrc b/.pylintrc index 6ec578a..e6da2dc 100644 --- a/.pylintrc +++ b/.pylintrc @@ -7,6 +7,7 @@ disable=fixme, too-many-locals, too-many-statements, too-many-branches, + too-many-return-statements, bare-except, missing-module-docstring, missing-function-docstring, diff --git a/README.md b/README.md index 0e97751..6d900ae 100644 --- a/README.md +++ b/README.md @@ -12,25 +12,34 @@ The plugin requires at least Python 3. ## Usage ``` -check_brevisone [-h] -H HOSTNAME [-T TIMEOUT] [-Q QUEUE] [-F FAIL] - [--signal-warning SIGNAL_WARNING] - [--signal-critical SIGNAL_CRITICAL] [--ssl-insecure] - [--protocol PROTOCOL] +usage: check_brevisone.py [-h] [-V] -H HOSTNAME [-T TIMEOUT] [--ssl-insecure] [--protocol {http,https}] [-d] [--queue-warning QUEUE_WARNING] + [--queue-critical QUEUE_CRITICAL] [--failed-warning FAILED_WARNING] [--failed-critical FAILED_CRITICAL] + [--signal-warning SIGNAL_WARNING] [--signal-critical SIGNAL_CRITICAL] [--disk-warning DISK_WARNING] [--disk-critical DISK_CRITICAL] + ``` ## Example ``` -check_brevisone -H 192.168.1.1 --signal-warning -85 --signal-critical -90 - -OK - que: 0 failed: 0 signal: -83db total: 0 state: Idle load: 0;0.03;0.05 time: 1451320254 disk free: 647569408 uptime: 9 min, 0 users +check_brevisone -H 192.168.1.1 +[CRITICAL] - Brevis.One SMS Gateway Status + \_[CRITICAL] Failed sending: 12 + \_[OK] Signal strength: 95 + \_[CRITICAL] Que length: 23 +|que=23 failed=12 signal=95 total=885 time=1713865490 disk=1400246272 + +check_brevisone -H 192.168.1.1 --protocol http --failed-critical 18 --failed-warning 15 --signal-warning 100 --signal-critical 120 +[CRITICAL] - Brevis.One SMS Gateway Status + \_[OK] Failed sending: 12 + \_[OK] Signal strength: 95 + \_[CRITICAL] Que length: 23 +|que=23 failed=12 signal=95 total=885 time=1713865490 disk=1400246272 ``` -## Advanced - -Since firmware version 4.0 HTTPS is the default. To connect to a unencrypted HTTP endpoint you can use ```--protocol=http```. +Since firmware version 4.0 HTTPS is the default. To connect to a unencrypted HTTP endpoint you can use `--protocol=http`. +If you are using a self-certified certificate, use `--ssl-insecure` to disable verification. -I you are using a self-certified certificate, use ```--ssl-insecure``` to disable verification. +`--disk-warning` and `--disk-critical` don't have defaults, since we don't know the limit of the specific device. Each user will have to check their devices disk capacity and set an appropriate value. # License diff --git a/check_brevisone b/check_brevisone index ac19006..62085fd 100755 --- a/check_brevisone +++ b/check_brevisone @@ -55,53 +55,41 @@ import sys # threshold or did not appear to be working properly # 2 - CRITICAL - The plugin detected that either the service was not running or it was above some "critical" threshold # 3 - UNKNOWN - Invalid command line arguments were supplied to the plugin or low-level failures +__version__ = '4.0.0-rc1' + OK = 0 WARNING = 1 CRITICAL = 2 UNKNOWN = 3 -__version__ = '3.0.0' - - -def read_int(string): - """ - Get and sanitize integer data - """ - try: - return int(string.split(':')[1]) - except ValueError: - return 0 - - -def read_signal(string): - """ - Get and sanitize the signal data - """ - sig = ':'.join(string.split(':')[1:]).strip() - sig = sig.replace('db', '').replace('dBm', '').strip() - - try: - return int(sig) - except ValueError: - return 0 +STATES = { + OK: "OK", + WARNING: "WARNING", + CRITICAL: "CRITICAL", + UNKNOWN: "UNKNOWN", +} -def generate_output(status='UNKNOWN', lines=None, perfdata=None): +def generate_output(status=3, outputs=None, perfdata=None): """ Generate plugin data output with status and perfdata """ - pluginoutput = str(status) + pluginoutput = '[{}] - Brevis.One SMS Gateway Status\n'.format(STATES.get(status, 'UNKNOWN')) # All lines we received from the Endpoint - if lines: - pluginoutput += ' - ' + ' '.join(lines) + if outputs: + for line in outputs: + pluginoutput += line + '\n' # Perfdata we explicitly extracted from the data # The perfdata var should be a dict, we normalize the keys and # transform everything into the expected output 'key 1': 3 > key_1=3 if perfdata: - pluginoutput += '|' + ' '.join([key.lower().replace(" ", "_").replace(",", "") + '=' + str(value) for key, value in perfdata.items()]) + pluginoutput += '|' + for k, v in perfdata.items(): + if (k and v) and isinstance(v, int): + pluginoutput += k.lower().replace(" ", "_").replace(",", "") + '=' + str(v) + ' ' print(pluginoutput) @@ -110,7 +98,7 @@ def commandline(args): """ Parse commandline arguments. """ - parser = ArgumentParser(description= "check_brevisone (Version: %s)" % (__version__)) + parser = ArgumentParser(description="check_brevisone (Version: %s)" % (__version__)) parser.add_argument('-V', '--version', action='version', version='check_brevisone' + __version__) @@ -118,10 +106,34 @@ def commandline(args): parser.add_argument('-T', '--timeout', help='Seconds before connection times out (default 10)', default=10, type=int) - parser.add_argument('-Q', '--queue', help='The warning threshold for the amount of queued SMS (default 1)', + parser.add_argument('--ssl-insecure', + dest='insecure', + action='store_true', + default=False, + help='Allow insecure SSL connections (default False)') + parser.add_argument('--protocol', + choices=["http", "https"], + default='https', + help='HTTP protocol, use one of http or https (default https)') + parser.add_argument('-d', '--debug', action='store_true', + help='debug mode') + + parser.add_argument('--queue-warning', help='The warning threshold for the amount of queued SMS (default 1)', default=1, type=int) - parser.add_argument('-F', '--fail', help='The critical threshold for failed SMS (default 1)', default=1, type=int) + parser.add_argument('--queue-critical', help='The critical threshold for the amount of queued SMS (default 5)', + default=5, + type=int) + + parser.add_argument('--failed-warning', + help='The warning threshold for failed SMS (default 1)', + default=1, + type=int) + parser.add_argument('--failed-critical', + help='The critical threshold for failed SMS (default 5)', + default=5, + type=int) + parser.add_argument('--signal-warning', help='The warning threshold for the minimum signal strength (in db, default -91)', default=-91, @@ -130,15 +142,15 @@ def commandline(args): help='The critical threshold for the minimum signal strength (in db, default -107)', default=-107, type=int) - parser.add_argument('--ssl-insecure', - dest='insecure', - action='store_true', - default=False, - help='Allow insecure SSL connections (default False)') - parser.add_argument('--protocol', - choices=["http", "https"], - default='https', - help='HTTP protocol, use one of http or https (default https)') + + parser.add_argument('--disk-warning', + help='The warning threshold for the disk space (in bytes)', + required=False, + type=int) + parser.add_argument('--disk-critical', + help='The critical threshold for the disk space (in bytes)', + required=False, + type=int) return parser.parse_args(args) @@ -152,7 +164,7 @@ def get_data(base_url, timeout, insecure): HTTP/1.0 200 OK que: foo failed: 0 - signal_strength: 15 db + signal: 15 db total: 25 """ @@ -164,7 +176,7 @@ def get_data(base_url, timeout, insecure): # Example URL: https://mybrevisone/check.php url = urljoin(base_url, "check.php") - response = urllib.request.urlopen(url=url, timeout=timeout, context=ctx) # pylint: disable=consider-using-with + response = urllib.request.urlopen(url=url, timeout=timeout, context=ctx) # pylint: disable=consider-using-with if response.getcode() >= 400: raise RuntimeError("Could not get response") @@ -177,21 +189,108 @@ def get_data(base_url, timeout, insecure): return resp -def extract_perfdata(lines): + +def parse_data(data): """ - Safely extract perfdata + Safely extract data from the APIs reponse """ - if len(lines) < 4: - return None + lines = [str(i).strip() for i in data.split("\n") if i] + + parsed_data = {} + + for line in lines: + d = line.split(":") + if len(d) == 2: + key = d[0].strip() + value = d[1].strip() + + # Remove the db string from the signal value + if key == "signal": + value = value.replace('db', '').replace('dBm', '').strip() + + # Parse integer value to be actual integers + if value.lstrip('-').isdigit(): + value = int(value) + + parsed_data[key] = value - perfdata = { - 'que': read_int(lines[0]), - 'failed': read_int(lines[1]), - 'signal': read_signal(lines[2]), - 'total': read_int(lines[3]) - } + return parsed_data + + +def worst_state(*states): + overall = -1 + + for state in states: + if state == CRITICAL: + overall = CRITICAL + elif state == UNKNOWN: + if overall != CRITICAL: + overall = UNKNOWN + elif state > overall: + overall = state + + if overall < 0 or overall > 3: + overall = UNKNOWN + + return overall + + +def debug_print(debug_flag, message): + """ + Print debug messages if -d is set. + """ + if not debug_flag: + return + + print(message) + + +def determine_status(args, perfdata): + states = [] + outputs = [] + + if perfdata['failed'] >= args.failed_critical: + outputs.append(" \\_[CRITICAL] Failed sending: {}".format(perfdata['failed'])) + states.append(CRITICAL) + elif perfdata['failed'] >= args.failed_warning: + outputs.append(" \\_[WARNING] Failed sending: {}".format(perfdata['failed'])) + states.append(WARNING) + else: + outputs.append(" \\_[OK] Failed sending: {}".format(perfdata['failed'])) + states.append(OK) + + if perfdata['signal'] <= args.signal_critical: + outputs.append(" \\_[CRITICAL] Signal strength: {}".format(perfdata['signal'])) + states.append(CRITICAL) + elif perfdata['signal'] <= args.signal_warning: + outputs.append(" \\_[WARNING] Signal strength: {}".format(perfdata['signal'])) + states.append(WARNING) + else: + outputs.append(" \\_[OK] Signal strength: {}".format(perfdata['signal'])) + states.append(OK) + + if perfdata['que'] >= args.queue_critical: + outputs.append(" \\_[CRITICAL] Queue length: {}".format(perfdata['que'])) + states.append(CRITICAL) + elif perfdata['que'] >= args.queue_warning: + outputs.append(" \\_[WARNING] Queue length: {}".format(perfdata['que'])) + states.append(WARNING) + else: + outputs.append(" \\_[OK] Queue length: {}".format(perfdata['que'])) + states.append(OK) + + if args.disk_critical and perfdata['disk'] >= args.disk_critical: + outputs.append(" \\_[CRITICAL] Disk usage: {}".format(perfdata['disk'])) + states.append(CRITICAL) + elif args.disk_warning and perfdata['disk'] >= args.disk_warning: + outputs.append(" \\_[WARNING] Disk usage: {}".format(perfdata['disk'])) + states.append(WARNING) + elif args.disk_warning or args.disk_critical: + outputs.append(" \\_[OK] Disk usage: {}".format(perfdata['disk'])) + states.append(OK) + + return states, outputs - return perfdata def main(args): try: @@ -199,39 +298,44 @@ def main(args): data = get_data(base_url=base_url, timeout=args.timeout, insecure=args.insecure) - except Exception as data_exc: # pylint: disable=broad-except + except Exception as data_exc: # pylint: disable=broad-except print('UNKNOWN - Could not connect to SMS Gateway', data_exc) return UNKNOWN - # Split up lines and clean up items - lines = [str(i).strip() for i in data.split("\n") if i] # Safely extract perfdata, which we'll also use to determine the final status - perfdata = extract_perfdata(lines) + perfdata = parse_data(data) if not perfdata: print('UNKNOWN - Could not determine status', perfdata) return UNKNOWN + if 'failed' not in perfdata: + print('UNKNOWN - Could not determine failed status', perfdata) + return UNKNOWN + + if 'que' not in perfdata: + print('UNKNOWN - Could not determine que status', perfdata) + return UNKNOWN + # Determine the final status - if perfdata['failed'] >= args.fail or perfdata['signal'] <= args.signal_critical: - generate_output(status='CRITICAL', lines=lines, perfdata=perfdata) - return CRITICAL + states, outputs = determine_status(args, perfdata) - if perfdata['que'] >= args.queue or perfdata['signal'] <= args.signal_warning: - generate_output(status='WARNING', lines=lines, perfdata=perfdata) - return WARNING + debug_print(args.debug, "API Response:\n{}".format(data)) - generate_output(status='OK', lines=lines, perfdata=perfdata) - return OK + generate_output(worst_state(*states), outputs=outputs, perfdata=perfdata) + return worst_state(*states) -if __name__ == '__main__': # pragma: no cover +if __name__ == '__main__': # pragma: no cover try: ARGS = commandline(sys.argv[1:]) sys.exit(main(ARGS)) except SystemExit: # Re-throw the exception - raise sys.exc_info()[1].with_traceback(sys.exc_info()[2]) # pylint: disable=raise-missing-from - except: - print("UNKNOWN - Error: %s" % (str(sys.exc_info()[1]))) + exception = sys.exc_info()[1] + + if exception is not None: + raise exception.with_traceback(sys.exc_info()[2]) # pylint: disable=raise-missing-from + except Exception as excpt: # pylint: disable=broad-except + print("UNKNOWN - Error: %s" % (excpt)) sys.exit(3) diff --git a/test_check_brevisone.py b/test_check_brevisone.py index faa90cd..4e974e9 100644 --- a/test_check_brevisone.py +++ b/test_check_brevisone.py @@ -8,21 +8,70 @@ from check_brevisone import commandline from check_brevisone import generate_output +from check_brevisone import determine_status from check_brevisone import get_data +from check_brevisone import worst_state from check_brevisone import main class UtilTesting(unittest.TestCase): @mock.patch('builtins.print') def test_output(self, mock_print): - generate_output(status='CRITICAL') - mock_print.assert_called_with("CRITICAL") + generate_output(status=2) + mock_print.assert_called_with("[CRITICAL] - Brevis.One SMS Gateway Status\n") - generate_output(status='OK', lines=['1: 2', '3: 4'], perfdata={'1': '2', '3': 4}) - mock_print.assert_called_with("OK - 1: 2 3: 4|1=2 3=4") + generate_output(status=0, perfdata={'1': '2', 'foo': 4}) + mock_print.assert_called_with("[OK] - Brevis.One SMS Gateway Status\n|foo=4 ") - generate_output(status='CRITICAL', lines=['que: foo', 'foo bar: 1'], perfdata={'que': 'foo', 'foo bar': 1}) - mock_print.assert_called_with("CRITICAL - que: foo foo bar: 1|que=foo foo_bar=1") + generate_output(status=3, perfdata={'que': '0', 'foo bar': 1}) + mock_print.assert_called_with("[UNKNOWN] - Brevis.One SMS Gateway Status\n|foo_bar=1 ") + + def test_determine_status(self): + args = commandline(['-H', 'localhost', '--disk-warning', '1500', '--disk-critical', '3000' ]) + data = {"que": 5,"failed": 1, "signal": 0, 'disk': 1400} + + s, o = determine_status(args, data) + + self.assertIn(' \\_[WARNING] Failed sending: 1', o) + self.assertIn(' \\_[CRITICAL] Queue length: 5', o) + self.assertIn(' \\_[OK] Signal strength: 0', o) + self.assertIn(' \\_[OK] Disk usage: 1400', o) + + data = {"que": 0,"failed": 0, "signal": -900, 'disk': 2000} + + s, o = determine_status(args, data) + + self.assertIn(' \\_[OK] Failed sending: 0', o) + self.assertIn(' \\_[OK] Queue length: 0', o) + self.assertIn(' \\_[CRITICAL] Signal strength: -900', o) + self.assertIn(' \\_[WARNING] Disk usage: 2000', o) + + data = {"que": 0,"failed": 0, "signal": -900, 'disk': 4000} + + s, o = determine_status(args, data) + + self.assertIn(' \\_[OK] Failed sending: 0', o) + self.assertIn(' \\_[OK] Queue length: 0', o) + self.assertIn(' \\_[CRITICAL] Signal strength: -900', o) + self.assertIn(' \\_[CRITICAL] Disk usage: 4000', o) + + def test_worst_state(self): + + actual = worst_state() + expected = 3 + self.assertEqual(actual, expected) + + actual = worst_state(0,1,2) + expected = 2 + self.assertEqual(actual, expected) + + actual = worst_state(1,2,3,4) + expected = 3 + self.assertEqual(actual, expected) + + actual = worst_state(0,0,0,0) + expected = 0 + self.assertEqual(actual, expected) class CLITesting(unittest.TestCase): @@ -31,7 +80,7 @@ def test_commandline(self): self.assertEqual(actual.hostname, 'localhost') self.assertEqual(actual.protocol, 'https') self.assertFalse(actual.insecure) - self.assertEqual(actual.fail, 1) + self.assertEqual(actual.failed_warning, 1) class URLTesting(unittest.TestCase): @@ -85,12 +134,31 @@ def test_main_unknown(self, mock_data): actual = main(args) self.assertEqual(actual, 3) + @mock.patch('check_brevisone.get_data') + def test_main_data_missing(self, mock_data): + d = """ + + """ + mock_data.return_value = d + + args = commandline(['-H', 'localhost']) + actual = main(args) + self.assertEqual(actual, 3) + + d = """ + failed: 0 + """ + mock_data.return_value = d + + actual = main(args) + self.assertEqual(actual, 3) + @mock.patch('check_brevisone.get_data') def test_main_ok(self, mock_data): d = """ - que: foo + que: 0 failed: 0 - signal_strength: 15 db + signal: 15 db total: 25 """ mock_data.return_value = d @@ -102,9 +170,9 @@ def test_main_ok(self, mock_data): @mock.patch('check_brevisone.get_data') def test_main_warn(self, mock_data): d = """ - que: foo + que: 1 failed: 0 - signal_strength: -91 db + signal: -91 db total: 25 """ mock_data.return_value = d @@ -116,9 +184,9 @@ def test_main_warn(self, mock_data): @mock.patch('check_brevisone.get_data') def test_main_crit(self, mock_data): d = """ - que: foo + que: 0 failed: 10 - signal_strength: -91 db + signal: -91 db total: 25 """ mock_data.return_value = d