Quote harmonization: all quotes now single.

This commit is contained in:
Rudi klein 2024-05-30 17:27:17 +02:00
parent 84fc2bc1a8
commit 9c7bcfd2ee

View File

@ -22,8 +22,8 @@ from dotenv import load_dotenv
# config_path = wazuh-notify-config.toml # config_path = wazuh-notify-config.toml
def set_environment() -> tuple: def set_environment() -> tuple:
set_wazuh_path = os.path.abspath(os.path.join(__file__, "../..")) set_wazuh_path = os.path.abspath(os.path.join(__file__, '../..'))
# set_wazuh_path = os.path.abspath(os.path.join(__file__, "../../..")) # set_wazuh_path = os.path.abspath(os.path.join(__file__, '../../..'))
set_log_path = '{0}/logs/wazuh-notify.log'.format(set_wazuh_path) set_log_path = '{0}/logs/wazuh-notify.log'.format(set_wazuh_path)
set_config_path = '{0}/etc/wazuh-notify-config.toml'.format(set_wazuh_path) set_config_path = '{0}/etc/wazuh-notify-config.toml'.format(set_wazuh_path)
@ -48,7 +48,7 @@ def logger(level: int, config: dict, me: str, him: str, message: str) -> None:
logger_wazuh_path, logger_log_path, _ = set_environment() logger_wazuh_path, logger_log_path, _ = set_environment()
# When logging from main(), the destination function is called "<module>". For cosmetic reasons rename to "main". # When logging from main(), the destination function is called '<module>'. For cosmetic reasons rename to 'main'.
him: str = 'main' if him == '<module>' else him him: str = 'main' if him == '<module>' else him
log_line: str = f'{now_logging} | {level} | {me: <27} | {him: <27} | {message}' log_line: str = f'{now_logging} | {level} | {me: <27} | {him: <27} | {message}'
@ -57,7 +57,7 @@ def logger(level: int, config: dict, me: str, him: str, message: str) -> None:
print(log_line) print(log_line)
try: try:
# Compare the extended_logging level in the configuration to the log level of the message. # Compare the extended_logging level in the configuration to the log level of the message.
with open(logger_log_path, mode="a") as log_file: with open(logger_log_path, mode='a') as log_file:
if config.get('python').get('extended_logging', 0) >= level: if config.get('python').get('extended_logging', 0) >= level:
log_file.write(log_line) log_file.write(log_line)
else: else:
@ -65,7 +65,7 @@ def logger(level: int, config: dict, me: str, him: str, message: str) -> None:
level = 'Lvl' level = 'Lvl'
him = 'Origination function' him = 'Origination function'
me = 'Executed function' me = 'Executed function'
message = "Information" message = 'Information'
log_line: str = f'\n{now_logging: <19} |{level: <3}| {me: <27} | {him: <27} | {message}' log_line: str = f'\n{now_logging: <19} |{level: <3}| {me: <27} | {him: <27} | {message}'
log_file.write(log_line + '\n') log_file.write(log_line + '\n')
@ -73,7 +73,7 @@ def logger(level: int, config: dict, me: str, him: str, message: str) -> None:
level = '---' level = '---'
him = '---------------------------' him = '---------------------------'
me = '---------------------------' me = '---------------------------'
message = "------------------------------------------------" message = '------------------------------------------------'
log_line: str = f'\n{now_logging: <19} |{level: <3}| {me: <27} | {him: <27} | {message}' log_line: str = f'\n{now_logging: <19} |{level: <3}| {me: <27} | {him: <27} | {message}'
log_file.write(log_line + '\n') log_file.write(log_line + '\n')
except (FileNotFoundError, PermissionError, OSError): except (FileNotFoundError, PermissionError, OSError):
@ -90,7 +90,7 @@ def get_env() -> tuple:
# Write the configuration to a dictionary. # Write the configuration to a dictionary.
config: dict = get_config() config: dict = get_config()
logger(level=2, config=config, me=me, him=him, message="Configuration retrieved to dictionary") logger(level=2, config=config, me=me, him=him, message='Configuration retrieved to dictionary')
# Check if the secrets .env file is available. # Check if the secrets .env file is available.
try: try:
@ -98,15 +98,15 @@ def get_env() -> tuple:
load_dotenv(dotenv_path) load_dotenv(dotenv_path)
if not os.path.isfile(dotenv_path): if not os.path.isfile(dotenv_path):
logger(level=0, config=config, me=me, him=him, message=f'%s not found' % dotenv_path) logger(level=0, config=config, me=me, him=him, message=f'%s not found' % dotenv_path)
raise Exception(dotenv_path, "file not found") raise Exception(dotenv_path, 'file not found')
# Retrieve URLs from .env # Retrieve URLs from .env
discord_url = os.getenv("DISCORD_URL") discord_url = os.getenv('DISCORD_URL')
logger(level=2, config=config, me=me, him=him, message=f"DISCORD_URL: %s" % discord_url) logger(level=2, config=config, me=me, him=him, message=f'DISCORD_URL: %s' % discord_url)
ntfy_url = os.getenv("NTFY_URL") ntfy_url = os.getenv('NTFY_URL')
logger(level=2, config=config, me=me, him=him, message=f"NTFY_URL: %s" % ntfy_url) logger(level=2, config=config, me=me, him=him, message=f'NTFY_URL: %s' % ntfy_url)
slack_url = os.getenv("SLACK_URL") slack_url = os.getenv('SLACK_URL')
logger(level=2, config=config, me=me, him=him, message=f"SLACK_URL: %s" % slack_url) logger(level=2, config=config, me=me, him=him, message=f'SLACK_URL: %s' % slack_url)
except Exception as err: except Exception as err:
# output error, and return with an error code # output error, and return with an error code
@ -121,7 +121,7 @@ def get_env() -> tuple:
def get_config() -> dict: def get_config() -> dict:
me: str = frame(0).f_code.co_name me: str = frame(0).f_code.co_name
him: str = frame(1).f_code.co_name him: str = frame(1).f_code.co_name
this_config_path: str = "" this_config_path: str = ''
config: dict = {} config: dict = {}
try: try:
@ -129,8 +129,8 @@ def get_config() -> dict:
with open(this_config_path, 'rb') as ntfier_config: with open(this_config_path, 'rb') as ntfier_config:
config: dict = tomli.load(ntfier_config) config: dict = tomli.load(ntfier_config)
except (FileNotFoundError, PermissionError, OSError): except (FileNotFoundError, PermissionError, OSError):
logger(level=2, config=config, me=me, him=him, message=f"Error accessing configuration: %s" % this_config_path) logger(level=2, config=config, me=me, him=him, message=f'Error accessing configuration: %s' % this_config_path)
logger(level=2, config=config, me=me, him=him, message=f"Reading TOML configuration file: %s" % this_config_path) logger(level=2, config=config, me=me, him=him, message=f'Reading TOML configuration file: %s' % this_config_path)
config['targets']: str = config.get('general').get('targets', 'discord, slack, ntfy') config['targets']: str = config.get('general').get('targets', 'discord, slack, ntfy')
config['full_alert']: bool = config.get('general').get('full_alert', False) config['full_alert']: bool = config.get('general').get('full_alert', False)
@ -157,7 +157,7 @@ def view_config() -> None:
with open(this_config_path, 'r') as ntfier_config: with open(this_config_path, 'r') as ntfier_config:
print(ntfier_config.read()) print(ntfier_config.read())
except (FileNotFoundError, PermissionError, OSError): except (FileNotFoundError, PermissionError, OSError):
print(f"%s does not exist or is not accessible" % this_config_path) print(f'%s does not exist or is not accessible' % this_config_path)
return return
@ -167,48 +167,48 @@ def get_arguments() -> dict:
him: str = frame(1).f_code.co_name him: str = frame(1).f_code.co_name
config: dict = get_config() config: dict = get_config()
logger(level=2, config=config, me=me, him=him, message="Configuration retrieved to dictionary") logger(level=2, config=config, me=me, him=him, message='Configuration retrieved to dictionary')
# Short options # Short options
options: str = "u:s:p:m:t:c:hv" options: str = 'u:s:p:m:t:c:hv'
# Long options # Long options
long_options: list = ["url=", long_options: list = ['url=',
"sender=", 'sender=',
"targets=", 'targets=',
"priority=", 'priority=',
"message=", 'message=',
"tags=", 'tags=',
"click=", 'click=',
"help", 'help',
"view" 'view'
] ]
help_text: str = """ help_text: str = '''
-u, --url is the url for the server, ending with a "/". -u, --url is the url for the server, ending with a '/'.
-s, --sender is the sender of the message, either an app name or a person. -s, --sender is the sender of the message, either an app name or a person.
-d, --targets is the list of platforms to send a message to (slack, ntfy, discord) -d, --targets is the list of platforms to send a message to (slack, ntfy, discord)
-p, --priority is the priority of the message, ranging from 1 (lowest), to 5 (highest). -p, --priority is the priority of the message, ranging from 1 (lowest), to 5 (highest).
-m, --message is the text of the message to be sent. -m, --message is the text of the message to be sent.
-t, --tags is an arbitrary strings of tags (keywords), seperated by a "," (comma). -t, --tags is an arbitrary strings of tags (keywords), seperated by a ',' (comma).
-c, --click is a link (URL) that can be followed by tapping/clicking inside the message. -c, --click is a link (URL) that can be followed by tapping/clicking inside the message.
-h, --help shows this help message. Must have no value argument. -h, --help shows this help message. Must have no value argument.
-v, --view show config. -v, --view show config.
""" '''
# Initialize some variables. # Initialize some variables.
url: str = "" url: str = ''
sender: str = "" sender: str = ''
targets: str = "" targets: str = ''
message: str = "" message: str = ''
priority: int = 0 priority: int = 0
tags: str = "" tags: str = ''
click: str = "" click: str = ''
# Fetch the arguments from the command line, skipping the first argument (name of the script). # Fetch the arguments from the command line, skipping the first argument (name of the script).
argument_list: list = sys.argv[1:] argument_list: list = sys.argv[1:]
logger(level=2, config=config, me=me, him=him, message=f"Found arguments: %s" % argument_list) logger(level=2, config=config, me=me, him=him, message=f'Found arguments: %s' % argument_list)
if not argument_list: if not argument_list:
logger(level=1, config=config, me=me, him=him, logger(level=1, config=config, me=me, him=him,
@ -227,35 +227,35 @@ def get_arguments() -> dict:
try: try:
# Parsing arguments # Parsing arguments
p_arguments, values = getopt.getopt(argument_list, options, long_options) p_arguments, values = getopt.getopt(argument_list, options, long_options)
logger(level=2, config=config, me=me, him=him, message="Parsing arguments") logger(level=2, config=config, me=me, him=him, message='Parsing arguments')
# Check each argument. Arguments that are present will override the defaults. # Check each argument. Arguments that are present will override the defaults.
for current_argument, current_value in p_arguments: for current_argument, current_value in p_arguments:
if current_argument in ("-h", "--help"): if current_argument in ('-h', '--help'):
print(help_text) print(help_text)
exit() exit()
elif current_argument in ("-v", "--view"): elif current_argument in ('-v', '--view'):
view_config() view_config()
exit() exit()
elif current_argument in ("-u", "--url"): elif current_argument in ('-u', '--url'):
url: str = current_value url: str = current_value
elif current_argument in ("-s", "--sender"): elif current_argument in ('-s', '--sender'):
sender: str = current_value sender: str = current_value
elif current_argument in ("-d", "--targets"): elif current_argument in ('-d', '--targets'):
targets: str = current_value targets: str = current_value
elif current_argument in ("-p", "--priority"): elif current_argument in ('-p', '--priority'):
priority: int = int(current_value) priority: int = int(current_value)
elif current_argument in ("-m", "--message"): elif current_argument in ('-m', '--message'):
message: str = current_value message: str = current_value
elif current_argument in ("-t", "--tags"): elif current_argument in ('-t', '--tags'):
tags: str = current_value tags: str = current_value
elif current_argument in ("-c", "--click"): elif current_argument in ('-c', '--click'):
click: str = current_value click: str = current_value
except getopt.error as err: except getopt.error as err:
# Output error, and return error code # Output error, and return error code
logger(level=0, config=config, me=me, him=him, message=f"Error during argument parsing: %s" % err) logger(level=0, config=config, me=me, him=him, message=f'Error during argument parsing: %s' % err)
logger(level=2, config=config, me=me, him=him, message="Arguments returned as dictionary") logger(level=2, config=config, me=me, him=him, message='Arguments returned as dictionary')
# Store the arguments in the arguments dictionary. # Store the arguments in the arguments dictionary.
arguments: dict = {'url': url, 'sender': sender, 'targets': targets, 'message': message, arguments: dict = {'url': url, 'sender': sender, 'targets': targets, 'message': message,
@ -276,21 +276,21 @@ def load_message() -> dict:
config: dict = get_config() config: dict = get_config()
# get alert from stdin # get alert from stdin
logger(level=2, config=config, me=me, him=him, message="Loading event message from STDIN") logger(level=2, config=config, me=me, him=him, message='Loading event message from STDIN')
input_str: str = "" input_str: str = ''
for line in sys.stdin: for line in sys.stdin:
input_str: str = line input_str: str = line
break break
data: json = json.loads(input_str) data: json = json.loads(input_str)
if data.get("command") == "add": if data.get('command') == 'add':
logger(level=1, config=config, me=me, him=him, message="Relevant event data found") logger(level=1, config=config, me=me, him=him, message='Relevant event data found')
return data return data
else: else:
# Event came in, but wasn't processed. # Event came in, but wasn't processed.
logger(level=0, config=config, me=me, him=him, message="Event data not found") logger(level=0, config=config, me=me, him=him, message='Event data not found')
sys.exit(1) sys.exit(1)
@ -301,14 +301,14 @@ def check_test_mode(config) -> dict:
if config.get('python').get('test_mode'): if config.get('python').get('test_mode'):
logger(level=1, config=config, me=me, him=him, logger(level=1, config=config, me=me, him=him,
message="Running in test mode: using test message wazuh-notify-test-event.json") message='Running in test mode: using test message wazuh-notify-test-event.json')
# Load the test event data. # Load the test event data.
home_path, _, _ = set_environment() home_path, _, _ = set_environment()
with (open(home_path + '/etc/wazuh-notify-test-event.json') as event_file): with (open(home_path + '/etc/wazuh-notify-test-event.json') as event_file):
data: dict = json.loads(event_file.read()) data: dict = json.loads(event_file.read())
else: else:
# We are running live. Load the data from the Wazuh process. # We are running live. Load the data from the Wazuh process.
logger(level=2, config=config, me=me, him=him, message="Running in live mode: using live message") logger(level=2, config=config, me=me, him=him, message='Running in live mode: using live message')
data: dict = load_message() data: dict = load_message()
return data return data
@ -323,19 +323,19 @@ def exclusions_check(config, alert) -> bool:
# Set some environment # Set some environment
now_message, now_logging, now_weekday, now_time = set_time_format() now_message, now_logging, now_weekday, now_time = set_time_format()
logger(level=2, config=config, me=me, him=him, logger(level=2, config=config, me=me, him=him,
message="Setting pretty datetime formats for notifications and logging.") message='Setting pretty datetime formats for notifications and logging.')
# Check the exclusion records from the configuration yaml. # Check the exclusion records from the configuration yaml.
logger(level=1, config=config, me=me, him=him, message="Checking if we are outside of the exclusion rules: ") logger(level=1, config=config, me=me, him=him, message='Checking if we are outside of the exclusion rules: ')
ex_hours: tuple = config.get('python').get('excluded_hours') ex_hours: tuple = config.get('python').get('excluded_hours')
# Start hour may not be later than end hours. End hour may not exceed 00:00 midnight to avoid day jump. # Start hour may not be later than end hours. End hour may not exceed 00:00 midnight to avoid day jump.
ex_hours: tuple = [ex_hours[0], "23:59"] if (ex_hours[1] >= '23:59' or ex_hours[1] < ex_hours[0]) else ex_hours ex_hours: tuple = [ex_hours[0], '23:59'] if (ex_hours[1] >= '23:59' or ex_hours[1] < ex_hours[0]) else ex_hours
# Get some more exclusion records from the config. # Get some more exclusion records from the config.
ex_days: str = config.get('python').get('excluded_days') ex_days: str = config.get('python').get('excluded_days')
ex_agents: str = config.get('general').get("excluded_agents") ex_agents: str = config.get('general').get('excluded_agents')
ex_rules: str = config.get('general').get("excluded_rules") ex_rules: str = config.get('general').get('excluded_rules')
# Check agent and rule from within the event. # Check agent and rule from within the event.
ev_agent = alert['agent']['id'] ev_agent = alert['agent']['id']
@ -346,21 +346,21 @@ def exclusions_check(config, alert) -> bool:
# Evaluate the conditions for sending a notification. Any False will cause the notification to be discarded. # Evaluate the conditions for sending a notification. Any False will cause the notification to be discarded.
if (now_time > ex_hours[0]) and (now_time < ex_hours[1]): if (now_time > ex_hours[0]) and (now_time < ex_hours[1]):
logger(level=2, config=config, me=me, him=him, message="excluded: event inside exclusion time frame") logger(level=2, config=config, me=me, him=him, message='excluded: event inside exclusion time frame')
ex_hours_eval = False ex_hours_eval = False
elif now_weekday in ex_days: elif now_weekday in ex_days:
logger(level=2, config=config, me=me, him=him, message="excluded: event inside excluded weekdays") logger(level=2, config=config, me=me, him=him, message='excluded: event inside excluded weekdays')
ex_weekday_eval = False ex_weekday_eval = False
elif ev_rule in ex_rules: elif ev_rule in ex_rules:
logger(level=2, config=config, me=me, him=him, message="excluded: event id inside exclusion list") logger(level=2, config=config, me=me, him=him, message='excluded: event id inside exclusion list')
ev_rule_eval = False ev_rule_eval = False
elif ev_agent in ex_agents: elif ev_agent in ex_agents:
logger(level=2, config=config, me=me, him=him, message="excluded: event agent inside exclusion list") logger(level=2, config=config, me=me, him=him, message='excluded: event agent inside exclusion list')
ev_rule_eval = False ev_rule_eval = False
notification_eval = True if (ex_hours_eval and ex_weekday_eval and ev_rule_eval and ev_agent_eval) else False notification_eval = True if (ex_hours_eval and ex_weekday_eval and ev_rule_eval and ev_agent_eval) else False
logger(level=1, config=config, me=me, him=him, logger(level=1, config=config, me=me, him=him,
message=f"Exclusion rules evaluated. Process event is %s" % notification_eval) message=f'Exclusion rules evaluated. Process event is %s' % notification_eval)
return notification_eval return notification_eval
@ -373,38 +373,38 @@ def threat_mapping(config, threat_level, fired_times):
# Map threat level to priority. Enters Homeland Security :-). # Map threat level to priority. Enters Homeland Security :-).
p_map = config.get('priority_map') p_map = config.get('priority_map')
logger(level=2, config=config, me=me, him=him, message=f"Threat mapping: priority mapped to: %s" % p_map) logger(level=2, config=config, me=me, him=him, message=f'Threat mapping: priority mapped to: %s' % p_map)
for i in range(len(p_map)): for i in range(len(p_map)):
logger(level=2, config=config, me=me, him=him, message=f"Threat mapping: list loop counter: %s" % i) logger(level=2, config=config, me=me, him=him, message=f'Threat mapping: list loop counter: %s' % i)
logger(level=2, config=config, me=me, him=him, message=f"Threat mapping: threat level found: %s" % threat_level) logger(level=2, config=config, me=me, him=him, message=f'Threat mapping: threat level found: %s' % threat_level)
if threat_level in p_map[i]["threat_map"]: if threat_level in p_map[i]['threat_map']:
color_mapping = p_map[i]["color"] color_mapping = p_map[i]['color']
priority_mapping = 5 - i priority_mapping = 5 - i
logger(level=2, config=config, me=me, him=him, message=f"Threat mapping: priority: %s" % priority_mapping) logger(level=2, config=config, me=me, him=him, message=f'Threat mapping: priority: %s' % priority_mapping)
logger(level=2, config=config, me=me, him=him, message=f"Threat mapping: color: %s" % color_mapping) logger(level=2, config=config, me=me, him=him, message=f'Threat mapping: color: %s' % color_mapping)
if fired_times >= p_map[i]["notify_threshold"]: if fired_times >= p_map[i]['notify_threshold']:
logger(level=2, config=config, me=me, him=him, logger(level=2, config=config, me=me, him=him,
message="The notification_threshold prevents this message from sending") message='The notification_threshold prevents this message from sending')
exit(0) exit(0)
if fired_times >= p_map[i]["mention_threshold"]: if fired_times >= p_map[i]['mention_threshold']:
# When this flag is set, Discord!! recipients get a stronger message (DM). # When this flag is set, Discord!! recipients get a stronger message (DM).
mention_flag = "@here" mention_flag = '@here'
logger(level=2, config=config, me=me, him=him, logger(level=2, config=config, me=me, him=him,
message=f"Threat mapping: mention flag: %s" % mention_flag) message=f'Threat mapping: mention flag: %s' % mention_flag)
else: else:
mention_flag = "" mention_flag = ''
logger(level=2, config=config, me=me, him=him, logger(level=2, config=config, me=me, him=him,
message=f"Threat level mapped as: priority: %s color: %s mention: %s" message=f'Threat level mapped as: priority: %s color: %s mention: %s'
% (priority_mapping, color_mapping, mention_flag)) % (priority_mapping, color_mapping, mention_flag))
return priority_mapping, color_mapping, mention_flag return priority_mapping, color_mapping, mention_flag
logger(level=0, config=config, me=me, him=him, message="Threat level mapping failed! Returning (99, 99, 99)") logger(level=0, config=config, me=me, him=him, message='Threat level mapping failed! Returning (99, 99, 99)')
return 99, 99, "99" return 99, 99, '99'
############################################################################################## ##############################################################################################
@ -418,13 +418,13 @@ def construct_message_body(caller, config, arguments, alert) -> str:
me: str = frame(0).f_code.co_name me: str = frame(0).f_code.co_name
him: str = frame(1).f_code.co_name him: str = frame(1).f_code.co_name
logger(level=2, config=config, me=me, him=him, message=f"%s: Constructing message body" % caller) logger(level=2, config=config, me=me, him=him, message=f'%s: Constructing message body' % caller)
# Include a specific control sequence for markdown bold parameter names. # Include a specific control sequence for markdown bold parameter names.
# todo To be fixed # todo To be fixed
md_map = config.get('markdown_emphasis', '') md_map = config.get('markdown_emphasis', '')
md_e = md_map[caller] md_e = md_map[caller]
logger(level=2, config=config, me=me, him=him, message=caller + "Emphasis string used: " + md_e) logger(level=2, config=config, me=me, him=him, message=caller + 'Emphasis string used: ' + md_e)
# If the --message (-m) argument was fulfilled, use this message to be sent. # If the --message (-m) argument was fulfilled, use this message to be sent.
if arguments['message']: if arguments['message']:
@ -433,16 +433,16 @@ def construct_message_body(caller, config, arguments, alert) -> str:
_, timestamp, _, _ = set_time_format() _, timestamp, _, _ = set_time_format()
message_body: str = \ message_body: str = \
( (
md_e + "Timestamp:" + md_e + " " + timestamp + "\n" + md_e + 'Timestamp:' + md_e + ' ' + timestamp + '\n' +
md_e + "Agent:" + md_e + " " + alert["agent"]["name"] + " (" + alert["agent"]["id"] + ")" + "\n" + md_e + 'Agent:' + md_e + ' ' + alert['agent']['name'] + ' (' + alert['agent']['id'] + ')' + '\n' +
md_e + "Rule id:" + md_e + " " + alert["rule"]["id"] + "\n" + md_e + 'Rule id:' + md_e + ' ' + alert['rule']['id'] + '\n' +
md_e + "Rule:" + md_e + " " + alert["rule"]["description"] + "\n" + md_e + 'Rule:' + md_e + ' ' + alert['rule']['description'] + '\n' +
md_e + "Description:" + md_e + " " + alert["full_log"] + "\n" + md_e + 'Description:' + md_e + ' ' + alert['full_log'] + '\n' +
md_e + "Threat level:" + md_e + " " + str(alert["rule"]["level"]) + "\n" + md_e + 'Threat level:' + md_e + ' ' + str(alert['rule']['level']) + '\n' +
md_e + "Times fired:" + md_e + " " + str(alert["rule"]["firedtimes"]) + "\n" md_e + 'Times fired:' + md_e + ' ' + str(alert['rule']['firedtimes']) + '\n'
) )
logger(level=2, config=config, me=me, him=him, logger(level=2, config=config, me=me, him=him,
message=f"%s basic message constructed.\n %s" % (caller, message_body)) message=f'%s basic message constructed.\n %s' % (caller, message_body))
return message_body return message_body
@ -452,19 +452,19 @@ def prepare_payload(caller, config, arguments, message_body, alert, priority):
me: str = frame(0).f_code.co_name me: str = frame(0).f_code.co_name
him: str = frame(1).f_code.co_name him: str = frame(1).f_code.co_name
logger(level=2, config=config, me=me, him=him, message="Payload being constructed.") logger(level=2, config=config, me=me, him=him, message='Payload being constructed.')
md_map = config.get('markdown_emphasis', '') md_map = config.get('markdown_emphasis', '')
md_e = md_map[caller] md_e = md_map[caller]
logger(level=2, config=config, me=me, him=him, message=f"%s: emphasis string used: %s" % (caller, md_e)) logger(level=2, config=config, me=me, him=him, message=f'%s: emphasis string used: %s' % (caller, md_e))
priority: str = str(priority) priority: str = str(priority)
tags = (str(alert['rule']['groups']).replace("[", "") tags = (str(alert['rule']['groups']).replace('[', '')
.replace("]", "") .replace(']', '')
.replace("'", "") .replace("'", '')
.replace(",", ", ") .replace(',', ', ')
) )
logger(level=2, config=config, me=me, him=him, message="Full event formatted.") logger(level=2, config=config, me=me, him=him, message='Full event formatted.')
full_event: str = str(json.dumps(alert, indent=4) full_event: str = str(json.dumps(alert, indent=4)
.replace('"', '') .replace('"', '')
@ -484,20 +484,20 @@ def prepare_payload(caller, config, arguments, message_body, alert, priority):
click = arguments['click'] if arguments['click'] else click click = arguments['click'] if arguments['click'] else click
# Add the full alert data to the notification. # Add the full alert data to the notification.
if caller in config["full_alert"]: if caller in config['full_alert']:
logger(level=2, config=config, me=me, him=him, message=caller + "Full alert data will be attached.") logger(level=2, config=config, me=me, him=him, message=caller + 'Full alert data will be attached.')
# Add the full alert data to the notification body # Add the full alert data to the notification body
notification_body: str = ("\n\n" + message_body + "\n" + notification_body: str = ('\n\n' + message_body + '\n' +
md_e + "__Full event__" + md_e + "\n" + "```\n" + full_event + "```") md_e + '__Full event__' + md_e + '\n' + '```\n' + full_event + '```')
else: else:
notification_body: str = message_body notification_body: str = message_body
# Add priority & tags to the notification body # Add priority & tags to the notification body
notification_body = (notification_body + "\n\n" + md_e + "Priority:" + md_e + " " + str(priority) + "\n" + notification_body = (notification_body + '\n\n' + md_e + 'Priority:' + md_e + ' ' + str(priority) + '\n' +
md_e + "Tags:" + md_e + " " + tags + "\n\n" + click) md_e + 'Tags:' + md_e + ' ' + tags + '\n\n' + click)
logger(level=2, config=config, me=me, him=him, message="Adding priority and tags") logger(level=2, config=config, me=me, him=him, message='Adding priority and tags')
config["targets"] = arguments['targets'] if arguments['targets'] != "" else config["targets"] config['targets'] = arguments['targets'] if arguments['targets'] != '' else config['targets']
return notification_body, click, sender return notification_body, click, sender
@ -512,15 +512,15 @@ def build_discord_notification(config, notification_body, color, mention, sender
# The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging.
me: str = frame(0).f_code.co_name me: str = frame(0).f_code.co_name
him: str = frame(1).f_code.co_name him: str = frame(1).f_code.co_name
logger(level=2, config=config, me=me, him=him, message="Discord payload created") logger(level=2, config=config, me=me, him=him, message='Discord payload created')
payload_json = {"username": sender, payload_json = {'username': sender,
"content": mention, 'content': mention,
"embeds": [{"description": notification_body, 'embeds': [{'description': notification_body,
"color": color, 'color': color,
"title": sender}]} 'title': sender}]}
logger(level=2, config=config, me=me, him=him, message="Discord notification built") logger(level=2, config=config, me=me, him=him, message='Discord notification built')
return "", "", payload_json return '', '', payload_json
# Build the notification for this specific platform. # Build the notification for this specific platform.
@ -528,15 +528,15 @@ def build_ntfy_notification(config, notification_body, priority, click, sender):
# The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging.
me: str = frame(0).f_code.co_name me: str = frame(0).f_code.co_name
him: str = frame(1).f_code.co_name him: str = frame(1).f_code.co_name
logger(level=2, config=config, me=me, him=him, message="Ntfy payloads created") logger(level=2, config=config, me=me, him=him, message='Ntfy payloads created')
payload_data = notification_body payload_data = notification_body
payload_headers = {"Markdown": "yes", payload_headers = {'Markdown': 'yes',
"Title": sender, 'Title': sender,
"Priority": str(priority), 'Priority': str(priority),
"Click": click} 'Click': click}
logger(level=2, config=config, me=me, him=him, message="Ntfy notification built") logger(level=2, config=config, me=me, him=him, message='Ntfy notification built')
return payload_headers, payload_data, "" return payload_headers, payload_data, ''
# Build the notification for this specific platform. # Build the notification for this specific platform.
@ -544,15 +544,15 @@ def build_slack_notification(config, notification_body, color, mention, sender):
# The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging.
me: str = frame(0).f_code.co_name me: str = frame(0).f_code.co_name
him: str = frame(1).f_code.co_name him: str = frame(1).f_code.co_name
logger(level=2, config=config, me=me, him=him, message="Slack payload created") logger(level=2, config=config, me=me, him=him, message='Slack payload created')
payload_json = {"username": sender, payload_json = {'username': sender,
"content": mention, 'content': mention,
"embeds": [{"description": notification_body, 'embeds': [{'description': notification_body,
"color": color, 'color': color,
"title": sender}]} 'title': sender}]}
logger(level=2, config=config, me=me, him=him, message="Slack notification built") logger(level=2, config=config, me=me, him=him, message='Slack notification built')
return "", "", payload_json return '', '', payload_json
# Handle the complete notification generation for this specific platform. # Handle the complete notification generation for this specific platform.
@ -561,7 +561,7 @@ def handle_discord_notification(config, arguments, alert, color, priority, menti
me: str = frame(0).f_code.co_name me: str = frame(0).f_code.co_name
him: str = frame(1).f_code.co_name him: str = frame(1).f_code.co_name
logger(level=1, config=config, me=me, him=him, message="Process discord notification: start") logger(level=1, config=config, me=me, him=him, message='Process discord notification: start')
# Load the url/webhook from the configuration. # Load the url/webhook from the configuration.
discord_url, _, _ = get_env() discord_url, _, _ = get_env()
@ -578,7 +578,7 @@ def handle_discord_notification(config, arguments, alert, color, priority, menti
_, _, payload_json = build_discord_notification(config=config, notification_body=notification_body, color=color, _, _, payload_json = build_discord_notification(config=config, notification_body=notification_body, color=color,
mention=mention, sender=sender) mention=mention, sender=sender)
logger(level=1, config=config, me=me, him=him, message="Process discord notification: done") logger(level=1, config=config, me=me, him=him, message='Process discord notification: done')
return payload_json, discord_url return payload_json, discord_url
@ -588,7 +588,7 @@ def handle_ntfy_notification(config, arguments, alert, priority):
me: str = frame(0).f_code.co_name me: str = frame(0).f_code.co_name
him: str = frame(1).f_code.co_name him: str = frame(1).f_code.co_name
logger(level=1, config=config, me=me, him=him, message="Process ntfy notification: start") logger(level=1, config=config, me=me, him=him, message='Process ntfy notification: start')
# Load the url/webhook from the configuration. # Load the url/webhook from the configuration.
_, ntfy_url, _ = get_env() _, ntfy_url, _ = get_env()
@ -598,7 +598,7 @@ def handle_ntfy_notification(config, arguments, alert, priority):
message_body: str = construct_message_body(caller='ntfy', config=config, arguments=arguments, alert=alert) message_body: str = construct_message_body(caller='ntfy', config=config, arguments=arguments, alert=alert)
# Special blank line after the title of the message. # Special blank line after the title of the message.
message_body = "&nbsp;\n" + message_body message_body = '&nbsp;\n' + message_body
# Common preparation of the notification. # Common preparation of the notification.
notification_body, click, sender = prepare_payload(caller='ntfy', config=config, arguments=arguments, notification_body, click, sender = prepare_payload(caller='ntfy', config=config, arguments=arguments,
@ -608,7 +608,7 @@ def handle_ntfy_notification(config, arguments, alert, priority):
payload_headers, payload_data, _ = build_ntfy_notification(config=config, notification_body=notification_body, payload_headers, payload_data, _ = build_ntfy_notification(config=config, notification_body=notification_body,
priority=priority, click=click, sender=sender) priority=priority, click=click, sender=sender)
logger(level=1, config=config, me=me, him=him, message="Process ntfy notification: done") logger(level=1, config=config, me=me, him=him, message='Process ntfy notification: done')
return payload_data, payload_headers, ntfy_url return payload_data, payload_headers, ntfy_url
@ -618,7 +618,7 @@ def handle_slack_notification(config, arguments, alert, color, priority, mention
me: str = frame(0).f_code.co_name me: str = frame(0).f_code.co_name
him: str = frame(1).f_code.co_name him: str = frame(1).f_code.co_name
logger(level=1, config=config, me=me, him=him, message="Process slack notification: start") logger(level=1, config=config, me=me, him=him, message='Process slack notification: start')
# Load the url/webhook from the configuration. # Load the url/webhook from the configuration.
_, _, slack_url = get_env() _, _, slack_url = get_env()
@ -635,5 +635,5 @@ def handle_slack_notification(config, arguments, alert, color, priority, mention
_, _, payload_json = build_slack_notification(config=config, notification_body=notification_body, color=color, _, _, payload_json = build_slack_notification(config=config, notification_body=notification_body, color=color,
mention=mention, sender=sender) mention=mention, sender=sender)
logger(level=1, config=config, me=me, him=him, message="Process slack notification: done") logger(level=1, config=config, me=me, him=him, message='Process slack notification: done')
return payload_json, slack_url return payload_json, slack_url