热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

查找恶意的TOR中继节点

        众所周知,TOR是一种能够保护世界各地不同中继节点之间通信调度数据包的“软件”,由志愿者建立的网络进行运营。在过去的几年中,TOR已经被物理和网络攻击者广泛用于掩盖恶意操作。TOR也被

http://p8.qhimg.com/t01321d0c64ac56fd8e.jpg

        众所周知,TOR是一种能够保护世界各地不同中继节点之间通信调度数据包的“软件”,由志愿者建立的网络进行运营。在过去的几年中,TOR已经被物理和网络攻击者广泛用于掩盖恶意操作。TOR也被视为最主要的进入暗网的方法,在暗网中,“恶意”的人可以通过黑市非法买卖东西。

        网络里每个中继节点都能够根据自己的配置状态,决定是作为一个ExitPoint(在以下图片中用最后一台接触“Bob”的机器表示)或者只是一个中继节点(在下图中,TOR节点用“绿十字”突出显示)。如果中继节点决定成为一个ExitNode,就会向公众公开自己的IP地址,这通常是一个引起当地警方警觉的好办法。

http://p7.qhimg.com/t01cee0385d08698c9c.png

        过去一年中,电视节目,广播电台,youtube频道,Facebook这一类大众媒体披露出,许多黑市故意将人们引向DarkWeb,从而将他们暴露在许多新的攻击场景下。事实上新的攻击者会为了监视他们的通信流而设置退出节点或继电器节点。这种攻击可能会在许多单一的方式下发生,但最常用的今天写的这三个:

        1.DNS中毒:这个技术主要在于将与知名网站有关的DNS调用重定向到一个假的页面上, 这个假的页面包含开发工具包,可以使用户的浏览器妥协。

        2.文件修补:这个技术是改变请求的文件,在其到达目的地之前在其中添加恶意内容.这些步骤会在发给原始请求者之前直接在ExitPoint /中继节点上进行。

        3.替换证明(SSL – MITM):这个技术是用假的证书替换网站真实的证书,这样就可以解密通信流,拦截凭证和参数。

        致力于网络安全就意味着要及时意识到这样的攻击的存在,也要能够决定将在何时通过TOR中继节点。请注意,TOR不是DarkWeb中唯一的匿名网络.

        我的目标是弄清楚我的TOR流通过恶意中继节点的时间。出于这样一个原因,我决定写一个能够对DNS Poison,文件修补和SSL-MITM做出一些快速检查的python脚本。这个脚本已经存在了2年, 直到现在仍是保密状态。自从科学研究应用了我的FindMalExit.py的高级版本之后,我决定公开这个脚本。

 

        想法

        实际上这是一个非常简单的想法:“让我们在不通过TOR网络(或通过可靠电路)的情况下拿到证书,IP地址和文件,然后重复这个过程,直到通过所有可用的中继节点。比较结果并进行检查。”

 

        实现

          以下请查收我的代码。请记住这是一个不是最终的代码 (这段代码只是一个更大的项目中的第一个版本,现在由Yoroi维护)。我决定发布HTML格式的代码。


