check_openvpn/check_openvpn

324 lines
9.9 KiB
Python
Executable File

#! /usr/bin/env python
###############################################################################
# Nagios plugin check_openvpn
#
#
#
###############################################################################
__author__ = "Andreas Billmeier"
__email__ = "b@edevau.net"
__version__ = 0.1
# from optparse import OptionParser, OptionGroup
import argparse
import logging as log
import socket
import re
import sys
# NAGIOS return codes :
# https://nagios-plugins.org/doc/guidelines.html#AEN78
OK = 0
WARNING = 1
CRITICAL = 2
UNKNOWN = 3
def main():
args = Get_Args()
log.debug(f"args: {args}")
global verbose
if args.v > 0:
verbose = True
message = ""
performancedata = {}
min = ""
max = ""
# assume the best
STATUS = OK
edevau = True
# Create a TCP socket to OpenVPN management
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((args.HOST, args.PORT))
if not s:
gtfo(UNKNOWN, "Could not connect.")
try:
if not ovpn_logon(s, args.password):
gtfo(CRITICAL, "Login failed.")
status, clients, routing = ovpn_status2(s)
log.warning(status)
log.warning(clients)
log.warning(routing)
finally:
s.close()
# now we have all the data
if args.nomultiline:
ml_end = " "
else:
ml_end = "\n"
message += f"{status['TITLE']}{ml_end}"
if args.cn:
# search for a client
i = 0
performancedata["clients"] = 0
performancedata["in"] = 0
performancedata["out"] = 0
for cn in clients["Common Name"]:
if re.findall(args.cn, cn):
message += f"Client {clients['Common Name'][i]} connected since: {clients['Connected Since'][i]}"
performancedata["clients"] += 1
performancedata["in"] = clients["Bytes Received"][i]
performancedata["out"] = clients["Bytes Sent"][i]
performancedata["connectedsince"] = clients["Connected Since (time_t)"][
i
]
i += 1
if performancedata["clients"] == 0:
message += f"No Clients found matching {args.cn}."
STATUS = WARNING
elif args.route:
# search for a specified route
gtfo(UNKNOWN, "Not implemented yet.")
elif args.addr:
# search for a specified route
gtfo(UNKNOWN, "Not implemented yet.")
else:
# default: report Clientcount
clientcount = len(clients["Common Name"])
message += f"Clients connected: {clientcount}"
if not args.nodetails:
message += f" {clients['Common Name']}"
performancedata["clients"] = clientcount
# special edevau counts
if edevau:
performancedata["k_clients"] = 0
performancedata["e_clients"] = 0
performancedata["o_clients"] = 0
performancedata["k_in"] = 0
performancedata["k_out"] = 0
performancedata["e_in"] = 0
performancedata["e_out"] = 0
performancedata["o_in"] = 0
performancedata["o_out"] = 0
i = 0
for cn in clients["Common Name"]:
if cn.startswith("k_"):
performancedata["k_clients"] += 1
performancedata["k_in"] += int(clients["Bytes Received"][i])
performancedata["k_out"] += int(clients["Bytes Sent"][i])
elif cn.startswith("e_"):
performancedata["e_clients"] += 1
performancedata["e_in"] += int(clients["Bytes Received"][i])
performancedata["e_out"] += int(clients["Bytes Sent"][i])
else:
performancedata["o_clients"] += 1
performancedata["o_in"] += int(clients["Bytes Received"][i])
performancedata["o_out"] += int(clients["Bytes Sent"][i])
i += 1
# add performancedata
# 'label'=value[UOM];[warn];[crit];[min];[max]
# https://nagios-plugins.org/doc/guidelines.html#PLUGOUTPUT
message += f" |"
for key in performancedata.keys():
if key.endswith("clients"):
message += (
f" {key}={performancedata[key]};{args.warn};{args.crit};{min};{max}"
)
else:
message += f" {key}={performancedata[key]};;;;"
# go Home
gtfo(STATUS, message)
def ovpn_status2(s):
try:
request = "status 2\r\n"
s.sendall(request.encode())
except socket.error:
gtfo(UNKNOWN, "Send failed")
status = {}
clients = {}
routing = {}
end = False
while not end:
data = s.recv(1024)
if not data:
break
lines = data.decode()
log.debug(f"lines...:{lines}")
for line in lines.split("\n"):
line = line.rstrip("\r")
if not re.findall(",", line):
# no Komma Found
end = True
break
token, payload = line.split(",", 1)
log.debug(f"{token} {payload}")
if token == "TITLE" or token == "GLOBAL_STATS" or token == "TIME":
status[token] = payload
continue
elif token == "HEADER":
header, hcontent = payload.split(",", 1)
if header == "CLIENT_LIST":
for i in hcontent.split(","):
clients[i] = []
elif header == "ROUTING_TABLE":
for i in hcontent.split(","):
routing[i] = []
else:
log.warning(f"unknown HEADER found: {header}")
continue
elif token == "CLIENT_LIST" or token == "ROUTING_TABLE":
i = 0
while i < len(hcontent.split(",")):
log.debug(
f"{token} {hcontent.split(',')[i]} -> {payload.split(',')[i]}"
)
if token == "CLIENT_LIST":
clients[hcontent.split(",")[i]].append(payload.split(",")[i])
else:
routing[hcontent.split(",")[i]].append(payload.split(",")[i])
i += 1
continue
elif token == "ERROR":
log.warning(f"{token} {payload}")
continue
else:
log.error(f"unknown token {token} -> {payload}")
return status, clients, routing
def ovpn_logon(s, password):
connected = False
while not connected:
line = s.recv(1024).decode()
log.debug(f"line..:{line}")
token = line.split(":")[0]
log.debug(f"token..:{token}")
if token == "ENTER PASSWORD":
if password:
log.debug("Logon requested...")
try:
request = f"{password}\r\n"
s.sendall(request.encode())
except socket.error:
gtfo(ERROR, "Send Password failed")
else:
gtfo(ERROR, "Logon requested, none given.")
elif token == "SUCCESS":
connected = True
break
elif token == ">INFO":
connected = True
break
elif token == ">ERROR":
# what the hell...
break
return connected
def save_obj(obj, name):
with open("obj/" + name + ".pkl", "wb") as f:
pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL)
def load_obj(name):
with open("obj/" + name + ".pkl", "rb") as f:
return pickle.load(f)
def Get_Args():
parser = argparse.ArgumentParser(
description="check_openvpn", usage=f"usage: {sys.argv[0]} [-v|vv|vvv] [options]"
)
parser.add_argument(
"-v",
"--verbosity",
action="count",
dest="v",
default=0,
help="increase output verbosity [-v|vv|vvv]",
)
parser.add_argument("-H", help="Hostname or IP address", required=True, dest="HOST")
parser.add_argument(
"-p",
"--port",
required=True,
help="TCP port of management interface",
type=int,
dest="PORT",
default=2195,
)
parser.add_argument(
"-P", "--password", help="password for management interface", default=None
)
parser.add_argument(
"-C", "--cn", help="Common Name (in certificate) to seek", default=None
)
parser.add_argument("-R", "--route", help="Network to seek route for", default=None)
parser.add_argument(
"-A", "--addr", help="IP Address of client system to seek", default=None
)
parser.add_argument(
"-d",
"--perfdata",
help="Output performance data",
action="store_true",
default=False,
)
parser.add_argument(
"--nomultiline",
help="Force single line output",
action="store_true",
default=False,
)
parser.add_argument(
"--nodetails",
help="simplify output",
action="store_true",
default=False,
)
parser.add_argument("-w", "--warn", help="warning limit", default=40)
parser.add_argument("-c", "--crit", help="critical limit", default=50)
# parser.add_argument('-n', '--neg ', help='negate the return code', action="store_true", dest="neg", default=False)
args = parser.parse_args()
if args.v > 3:
args.v = 3
log.getLogger().setLevel([log.ERROR, log.WARNING, log.INFO, log.DEBUG][args.v])
log.debug(f"Parsed arguments: {args}")
return args
def gtfo(exitcode, message=""):
log.debug(f"Exiting with status {exitcode}. Message: {message}")
if message:
print(message)
sys.exit(exitcode)
if __name__ == "__main__":
## Initialize logging before hitting main, in case we need extra debuggability
log.basicConfig(
level=log.DEBUG,
format="%(asctime)s - %(funcName)s - %(levelname)s - %(message)s",
)
main()