Merge remote-tracking branch 'origin/master'

This commit is contained in:
Rudi klein 2024-05-23 17:33:25 +02:00
commit e36c1851bd
4 changed files with 395 additions and 183 deletions

View File

@ -1,10 +1,8 @@
#!/bin/bash
echo "Going live!"
sudo cp /home/rudi/pycharm/wazuh-notifier.py /var/ossec/active-response/bin
sudo cp /home/rudi/pycharm/wazuh_notifier_module.py /var/ossec/active-response/bin
sudo cp /home/rudi/pycharm/wazuh-notify-python/wazuh-notify.py /var/ossec/active-response/bin
sudo cp /home/rudi/pycharm/wazuh-notify-python/wazuh_notify_module.py /var/ossec/active-response/bin
sudo cp /home/rudi/pycharm/etc/wazuh-notify-config.yaml /var/ossec/etc
sudo cp /home/rudi/pycharm/wazuh-test-event.json /var/ossec/etc
echo copied: wazuh-notifier.py, wazuh_notifier_module.py
sudo cp /home/rudi/pycharm/wazuh-notify-test-event.json /var/ossec/etc
echo "Live!"

View File

@ -3,8 +3,8 @@
# This is the yaml config file for wazuh-active-response (for both the Python and Go version)
targets: "discord" # Platforms in this string with comma seperated values are triggered.
full_message: "ntfy, slack" # Platforms in this string will enable the sending of the full event information.
targets: "slack, ntfy, discord" # Platforms in this string with comma seperated values are triggered.
full_message: "" # Platforms in this string will enable the sending of the full event information.
# Exclude rule events that are enabled in the ossec.conf active response definition.
# These settings provide an easier way to disable events from firing the notifiers.
@ -14,8 +14,7 @@ excluded_agents: "99999" # Enter as a string with comma seperated v
# Priority mapping from 0-15 (Wazuh threat levels) to 1-5 (in notifications) and their respective colors (Discord)
# https://documentation.wazuh.com/current/user-manual/ruleset/rules-classification.html
# Enter as lists of integers.
# Enter threat_map as lists of integers, mention_threshold as integer and color as Hex integer
priority_map:
- threat_map: [ 15,14,13,12 ]
@ -38,21 +37,29 @@ priority_map:
sender: "Wazuh (IDS)"
click: "https://documentation.wazuh.com/"
# From here on the settings are ONLY used by the Python version of wazuh-active-response.
###########################################################################################
# From here on the settings are ONLY used by the Python version of wazuh-active-response. #
###########################################################################################
# Below settings provide for a window that enable/disables events from firing the notifiers.
excluded_days: "" # Enter as a string with comma seperated values. Be aware of your regional settings.
excluded_hours: [ "23:59", "00:00" ] # Enter as a tuple of string values. Be aware of your regional settings.
# Following parameter defines the markdown characters to emphasise the parameter names in the notification messages
markdown_emphasis:
slack: "*"
ntfy: "**"
discord: "**"
# The next settings are used for testing. Test mode will add the example event in wazuh-notify-test-event.json instead of the
# message received through wazuh. This enables testing for particular events when the test event is customized.
test_mode: True
test_mode: False
# Enabling this parameter provides more logging to the wazuh-notifier log.
extended_logging: True
extended_logging: 2
# Enabling this parameter provides extended logging to the console.
extended_print: True
extended_print: 0
# End of wazuh notifier configuration yaml
...

View File