#!/usr/bin/env python2
#========================================================================#
#               THIS IS NOT A PRODUCTION RELEASED SOFTWARE               #
#========================================================================#
# Purpose of finMaliciousRelayPoints is to proof the way it's possible to#
# discover TOR malicious Relays Points. Please do not use it in          #
# any production  environment                                            #
# Author: Marco Ramilli                                                  #
# eMail: XXXXXXXX                                                        #
# WebSite: marcoramilli.blogspot.com                                     #
# Use it at your own                                                     #
#========================================================================#
#==============================Disclaimer: ==============================#
#THIS SOFTWARE IS PROVIDED BY THE AUTHOR "AS IS" AND ANY EXPRESS OR      #
#IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED          #
#WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE  #
#DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,      #
#INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES      #
#(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR      #
#SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)      #
#HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,     #
#STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING   #
#IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE      #
#POSSIBILITY OF SUCH DAMAGE.                                             #
#========================================================================#
#-------------------------------------------------------------------------
#------------------- GENERAL SECTION -------------------------------------
#-------------------------------------------------------------------------
import StringIO
import tempfile
import time
import hashlib
import traceback
from   geoip         import  geolite2
import stem.control
TRUSTED_HOP_FINGERPRINT = '379FB450010D17078B3766C2273303C358C3A442' 
#trusted hop
SOCKS_PORT              = 9050
CONNECTION_TIMEOUT      = 30  # timeout before we give up on a circuit
#-------------------------------------------------------------------------
#---------------- File Patching Section ----------------------------------
#-------------------------------------------------------------------------
import pycurl
check_files               = {
                             "http://live.sysinternals.com/psexec.exe",
                             "http://live.sysinternals.com/psexec.exe",
                             "http://live.sysinternals.com/psping.exe", }
check_files_patch_results = []
class File_Check_Results:
    """
    Analysis Results against File Patching
    """
    def __init__(self, url, filen, filepath, exitnode, found_hash):
        self.url           = url
        self.filename      = filen
        self.filepath      = filepath
        self.exitnode      = exitnode
        self.filehash      = found_hash
#------------------------------------------------------------------------
#------------------- DNS Poison Section ---------------------------------
#------------------------------------------------------------------------
import dns.resolver
import socks
import socket
check_domain_poison_results = []
domains                     = {
                                 "www.youporn.com",
                                 "youporn.com",
                                 "www.torproject.org",
                                 "www.wikileaks.org",
                                 "www.i2p2.de",
                                 "torrentfreak.com",
                                 "blockchain.info",
}
class Domain_Poison_Check:
    """
    Analysis Results against Domain Poison
    """
    def __init__(self, domain):
        self.domain  = domain
        self.address = []
        self.path    = []
    def pushAddr(self, add):
        self.address.append(add)
    def pushPath(self, path):
        self.path = path
#-----------------------------------------------------------------------
#------------------- SSL Sltrip Section --------------------------------
#-----------------------------------------------------------------------
import OpenSSL
import ssl
check_ssl_strip_results   = []
ssl_strip_monitored_urls = {
                            "www.google.com",
                            "www.microsoft.com",
                            "www.apple.com",
                            "www.bbc.com",
}
class SSL_Strip_Check:
    """
    Analysis Result against SSL Strip
    """
    def __init__(self, domain, public_key, serial_number):
        self.domain        = domain
        self.public_key    = public_key
        self.serial_number = serial_number
#----------------------------------------------------------------------
#----------------     Starting Coding   -------------------------------
#----------------------------------------------------------------------
def sslCheckOriginal():
    """
    Download the original Certificate without TOR connection
    """
    print('[+] Populating SSL for later check')
    for url in ssl_strip_monitored_urls:
        try:
            cert = ssl.get_server_certificate((str(url), 443))
            x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
            p_k  = x509.get_pubkey()
            s_n  = x509.get_serial_number()
            print('[+] Acquired Certificate: %s' % url)
            print('    |_________> serial_number %s' % s_n)
            print('    |_________> public_key %s' % p_k)
            check_ssl_strip_results.append(SSL_Strip_Check(url, p_k, s_n))
        except Exception as err:
            print('[-] Error While Acquiring certificats on setup phase !')
            traceback.print_exc()
    return time.time()
