热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Wazuh:如何对异构数据进行关联告警

这并不是什么高精尖的架构与技术。这只是我个人在工作中结合目前手头的资源进行了一些整合 。当然实现这些需求的方法有很多,有钱的可以考虑Splunk,没钱的有研发的团队的可以上Flink、Esper 。由于攻防对抗的升级,通过单一的数据源很难直接断言攻击是否成功。因此,我们需要结合多个数据源进行安全事件的关联, 从中提炼出可靠性较高的安全告警进行人工排查。例如:针对WebShell上传类的,可以通过虽

写在前面

这并不是什么高精尖的架构与技术。这只是我个人在工作中结合目前手头的资源进行了一些整合 。当然实现这些需求的方法有很多, 有钱的可以考虑Splunk, 没钱的有研发的团队的可以上Flink、Esper 。 

需求

由于攻防对抗的升级, 通过单一的数据源很难直接断言攻击是否成功。因此, 我们需要结合多个数据源进行安全事件的关联, 从中提炼出可靠性较高的安全告警进行人工排查。例如: 针对WebShell上传类的, 可以通过 网络流量 + 终端 进行关联; 针对Web攻击类, 可以通过 WAF + NIDS 的事件关联, 得到Bypass WAF 的安全告警。

解决思路

虽然Wazuh本身具备安全事件的关联能力, 但在传统的部署架构中, 通常是由Wazuh Agent将安全事件发送到Wazuh  M anager , 通过Manager进行安全事件的关联告警。由于缺少了对数据进行ETL, 使得Wazuh Manager很难对异构数据进行关联。因此, 我们需要 通过Logstash实现对数据的标准化, 并将标准化后的数据通过Syslog的形式发送到Wazuh Manager, 从而进行异构数据的关联。

坑点:

1. 本次改造采用了Syslog的形式将数据发送到Wazuh Manager端进行数据关联。由于Syslog 默认采用了UDP协议进行数据传输, 当数据发送过大时将会导致数据截断的报错。针对此问题, 需改用TCP的形式进行数据发送的方式。

2. Wazuh Manager 部分告警缺少” 必要 “关联字段的现象。如: Wazuh syscheck告警事件, 默认不会携带srcip的字段, 对于此问题可以通过在Manager上编写一个预处理脚本来解决。

改造

改造前: Suricata (Wazuh Agent) —(Agent: UDP 1514)—> Wazuh Manager

改造后: 所有的标准化都由Logstash来进行, Filebeat只需要做’无脑‘转发即可。

workflow

Wazuh:如何对异构数据进行关联告警

Filebeat配置

1. filebeat.yaml

#=========================== Filebeat inputs =============================
filebeat.inputs:
- type: log
  paths:
    - "/var/log/suricata/alert-*.json"
  fields_under_root: true
  fields: { application: suricata }
  json.keys_under_root: true
  json.overwrite_keys: true
  json.message_key: log
  tail_files: false
  scan_frequency: 1s
  harvester_buffer_size: 104857600
  backoff: 1s
  max_backoff: 10s
  close_timeout: 30m
  close_inactive: 10m
  clean_inactive: 72h
  ignore_older: 70h
  registry_file: /etc/filebeat/registry/wazuh/
#================================ Processors ==================================
processors:
- drop_fields:
    fields: ["ecs.version", "agent.ephemeral_id", "agent.version", "agent.type", "agent.id", "agent.ephemeral_id", "input.type"]
#================================ Outputs =====================================
output.logstash:
  hosts: ["logstash:5010"]
  loadbalance: true
  worker: 4
  compression_level: 3
  bulk_max_size: 4096

Logstash配置

1. 00_input.conf

input {
  beats {
    port => 5010
    codec => "json_lines"
    tags => ["beats"]
  }
}

2. 50_suricata.conf

filter {
  if [application] == "suricata" {
    date {
      match => [ "timestamp", "ISO8601" ]
      target => "timestamp"
    }
  }
}

3. mapping.json

