Code improvement

This commit is contained in:
Rudi klein 2024-05-29 20:37:53 +02:00
parent 79e3e9e1a4
commit 6c267f555f
2 changed files with 239 additions and 184 deletions

View File

@ -20,16 +20,17 @@ def main():
# Load the TOML config. # Load the TOML config.
config: dict = get_config() config: dict = get_config()
logger(0, config, me, him, "############ Processing event ###############################") logger(0, config, me, him, "############ Processing event ###############################")
# Get the arguments used with running the script. # Get the arguments used with running the script.
arguments = get_arguments() arguments = get_arguments()
# Check for test mode. Use test data if true # Check for test mode. Use test data if true.
data = check_test_mode(config) event_data = check_test_mode(config)
# Extract the 'alert' section of the (JSON) event # Extract the 'alert' section of the (JSON) event.
alert = data["parameters"]["alert"] alert = event_data["parameters"]["alert"]
logger(2, config, me, him, "Extracting data from the event") logger(2, config, me, him, "Extracting data from the event")
# Check the config for any exclusion rules and abort when excluded. # Check the config for any exclusion rules and abort when excluded.
@ -42,86 +43,32 @@ def main():
config["targets"] = arguments['targets'] if arguments['targets'] != "" else config["targets"] config["targets"] = arguments['targets'] if arguments['targets'] != "" else config["targets"]
# Prepare the messaging platform specific notification and execute if configured. # Prepare the messaging platform specific notification and execute if configured.
# Discord notification handler
if "discord" in config["targets"]: if "discord" in config["targets"]:
# Show me some ID! Stop resisting! payload_json, discord_url = handle_discord_notification(config=config, arguments=arguments, alert=alert,
caller = "discord" color=color, priority=priority, mention=mention)
me = frame(0).f_code.co_name
him = frame(1).f_code.co_name
# Load the url/webhook from the configuration.
discord_url, _, _ = get_env()
discord_url = arguments['url'] if arguments['url'] else discord_url
# Build the basic message content.
message_body: str = construct_message_body(caller, config, arguments, alert)
# Common preparation of the notification.
notification_body, click, sender = prepare_payload(caller, config, arguments, message_body, alert, priority)
# Build the payload(s) for the POST request.
_, _, payload_json = build_discord_notification(caller, config, notification_body, color, mention, sender)
# Build the notification to be sent.
build_discord_notification(caller, config, notification_body, color, mention, sender)
# POST the notification through requests. # POST the notification through requests.
discord_result = requests.post(discord_url, json=payload_json) discord_result = requests.post(url=discord_url, json=payload_json)
logger(1, config, me, him, caller + " notification constructed and sent: " + str(discord_result)) logger(1, config, me, him, "Discord notification constructed and sent: " + str(discord_result))
# ntfy.sh notification handler
if "ntfy" in config["targets"]: if "ntfy" in config["targets"]:
# Show me some ID! Stop resisting! payload_data, payload_headers, ntfy_url = handle_ntfy_notification(config=config, arguments=arguments,
caller = "ntfy" alert=alert, priority=priority)
me = frame(0).f_code.co_name
him = frame(1).f_code.co_name
# Load the url/webhook from the configuration.
_, ntfy_url, _ = get_env()
ntfy_url = arguments['url'] if arguments['url'] else ntfy_url
# Build the basic message content.
message_body: str = construct_message_body(caller, config, arguments, alert)
# Common preparation of the notification.
notification_body, click, sender = prepare_payload(caller, config, arguments, message_body, alert, priority)
# Build the payload(s) for the POST request.
payload_headers, payload_data, _ = build_ntfy_notification(caller, config, notification_body, color, mention,
sender)
# Build the notification to be sent.
build_ntfy_notification(caller, config, notification_body, priority, click, sender)
# POST the notification through requests. # POST the notification through requests.
ntfy_result = requests.post(ntfy_url, data=payload_data, headers=payload_headers) ntfy_result = requests.post(url=ntfy_url, data=payload_data, headers=payload_headers)
logger(1, config, me, him, caller + " notification constructed and sent: " + str(ntfy_result)) logger(1, config, me, him, "Ntfy notification constructed and sent: " + str(ntfy_result))
# if "slack" in config["targets"]: # Slack notification handler
# # Show me some ID! Stop resisting! if "slack" in config["targets"]:
# caller = "slack" payload_json, slack_url = handle_slack_notification(config=config, arguments=arguments, alert=alert,
# me = frame(0).f_code.co_name color=color, priority=priority, mention=mention)
# him = frame(1).f_code.co_name
#
# # Load the url/webhook from the configuration.
# _, _, slack_url = get_env()
# slack_url = arguments['url'] if arguments['url'] else slack_url
#
# # Build the basic message content.
# message_body: str = construct_message_body(caller, config, arguments, data)
#
# # Common preparation of the notification.
# notification_body, click, sender = prepare_payload(caller, config, arguments, message_body, alert,
# priority)
# # Build the payload(s) for the POST request.
# _, _, payload_json = build_slack_notification(caller, config, notification_body, priority, color, mention,
# click, sender)
#
# # Build the notification to be sent.
# build_slack_notification(caller, config, notification_body, priority, click, sender)
#
# result = requests.post(slack_url, headers={'Content-Type': 'application/json'}, json=payload_json)
#
# logger(1, config, me, him, caller + " notification constructed and sent: " + str(result))
slack_result = requests.post(url=slack_url, headers={'Content-Type': 'application/json'}, json=payload_json)
logger(1, config, me, him, "Slack notification constructed and sent: " + str(slack_result))
# The end of processing
logger(0, config, me, him, "############ Event processed ################################") logger(0, config, me, him, "############ Event processed ################################")
exit(0) exit(0)