def fileCheckOriginal():
    """
    Downloading file ORIGINAL without TOR
    """
    print('[+] Populating File Hasing for later check')
    for url in check_files:
        try:
            data = query(url)
            file_name = url.split("/")[-1]
            _,tmp_file = tempfile.mkstemp(prefix="exitmap_%s_" % file_name)
            with open(tmp_file, "wb") as fd:
                fd.write(data)
                print('[+] Saving File  "%s".' % tmp_file)
                check_files_patch_results.append( File_Check_Results(url, file_name, tmp_file, "NO", sha512_file(tmp_file)) )
                print('[+] First Time we see the file..')
                print('    |_________> exitnode : None'       )
                print('    |_________> :url:  %s' % str(url)     )
                print('    |_________> :filePath:  %s' % str(tmp_file))
                print('    |_________> :file Hash: %s' % str(sha512_file(tmp_file)))
        except Exception as err:
                print('[-] Error ! %s' % err)
                traceback.print_exc()
                pass
    return time.time()
def resolveOriginalDomains():
    """
        Resolving DNS For original purposes
    """
    print('[+] Populating Domain Name Resolution for later check ')
    try:
        for domain in domains:
            response = dns.resolver.query(domain)
            d = Domain_Poison_Check(domain)
            print('[+] Domain: %s' % domain)
            for record in response:
                print(' |____> maps to %s.' % (record.address))
                d.pushAddr(record)
            check_domain_poison_results.append(d)
        return time.time()
    except Exception as err:
        print('[+] Exception: %s' % err)
        traceback.print_exc()
        return time.time()
def query(url):
  """
  Uses pycurl to fetch a site using the proxy on the SOCKS_PORT.
  """
  output = StringIO.StringIO()
  query = pycurl.Curl()
  query.setopt(pycurl.URL, url)
  query.setopt(pycurl.PROXY, 'localhost')
  query.setopt(pycurl.PROXYPORT, SOCKS_PORT)
  query.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS5_HOSTNAME)
  query.setopt(pycurl.CONNECTTIMEOUT, CONNECTION_TIMEOUT)
  query.setopt(pycurl.WRITEFUNCTION, output.write)
  try:
    query.perform()
    return output.getvalue()
  except pycurl.error as exc:
    raise ValueError("Unable to reach %s (%s)" % (url, exc))
def scan(controller, path):
  """
  Scan Tor Relays Point to find File Patching
  """
  def attach_stream(stream):
    if stream.status == 'NEW':
      try:
        controller.attach_stream(stream.id, circuit_id)
        #print('[+] New Circuit id (%s) attached and ready to be used!' % circuit_id)
      except Exception as err:
        controller.remove_event_listener(attach_stream)
        controller.reset_conf('__LeaveStreamsUnattached')
  try:
    print('[+] Creating a New TOR circuit based on path: %s' % path)
    circuit_id = controller.new_circuit(path, await_build = True)
    controller.add_event_listener(attach_stream, stem.control.EventType.STREAM)
    controller.set_conf('__LeaveStreamsUnattached', '1')  # leave stream management to us
    start_time = time.time()
    socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, "127.0.0.1", 9050)
    socket.socket = socks.socket
    ip = query('http://ip.42.pl/raw')
    if ip is not None:
        country  = geolite2.lookup( str(ip) ).country
        print('n n')
        print('[+] Performing FilePatch,  DNS Spoofing and Certificate Checking
              passing through --> %s (%s) n n' % (str(ip), str(country))  )
    time_FileCheck = fileCheck(path)
    print('[+] FileCheck took: %0.2f seconds'  % ( time_FileCheck - start_time))
    #time_CertsCheck  = certsCheck(path)
    #print('[+] CertsCheck took: %0.2f seconds' % ( time_DNSCheck - start_time))
    time_DNSCheck  = dnsCheck(path)
    print('[+] DNSCheck took: %0.2f seconds'   % ( time_DNSCheck - start_time))
  except Exception as  err:
    print('[-] Circuit creation error: %s' % path)
  return time.time() - start_time
def certsCheck(path):
    """
    SSL Strip detection
    TODO: It's still a weak control. Need to collect and to compare public_key()
    """
    print('[+] Checking Certificates')
    try:
        socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, "127.0.0.1", 9050)
        socket.socket = socks.socket
        for url in ssl_strip_monitored_urls:
            cert = ssl.get_server_certificate((str(url), 443))
            x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
            p_k  = x509.get_pubkey()
            s_n  = x509.get_serial_number()
            for stored_cert in check_ssl_strip_results:
                if str(url) == str(stored_cert.domain):
                    if str(stored_cert.serial_number) != str(s_n):
                        print('[+] ALERT Found SSL Strip on uri (%s) through path %s ' % (url, path))
                        break
                    else:
                        print('[+] Certificate Check seems to be OK for %s' % url)
    except Exception as err:
        print('[-] Error: %s' % err)
        traceback.print_exc()
    socket.close()
    return time.time()