@ -1,105 +1,149 @@
#!/usr/bin/env python3
# This script is adapted version of the Python active response script sample, provided by Wazuh, in the documentation:
# https://documentation.wazuh.com/current/user-manual/capabilities/active-response/custom-active-response-scripts.html
# It is provided under the below copyright statement:
#
# Copyright (C) 2015-2022, Wazuh Inc.
# All rights reserved.
#
# This program is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public
# License (version 2) as published by the FSF - Free Software
# Foundation.
#
# This adapted version is free software. Rudi Klein, april 2024
# Rudi Klein, april 2024
import json
import sys
import requests
from wazuh_notify_module import build_notification
from wazuh_notify_module import construct_basic_message
from wazuh_notify_module import exclusions_check
from wazuh_notify_module import get_arguments
from wazuh_notify_module import get_config
from wazuh_notify_module import get_env
from wazuh_notify_module import load_message
from wazuh_notify_module import logger
from wazuh_notify_module import set_environment
from wazuh_notify_module import threat_mapping
from wazuh_notify_module import *
def main(argv):
# Load the YAML config
def main():
me = frame(0).f_code.co_name
him = frame(1).f_code.co_name
# Load the YAML config.
config: dict = get_config()
# Path variables assignments
wazuh_path, ar_path, config_path = set_environment()
logger(0, config, me, him, "############ Processing event ###############################")
logger(2, config, me, him, "Loading yaml configuration")
# Get the arguments used with running the script
arg_url, arg_sender, arg_destination, arg_message, arg_priority, arg_tags, arg_click = get_arguments()
# Get the arguments used with running the script.
arguments = get_arguments()
# Check if we are in test mode (test_mode setting in config yaml). If so, load test event instead of live event.
if config.get("test_mode"):
logger(config, "In test mode: using test message wazuh-notify-test-event.json")
logger(1, config, me, him, "Running in test mode: using test message wazuh-notify-test-event.json")
with (open('wazuh-notify-test-event.json') as event_file):
# Load the test event data
home_path, _, _ = set_environment()
with (open(home_path + '/etc/wazuh-notify-test-event.json') as event_file):
data: dict = json.loads(event_file.read())
else:
logger(config, "In live mode: using live message")
# We are running live. Load the data from the Wazuh process.
logger(2, config, me, him, "Running in live mode: using live message")
data = load_message()
# Extract the 'alert' section of the (JSON) event
alert = data["parameters"]["alert"]
logger(2, config, me, him, "Extracting data from the event")
# Check the config for any exclusion rules
fire_notification = exclusions_check(config, alert)
logger(1, config, me, him, "Checking if we are outside of the exclusion rules: " + str(fire_notification))
if not fire_notification:
logger(config, "Event excluded, no notification sent. Exiting")
# The event was excluded by the exclusion rules in the configuration.
logger(1, config, me, him, "Event excluded, no notification sent. Exiting")
exit()
# Include a specific control sequence for Discord bold text
if "discord" in config["targets"]:
accent: str = "**"
else:
accent: str = ""
# The event was not excluded by the exclusion rules in the configuration. Keep processing.
logger(2, config, me, him, "Event NOT excluded, notification will be sent")
# Create the notification text to be sent.
notification: str = construct_basic_message(accent, alert)
logger(config, "Notification constructed")
# Get the mapping from event threat level to priority, color and mention_flag.
priority, color, mention = threat_mapping(config, alert['rule']['level'], alert['rule']['firedtimes'])
logger(2, config, me, him, "Threat mapping done: " + "p:" + str(priority) + " c:" + str(color) + " m:" + mention)
# todo Not used?
# Get the mapping from event threat level to priority (Discord/ntfy), color (Discord) and mention_flag (Discord)
priority, color, mention = threat_mapping(config, alert['rule']['level'],
alert['rule']['firedtimes'])
# If the target argument was used with the script, we'll use that instead of the configuration parameter.
config["targets"] = arguments['targets'] if arguments['targets'] != "" else config["targets"]
result = ""
# Prepare the messaging platform specific request and execute
if "discord" in config["targets"]:
caller = "discord"
# Load the url/webhook from the configuration.
discord_url, _, _ = get_env()
payload = build_notification(caller, config, notification, alert, priority, color, mention)
result = requests.post(discord_url, json=payload)
exit(result)
discord_url = arguments['url'] if arguments['url'] else discord_url
# Build the basic notification message content.
notification: str = construct_basic_message(config, arguments, caller, alert)
logger(2, config, me, him, caller + " basic message constructed")
# Build the payload(s) for the POST request.
_, _, payload_json = build_notification(caller,
config,
arguments,
notification,
alert,
priority,
color,
mention
)
# POST the notification through requests.
result = requests.post(discord_url, json=payload_json)
logger(1, config, me, him, caller + " notification constructed and HTTPS request done: " + str(result))
if "ntfy" in config["targets"]:
caller = "ntfy"
ntfy_url, _, _ = get_env()
payload = build_notification(caller, config, notification, alert, priority, color, mention)
result = requests.post(ntfy_url, json=payload)
exit(result)
# Load the url/webhook from the configuration.
_, ntfy_url, _ = get_env()
# Build the basic notification message content.
notification: str = construct_basic_message(config, arguments, caller, alert)
logger(2, config, me, him, caller + " basic message constructed")
# Build the payload(s) for the POST request.
payload_headers, payload_data, _ = build_notification(caller,
config,
arguments,
notification,
alert,
priority,
color,
mention
)
# POST the notification through requests.
result = requests.post(ntfy_url, data=payload_data, headers=payload_headers)
logger(1, config, me, him, caller + " notification constructed and request done: " + str(result))
if "slack" in config["targets"]:
caller = "slack"
slack_url, _, _ = get_env()
payload = build_notification(caller, config, notification, alert, priority, color, mention)
result = requests.post(slack_url, json=payload)
exit(result)
# Load the url/webhook from the configuration.
_, _, slack_url = get_env()
# Build the basic notification message content.
notification: str = construct_basic_message(config, arguments, caller, alert)
logger(2, config, me, him, caller + " basic message constructed")
# Build the payload(s) for the POST request.
_, _, payload_json = build_notification(caller,
config,
arguments,
notification,
alert,
priority,
color,
mention
)
# POST the notification through requests.
result = requests.post(slack_url, headers={'Content-Type': 'application/json'}, json=payload_json)
logger(1, config, me, him, caller + " notification constructed and request done: " + str(result))
logger(0, config, me, him, "############ Event processed ################################")
exit(0)
if __name__ == "__main__":
main(sys.argv)
main()

