这并不是什么高精尖的架构与技术。这只是我个人在工作中结合目前手头的资源进行了一些整合 。当然实现这些需求的方法有很多, 有钱的可以考虑Splunk, 没钱的有研发的团队的可以上Flink、Esper 。
由于攻防对抗的升级, 通过单一的数据源很难直接断言攻击是否成功。因此, 我们需要结合多个数据源进行安全事件的关联, 从中提炼出可靠性较高的安全告警进行人工排查。例如: 针对WebShell上传类的, 可以通过 网络流量 + 终端 进行关联; 针对Web攻击类, 可以通过 WAF + NIDS 的事件关联, 得到Bypass WAF 的安全告警。
虽然Wazuh本身具备安全事件的关联能力, 但在传统的部署架构中, 通常是由Wazuh Agent将安全事件发送到Wazuh M anager , 通过Manager进行安全事件的关联告警。由于缺少了对数据进行ETL, 使得Wazuh Manager很难对异构数据进行关联。因此, 我们需要 通过Logstash实现对数据的标准化, 并将标准化后的数据通过Syslog的形式发送到Wazuh Manager, 从而进行异构数据的关联。
1. 本次改造采用了Syslog的形式将数据发送到Wazuh Manager端进行数据关联。由于Syslog 默认采用了UDP协议进行数据传输, 当数据发送过大时将会导致数据截断的报错。针对此问题, 需改用TCP的形式进行数据发送的方式。
2. Wazuh Manager 部分告警缺少” 必要 “关联字段的现象。如: Wazuh syscheck告警事件, 默认不会携带srcip的字段, 对于此问题可以通过在Manager上编写一个预处理脚本来解决。
改造前: Suricata (Wazuh Agent) —(Agent: UDP 1514)—> Wazuh Manager
改造后: 所有的标准化都由Logstash来进行, Filebeat只需要做’无脑‘转发即可。
Filebeat配置
1. filebeat.yaml
#=========================== Filebeat inputs ============================= filebeat.inputs: - type: log paths: - "/var/log/suricata/alert-*.json" fields_under_root: true fields: { application: suricata } json.keys_under_root: true json.overwrite_keys: true json.message_key: log tail_files: false scan_frequency: 1s harvester_buffer_size: 104857600 backoff: 1s max_backoff: 10s close_timeout: 30m close_inactive: 10m clean_inactive: 72h ignore_older: 70h registry_file: /etc/filebeat/registry/wazuh/ #================================ Processors ================================== processors: - drop_fields: fields: ["ecs.version", "agent.ephemeral_id", "agent.version", "agent.type", "agent.id", "agent.ephemeral_id", "input.type"] #================================ Outputs ===================================== output.logstash: hosts: ["logstash:5010"] loadbalance: true worker: 4 compression_level: 3 bulk_max_size: 4096
Logstash配置
1. 00_input.conf
input { beats { port => 5010 codec => "json_lines" tags => ["beats"] } }
2. 50_suricata.conf
filter { if [application] == "suricata" { date { match => [ "timestamp", "ISO8601" ] target => "timestamp" } } }
3. mapping.json
{ "common_mapping": { "src_ip": "srcip", "dest_ip": "dstip", "src_port": "srcport", "dest_port": "dstport" } }
4. 70_normalized-suricata.conf
filter { clone { clOnes=> [ "siem_events" ] } } filter { if [type] == "siem_events" { mutate { remove_field => [ "application", "type", "agent", "@version", "@timestamp"] add_field => { "provider" => "Suricata" "product" => "Intrusion Detection System" } } ruby { init => " require 'json' mapping_json = File.read('/etc/logstash/mappings/wazuh/mapping.json') mapping = JSON.parse(mapping_json) @common_mapping = mapping['common_mapping'] " code => " keys = event.to_hash.keys keys.each do |key| if @common_mapping.include? key then value = event.get(key) event.remove(key) new_key = @common_mapping[key] event.set(new_key, value) end end sensor = event.get('[host][name]') event.set('sensor', sensor) " } } }
5. 99_output-elasticsearch.conf
output { if [event_type] == "alert" { elasticsearch { cacert => "/etc/logstash/certs/ca/ca.crt" user => "elastic" password => "Hello World!" hosts => ["https://elasticsearch:9200"] index => "suricata-%{+YYYY.MM.dd}" template => "/etc/logstash/index-template.d/suricata-template.json" template_name => "suricata" template_overwrite => true } } }
6. 99_output-wazuh.conf
output { if [provider] == "Suricata" { syslog { host => "wazuh" protocol => "tcp" port => 514 codec => "json" sourcehost => "logstash" appname => "NORMALIZED" } #stdout { #codec => rubydebug #} } }
Wazuh配置
1. custom-syscheck.py
Wazuh Manager 新增预处理脚本对指定的安全事件进行预处理。如: syscheck事件增加srcip字段。
import json import sys import time import os from datetime import datetime, timedelta, timezone # ossec.conf configuration: ## # Global vars debug_enabled = False pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) json_alert = {} now = time.strftime("%a %b %d %H:%M:%S %Z %Y") wazuh_server = "192.168.199.97" # Set paths log_file = '{0}/logs/integrations.log'.format(pwd) syscheck_file = '{0}/logs/syscheck.json'.format(pwd) def iso8601(hours=8): td = timedelta(hours=hours) tz = timezone(td) return datetime.now(tz=tz).isoformat() def main(args): debug("# Starting") # Read args alert_file_location = args[1] debug("# File location") debug(alert_file_location) # Load alert. Parse JSON object. with open(alert_file_location) as alert_file: json_alert = json.load(alert_file) debug("# Processing alert") debug(json_alert) alert = normalized_data(json_alert) with open(syscheck_file, 'a') as f: msg = json.dumps(alert) f.write(msg + '\n') def debug(msg): if debug_enabled: msg = "{0}: {1}\n".format(now, msg) with open(log_file, "a") as f: f.write(msg) def normalized_data(alert): if alert['agent']['id'] == '000': alert['srcip'] = wazuh_server elif alert['agent'].get('ip'): alert['srcip'] = alert['agent']['ip'] alert['dstip'] = alert['agent']['ip'] alert['integration'] = 'custom-syscheck' alert['create_timestamp'] = iso8601() debug(alert) return(alert) if __name__ == "__main__": try: # Read arguments bad_arguments = False if len(sys.argv) >= 4: msg = '{0} {1} {2} {3} {4}'.format(now, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4] if len(sys.argv) > 4 else '') else: msg = '{0} Wrong arguments'.format(now) bad_arguments = True # Logging the call with open(log_file, 'a') as f: f.write(msg + '\n') if bad_arguments: debug("# Exiting: Bad arguments.") sys.exit(1) # Main function main(sys.argv) except Exception as e: debug(str(e)) raisecustom-syscheck #554 #syscheck #json #
2. ossec.conf
a. 配置sysloig选用 TCP 协议;
b. 加载预处理脚本;
c. 加载脚本输出日志;
syslog 514 tcp 192.168.199.0/24 custom-syscheck 554 syscheck json json /var/ossec/logs/syscheck.json
3. local_decoder_normalized.xml
NORMALIZED[-]: JSON_Decoder
4. 0901-local_raw.xml
a. 默认引用的解码器为json, 这里需要修改为刚才新增的nta_json;
b. 通过overwrite=yes覆盖原始规则;
nta_json \.+ \.+ Suricata messages. no_full_log
5. 0905-local_syscheck.xml
为预处理脚本生成的日志进行解析
json custom-syscheck syscheck integration messages. no_full_log
6. 9999-local_composite.xml
101000 187100 Phase 3: 检测到服务器:$(srcip), 被上传WebShell. no_full_log 88801 ids Phase 3: Alarm - Same ip Bypass WAF of within 600 seconds. $(srcip) -> $(http.hostname) -> $(alert.signature) -> $(alert.signature_id). no_full_log
1. 对于 WebShell 关联检测,目前采用的是同源IP以及时序的关联, 最为靠谱的应该是通过Hash的比对。这里要吐槽一下Suricata默认的fileinfo, 没办法自定义输出, 只要开启可被还原的协议都会输出fileinfo的事件。正因如此, 数据量一大Wazuh的引擎压力会很大。我尝试通过 Lua 来自定义一个文件审计类的事件, 貌似也同样没办法区分协议更别说针对http过滤条件进行自定义的过滤输出了。
2. 由于关联规则本身是通过底层多个安全事件进行维度的关联提升告警的 可靠性 。因此, 底层安全事件不够准确同样会让上层的关联规则带来大量误报。对于底层安全事件的优化也是需要持续进行的。
3. Wazuh v3.11.4 采用Syslog接收大日志时, 会触发memory violation导致ossec-remoted进程重启, 该问题已向社区反馈下个版本中会解决。
How to forward Android syslog to Wazuh
How to configure Rsyslog client to send events to Wazuh
How to integrate external software using Integrator
*本文原创作者:Shell.,本文属于FreeBuf原创奖励计划,未经许可禁止转载
以上所述就是小编给大家介绍的《Wazuh:如何对异构数据进行关联告警》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 我们 的支持!