def dnsCheck(path):
    """
    DNS Poisoning Check
    """
    try:
        socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, "127.0.0.1", 9050)
        socket.socket = socks.socket
        print('[+] Checking DNS ')
        for domain in domains:
            ipv4 = socket.gethostbyname(domain)
            for p_d in check_domain_poison_results:
                if str(p_d.domain) == str(domain):
                    found = False
                    for d_ip in p_d.address:
                        if str(ipv4) == str(d_ip):
                            found = True
                            break
                    if found == False:
                        print('[+] ALERT:DNS SPOOFING FOUND: name: %s ip: %s  (path: %s )' % (domain, ipv4, path) )
                    else:
                        print('[+] Check DNS (%s) seems to be OK' % domain)
    except Exception as err:
        print('[-] Error: %s' % err)
        traceback.print_exc()
    socket.close()
    return time.time()
def fileCheck(path):
    """
    Downloading file through TOR circuits doing the hashing
    """
    print('[+] Checking For File patching ')
    for url in check_files:
        try:
            #File Rereive
            data = query(url)
            file_name = url.split("/")[-1]
            _,tmp_file = tempfile.mkstemp(prefix="exitmap_%s_" % file_name)
            with open(tmp_file, "wb") as fd:
                fd.write(data)
                for i in check_files_patch_results:
                    if str(i.url) == str(url):
                        if str(i.filehash) != str(sha512_file(tmp_file)):
                            print('[+] ALERT File Patch FOUND !')
                            print('    | exitnode : %s' % str(i.exitnode)      )
                            print('    |_________> url: %s' % str(i.url)        )
                            print('    |_________> filePath: %s' % str(i.filepath)   )
                            print('    |_________> fileHash: %s' % str(i.filehash)   )
                            #check_files_patch_results.append( File_Check_Results(url, file_name, tmp_file, path, sha512_file(tmp_file)) )
                        else :
                            print('[+] File (%s) seems to be ok' % i.url)
                        break
        except Exception as err:
                print('[-] Error ! %s' % err)
                traceback.print_exc()
                pass
    return time.time()
def sha512_file(file_name):
    """
    Calculate SHA512 over the given file.
    """
    hash_func = hashlib.sha256()
    with open(file_name, "rb") as fd:
        hash_func.update(fd.read())
    return hash_func.hexdigest()
