From 6c267f555fe408f319c19ed31876dd8ec8824c7a Mon Sep 17 00:00:00 2001 From: Rudi Klein Date: Wed, 29 May 2024 20:37:53 +0200 Subject: [PATCH] Code improvement --- wazuh-notify-python/wazuh-notify.py | 101 ++----- wazuh-notify-python/wazuh_notify_module.py | 322 ++++++++++++++------- 2 files changed, 239 insertions(+), 184 deletions(-) diff --git a/wazuh-notify-python/wazuh-notify.py b/wazuh-notify-python/wazuh-notify.py index 42afdb9..637c918 100755 --- a/wazuh-notify-python/wazuh-notify.py +++ b/wazuh-notify-python/wazuh-notify.py @@ -20,16 +20,17 @@ def main(): # Load the TOML config. config: dict = get_config() + logger(0, config, me, him, "############ Processing event ###############################") # Get the arguments used with running the script. arguments = get_arguments() - # Check for test mode. Use test data if true - data = check_test_mode(config) + # Check for test mode. Use test data if true. + event_data = check_test_mode(config) - # Extract the 'alert' section of the (JSON) event - alert = data["parameters"]["alert"] + # Extract the 'alert' section of the (JSON) event. + alert = event_data["parameters"]["alert"] logger(2, config, me, him, "Extracting data from the event") # Check the config for any exclusion rules and abort when excluded. @@ -42,86 +43,32 @@ def main(): config["targets"] = arguments['targets'] if arguments['targets'] != "" else config["targets"] # Prepare the messaging platform specific notification and execute if configured. + + # Discord notification handler if "discord" in config["targets"]: - # Show me some ID! Stop resisting! - caller = "discord" - me = frame(0).f_code.co_name - him = frame(1).f_code.co_name - - # Load the url/webhook from the configuration. - discord_url, _, _ = get_env() - discord_url = arguments['url'] if arguments['url'] else discord_url - - # Build the basic message content. - message_body: str = construct_message_body(caller, config, arguments, alert) - - # Common preparation of the notification. - notification_body, click, sender = prepare_payload(caller, config, arguments, message_body, alert, priority) - - # Build the payload(s) for the POST request. - _, _, payload_json = build_discord_notification(caller, config, notification_body, color, mention, sender) - - # Build the notification to be sent. - build_discord_notification(caller, config, notification_body, color, mention, sender) - + payload_json, discord_url = handle_discord_notification(config=config, arguments=arguments, alert=alert, + color=color, priority=priority, mention=mention) # POST the notification through requests. - discord_result = requests.post(discord_url, json=payload_json) - logger(1, config, me, him, caller + " notification constructed and sent: " + str(discord_result)) + discord_result = requests.post(url=discord_url, json=payload_json) + logger(1, config, me, him, "Discord notification constructed and sent: " + str(discord_result)) + # ntfy.sh notification handler if "ntfy" in config["targets"]: - # Show me some ID! Stop resisting! - caller = "ntfy" - me = frame(0).f_code.co_name - him = frame(1).f_code.co_name - - # Load the url/webhook from the configuration. - _, ntfy_url, _ = get_env() - ntfy_url = arguments['url'] if arguments['url'] else ntfy_url - - # Build the basic message content. - message_body: str = construct_message_body(caller, config, arguments, alert) - - # Common preparation of the notification. - notification_body, click, sender = prepare_payload(caller, config, arguments, message_body, alert, priority) - - # Build the payload(s) for the POST request. - payload_headers, payload_data, _ = build_ntfy_notification(caller, config, notification_body, color, mention, - sender) - - # Build the notification to be sent. - build_ntfy_notification(caller, config, notification_body, priority, click, sender) - + payload_data, payload_headers, ntfy_url = handle_ntfy_notification(config=config, arguments=arguments, + alert=alert, priority=priority) # POST the notification through requests. - ntfy_result = requests.post(ntfy_url, data=payload_data, headers=payload_headers) - logger(1, config, me, him, caller + " notification constructed and sent: " + str(ntfy_result)) + ntfy_result = requests.post(url=ntfy_url, data=payload_data, headers=payload_headers) + logger(1, config, me, him, "Ntfy notification constructed and sent: " + str(ntfy_result)) - # if "slack" in config["targets"]: - # # Show me some ID! Stop resisting! - # caller = "slack" - # me = frame(0).f_code.co_name - # him = frame(1).f_code.co_name - # - # # Load the url/webhook from the configuration. - # _, _, slack_url = get_env() - # slack_url = arguments['url'] if arguments['url'] else slack_url - # - # # Build the basic message content. - # message_body: str = construct_message_body(caller, config, arguments, data) - # - # # Common preparation of the notification. - # notification_body, click, sender = prepare_payload(caller, config, arguments, message_body, alert, - # priority) - # # Build the payload(s) for the POST request. - # _, _, payload_json = build_slack_notification(caller, config, notification_body, priority, color, mention, - # click, sender) - # - # # Build the notification to be sent. - # build_slack_notification(caller, config, notification_body, priority, click, sender) - # - # result = requests.post(slack_url, headers={'Content-Type': 'application/json'}, json=payload_json) - # - # logger(1, config, me, him, caller + " notification constructed and sent: " + str(result)) + # Slack notification handler + if "slack" in config["targets"]: + payload_json, slack_url = handle_slack_notification(config=config, arguments=arguments, alert=alert, + color=color, priority=priority, mention=mention) + slack_result = requests.post(url=slack_url, headers={'Content-Type': 'application/json'}, json=payload_json) + logger(1, config, me, him, "Slack notification constructed and sent: " + str(slack_result)) + + # The end of processing logger(0, config, me, him, "############ Event processed ################################") exit(0) diff --git a/wazuh-notify-python/wazuh_notify_module.py b/wazuh-notify-python/wazuh_notify_module.py index c508db8..021b2ad 100755 --- a/wazuh-notify-python/wazuh_notify_module.py +++ b/wazuh-notify-python/wazuh_notify_module.py @@ -16,6 +16,12 @@ from dotenv import load_dotenv # log_path = wazuh-notify.log path, # config_path = wazuh-notify-config.yaml + +############################################################################################## +# General process environment handlers # +############################################################################################## + + def set_environment() -> tuple: set_wazuh_path = os.path.abspath(os.path.join(__file__, "../..")) # set_wazuh_path = os.path.abspath(os.path.join(__file__, "../../..")) @@ -30,7 +36,7 @@ wazuh_path, log_path, config_path = set_environment() # Set structured timestamps for notifications. -def set_time_format(): +def set_time_format() -> tuple: now_message = time.strftime('%A, %d %b %Y %H:%M:%S') now_logging = time.strftime('%Y-%m-%d %H:%M:%S') now_time = time.strftime('%H:%M') @@ -40,15 +46,14 @@ def set_time_format(): # Logger: print to console and/or log to file -def logger(level, config, me, him, message): +def logger(level, config, me, him, message) -> None: _, now_logging, _, _ = set_time_format() - logger_wazuh_path = os.path.abspath(os.path.join(__file__, "../../..")) - logger_log_path = '{0}/logs/wazuh-notify.log'.format(logger_wazuh_path) + logger_wazuh_path, logger_log_path, _ = set_environment() # When logging from main(), the destination function is called "". For cosmetic reasons rename to "main". - him = 'main' if him == '' else him - log_line = f'{now_logging} | {level} | {me: <26} | {him: <13} | {message}' + him: str = 'main' if him == '' else him + log_line: str = f'{now_logging} | {level} | {me: <27} | {him: <27} | {message}' # Compare the extended_print log level in the configuration to the log level of the message. if config.get('python').get('extended_print', 0) >= level: @@ -60,18 +65,19 @@ def logger(level, config, me, him, message): log_file.write(log_line + "\n") except (FileNotFoundError, PermissionError, OSError): # Special message to console when logging to file fails and console logging might not be set. - log_line = f'{now_logging} | {level} | {me: <26} | {him: <13} | error opening log file: {logger_log_path}' + log_line: str = f'{now_logging} | {level} | {me: <27} | {him: <17} | error opening log file: {logger_log_path}' print(log_line) # Get the content of the .env file (url's and/or webhooks). -def get_env(): +def get_env() -> tuple: # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. - me = frame(0).f_code.co_name - him = frame(1).f_code.co_name + me: str = frame(0).f_code.co_name + him: str = frame(1).f_code.co_name # Write the configuration to a dictionary. config: dict = get_config() + logger(2, config, me, him, "Configuration retrieved to dictionary") # Check if the secrets .env file is available. try: @@ -83,8 +89,11 @@ def get_env(): # Retrieve URLs from .env discord_url = os.getenv("DISCORD_URL") + logger(2, config, me, him, "DISCORD_URL: " + discord_url) ntfy_url = os.getenv("NTFY_URL") + logger(2, config, me, him, "NTFY_URL: " + ntfy_url) slack_url = os.getenv("SLACK_URL") + logger(2, config, me, him, "SLACK_URL: " + slack_url) except Exception as err: # output error, and return with an error code @@ -96,41 +105,40 @@ def get_env(): # Read and process configuration settings from wazuh-notify-config.yaml and create dictionary. -def get_config(): +def get_config() -> dict: # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. - me = frame(0).f_code.co_name - him = frame(1).f_code.co_name + me: str = frame(0).f_code.co_name + him: str = frame(1).f_code.co_name this_config_path: str = "" config: dict = {} - # logger(2, config, me, him, "Loading TOML configuration") try: _, _, this_config_path = set_environment() with open(this_config_path, 'rb') as ntfier_config: config: dict = tomli.load(ntfier_config) except (FileNotFoundError, PermissionError, OSError): logger(2, config, me, him, "Error accessing configuration file: " + this_config_path) - logger(2, config, me, him, "Reading configuration file: " + this_config_path) + logger(2, config, me, him, "Reading TOML configuration file: " + this_config_path) - config['targets'] = config.get('general').get('targets', 'discord, slack, ntfy') - config['full_alert'] = config.get('general').get('full_alert', False) - config['excluded_rules'] = config.get('general').get('excluded_rules', '') - config['excluded_agents'] = config.get('general').get('excluded_agents', '') - config['priority_map'] = config.get('priority_map', []) - config['sender'] = config.get('general').get('sender', 'Wazuh (IDS)') - config['click'] = config.get('general').get('click', 'https://wazuh.com') - config['md_e'] = config.get('general').get('markdown_emphasis', '') - config['excluded_days'] = config.get('python').get('excluded_days', '') - config['excluded_hours'] = config.get('python').get('excluded_hours', '') - config['test_mode'] = config.get('python').get('test_mode', False) - config['extended_logging'] = config.get('python').get('extended_logging', 0) - config['extended_print'] = config.get('python').get('extended_print', 0) + config['targets']: str = config.get('general').get('targets', 'discord, slack, ntfy') + config['full_alert']: bool = config.get('general').get('full_alert', False) + config['excluded_rules']: str = config.get('general').get('excluded_rules', '') + config['excluded_agents']: str = config.get('general').get('excluded_agents', '') + config['priority_map']: dict = config.get('priority_map', []) + config['sender']: str = config.get('general').get('sender', 'Wazuh (IDS)') + config['click']: str = config.get('general').get('click', 'https://wazuh.com') + config['md_e']: str = config.get('general').get('markdown_emphasis', '') + config['excluded_days']: list = config.get('python').get('excluded_days', '') + config['excluded_hours']: list = config.get('python').get('excluded_hours', '') + config['test_mode']: bool = config.get('python').get('test_mode', False) + config['extended_logging']: int = config.get('python').get('extended_logging', 0) + config['extended_print']: int = config.get('python').get('extended_print', 0) return config # Show configuration settings from wazuh-notify-config.yaml -def view_config(): +def view_config() -> None: _, _, this_config_path, _ = set_environment() try: @@ -142,13 +150,14 @@ def view_config(): # Get script arguments during execution. Params found here override config settings. -def get_arguments(): +def get_arguments() -> dict: # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. - me = frame(0).f_code.co_name - him = frame(1).f_code.co_name + me: str = frame(0).f_code.co_name + him: str = frame(1).f_code.co_name # Retrieve the configuration information config: dict = get_config() + logger(2, config, me, him, "Configuration retrieved to dictionary") # Short options options: str = "u:s:p:m:t:c:hv" @@ -205,10 +214,9 @@ def get_arguments(): return arguments else: try: - logger(2, config, me, him, "Parsing arguments") - # Parsing arguments p_arguments, values = getopt.getopt(argument_list, options, long_options) + logger(2, config, me, him, "Parsing arguments") # Check each argument. Arguments that are present will override the defaults. for current_argument, current_value in p_arguments: @@ -244,11 +252,16 @@ def get_arguments(): return arguments +############################################################################################## +# Wazuh event handling # +############################################################################################## + + # Receive and load message from Wazuh -def load_message(): +def load_message() -> dict: # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. - me = frame(0).f_code.co_name - him = frame(1).f_code.co_name + me: str = frame(0).f_code.co_name + him: str = frame(1).f_code.co_name config: dict = get_config() # get alert from stdin @@ -271,9 +284,9 @@ def load_message(): # Check if we are in test mode (test_mode setting in config yaml). If so, load test event instead of live event. -def check_test_mode(config): - me = frame(0).f_code.co_name - him = frame(1).f_code.co_name +def check_test_mode(config) -> dict: + me: str = frame(0).f_code.co_name + him: str = frame(1).f_code.co_name if config.get('python').get('test_mode'): logger(1, config, me, him, "Running in test mode: using test message wazuh-notify-test-event.json") @@ -284,30 +297,32 @@ def check_test_mode(config): else: # We are running live. Load the data from the Wazuh process. logger(2, config, me, him, "Running in live mode: using live message") - data = load_message() + data: dict = load_message() + return data # Check if there are reasons not to process this event. Check exclusions for rules, agents, days and hours. -def exclusions_check(config, alert): +def exclusions_check(config, alert) -> None: # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. - me = frame(0).f_code.co_name - him = frame(1).f_code.co_name + me: str = frame(0).f_code.co_name + him: str = frame(1).f_code.co_name # Set some environment now_message, now_logging, now_weekday, now_time = set_time_format() + logger(2, config, me, him, "Setting pretty datetime formats for notifications and logging.") # Check the exclusion records from the configuration yaml. logger(1, config, me, him, "Checking if we are outside of the exclusion rules: ") ex_hours: tuple = config.get('python').get('excluded_hours') # Start hour may not be later than end hours. End hour may not exceed 00:00 midnight to avoid day jump. - ex_hours = [ex_hours[0], "23:59"] if (ex_hours[1] >= '23:59' or ex_hours[1] < ex_hours[0]) else ex_hours + ex_hours: tuple = [ex_hours[0], "23:59"] if (ex_hours[1] >= '23:59' or ex_hours[1] < ex_hours[0]) else ex_hours # Get some more exclusion records from the config. - ex_days = config.get('python').get('excluded_days') - ex_agents = config.get('general').get("excluded_agents") - ex_rules = config.get('general').get("excluded_rules") + ex_days: str = config.get('python').get('excluded_days') + ex_agents: str = config.get('general').get("excluded_agents") + ex_rules: str = config.get('general').get("excluded_rules") # Check agent and rule from within the event. ev_agent = alert['agent']['id'] @@ -323,7 +338,6 @@ def exclusions_check(config, alert): elif now_weekday in ex_days: logger(2, config, me, him, "excluded: event inside excluded weekdays") ex_weekday_eval = False - elif ev_rule in ex_rules: logger(2, config, me, him, "excluded: event id inside exclusion list") ev_rule_eval = False @@ -332,7 +346,7 @@ def exclusions_check(config, alert): ev_rule_eval = False notification_eval = True if (ex_hours_eval and ex_weekday_eval and ev_rule_eval and ev_agent_eval) else False - logger(1, config, me, him, "Exclusion rules evaluated. Final decision: " + str(notification_eval)) + logger(1, config, me, him, "Exclusion rules evaluated. Process event is " + str(notification_eval)) if not notification_eval: # The event was excluded by the exclusion rules in the configuration. @@ -341,52 +355,67 @@ def exclusions_check(config, alert): else: # The event was not excluded by the exclusion rules in the configuration. Keep processing. logger(2, config, me, him, "Event NOT excluded, notification will be sent") + return # Map the event threat level to the appropriate 5-level priority scale and color for use in the notification platforms. def threat_mapping(config, threat_level, fired_times): # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. - me = frame(0).f_code.co_name - him = frame(1).f_code.co_name + me: str = frame(0).f_code.co_name + him: str = frame(1).f_code.co_name # Map threat level to priority. Enters Homeland Security :-). p_map = config.get('priority_map') - logger(2, config, me, him, "Prio map: " + str(p_map)) + logger(2, config, me, him, "Threat mapping: priority mapped to: " + str(p_map)) for i in range(len(p_map)): - logger(2, config, me, him, "Loop: " + str(i)) - logger(2, config, me, him, "Level: " + str(threat_level)) + logger(2, config, me, him, "Threat mapping: list loop counter: " + str(i)) + logger(2, config, me, him, "Threat mapping: threat level found: " + str(threat_level)) if threat_level in p_map[i]["threat_map"]: color_mapping = p_map[i]["color"] priority_mapping = 5 - i - logger(2, config, me, him, "Prio: " + str(priority_mapping)) - logger(2, config, me, him, "Color: " + str(color_mapping)) + logger(2, config, me, him, "Threat mapping: priority: " + str(priority_mapping)) + logger(2, config, me, him, "Threat mapping: color: " + str(color_mapping)) + + if fired_times >= p_map[i]["notify_threshold"]: + logger(2, config, me, him, "The notification_threshold prevents this message from sending") + exit(0) if fired_times >= p_map[i]["mention_threshold"]: - # When this flag is set, Discord recipients get a stronger message (DM). + # When this flag is set, Discord!! recipients get a stronger message (DM). mention_flag = "@here" + logger(2, config, me, him, "Threat mapping: mention flag: " + str(mention_flag)) else: mention_flag = "" - logger(2, config, me, him, "Threat level mapped as: " + - "prio:" + str(priority_mapping) + " color: " + str(color_mapping) + " mention: " + mention_flag) + logger(2, config, me, him, "Threat level mapped as: " + "priority:" + str(priority_mapping) + + " color: " + str(color_mapping) + " mention: " + str(mention_flag)) + return priority_mapping, color_mapping, mention_flag logger(0, config, me, him, "Threat level mapping failed! Returning garbage (99, 99, 99)") return 99, 99, "99" +############################################################################################## +# Common notification preparation # +############################################################################################## + + # Construct the message that will be sent to the notifier platforms. -def construct_message_body(caller, config, arguments, data: dict) -> str: +def construct_message_body(caller, config, arguments, alert) -> str: # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. - me = frame(0).f_code.co_name - him = frame(1).f_code.co_name + me: str = frame(0).f_code.co_name + him: str = frame(1).f_code.co_name + + logger(2, config, me, him, caller + ": Constructing message body") # Include a specific control sequence for markdown bold parameter names. # todo To be fixed md_map = config.get('markdown_emphasis', '') md_e = md_map[caller] + logger(2, config, me, him, caller + "Emphasis string used: " + md_e) # If the --message (-m) argument was fulfilled, use this message to be sent. if arguments['message']: @@ -396,30 +425,29 @@ def construct_message_body(caller, config, arguments, data: dict) -> str: message_body: str = \ ( md_e + "Timestamp:" + md_e + " " + timestamp + "\n" + - md_e + "Agent:" + md_e + " " + data["agent"]["name"] + " (" + data["agent"]["id"] + ")" + "\n" + - md_e + "Rule id:" + md_e + " " + data["rule"]["id"] + "\n" + - md_e + "Rule:" + md_e + " " + data["rule"]["description"] + "\n" + - md_e + "Description:" + md_e + " " + data["full_log"] + "\n" + - md_e + "Threat level:" + md_e + " " + str(data["rule"]["level"]) + "\n" + - md_e + "Times fired:" + md_e + " " + str(data["rule"]["firedtimes"]) + "\n" + md_e + "Agent:" + md_e + " " + alert["agent"]["name"] + " (" + alert["agent"]["id"] + ")" + "\n" + + md_e + "Rule id:" + md_e + " " + alert["rule"]["id"] + "\n" + + md_e + "Rule:" + md_e + " " + alert["rule"]["description"] + "\n" + + md_e + "Description:" + md_e + " " + alert["full_log"] + "\n" + + md_e + "Threat level:" + md_e + " " + str(alert["rule"]["level"]) + "\n" + + md_e + "Times fired:" + md_e + " " + str(alert["rule"]["firedtimes"]) + "\n" ) - - logger(2, config, me, him, caller + " basic message constructed.") + logger(2, config, me, him, caller + " basic message constructed. Returning: " + message_body) return message_body # Construct the notification (message + additional information) that will be sent to the notifier platforms. def prepare_payload(caller, config, arguments, message_body, alert, priority): # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. - me = frame(0).f_code.co_name - him = frame(1).f_code.co_name + me: str = frame(0).f_code.co_name + him: str = frame(1).f_code.co_name - logger(2, config, me, him, "Notification being constructed.") + logger(2, config, me, him, "Payload being constructed.") md_map = config.get('markdown_emphasis', '') md_e = md_map[caller] + logger(2, config, me, him, caller + "Emphasis string used: " + md_e) - click: str = config.get('general').get('click', 'https://wazuh.com') priority: str = str(priority) tags = (str(alert['rule']['groups']).replace("[", "") .replace("]", "") @@ -464,58 +492,138 @@ def prepare_payload(caller, config, arguments, message_body, alert, priority): return notification_body, click, sender -def build_discord_notification(caller, config, notification_body, color, mention, sender): +############################################################################################## +# Platform specific notification creation and handling # +############################################################################################## + + +# Build the notification for this specific platform. +def build_discord_notification(config, notification_body, color, mention, sender): # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. - me = frame(0).f_code.co_name - him = frame(1).f_code.co_name - logger(2, config, me, him, caller + " payload created") + me: str = frame(0).f_code.co_name + him: str = frame(1).f_code.co_name + logger(2, config, me, him, "Discord payload created") payload_json = {"username": sender, "content": mention, "embeds": [{"description": notification_body, "color": color, "title": sender}]} - logger(2, config, me, him, caller + " notification built") + logger(2, config, me, him, "Discord notification built") return "", "", payload_json -def build_ntfy_notification(caller, config, notification_body, priority, click, sender): +# Build the notification for this specific platform. +def build_ntfy_notification(config, notification_body, priority, click, sender): # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. - me = frame(0).f_code.co_name - him = frame(1).f_code.co_name - logger(2, config, me, him, caller + " payloads created") - - # if caller == "ntfy": - # todo Check this out - # basic_msg = " \n" + basic_msg + me: str = frame(0).f_code.co_name + him: str = frame(1).f_code.co_name + logger(2, config, me, him, "Ntfy payloads created") payload_data = notification_body payload_headers = {"Markdown": "yes", "Title": sender, "Priority": str(priority), "Click": click} - logger(2, config, me, him, caller + " notification built") + logger(2, config, me, him, "Ntfy notification built") return payload_headers, payload_data, "" -def build_slack_notification(caller, config, arguments, notification_body, priority, color, mention, click, sender): +# Build the notification for this specific platform. +def build_slack_notification(config, notification_body, color, mention, sender): # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. - me = frame(0).f_code.co_name - him = frame(1).f_code.co_name - sender: str = config.get('general').get('sender', 'Wazuh (IDS)') - sender = arguments['sender'] if arguments['sender'] else sender - - logger(2, config, me, him, caller + " payloads created") - - # todo Need some investigation. - - payload_json = {"text": notification_body} - # payload_json = {"username": sender, - # "content": mention, - # "embeds": [{"description": notification, - # "color": color, - # "title": sender}]} - - logger(2, config, me, him, caller + " notification built") + me: str = frame(0).f_code.co_name + him: str = frame(1).f_code.co_name + logger(2, config, me, him, "Slack payload created") + payload_json = {"username": sender, + "content": mention, + "embeds": [{"description": notification_body, + "color": color, + "title": sender}]} + logger(2, config, me, him, "Slack notification built") return "", "", payload_json + + +# Handle the complete notification generation for this specific platform. +def handle_discord_notification(config, arguments, alert, color, priority, mention): + # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. + me: str = frame(0).f_code.co_name + him: str = frame(1).f_code.co_name + + logger(1, config, me, him, "Process discord notification: start") + + # Load the url/webhook from the configuration. + discord_url, _, _ = get_env() + discord_url = arguments['url'] if arguments['url'] else discord_url + + # Build the basic message content. + message_body: str = construct_message_body(caller='discord', config=config, arguments=arguments, alert=alert) + + # Common preparation of the notification. + notification_body, click, sender = prepare_payload(caller='discord', config=config, arguments=arguments, + message_body=message_body, alert=alert, priority=priority) + + # Build the payload(s) for the POST request. + _, _, payload_json = build_discord_notification(config=config, notification_body=notification_body, color=color, + mention=mention, sender=sender) + + logger(1, config, me, him, "Process discord notification: done") + return payload_json, discord_url + + +# Handle the complete notification generation for this specific platform. +def handle_ntfy_notification(config, arguments, alert, priority): + # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. + me: str = frame(0).f_code.co_name + him: str = frame(1).f_code.co_name + + logger(1, config, me, him, "Process ntfy notification: start") + + # Load the url/webhook from the configuration. + _, ntfy_url, _ = get_env() + ntfy_url = arguments['url'] if arguments['url'] else ntfy_url + + # Build the basic message content. + message_body: str = construct_message_body(caller='ntfy', config=config, arguments=arguments, alert=alert) + + # Special blank line after the title of the message. + message_body = " \n" + message_body + + # Common preparation of the notification. + notification_body, click, sender = prepare_payload(caller='ntfy', config=config, arguments=arguments, + message_body=message_body, alert=alert, priority=priority) + + # Build the payload(s) for the POST request. + payload_headers, payload_data, _ = build_ntfy_notification(config=config, notification_body=notification_body, + priority=priority, click=click, sender=sender) + + logger(1, config, me, him, "Process ntfy notification: done") + return payload_data, payload_headers, ntfy_url + + +# Handle the complete notification generation for this specific platform. +def handle_slack_notification(config, arguments, alert, color, priority, mention): + # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. + me: str = frame(0).f_code.co_name + him: str = frame(1).f_code.co_name + + logger(1, config, me, him, "Process slack notification: start") + + # Load the url/webhook from the configuration. + _, _, slack_url = get_env() + slack_url = arguments['url'] if arguments['url'] else slack_url + + # Build the basic message content. + message_body: str = construct_message_body(caller='slack', config=config, arguments=arguments, alert=alert) + + # Common preparation of the notification. + notification_body, click, sender = prepare_payload(caller='slack', config=config, arguments=arguments, + message_body=message_body, alert=alert, priority=priority) + + # Build the payload(s) for the POST request. + _, _, payload_json = build_slack_notification(config=config, notification_body=notification_body, color=color, + mention=mention, sender=sender) + + logger(1, config, me, him, "Process slack notification: done") + return payload_json, slack_url