106 lines
3.8 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# This script is adapted version of the Python active response script sample, provided by Wazuh, in the documentation:
# https://documentation.wazuh.com/current/user-manual/capabilities/active-response/custom-active-response-scripts.html
# It is provided under the below copyright statement:
#
# Copyright (C) 2015-2022, Wazuh Inc.
# All rights reserved.
#
# This program is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public
# License (version 2) as published by the FSF - Free Software
# Foundation.
#
# This adapted version is free software. Rudi Klein, april 2024
import json
import sys
import requests
from wazuh_notify_module import build_notification
from wazuh_notify_module import construct_basic_message
from wazuh_notify_module import exclusions_check
from wazuh_notify_module import get_arguments
from wazuh_notify_module import get_config
from wazuh_notify_module import get_env
from wazuh_notify_module import load_message
from wazuh_notify_module import logger
from wazuh_notify_module import set_environment
from wazuh_notify_module import threat_mapping
def main(argv):
# Load the YAML config
config: dict = get_config()
# Path variables assignments
wazuh_path, ar_path, config_path = set_environment()
# Get the arguments used with running the script
arg_url, arg_sender, arg_destination, arg_message, arg_priority, arg_tags, arg_click = get_arguments()
# Check if we are in test mode (test_mode setting in config yaml). If so, load test event instead of live event.
if config.get("test_mode"):
logger(config, "In test mode: using test message wazuh-notify-test-event.json")
with (open('wazuh-notify-test-event.json') as event_file):
data: dict = json.loads(event_file.read())
else:
logger(config, "In live mode: using live message")
data = load_message()
# Extract the 'alert' section of the (JSON) event
alert = data["parameters"]["alert"]
# Check the config for any exclusion rules
fire_notification = exclusions_check(config, alert)
if not fire_notification:
logger(config, "Event excluded, no notification sent. Exiting")
exit()
# Include a specific control sequence for Discord bold text
if "discord" in config["targets"]:
accent: str = "**"
else:
accent: str = ""
# Create the notification text to be sent.
notification: str = construct_basic_message(accent, alert)
logger(config, "Notification constructed")
# todo Not used?
# Get the mapping from event threat level to priority (Discord/ntfy), color (Discord) and mention_flag (Discord)
priority, color, mention = threat_mapping(config, alert['rule']['level'],
alert['rule']['firedtimes'])
result = ""
# Prepare the messaging platform specific request and execute
if "discord" in config["targets"]:
caller = "discord"
discord_url, _, _ = get_env()
payload = build_notification(caller, config, notification, alert, priority, color, mention)
result = requests.post(discord_url, json=payload)
exit(result)
if "ntfy" in config["targets"]:
caller = "ntfy"
ntfy_url, _, _ = get_env()
payload = build_notification(caller, config, notification, alert, priority, color, mention)
result = requests.post(ntfy_url, json=payload)
exit(result)
if "slack" in config["targets"]:
caller = "slack"
slack_url, _, _ = get_env()
payload = build_notification(caller, config, notification, alert, priority, color, mention)
result = requests.post(slack_url, json=payload)
exit(result)
if __name__ == "__main__":
main(sys.argv)