if __name__ == '__main__':
    start_analysis = time.time()
    print("""
  |=====================================================================|
  | Find Malicious Relay Nodes is a python script made for checking 3   |
  | unique kind of frauds such as:                                      |
  | (1) File Patching                                                   |
  | (2) DNS Poison                                                      |
  | (3) SSL Stripping (MITM SSL)                                        |
  |=====================================================================|
         """)
    print("""
  |=====================================================================|
  |                 Initialization Phase                                |
  |=====================================================================|
       """)
    dns_setup_time             = resolveOriginalDomains()
    print('[+] DNS Setup Finished: %0.2f' % (dns_setup_time - start_analysis))
    file_check_original_time   = fileCheckOriginal()
    print('[+] File Setup Finished: %0.2f' % (file_check_original_time - start_analysis))
    ssl_checking_original_time = sslCheckOriginal()
    print('[+] Acquiring Certificates  Setup Finished: %0.2f' % (ssl_checking_original_time - start_analysis))
    print("""
  |=====================================================================|
  |                 Analysis  Phase                                     |
  |=====================================================================|
          """)
    print('[+] Connecting and Fetching possible Relays ...')
    with stem.control.Controller.from_port() as controller:
      controller.authenticate()
      net_status = controller.get_network_statuses()
      for descriptor in net_status:
        try:
          fingerprint = descriptor.fingerprint
          print('[+] Selecting a New Exit Point:')
          print('[+] |_________> FingerPrint: %s ' % fingerprint)
          print('[+] |_________> Flags: %s ' % descriptor.flags)
          print('[+] |_________> Exit_Policies: %s ' % descriptor.exit_policy)
          if 'EXIT' in (flag.upper() for flag in descriptor.flags):
              print('[+] Found Exit Point. Performing Scan through EXIT: %s' % fingerprint)
              if None == descriptor.exit_policy:
                  print('[+] No Exit Policies found ... no certs checking')
                  time_taken = scan(controller, [TRUSTED_HOP_FINGERPRINT, fingerprint])
          else:
              #print('[+] Not Exit Point found. Using it as Relay passing to TRUST Exit Point')
              pass
              #time_taken = scan(controller, [fingerprint, TRUSTED_HOP_FINGERPRINT])
          #print('[+] Finished Analysis for %s finished  => %0.2f seconds' % (fingerprint, time_taken))
        except Exception as exc:
            print('[-] Exception on  FingerPrint: %s => %s' % (fingerprint, exc))
            traceback.print_exc()
            pass