View File

@ -1,90 +1,117 @@
#!/usr/bin/env python3
import datetime
import getopt
import json
import os
import sys
import time
from os.path import join, dirname
from pathlib import PureWindowsPath, PurePosixPath
from sys import _getframe as frame
import yaml
from dotenv import load_dotenv
# Define paths: wazuh_path = wazuh root directory
# ar_path = active-responses.log path,
# log_path = wazuh-notify.log path,
# config_path = wazuh-notify-config.yaml
#
def set_environment() -> tuple:
# todo fix reference when running manually/in process
set_wazuh_path = "/home/rudi/pycharm"
# set_wazuh_path = os.path.abspath(os.path.join(__file__, "../../.."))
set_ar_path = '{0}/logs/wazuh-notifier.log'.format(set_wazuh_path)
def set_environment() -> tuple:
me = frame(0).f_code.co_name
him = frame(1).f_code.co_name
# set_wazuh_path = os.path.abspath(os.path.join(__file__, "../../"))
set_wazuh_path = os.path.abspath(os.path.join(__file__, "../../.."))
set_log_path = '{0}/logs/wazuh-notify.log'.format(set_wazuh_path)
set_config_path = '{0}/etc/wazuh-notify-config.yaml'.format(set_wazuh_path)
return set_wazuh_path, set_ar_path, set_config_path
return set_wazuh_path, set_log_path, set_config_path
# Set paths for use in this module
wazuh_path, ar_path, config_path = set_environment()
wazuh_path, log_path, config_path = set_environment()
# Set structured timestamps for logging and notifications.
# Set structured timestamps for notifications.
def set_time_format():
now_message = time.strftime('%A, %d %b %Y %H:%M:%S')
now_logging = time.strftime('%Y/%m/%d %H:%M:%S')
now_logging = time.strftime('%Y-%m-%d %H:%M:%S')
now_time = time.strftime('%H:%M')
now_weekday = time.strftime('%A')
return now_message, now_logging, now_weekday, now_time
# Logger
def logger(config, message):
# todo fix logging
_, log_path, _ = set_environment()
if config.get('extended_print', True):
print(time.strftime('%Y/%m/%d %H:%M:%S'), "|", message)
if config.get("extended_logging"):
with open(ar_path, mode="a") as log_file:
ar_name_posix = str(PurePosixPath(PureWindowsPath(log_path[log_path.find("active-response"):])))
log_file.write(
str(datetime.datetime.now().strftime(
'%Y/%m/%d %H:%M:%S')) + " " + ar_name_posix + ": " + message + "\n")
else:
pass
# Logger: print to console and/or log to file
# Get the content of the .env file
def logger(level, config, me, him, message):
logger_wazuh_path = os.path.abspath(os.path.join(__file__, "../../.."))
# logger_wazuh_path = os.path.abspath(os.path.join(__file__, "../.."))
logger_log_path = '{0}/logs/wazuh-notify.log'.format(logger_wazuh_path)
him = 'main' if him == '<module>' else him
time_stamp = time.strftime('%Y-%m-%d %H:%M:%S')
log_line = f'{time_stamp} | {level} | {me: <23} | {him: <15} | {message}'
# Compare the console log level in the configuration to the log level of the message
if config.get('extended_print') >= level:
print(log_line)
try:
# Compare the file logging, log level in the configuration to the log level of the message
if config.get("extended_logging") >= level:
with open(logger_log_path, mode="a") as log_file:
log_file.write(log_line + "\n")
except (FileNotFoundError, PermissionError, OSError):
# Special message to console when logging to file fails
log_line = f'{time_stamp} | {level} | {me: <23} | {him: <15} | error opening log file: {logger_log_path}'
print(log_line)
# Get the content of the .env file (url's and/or webhooks)
def get_env():
# The 'me' variable sets the calling function, the 'him' the called function. Used for logging.
me = frame(0).f_code.co_name
him = frame(1).f_code.co_name
config: dict = get_config()
try:
dotenv_path = join(dirname(__file__), '.env')
load_dotenv(dotenv_path)
if not os.path.isfile(dotenv_path):
logger(config, ".env not found")
logger(0, config, me, him, dotenv_path + " not found")
raise Exception(dotenv_path, "file not found")
# Retrieve url from .env
discord_url = os.getenv("DISCORD_URL")
ntfy_url = os.getenv("NTFY_URL")
slack_url = os.getenv("SLACK_URL")
except Exception as err:
# output error, and return with an error code
logger(config, str(Exception(err.args)))
logger(0, config, me, him, 'Error reading ' + str(err))
exit(err)
logger(2, config, me, him, dotenv_path + " loaded")
return discord_url, ntfy_url, slack_url
@ -92,19 +119,23 @@ def get_env():
def get_config():
# DO NOT USE logger() IN THIS FUNCTION. IT WILL CREATE A RECURSION LOOP!
me = frame(0).f_code.co_name
him = frame(1).f_code.co_name
this_config_path: str = ""
config: dict = {}
try:
_, _, this_config_path = set_environment()
with open(this_config_path, 'r') as ntfier_config:
config: dict = yaml.safe_load(ntfier_config)
except (FileNotFoundError, PermissionError, OSError):
print(time.strftime('%Y/%m/%d %H:%M:%S') + " | " + this_config_path + " missing")
print(time.strftime('%Y/%m/%d %H:%M:%S') + " | " + "reading config: " + this_config_path)
except (FileNotFoundError, PermissionError, OSError):
logger(2, config, me, him, "Error accessing configuration file: " + this_config_path)
logger(2, config, me, him, "Reading configuration file: " + this_config_path)
config['targets'] = config.get('targets', 'ntfy, discord')
config['excluded_rules'] = config.get('excluded_rules', '')
config['excluded_agents'] = config.get('excluded_agents', '')
@ -113,9 +144,9 @@ def get_config():
config['test_mode'] = config.get('test_mode', True)
config['extended_logging'] = config.get('extended_logging', True)
config['extended_print'] = config.get('extended_print', True)
config['sender'] = 'Wazuh (IDS)'
config['click'] = 'https://wazuh.org'
config['sender'] = config.get('sender', 'Wazuh (IDS)')
config['click'] = config.get('click', 'https://wazuh.org')
config['md_e'] = config.get('markdown_emphasis', '')
return config
@ -124,6 +155,7 @@ def get_config():
def view_config():
_, _, this_config_path, _ = set_environment()
try:
@ -133,22 +165,39 @@ def view_config():
print(this_config_path + " does not exist or is not accessible")
return
# Get params during execution. Params found here override config settings.
# Get script arguments during execution. Params found here override config settings.
def get_arguments():
me = frame(0).f_code.co_name
him = frame(1).f_code.co_name
# Retrieve the configuration information
config: dict = get_config()
# Short options
options: str = "u:s:p:m:t:c:hv"
# Long options
long_options: list = ["url=", "sender=", "destination=", "priority=", "message=", "tags=", "click=", "help",
"view"]
long_options: list = ["url=",
"sender=",
"targets=",
"priority=",
"message=",
"tags=",
"click=",
"help",
"view"
]
help_text: str = """
-u, --url is the url for the server, ending with a "/".
-s, --sender is the sender of the message, either an app name or a person.
-d, --destination is the NTFY subscription or Discord title, to send the message to.
-d, --targets is the list of platforms to send a message to (slack, ntfy, discord)
-p, --priority is the priority of the message, ranging from 1 (lowest), to 5 (highest).
-m, --message is the text of the message to be sent.
-t, --tags is an arbitrary strings of tags (keywords), seperated by a "," (comma).
@ -157,33 +206,43 @@ def get_arguments():
-v, --view show config.
"""
url: str
sender: str
destination: str
message: str
priority: int
tags: str
click: str
url, sender, destination, message, priority, tags, click = "", "", "", "", 0, "", ""
# Initialize some variables.
url: str = ""
sender: str = ""
targets: str = ""
message: str = ""
priority: int = 0
tags: str = ""
click: str = ""
# Fetch the arguments from the command line, skipping the first argument (name of the script).
argument_list: list = sys.argv[1:]
logger(2, config, me, him, "Found arguments:" + str(argument_list))
if not argument_list:
logger(config, 'No argument list found (no arguments provided with script execution')
return url, sender, destination, message, priority, tags, click
logger(1, config, me, him, 'No argument list found (no arguments provided with script execution')
arguments: dict = {'url': url, 'sender': sender, 'targets': targets, 'message': message,
'priority': priority, 'tags': tags, 'click': click}
return arguments
else:
try:
logger(config, "Parsing arguments")
logger(2, config, me, him, "Parsing arguments")
# Parsing arguments
arguments, values = getopt.getopt(argument_list, options, long_options)
# checking each argument
for current_argument, current_value in arguments:
p_arguments, values = getopt.getopt(argument_list, options, long_options)
# Check each argument
for current_argument, current_value in p_arguments:
if current_argument in ("-h", "--help"):
print(help_text)
@ -199,8 +258,8 @@ def get_arguments():
elif current_argument in ("-s", "--sender"):
sender: str = current_value
elif current_argument in ("-d", "--destination"):
destination: str = current_value
elif current_argument in ("-d", "--targets"):
targets: str = current_value
elif current_argument in ("-p", "--priority"):
priority: int = int(current_value)
@ -215,22 +274,31 @@ def get_arguments():
click: str = current_value
except getopt.error as err:
# output error, and return with an error code
logger(config, str(err))
logger(0, config, me, him, "Error during argument parsing:" + str(err))
return url, sender, destination, message, priority, tags, click
logger(2, config, me, him, "Arguments returned as dictionary")
arguments: dict = {'url': url, 'sender': sender, 'targets': targets, 'message': message,
'priority': priority, 'tags': tags, 'click': click}
return arguments
# Receive and load message from Wazuh
def load_message():
me = frame(0).f_code.co_name
him = frame(1).f_code.co_name
config: dict = get_config()
# get alert from stdin
logger(config, "Loading event message from STDIN")
logger(2, config, me, him, "Loading event message from STDIN")
input_str: str = ""
for line in sys.stdin:
@ -240,10 +308,12 @@ def load_message():
data: json = json.loads(input_str)
if data.get("command") == "add":
logger(config, "Relevant event data found")
logger(1, config, me, him, "Relevant event data found")
return data
else:
# todo fix error message
# Event came in, but wasn't processed. Shouldn't happen.
logger(0, config, me, him, "Event data not found")
sys.exit(1)
@ -251,42 +321,61 @@ def load_message():
def exclusions_check(config, alert):
me = frame(0).f_code.co_name
him = frame(1).f_code.co_name
# Set some environment
now_message, now_logging, now_weekday, now_time = set_time_format()
# Check the exclusion records from the configuration yaml
ex_hours: tuple = config.get('excluded_hours')
# Start hour may not be later than end hours. End hour may not exceed 00:00 midnight to avoid day jump
ex_hours = [ex_hours[0], "23:59"] if (ex_hours[1] >= '23:59' or ex_hours[1] < ex_hours[0]) else ex_hours
# Get some more exclusion records from the config
ex_days = config.get('excluded_days')
ex_agents = config.get("excluded_agents")
ex_rules = config.get("excluded_rules")
# Check agent and rule from within the event
ev_agent = alert['agent']['id']
ev_rule = alert['rule']['id']
# Let's assume all lights are green
# Let's assume all lights are green, until proven otherwise
ex_hours_eval, ex_weekday_eval, ev_rule_eval, ev_agent_eval = True, True, True, True
# Evaluate the conditions for sending a notification. Any False will cause the notification to be discarded.
if (now_time > ex_hours[0]) and (now_time < ex_hours[1]):
logger(config, "excluded: event inside exclusion time frame")
logger(2, config, me, him, "excluded: event inside exclusion time frame")
ex_hours_eval = False
elif now_weekday in ex_days:
logger(config, "excluded: event inside excluded weekdays")
logger(2, config, me, him, "excluded: event inside excluded weekdays")
ex_weekday_eval = False
elif ev_rule in ex_rules:
logger(config, "excluded: event id inside exclusion list")
ev_rule_eval = False
elif ev_agent in ex_agents:
logger(config, "excluded: event agent inside exclusion list")
logger(2, config, me, him, "excluded: event id inside exclusion list")
ev_rule_eval = False
notification_eval = ex_hours_eval and ex_weekday_eval and ev_rule_eval and ev_agent
elif ev_agent in ex_agents:
logger(2, config, me, him, "excluded: event agent inside exclusion list")
ev_rule_eval = False
notification_eval = True if (ex_hours_eval and ex_weekday_eval and ev_rule_eval and ev_agent_eval) else False
logger(1, config, me, him, "Exclusion rules evaluated. Final decision: " + str(notification_eval))
return notification_eval
@ -295,43 +384,94 @@ def exclusions_check(config, alert):
def threat_mapping(config, threat_level, fired_times):
me = frame(0).f_code.co_name
him = frame(1).f_code.co_name
# Map threat level v/s priority
p_map = config.get('priority_map')
logger(2, config, me, him, "Prio map: " + str(p_map))
for i in range(len(p_map)):
logger(2, config, me, him, "Loop: " + str(i))
logger(2, config, me, him, "Level: " + str(threat_level))
if threat_level in p_map[i]["threat_map"]:
color_mapping = p_map[i]["color"]
priority_mapping = 5 - i
logger(2, config, me, him, "Prio: " + str(priority_mapping))
logger(2, config, me, him, "Color: " + str(color_mapping))
if fired_times >= p_map[i]["mention_threshold"]:
# When this flag is set, Discord recipients get a stronger message
mention_flag = "@here"
else:
mention_flag = ""
logger(2, config, me, him, "Threat level mapped as: " +
"p:" + str(priority_mapping) + " c: " + str(color_mapping) + " m: " + mention_flag)
return priority_mapping, color_mapping, mention_flag
else:
return 0, 0, 0
logger(0, config, me, him, "Threat level mapping failed! Returning garbage (99, 99, 99)")
return 99, 99, 99
# Construct the message that will be sent to the notifier platforms
def construct_basic_message(accent: str, data: dict) -> str:
# Adding the BOLD text string for Discord use
def construct_basic_message(config, arguments, caller: str, data: dict) -> str:
me = frame(0).f_code.co_name
him = frame(1).f_code.co_name
basic_msg: str = (accent +
"Agent:" + " " + accent + data["agent"]["name"] + " (" + data["agent"][
"id"] + ")" + "\n" + accent +
"Rule id: " + accent + data["rule"]["id"] + "\n" + accent +
"Rule: " + accent + data["rule"]["description"] + "\n" + accent +
"Description: " + accent + data["full_log"] + "\n" + accent +
"Threat level: " + accent + str(data["rule"]["level"]) + "\n" + accent +
"Times fired: " + accent + str(data["rule"]["firedtimes"]) + "\n")
# Include a specific control sequence for markdown bold parameters names
md_map = config.get('markdown_emphasis')
md_e = md_map[caller]
if arguments['message']:
basic_msg = arguments['message']
else:
_, timestamp, _, _ = set_time_format()
basic_msg: str = \
(
md_e + "Timestamp:" + md_e + " " + timestamp + "\n" +
md_e + "Agent:" + md_e + " " + data["agent"]["name"] + " (" + data["agent"]["id"] + ")" + "\n" +
md_e + "Rule id:" + md_e + " " + data["rule"]["id"] + "\n" +
md_e + "Rule:" + md_e + " " + data["rule"]["description"] + "\n" +
md_e + "Description:" + md_e + " " + data["full_log"] + "\n" +
md_e + "Threat level:" + md_e + " " + str(data["rule"]["level"]) + "\n" +
md_e + "Times fired:" + md_e + " " + str(data["rule"]["firedtimes"]) + "\n")
if caller == "ntfy":
basic_msg = "&nbsp;\n" + basic_msg
logger(2, config, me, him, caller + " basic message constructed")
return basic_msg
def build_notification(caller, config, notification, alert, priority, color, mention):
# Construct the notification (message + additional information) that will be sent to the notifier platforms.
def build_notification(caller, config, arguments, notification, alert, priority, color, mention):
me = frame(0).f_code.co_name
him = frame(1).f_code.co_name
logger(2, config, me, him, caller + " notification being constructed")
md_map = config.get('markdown_emphasis')
md_e = md_map[caller]
click: str = config.get('click')
sender: str = config.get('sender')
priority: str = str(priority)
@ -340,6 +480,8 @@ def build_notification(caller, config, notification, alert, priority, color, men
.replace("'", "")
.replace(",", ", ")
)
logger(2, config, me, him, caller + " full event formatted")
full_event: str = str(json.dumps(alert, indent=4)
.replace('"', '')
.replace('{', '')
@ -348,44 +490,65 @@ def build_notification(caller, config, notification, alert, priority, color, men
.replace(']', '')
.replace(',', ' ')
)
click = arguments['click'] if arguments['click'] else click
priority = arguments['priority'] if arguments['priority'] else priority
sender = arguments['sender'] if arguments['sender'] else sender
tags = arguments['tags'] if arguments['tags'] else tags
# Add the full alert data to the notification
if caller in config["full_message"]:
logger(2, config, me, him, caller + "Full alert data will be sent")
notification: str = ("\n\n" + notification + "\n" +
"**" + "__Full event__" + "**" + "\n" + "```\n" + full_event + "```")
md_e + "__Full event__" + md_e + "\n" + "```\n" + full_event + "```")
# Add Priority & tags to the notification
notification = (notification + "\n\n" + "Priority: " + priority + "\n" +
"Tags: " + tags + "\n\n" + click)
logger(2, config, me, him, caller + " adding priority and tags")
notification = (notification + "\n\n" + md_e + "Priority:" + md_e + " " + str(priority) + "\n" +
md_e + "Tags:" + md_e + " " + tags + "\n\n" + click)
config["targets"] = arguments['targets'] if arguments['targets'] != "" else config["targets"]
# Prepare the messaging platform specific notification and execute
if "discord" in config["targets"]:
url, _, _ = get_env()
payload = {"username": "sender",
if caller == "discord":
logger(2, config, me, him, caller + " payload created")
payload_json = {"username": sender,
"content": mention,
"embeds": [{"description": notification,
"color": color,
"title": sender}]}
return payload
if "ntfy" in config["targets"]:
caller = "ntfy"
ntfy_url, _, _ = get_env()
logger(2, config, me, him, caller + " notification built")
payload = {"username": "sender",
"content": mention,
"embeds": [{"description": notification,
"color": color,
"title": sender}]}
return payload
return "", "", payload_json
if "slack" in config["targets"]:
caller = "slack"
slack_url, _, _ = get_env()
if caller == "ntfy":
logger(2, config, me, him, caller + " payloads created")
payload = {"username": "sender",
"content": mention,
"embeds": [{"description": notification,
"color": color,
"title": sender}]}
return payload
payload_data = notification
payload_headers = {"Markdown": "yes",
"Title": sender,
"Priority": str(priority),
"Click": click}
logger(2, config, me, him, caller + " notification built")
return payload_headers, payload_data, ""
if caller == "slack":
logger(2, config, me, him, caller + " payloads created")
payload_json = {"text": notification}
# payload_json = {"username": sender,
# "content": mention,
# "embeds": [{"description": notification,
# "color": color,
# "title": sender}]}
logger(2, config, me, him, caller + " notification built")
return "", "", payload_json