View File

@ -16,6 +16,12 @@ from dotenv import load_dotenv
# log_path = wazuh-notify.log path, # log_path = wazuh-notify.log path,
# config_path = wazuh-notify-config.yaml # config_path = wazuh-notify-config.yaml
##############################################################################################
# General process environment handlers #
##############################################################################################
def set_environment() -> tuple: def set_environment() -> tuple:
set_wazuh_path = os.path.abspath(os.path.join(__file__, "../..")) set_wazuh_path = os.path.abspath(os.path.join(__file__, "../.."))
# set_wazuh_path = os.path.abspath(os.path.join(__file__, "../../..")) # set_wazuh_path = os.path.abspath(os.path.join(__file__, "../../.."))
@ -30,7 +36,7 @@ wazuh_path, log_path, config_path = set_environment()
# Set structured timestamps for notifications. # Set structured timestamps for notifications.
def set_time_format(): def set_time_format() -> tuple:
now_message = time.strftime('%A, %d %b %Y %H:%M:%S') now_message = time.strftime('%A, %d %b %Y %H:%M:%S')
now_logging = time.strftime('%Y-%m-%d %H:%M:%S') now_logging = time.strftime('%Y-%m-%d %H:%M:%S')
now_time = time.strftime('%H:%M') now_time = time.strftime('%H:%M')
@ -40,15 +46,14 @@ def set_time_format():
# Logger: print to console and/or log to file # Logger: print to console and/or log to file
def logger(level, config, me, him, message): def logger(level, config, me, him, message) -> None:
_, now_logging, _, _ = set_time_format() _, now_logging, _, _ = set_time_format()
logger_wazuh_path = os.path.abspath(os.path.join(__file__, "../../..")) logger_wazuh_path, logger_log_path, _ = set_environment()
logger_log_path = '{0}/logs/wazuh-notify.log'.format(logger_wazuh_path)
# When logging from main(), the destination function is called "<module>". For cosmetic reasons rename to "main". # When logging from main(), the destination function is called "<module>". For cosmetic reasons rename to "main".
him = 'main' if him == '<module>' else him him: str = 'main' if him == '<module>' else him
log_line = f'{now_logging} | {level} | {me: <26} | {him: <13} | {message}' log_line: str = f'{now_logging} | {level} | {me: <27} | {him: <27} | {message}'
# Compare the extended_print log level in the configuration to the log level of the message. # Compare the extended_print log level in the configuration to the log level of the message.
if config.get('python').get('extended_print', 0) >= level: if config.get('python').get('extended_print', 0) >= level:
@ -60,18 +65,19 @@ def logger(level, config, me, him, message):
log_file.write(log_line + "\n") log_file.write(log_line + "\n")
except (FileNotFoundError, PermissionError, OSError): except (FileNotFoundError, PermissionError, OSError):
# Special message to console when logging to file fails and console logging might not be set. # Special message to console when logging to file fails and console logging might not be set.
log_line = f'{now_logging} | {level} | {me: <26} | {him: <13} | error opening log file: {logger_log_path}' log_line: str = f'{now_logging} | {level} | {me: <27} | {him: <17} | error opening log file: {logger_log_path}'
print(log_line) print(log_line)
# Get the content of the .env file (url's and/or webhooks). # Get the content of the .env file (url's and/or webhooks).
def get_env(): def get_env() -> tuple:
# The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging.
me = frame(0).f_code.co_name me: str = frame(0).f_code.co_name
him = frame(1).f_code.co_name him: str = frame(1).f_code.co_name
# Write the configuration to a dictionary. # Write the configuration to a dictionary.
config: dict = get_config() config: dict = get_config()
logger(2, config, me, him, "Configuration retrieved to dictionary")
# Check if the secrets .env file is available. # Check if the secrets .env file is available.
try: try:
@ -83,8 +89,11 @@ def get_env():
# Retrieve URLs from .env # Retrieve URLs from .env
discord_url = os.getenv("DISCORD_URL") discord_url = os.getenv("DISCORD_URL")
logger(2, config, me, him, "DISCORD_URL: " + discord_url)
ntfy_url = os.getenv("NTFY_URL") ntfy_url = os.getenv("NTFY_URL")
logger(2, config, me, him, "NTFY_URL: " + ntfy_url)
slack_url = os.getenv("SLACK_URL") slack_url = os.getenv("SLACK_URL")
logger(2, config, me, him, "SLACK_URL: " + slack_url)
except Exception as err: except Exception as err:
# output error, and return with an error code # output error, and return with an error code
@ -96,41 +105,40 @@ def get_env():
# Read and process configuration settings from wazuh-notify-config.yaml and create dictionary. # Read and process configuration settings from wazuh-notify-config.yaml and create dictionary.
def get_config(): def get_config() -> dict:
# The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging.
me = frame(0).f_code.co_name me: str = frame(0).f_code.co_name
him = frame(1).f_code.co_name him: str = frame(1).f_code.co_name
this_config_path: str = "" this_config_path: str = ""
config: dict = {} config: dict = {}
# logger(2, config, me, him, "Loading TOML configuration")
try: try:
_, _, this_config_path = set_environment() _, _, this_config_path = set_environment()
with open(this_config_path, 'rb') as ntfier_config: with open(this_config_path, 'rb') as ntfier_config:
config: dict = tomli.load(ntfier_config) config: dict = tomli.load(ntfier_config)
except (FileNotFoundError, PermissionError, OSError): except (FileNotFoundError, PermissionError, OSError):
logger(2, config, me, him, "Error accessing configuration file: " + this_config_path) logger(2, config, me, him, "Error accessing configuration file: " + this_config_path)
logger(2, config, me, him, "Reading configuration file: " + this_config_path) logger(2, config, me, him, "Reading TOML configuration file: " + this_config_path)
config['targets'] = config.get('general').get('targets', 'discord, slack, ntfy') config['targets']: str = config.get('general').get('targets', 'discord, slack, ntfy')
config['full_alert'] = config.get('general').get('full_alert', False) config['full_alert']: bool = config.get('general').get('full_alert', False)
config['excluded_rules'] = config.get('general').get('excluded_rules', '') config['excluded_rules']: str = config.get('general').get('excluded_rules', '')
config['excluded_agents'] = config.get('general').get('excluded_agents', '') config['excluded_agents']: str = config.get('general').get('excluded_agents', '')
config['priority_map'] = config.get('priority_map', []) config['priority_map']: dict = config.get('priority_map', [])
config['sender'] = config.get('general').get('sender', 'Wazuh (IDS)') config['sender']: str = config.get('general').get('sender', 'Wazuh (IDS)')
config['click'] = config.get('general').get('click', 'https://wazuh.com') config['click']: str = config.get('general').get('click', 'https://wazuh.com')
config['md_e'] = config.get('general').get('markdown_emphasis', '') config['md_e']: str = config.get('general').get('markdown_emphasis', '')
config['excluded_days'] = config.get('python').get('excluded_days', '') config['excluded_days']: list = config.get('python').get('excluded_days', '')
config['excluded_hours'] = config.get('python').get('excluded_hours', '') config['excluded_hours']: list = config.get('python').get('excluded_hours', '')
config['test_mode'] = config.get('python').get('test_mode', False) config['test_mode']: bool = config.get('python').get('test_mode', False)
config['extended_logging'] = config.get('python').get('extended_logging', 0) config['extended_logging']: int = config.get('python').get('extended_logging', 0)
config['extended_print'] = config.get('python').get('extended_print', 0) config['extended_print']: int = config.get('python').get('extended_print', 0)
return config return config
# Show configuration settings from wazuh-notify-config.yaml # Show configuration settings from wazuh-notify-config.yaml
def view_config(): def view_config() -> None:
_, _, this_config_path, _ = set_environment() _, _, this_config_path, _ = set_environment()
try: try:
@ -142,13 +150,14 @@ def view_config():
# Get script arguments during execution. Params found here override config settings. # Get script arguments during execution. Params found here override config settings.
def get_arguments(): def get_arguments() -> dict:
# The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging.
me = frame(0).f_code.co_name me: str = frame(0).f_code.co_name
him = frame(1).f_code.co_name him: str = frame(1).f_code.co_name
# Retrieve the configuration information # Retrieve the configuration information
config: dict = get_config() config: dict = get_config()
logger(2, config, me, him, "Configuration retrieved to dictionary")
# Short options # Short options
options: str = "u:s:p:m:t:c:hv" options: str = "u:s:p:m:t:c:hv"
@ -205,10 +214,9 @@ def get_arguments():
return arguments return arguments
else: else:
try: try:
logger(2, config, me, him, "Parsing arguments")
# Parsing arguments # Parsing arguments
p_arguments, values = getopt.getopt(argument_list, options, long_options) p_arguments, values = getopt.getopt(argument_list, options, long_options)
logger(2, config, me, him, "Parsing arguments")
# Check each argument. Arguments that are present will override the defaults. # Check each argument. Arguments that are present will override the defaults.
for current_argument, current_value in p_arguments: for current_argument, current_value in p_arguments:
@ -244,11 +252,16 @@ def get_arguments():
return arguments return arguments
##############################################################################################
# Wazuh event handling #
##############################################################################################
# Receive and load message from Wazuh # Receive and load message from Wazuh
def load_message(): def load_message() -> dict:
# The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging.
me = frame(0).f_code.co_name me: str = frame(0).f_code.co_name
him = frame(1).f_code.co_name him: str = frame(1).f_code.co_name
config: dict = get_config() config: dict = get_config()
# get alert from stdin # get alert from stdin
@ -271,9 +284,9 @@ def load_message():
# Check if we are in test mode (test_mode setting in config yaml). If so, load test event instead of live event. # Check if we are in test mode (test_mode setting in config yaml). If so, load test event instead of live event.
def check_test_mode(config): def check_test_mode(config) -> dict:
me = frame(0).f_code.co_name me: str = frame(0).f_code.co_name
him = frame(1).f_code.co_name him: str = frame(1).f_code.co_name
if config.get('python').get('test_mode'): if config.get('python').get('test_mode'):
logger(1, config, me, him, "Running in test mode: using test message wazuh-notify-test-event.json") logger(1, config, me, him, "Running in test mode: using test message wazuh-notify-test-event.json")
@ -284,30 +297,32 @@ def check_test_mode(config):
else: else:
# We are running live. Load the data from the Wazuh process. # We are running live. Load the data from the Wazuh process.
logger(2, config, me, him, "Running in live mode: using live message") logger(2, config, me, him, "Running in live mode: using live message")
data = load_message() data: dict = load_message()
return data return data
# Check if there are reasons not to process this event. Check exclusions for rules, agents, days and hours. # Check if there are reasons not to process this event. Check exclusions for rules, agents, days and hours.
def exclusions_check(config, alert): def exclusions_check(config, alert) -> None:
# The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging.
me = frame(0).f_code.co_name me: str = frame(0).f_code.co_name
him = frame(1).f_code.co_name him: str = frame(1).f_code.co_name
# Set some environment # Set some environment
now_message, now_logging, now_weekday, now_time = set_time_format() now_message, now_logging, now_weekday, now_time = set_time_format()
logger(2, config, me, him, "Setting pretty datetime formats for notifications and logging.")
# Check the exclusion records from the configuration yaml. # Check the exclusion records from the configuration yaml.
logger(1, config, me, him, "Checking if we are outside of the exclusion rules: ") logger(1, config, me, him, "Checking if we are outside of the exclusion rules: ")
ex_hours: tuple = config.get('python').get('excluded_hours') ex_hours: tuple = config.get('python').get('excluded_hours')
# Start hour may not be later than end hours. End hour may not exceed 00:00 midnight to avoid day jump. # Start hour may not be later than end hours. End hour may not exceed 00:00 midnight to avoid day jump.
ex_hours = [ex_hours[0], "23:59"] if (ex_hours[1] >= '23:59' or ex_hours[1] < ex_hours[0]) else ex_hours ex_hours: tuple = [ex_hours[0], "23:59"] if (ex_hours[1] >= '23:59' or ex_hours[1] < ex_hours[0]) else ex_hours
# Get some more exclusion records from the config. # Get some more exclusion records from the config.
ex_days = config.get('python').get('excluded_days') ex_days: str = config.get('python').get('excluded_days')
ex_agents = config.get('general').get("excluded_agents") ex_agents: str = config.get('general').get("excluded_agents")
ex_rules = config.get('general').get("excluded_rules") ex_rules: str = config.get('general').get("excluded_rules")
# Check agent and rule from within the event. # Check agent and rule from within the event.
ev_agent = alert['agent']['id'] ev_agent = alert['agent']['id']
@ -323,7 +338,6 @@ def exclusions_check(config, alert):
elif now_weekday in ex_days: elif now_weekday in ex_days:
logger(2, config, me, him, "excluded: event inside excluded weekdays") logger(2, config, me, him, "excluded: event inside excluded weekdays")
ex_weekday_eval = False ex_weekday_eval = False
elif ev_rule in ex_rules: elif ev_rule in ex_rules:
logger(2, config, me, him, "excluded: event id inside exclusion list") logger(2, config, me, him, "excluded: event id inside exclusion list")
ev_rule_eval = False ev_rule_eval = False
@ -332,7 +346,7 @@ def exclusions_check(config, alert):
ev_rule_eval = False ev_rule_eval = False
notification_eval = True if (ex_hours_eval and ex_weekday_eval and ev_rule_eval and ev_agent_eval) else False notification_eval = True if (ex_hours_eval and ex_weekday_eval and ev_rule_eval and ev_agent_eval) else False
logger(1, config, me, him, "Exclusion rules evaluated. Final decision: " + str(notification_eval)) logger(1, config, me, him, "Exclusion rules evaluated. Process event is " + str(notification_eval))
if not notification_eval: if not notification_eval:
# The event was excluded by the exclusion rules in the configuration. # The event was excluded by the exclusion rules in the configuration.
@ -341,52 +355,67 @@ def exclusions_check(config, alert):
else: else:
# The event was not excluded by the exclusion rules in the configuration. Keep processing. # The event was not excluded by the exclusion rules in the configuration. Keep processing.
logger(2, config, me, him, "Event NOT excluded, notification will be sent") logger(2, config, me, him, "Event NOT excluded, notification will be sent")
return return
# Map the event threat level to the appropriate 5-level priority scale and color for use in the notification platforms. # Map the event threat level to the appropriate 5-level priority scale and color for use in the notification platforms.
def threat_mapping(config, threat_level, fired_times): def threat_mapping(config, threat_level, fired_times):
# The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging.
me = frame(0).f_code.co_name me: str = frame(0).f_code.co_name
him = frame(1).f_code.co_name him: str = frame(1).f_code.co_name
# Map threat level to priority. Enters Homeland Security :-). # Map threat level to priority. Enters Homeland Security :-).
p_map = config.get('priority_map') p_map = config.get('priority_map')
logger(2, config, me, him, "Prio map: " + str(p_map)) logger(2, config, me, him, "Threat mapping: priority mapped to: " + str(p_map))
for i in range(len(p_map)): for i in range(len(p_map)):
logger(2, config, me, him, "Loop: " + str(i)) logger(2, config, me, him, "Threat mapping: list loop counter: " + str(i))
logger(2, config, me, him, "Level: " + str(threat_level)) logger(2, config, me, him, "Threat mapping: threat level found: " + str(threat_level))
if threat_level in p_map[i]["threat_map"]: if threat_level in p_map[i]["threat_map"]:
color_mapping = p_map[i]["color"] color_mapping = p_map[i]["color"]
priority_mapping = 5 - i priority_mapping = 5 - i
logger(2, config, me, him, "Prio: " + str(priority_mapping)) logger(2, config, me, him, "Threat mapping: priority: " + str(priority_mapping))
logger(2, config, me, him, "Color: " + str(color_mapping)) logger(2, config, me, him, "Threat mapping: color: " + str(color_mapping))
if fired_times >= p_map[i]["notify_threshold"]:
logger(2, config, me, him, "The notification_threshold prevents this message from sending")
exit(0)
if fired_times >= p_map[i]["mention_threshold"]: if fired_times >= p_map[i]["mention_threshold"]:
# When this flag is set, Discord recipients get a stronger message (DM). # When this flag is set, Discord!! recipients get a stronger message (DM).
mention_flag = "@here" mention_flag = "@here"
logger(2, config, me, him, "Threat mapping: mention flag: " + str(mention_flag))
else: else:
mention_flag = "" mention_flag = ""
logger(2, config, me, him, "Threat level mapped as: " + logger(2, config, me, him, "Threat level mapped as: " + "priority:" + str(priority_mapping) +
"prio:" + str(priority_mapping) + " color: " + str(color_mapping) + " mention: " + mention_flag) " color: " + str(color_mapping) + " mention: " + str(mention_flag))
return priority_mapping, color_mapping, mention_flag return priority_mapping, color_mapping, mention_flag
logger(0, config, me, him, "Threat level mapping failed! Returning garbage (99, 99, 99)") logger(0, config, me, him, "Threat level mapping failed! Returning garbage (99, 99, 99)")
return 99, 99, "99" return 99, 99, "99"
##############################################################################################
# Common notification preparation #
##############################################################################################
# Construct the message that will be sent to the notifier platforms. # Construct the message that will be sent to the notifier platforms.
def construct_message_body(caller, config, arguments, data: dict) -> str: def construct_message_body(caller, config, arguments, alert) -> str:
# The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging.
me = frame(0).f_code.co_name me: str = frame(0).f_code.co_name
him = frame(1).f_code.co_name him: str = frame(1).f_code.co_name
logger(2, config, me, him, caller + ": Constructing message body")
# Include a specific control sequence for markdown bold parameter names. # Include a specific control sequence for markdown bold parameter names.
# todo To be fixed # todo To be fixed
md_map = config.get('markdown_emphasis', '') md_map = config.get('markdown_emphasis', '')
md_e = md_map[caller] md_e = md_map[caller]
logger(2, config, me, him, caller + "Emphasis string used: " + md_e)
# If the --message (-m) argument was fulfilled, use this message to be sent. # If the --message (-m) argument was fulfilled, use this message to be sent.
if arguments['message']: if arguments['message']:
@ -396,30 +425,29 @@ def construct_message_body(caller, config, arguments, data: dict) -> str:
message_body: str = \ message_body: str = \
( (
md_e + "Timestamp:" + md_e + " " + timestamp + "\n" + md_e + "Timestamp:" + md_e + " " + timestamp + "\n" +
md_e + "Agent:" + md_e + " " + data["agent"]["name"] + " (" + data["agent"]["id"] + ")" + "\n" + md_e + "Agent:" + md_e + " " + alert["agent"]["name"] + " (" + alert["agent"]["id"] + ")" + "\n" +
md_e + "Rule id:" + md_e + " " + data["rule"]["id"] + "\n" + md_e + "Rule id:" + md_e + " " + alert["rule"]["id"] + "\n" +
md_e + "Rule:" + md_e + " " + data["rule"]["description"] + "\n" + md_e + "Rule:" + md_e + " " + alert["rule"]["description"] + "\n" +
md_e + "Description:" + md_e + " " + data["full_log"] + "\n" + md_e + "Description:" + md_e + " " + alert["full_log"] + "\n" +
md_e + "Threat level:" + md_e + " " + str(data["rule"]["level"]) + "\n" + md_e + "Threat level:" + md_e + " " + str(alert["rule"]["level"]) + "\n" +
md_e + "Times fired:" + md_e + " " + str(data["rule"]["firedtimes"]) + "\n" md_e + "Times fired:" + md_e + " " + str(alert["rule"]["firedtimes"]) + "\n"
) )
logger(2, config, me, him, caller + " basic message constructed. Returning: " + message_body)
logger(2, config, me, him, caller + " basic message constructed.")
return message_body return message_body
# Construct the notification (message + additional information) that will be sent to the notifier platforms. # Construct the notification (message + additional information) that will be sent to the notifier platforms.
def prepare_payload(caller, config, arguments, message_body, alert, priority): def prepare_payload(caller, config, arguments, message_body, alert, priority):
# The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging.
me = frame(0).f_code.co_name me: str = frame(0).f_code.co_name
him = frame(1).f_code.co_name him: str = frame(1).f_code.co_name
logger(2, config, me, him, "Notification being constructed.") logger(2, config, me, him, "Payload being constructed.")
md_map = config.get('markdown_emphasis', '') md_map = config.get('markdown_emphasis', '')
md_e = md_map[caller] md_e = md_map[caller]
logger(2, config, me, him, caller + "Emphasis string used: " + md_e)
click: str = config.get('general').get('click', 'https://wazuh.com')
priority: str = str(priority) priority: str = str(priority)
tags = (str(alert['rule']['groups']).replace("[", "") tags = (str(alert['rule']['groups']).replace("[", "")
.replace("]", "") .replace("]", "")
@ -464,58 +492,138 @@ def prepare_payload(caller, config, arguments, message_body, alert, priority):
return notification_body, click, sender return notification_body, click, sender
def build_discord_notification(caller, config, notification_body, color, mention, sender): ##############################################################################################
# Platform specific notification creation and handling #
##############################################################################################
# Build the notification for this specific platform.
def build_discord_notification(config, notification_body, color, mention, sender):
# The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging.
me = frame(0).f_code.co_name me: str = frame(0).f_code.co_name
him = frame(1).f_code.co_name him: str = frame(1).f_code.co_name
logger(2, config, me, him, caller + " payload created") logger(2, config, me, him, "Discord payload created")
payload_json = {"username": sender, payload_json = {"username": sender,
"content": mention, "content": mention,
"embeds": [{"description": notification_body, "embeds": [{"description": notification_body,
"color": color, "color": color,
"title": sender}]} "title": sender}]}
logger(2, config, me, him, caller + " notification built") logger(2, config, me, him, "Discord notification built")
return "", "", payload_json return "", "", payload_json
def build_ntfy_notification(caller, config, notification_body, priority, click, sender): # Build the notification for this specific platform.
def build_ntfy_notification(config, notification_body, priority, click, sender):
# The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging.
me = frame(0).f_code.co_name me: str = frame(0).f_code.co_name
him = frame(1).f_code.co_name him: str = frame(1).f_code.co_name
logger(2, config, me, him, caller + " payloads created") logger(2, config, me, him, "Ntfy payloads created")
# if caller == "ntfy":
# todo Check this out
# basic_msg = "&nbsp;\n" + basic_msg
payload_data = notification_body payload_data = notification_body
payload_headers = {"Markdown": "yes", payload_headers = {"Markdown": "yes",
"Title": sender, "Title": sender,
"Priority": str(priority), "Priority": str(priority),
"Click": click} "Click": click}
logger(2, config, me, him, caller + " notification built") logger(2, config, me, him, "Ntfy notification built")
return payload_headers, payload_data, "" return payload_headers, payload_data, ""
def build_slack_notification(caller, config, arguments, notification_body, priority, color, mention, click, sender): # Build the notification for this specific platform.
def build_slack_notification(config, notification_body, color, mention, sender):
# The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging. # The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging.
me = frame(0).f_code.co_name me: str = frame(0).f_code.co_name
him = frame(1).f_code.co_name him: str = frame(1).f_code.co_name
sender: str = config.get('general').get('sender', 'Wazuh (IDS)') logger(2, config, me, him, "Slack payload created")
sender = arguments['sender'] if arguments['sender'] else sender
logger(2, config, me, him, caller + " payloads created")
# todo Need some investigation.
payload_json = {"text": notification_body}
# payload_json = {"username": sender,
# "content": mention,
# "embeds": [{"description": notification,
# "color": color,
# "title": sender}]}
logger(2, config, me, him, caller + " notification built")
payload_json = {"username": sender,
"content": mention,
"embeds": [{"description": notification_body,
"color": color,
"title": sender}]}
logger(2, config, me, him, "Slack notification built")
return "", "", payload_json return "", "", payload_json
# Handle the complete notification generation for this specific platform.
def handle_discord_notification(config, arguments, alert, color, priority, mention):
# The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging.
me: str = frame(0).f_code.co_name
him: str = frame(1).f_code.co_name
logger(1, config, me, him, "Process discord notification: start")
# Load the url/webhook from the configuration.
discord_url, _, _ = get_env()
discord_url = arguments['url'] if arguments['url'] else discord_url
# Build the basic message content.
message_body: str = construct_message_body(caller='discord', config=config, arguments=arguments, alert=alert)
# Common preparation of the notification.
notification_body, click, sender = prepare_payload(caller='discord', config=config, arguments=arguments,
message_body=message_body, alert=alert, priority=priority)
# Build the payload(s) for the POST request.
_, _, payload_json = build_discord_notification(config=config, notification_body=notification_body, color=color,
mention=mention, sender=sender)
logger(1, config, me, him, "Process discord notification: done")
return payload_json, discord_url
# Handle the complete notification generation for this specific platform.
def handle_ntfy_notification(config, arguments, alert, priority):
# The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging.
me: str = frame(0).f_code.co_name
him: str = frame(1).f_code.co_name
logger(1, config, me, him, "Process ntfy notification: start")
# Load the url/webhook from the configuration.
_, ntfy_url, _ = get_env()
ntfy_url = arguments['url'] if arguments['url'] else ntfy_url
# Build the basic message content.
message_body: str = construct_message_body(caller='ntfy', config=config, arguments=arguments, alert=alert)
# Special blank line after the title of the message.
message_body = "&nbsp;\n" + message_body
# Common preparation of the notification.
notification_body, click, sender = prepare_payload(caller='ntfy', config=config, arguments=arguments,
message_body=message_body, alert=alert, priority=priority)
# Build the payload(s) for the POST request.
payload_headers, payload_data, _ = build_ntfy_notification(config=config, notification_body=notification_body,
priority=priority, click=click, sender=sender)
logger(1, config, me, him, "Process ntfy notification: done")
return payload_data, payload_headers, ntfy_url
# Handle the complete notification generation for this specific platform.
def handle_slack_notification(config, arguments, alert, color, priority, mention):
# The 'me' variable sets the called function (current function), the 'him' the calling function. Used for logging.
me: str = frame(0).f_code.co_name
him: str = frame(1).f_code.co_name
logger(1, config, me, him, "Process slack notification: start")
# Load the url/webhook from the configuration.
_, _, slack_url = get_env()
slack_url = arguments['url'] if arguments['url'] else slack_url
# Build the basic message content.
message_body: str = construct_message_body(caller='slack', config=config, arguments=arguments, alert=alert)
# Common preparation of the notification.
notification_body, click, sender = prepare_payload(caller='slack', config=config, arguments=arguments,
message_body=message_body, alert=alert, priority=priority)
# Build the payload(s) for the POST request.
_, _, payload_json = build_slack_notification(config=config, notification_body=notification_body, color=color,
mention=mention, sender=sender)
logger(1, config, me, him, "Process slack notification: done")
return payload_json, slack_url