推荐阅读
  • 在Linux系统中,网络配置是至关重要的任务之一。本文详细解析了Firewalld和Netfilter机制,并探讨了iptables的应用。通过使用`ip addr show`命令来查看网卡IP地址(需要安装`iproute`包),当网卡未分配IP地址或处于关闭状态时,可以通过`ip link set`命令进行配置和激活。此外,文章还介绍了如何利用Firewalld和iptables实现网络流量控制和安全策略管理,为系统管理员提供了实用的操作指南。 ... [详细]
  • 本文深入探讨了 hCalendar 微格式在事件与时间、地点相关活动标记中的应用。作为微格式系列文章的第四篇,前文已分别介绍了 rel 属性用于定义链接关系、XFN 微格式增强链接的人际关系描述以及 hCard 微格式对个人和组织信息的描述。本次将重点解析 hCalendar 如何通过结构化数据标记,提高事件信息的可读性和互操作性。 ... [详细]
  • 本文详细介绍了 InfluxDB、collectd 和 Grafana 的安装与配置流程。首先,按照启动顺序依次安装并配置 InfluxDB、collectd 和 Grafana。InfluxDB 作为时序数据库,用于存储时间序列数据;collectd 负责数据的采集与传输;Grafana 则用于数据的可视化展示。文中提供了 collectd 的官方文档链接,便于用户参考和进一步了解其配置选项。通过本指南,读者可以轻松搭建一个高效的数据监控系统。 ... [详细]
  • 在CentOS 7环境中安装配置Redis及使用Redis Desktop Manager连接时的注意事项与技巧
    在 CentOS 7 环境中安装和配置 Redis 时,需要注意一些关键步骤和最佳实践。本文详细介绍了从安装 Redis 到配置其基本参数的全过程,并提供了使用 Redis Desktop Manager 连接 Redis 服务器的技巧和注意事项。此外,还探讨了如何优化性能和确保数据安全,帮助用户在生产环境中高效地管理和使用 Redis。 ... [详细]
  • 本文是Java并发编程系列的开篇之作,将详细解析Java 1.5及以上版本中提供的并发工具。文章假设读者已经具备同步和易失性关键字的基本知识,重点介绍信号量机制的内部工作原理及其在实际开发中的应用。 ... [详细]
  • 您的数据库配置是否安全?DBSAT工具助您一臂之力!
    本文探讨了Oracle提供的免费工具DBSAT,该工具能够有效协助用户检测和优化数据库配置的安全性。通过全面的分析和报告,DBSAT帮助用户识别潜在的安全漏洞,并提供针对性的改进建议,确保数据库系统的稳定性和安全性。 ... [详细]
  • 浏览器作为我们日常不可或缺的软件工具,其背后的运作机制却鲜为人知。本文将深入探讨浏览器内核及其版本的演变历程,帮助读者更好地理解这一关键技术组件,揭示其内部运作的奥秘。 ... [详细]
  • 在HTML布局中,即使将 `top: 0%` 和 `left: 0%` 设置为元素的定位属性,浏览器中仍然会出现空白填充。这个问题通常与默认的浏览器样式、盒模型或父元素的定位方式有关。为了消除这些空白,可以考虑重置浏览器的默认样式,确保父元素的定位方式正确,并检查是否有其他CSS规则影响了元素的位置。 ... [详细]
  • 该大学网站采用PHP和MySQL技术,在校内可免费访问某些外部收费资料数据库。为了方便学生校外访问,建议通过学校账号登录实现免费访问。具体方案可包括利用学校服务器作为代理,结合身份验证机制,确保合法用户在校外也能享受免费资源。 ... [详细]
  • 在Ubuntu系统中安装Android SDK的详细步骤及解决“Failed to fetch URL https://dlssl.google.com/”错误的方法
    在Ubuntu 11.10 x64系统中安装Android SDK的详细步骤,包括配置环境变量和解决“Failed to fetch URL https://dlssl.google.com/”错误的方法。本文详细介绍了如何在该系统上顺利安装并配置Android SDK,确保开发环境的稳定性和高效性。此外,还提供了解决网络连接问题的实用技巧,帮助用户克服常见的安装障碍。 ... [详细]
  • 本文详细介绍了在Linux系统上编译安装MySQL 5.5源码的步骤。首先,通过Yum安装必要的依赖软件包,如GCC、GCC-C++等,确保编译环境的完备。接着,下载并解压MySQL 5.5的源码包,配置编译选项,进行编译和安装。最后,完成安装后,进行基本的配置和启动测试,确保MySQL服务正常运行。 ... [详细]
  • Presto:高效即席查询引擎的深度解析与应用
    本文深入解析了Presto这一高效的即席查询引擎,详细探讨了其架构设计及其优缺点。Presto通过内存到内存的数据处理方式,显著提升了查询性能,相比传统的MapReduce查询,不仅减少了数据传输的延迟,还提高了查询的准确性和效率。然而,Presto在大规模数据处理和容错机制方面仍存在一定的局限性。本文还介绍了Presto在实际应用中的多种场景,展示了其在大数据分析领域的强大潜力。 ... [详细]
  • TypeScript 实战分享:Google 工程师深度解析 TypeScript 开发经验与心得
    TypeScript 实战分享:Google 工程师深度解析 TypeScript 开发经验与心得 ... [详细]
  • 本文详细探讨了几种常用的Java后端开发框架组合及其具体应用场景。通过对比分析Spring Boot、MyBatis、Hibernate等框架的特点和优势,结合实际项目需求,为开发者提供了选择合适框架组合的参考依据。同时,文章还介绍了这些框架在微服务架构中的应用,帮助读者更好地理解和运用这些技术。 ... [详细]
  • 基于Linux系统的Kickstart自动化服务器部署方案
    本文针对企业需求,提出了一种基于Linux系统的Kickstart自动化服务器部署方案。该方案旨在通过无盘批量安装操作系统,提高企业IT基础设施的部署效率。Kickstart是一种利用Anaconda工具实现服务器自动化安装的技术,能够显著简化和加速操作系统的安装过程。通过详细的实施规划,本文介绍了Kickstart的工作原理及其在实际部署中的应用,为企业提供了高效的自动化部署解决方案。 ... [详细]
author-avatar
kyijhx
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有