{
    "common_mapping": {
        "src_ip": "srcip",
        "dest_ip": "dstip",
        "src_port": "srcport",
        "dest_port": "dstport"
    }
}

4. 70_normalized-suricata.conf

filter {
  clone {
    clOnes=> [ "siem_events" ]
  }
}
filter {
  if [type] == "siem_events" {
    mutate {
      remove_field => [ "application", "type", "agent", "@version", "@timestamp"]
      add_field => {
        "provider" => "Suricata"
        "product" => "Intrusion Detection System"
      }
    }
    ruby {
      init => "
        require 'json'
        mapping_json = File.read('/etc/logstash/mappings/wazuh/mapping.json')
        mapping = JSON.parse(mapping_json)
        @common_mapping = mapping['common_mapping']
      "
      code => "
        keys = event.to_hash.keys
        keys.each do |key|
            if @common_mapping.include? key then
                value = event.get(key)
                event.remove(key)
                new_key = @common_mapping[key]
                event.set(new_key, value)
            end
        end
        sensor = event.get('[host][name]')
        event.set('sensor', sensor)
      "
    }
  }
}

5. 99_output-elasticsearch.conf

output {
  if [event_type] == "alert" {
    elasticsearch {
      cacert => "/etc/logstash/certs/ca/ca.crt"
      user => "elastic"
      password => "Hello World!"
      hosts => ["https://elasticsearch:9200"]
      index => "suricata-%{+YYYY.MM.dd}"
      template => "/etc/logstash/index-template.d/suricata-template.json"
      template_name => "suricata"
      template_overwrite => true
    }
  }
}

6. 99_output-wazuh.conf

output {
  if [provider] == "Suricata" {
    syslog {
        host => "wazuh"
        protocol => "tcp"
        port => 514
        codec => "json"
        sourcehost => "logstash"
        appname => "NORMALIZED"
    }
    #stdout {
        #codec => rubydebug
    #}
  }
}

Wazuh配置

1. custom-syscheck.py

Wazuh Manager 新增预处理脚本对指定的安全事件进行预处理。如: syscheck事件增加srcip字段。

import json
import sys
import time
import os
from datetime import datetime, timedelta, timezone

# ossec.conf configuration:
#  
#      custom-syscheck
#      554
#      syscheck
#      json
#  

# Global vars
debug_enabled = False
pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
json_alert = {}
now = time.strftime("%a %b %d %H:%M:%S %Z %Y")
wazuh_server = "192.168.199.97"

# Set paths
log_file = '{0}/logs/integrations.log'.format(pwd)
syscheck_file = '{0}/logs/syscheck.json'.format(pwd)

def iso8601(hours=8):
    td = timedelta(hours=hours)
    tz = timezone(td)
    return datetime.now(tz=tz).isoformat()

def main(args):
    debug("# Starting")

    # Read args
    alert_file_location = args[1]

    debug("# File location")
    debug(alert_file_location)

    # Load alert. Parse JSON object.
    with open(alert_file_location) as alert_file:
        json_alert = json.load(alert_file)
    debug("# Processing alert")
    debug(json_alert)

    alert = normalized_data(json_alert)
    with open(syscheck_file, 'a') as f:
        msg = json.dumps(alert)
        f.write(msg + '\n')

def debug(msg):
    if debug_enabled:
        msg = "{0}: {1}\n".format(now, msg)
        with open(log_file, "a") as f:
            f.write(msg)

def normalized_data(alert):
    if alert['agent']['id'] == '000':
        alert['srcip'] = wazuh_server
    elif alert['agent'].get('ip'):
        alert['srcip'] = alert['agent']['ip']
        alert['dstip'] = alert['agent']['ip']
    alert['integration'] = 'custom-syscheck'
    alert['create_timestamp'] = iso8601()
    debug(alert)
    return(alert)

if __name__ == "__main__":
    try:
        # Read arguments
        bad_arguments = False
        if len(sys.argv) >= 4:
            msg = '{0} {1} {2} {3} {4}'.format(now, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4] if len(sys.argv) > 4 else '')
        else:
            msg = '{0} Wrong arguments'.format(now)
            bad_arguments = True

        # Logging the call
        with open(log_file, 'a') as f:
            f.write(msg + '\n')

        if bad_arguments:
            debug("# Exiting: Bad arguments.")
            sys.exit(1)

        # Main function
        main(sys.argv)

    except Exception as e:
        debug(str(e))
        raise

2. ossec.conf

a. 配置sysloig选用 TCP 协议;

b. 加载预处理脚本;

c. 加载脚本输出日志;


  
    syslog
    514
    tcp 
    192.168.199.0/24
  
  
  
  
    custom-syscheck
    554
    syscheck
    json
  
  
  
  
    json
    /var/ossec/logs/syscheck.json
  

3. local_decoder_normalized.xml



    NORMALIZED[-]: 
    JSON_Decoder

4. 0901-local_raw.xml

a. 默认引用的解码器为json, 这里需要修改为刚才新增的nta_json;

b. 通过overwrite=yes覆盖原始规则;



    
    
    

    
        nta_json
        \.+
        \.+
        Suricata messages.
        no_full_log
    

5. 0905-local_syscheck.xml

为预处理脚本生成的日志进行解析


    
        json
        custom-syscheck
        syscheck integration messages.
        no_full_log
    

6. 9999-local_composite.xml



    

    
        101000 
        187100
        	
        Phase 3: 检测到服务器:$(srcip), 被上传WebShell.
        no_full_log
    

    
        88801 
        ids
        
        Phase 3: Alarm - Same ip Bypass WAF of within 600 seconds. $(srcip) -> $(http.hostname) -> $(alert.signature) -> $(alert.signature_id).
        no_full_log
    
  

总结

1. 对于 WebShell 关联检测,目前采用的是同源IP以及时序的关联, 最为靠谱的应该是通过Hash的比对。这里要吐槽一下Suricata默认的fileinfo, 没办法自定义输出, 只要开启可被还原的协议都会输出fileinfo的事件。正因如此, 数据量一大Wazuh的引擎压力会很大。我尝试通过 Lua 来自定义一个文件审计类的事件, 貌似也同样没办法区分协议更别说针对http过滤条件进行自定义的过滤输出了。

2. 由于关联规则本身是通过底层多个安全事件进行维度的关联提升告警的 可靠性 。因此, 底层安全事件不够准确同样会让上层的关联规则带来大量误报。对于底层安全事件的优化也是需要持续进行的。

3. Wazuh v3.11.4 采用Syslog接收大日志时, 会触发memory violation导致ossec-remoted进程重启, 该问题已向社区反馈下个版本中会解决。

参考

How to forward Android syslog to Wazuh

How to configure Rsyslog client to send events to Wazuh

How to integrate external software using Integrator

*本文原创作者:Shell.,本文属于FreeBuf原创奖励计划,未经许可禁止转载


以上所述就是小编给大家介绍的《Wazuh:如何对异构数据进行关联告警》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 我们 的支持!


推荐阅读
  • 兆芯X86 CPU架构的演进与现状(国产CPU系列)
    本文详细介绍了兆芯X86 CPU架构的发展历程,从公司成立背景到关键技术授权,再到具体芯片架构的演进,全面解析了兆芯在国产CPU领域的贡献与挑战。 ... [详细]
  • 用阿里云的免费 SSL 证书让网站从 HTTP 换成 HTTPS
    HTTP协议是不加密传输数据的,也就是用户跟你的网站之间传递数据有可能在途中被截获,破解传递的真实内容,所以使用不加密的HTTP的网站是不 ... [详细]
  • 在JavaWeb开发中,文件上传是一个常见的需求。无论是通过表单还是其他方式上传文件,都必须使用POST请求。前端部分通常采用HTML表单来实现文件选择和提交功能。后端则利用Apache Commons FileUpload库来处理上传的文件,该库提供了强大的文件解析和存储能力,能够高效地处理各种文件类型。此外,为了提高系统的安全性和稳定性,还需要对上传文件的大小、格式等进行严格的校验和限制。 ... [详细]
  • PTArchiver工作原理详解与应用分析
    PTArchiver工作原理及其应用分析本文详细解析了PTArchiver的工作机制,探讨了其在数据归档和管理中的应用。PTArchiver通过高效的压缩算法和灵活的存储策略,实现了对大规模数据的高效管理和长期保存。文章还介绍了其在企业级数据备份、历史数据迁移等场景中的实际应用案例,为用户提供了实用的操作建议和技术支持。 ... [详细]
  • 在对WordPress Duplicator插件0.4.4版本的安全评估中,发现其存在跨站脚本(XSS)攻击漏洞。此漏洞可能被利用进行恶意操作,建议用户及时更新至最新版本以确保系统安全。测试方法仅限于安全研究和教学目的,使用时需自行承担风险。漏洞编号:HTB23162。 ... [详细]
  • 本文深入解析了WCF Binding模型中的绑定元素,详细介绍了信道、信道管理器、信道监听器和信道工厂的概念与作用。从对象创建的角度来看,信道管理器负责信道的生成。具体而言,客户端的信道通过信道工厂进行实例化,而服务端则通过信道监听器来接收请求。文章还探讨了这些组件之间的交互机制及其在WCF通信中的重要性。 ... [详细]
  • 本文详细介绍了 Java 网站开发的相关资源和步骤,包括常用网站、开发环境和框架选择。 ... [详细]
  • 一、Tomcat安装后本身提供了一个server,端口配置默认是8080,对应目录为:..\Tomcat8.0\webapps二、Tomcat8.0配置多个端口,其实也就是给T ... [详细]
  • HTTP(HyperTextTransferProtocol)是超文本传输协议的缩写,它用于传送www方式的数据。HTTP协议采用了请求响应模型。客服端向服务器发送一 ... [详细]
  • 为什么多数程序员难以成为架构师?
    探讨80%的程序员为何难以晋升为架构师,涉及技术深度、经验积累和综合能力等方面。本文将详细解析Tomcat的配置和服务组件,帮助读者理解其内部机制。 ... [详细]
  • DVWA学习笔记系列:深入理解CSRF攻击机制
    DVWA学习笔记系列:深入理解CSRF攻击机制 ... [详细]
  • 如何将TS文件转换为M3U8直播流:HLS与M3U8格式详解
    在视频传输领域,MP4虽然常见,但在直播场景中直接使用MP4格式存在诸多问题。例如,MP4文件的头部信息(如ftyp、moov)较大,导致初始加载时间较长,影响用户体验。相比之下,HLS(HTTP Live Streaming)协议及其M3U8格式更具优势。HLS通过将视频切分成多个小片段,并生成一个M3U8播放列表文件,实现低延迟和高稳定性。本文详细介绍了如何将TS文件转换为M3U8直播流,包括技术原理和具体操作步骤,帮助读者更好地理解和应用这一技术。 ... [详细]
  • Java Socket 关键参数详解与优化建议
    Java Socket 的 API 虽然被广泛使用,但其关键参数的用途却鲜为人知。本文详细解析了 Java Socket 中的重要参数,如 backlog 参数,它用于控制服务器等待连接请求的队列长度。此外,还探讨了其他参数如 SO_TIMEOUT、SO_REUSEADDR 等的配置方法及其对性能的影响,并提供了优化建议,帮助开发者提升网络通信的稳定性和效率。 ... [详细]
  • 深入剖析Java中SimpleDateFormat在多线程环境下的潜在风险与解决方案
    深入剖析Java中SimpleDateFormat在多线程环境下的潜在风险与解决方案 ... [详细]
  • 在Android平台中,播放音频的采样率通常固定为44.1kHz,而录音的采样率则固定为8kHz。为了确保音频设备的正常工作,底层驱动必须预先设定这些固定的采样率。当上层应用提供的采样率与这些预设值不匹配时,需要通过重采样(resample)技术来调整采样率,以保证音频数据的正确处理和传输。本文将详细探讨FFMpeg在音频处理中的基础理论及重采样技术的应用。 ... [详细]
author-avatar
dajiang
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有