Using mitmproxy inside python script
Asked Answered
A

5

12

I new in mitmproxy. But I can't figure out how to used that in python script.

I want to put mitmproxy into my python script just like a library and also specific everything like port or host and do some modify with Request or Response in my python script. So when I start my script like this

python sample.py

Everything will run automatic without run mitmproxy from commandline like this

mitmproxy -s sample.py

Thanks for reading.

Airtoair answered 17/8, 2018 at 10:51 Comment(0)
P
13

You can use something like this. This code was taken from an issue posted on the mithproxy github found here

from mitmproxy import proxy, options
from mitmproxy.tools.dump import DumpMaster
from mitmproxy.addons import core


class AddHeader:
    def __init__(self):
        self.num = 0

    def response(self, flow):
        self.num = self.num + 1
        print(self.num)
        flow.response.headers["count"] = str(self.num)


addons = [
    AddHeader()
]

opts = options.Options(listen_host='127.0.0.1', listen_port=8080)
pconf = proxy.config.ProxyConfig(opts)

m = DumpMaster(None)
m.server = proxy.server.ProxyServer(pconf)
# print(m.addons)
m.addons.add(addons)
print(m.addons)
# m.addons.add(core.Core())

try:
    m.run()
except KeyboardInterrupt:
    m.shutdown()
Proline answered 1/9, 2018 at 16:42 Comment(3)
Thanks for your example! Only for me the AddHeader addon never got triggered, because if you .add() the array, you're essentially adding another array to the current 'addons' array, which will result in the AddHeader() addon not being read. Eitherway for other people experiencing the same issue; just do do this: m.addons.add(AddHeader()) instead of m.addons.add(addons) – Ellga
I think you can use list.extend python method. – Disputant
@Ellga Thanks very much, the answer should be modified with your addition. It wasn't working for me either until I did what you wrote. – Christner
T
5

Start mitmproxy in the background programmatically to integrate it into an existing app:

from mitmproxy.options import Options
from mitmproxy.proxy.config import ProxyConfig
from mitmproxy.proxy.server import ProxyServer
from mitmproxy.tools.dump import DumpMaster

import threading
import asyncio
import time

class Addon(object):
    def __init__(self):
        self.num = 1

    def request(self, flow):
        flow.request.headers["count"] = str(self.num)

    def response(self, flow):
        self.num = self.num + 1
        flow.response.headers["count"] = str(self.num)
        print(self.num)


# see source mitmproxy/master.py for details
def loop_in_thread(loop, m):
    asyncio.set_event_loop(loop)  # This is the key.
    m.run_loop(loop.run_forever)


if __name__ == "__main__":
    options = Options(listen_host='0.0.0.0', listen_port=8080, http2=True)
    m = DumpMaster(options, with_termlog=False, with_dumper=False)
    config = ProxyConfig(options)
    m.server = ProxyServer(config)
    m.addons.add(Addon())

    # run mitmproxy in backgroud, especially integrated with other server
    loop = asyncio.get_event_loop()
    t = threading.Thread( target=loop_in_thread, args=(loop,m) )
    t.start()

    # Other servers, such as a web server, might be started then.
    time.sleep(20)
    print('going to shutdown mitmproxy')
    m.shutdown()

from my gist

Torpedoman answered 14/7, 2020 at 17:16 Comment(6)
generally speaking, I would remove the extra thread wrapping the event loop + sleep(20) since most cases would not require that and it's confusing 😁 – Mexicali
in this case how do you implement the flow request interception? e.g. def request(flow: http.HTTPFlow) -> None: print(flow.request.headers) – Conjoint
how would I send an option to the Addon? I've already got the options defined in my load() method on my object, and can fill them with the --set command line argument, but not sure how to do that in a python script. – Perianth
@Perianth see github.com/mitmproxy/mitmproxy/blob/v5.3.0/examples/addons/… – Torpedoman
@Sully, thanks but that's actually still running from a script. I'm trying to run in inside a python script. I just passed my variables to the init and set them to a object level var and it worked. Thanks though! – Perianth
Hi @Iceberg, I want to be able to interrupt or restart mitmproxy in the thread. I used m.shutdown() to interrupt mitmproxy, and then I got an error when restarting the thread: RuntimeError: Event loop is closed This means that when I use asyncio.get_event_loop(), I get the same loop, so when I execute shutdown() to close the loop, I can’t start it again. I tried to use asyncio.new_event_loop() to get a new loop, but this seems to conflict with the loop inside mitmproxy. – Dossier
J
0
from mitmproxy.tools.main import mitmdump

mitmdump(args=["-s", "myaddon.py"])
Jehial answered 14/6, 2023 at 4:25 Comment(0)
C
0
  • Mitmproxy works with asyncio now.
  • Your addon has to be inserted in its right place among the builtin addons.
  • Some options are only available when all builtin addons are initialized.
import asyncio
import threading
from typing import Any, Callable, Self

from mitmproxy import http
from mitmproxy.addons import default_addons, script
from mitmproxy.master import Master
from mitmproxy.options import Options


class Addon:
    def __init__(self) -> None:
        self.n_reponse = 0

    def response(self, flow: http.HTTPFlow) -> None:
        if flow.response:
            self.n_reponse += 1
            print(f"reponse {self.n_reponse}")


class ThreadedMitmProxy(threading.Thread):
    def __init__(self, user_addon: Callable, **options: Any) -> None:
        self.loop = asyncio.get_event_loop()
        self.master = Master(Options(), event_loop=self.loop)
        # replace the ScriptLoader with the user addon
        self.master.addons.add(
            *(
                user_addon() if isinstance(addon, script.ScriptLoader) else addon
                for addon in default_addons()
            )
        )
        # set the options after the addons since some options depend on addons
        self.master.options.update(**options)
        super().__init__()

    def run(self) -> None:
        self.loop.run_until_complete(self.master.run())

    def __enter__(self) -> Self:
        self.start()
        return self

    def __exit__(self, *_) -> None:
        self.master.shutdown()
        self.join()


if __name__ == "__main__":
    with ThreadedMitmProxy(Addon, listen_host="127.0.0.1", listen_port=8080):
        # Other stuff might be started then
        input("hit <Enter> to quit")
        print("shutdown mitmproxy")
Cochise answered 21/3 at 0:46 Comment(0)
R
0

I have struggled a lot to get my setup working, below is an example of working solution using modern mitmproxy module, that addresses following issues:

  • Allows running of multiple mitmproxy instances on different ports, which is not possible with threads, multiprocessing only! (Reference)
  • Allows specifying of upstream proxy that can be changed on the fly from the main process using shared value through multiprocessing manager
  • Addresses authentication for upstream proxy, if needed
class ProxyAddOn:
    def __init__(self, proxy_url: ValueProxy[str], hosts_to_proxy: list[str] | None = None):
        self.proxy_url = proxy_url
        self.hosts_to_proxy = hosts_to_proxy
        self.proxy_parsed_url: urllib.parse.ParseResult | None = None
        self.proxy_server_spec: mitm_server_spec.ServerSpec | None = None

    def process_proxy_change(self):
        if self.proxy_parsed_url and self.proxy_url.value == self.proxy_parsed_url.geturl():
            return

        logging.getLogger("mitmproxy").warning(
            f"Changing proxy from {self.proxy_parsed_url.geturl() if self.proxy_parsed_url else 'None'} "
            f"to {self.proxy_url.value}")

        self.proxy_parsed_url = urllib.parse.urlparse(self.proxy_url.value)

        server_spec_url = f"{self.proxy_parsed_url.scheme}://{self.proxy_parsed_url.hostname}"
        if self.proxy_parsed_url.port:
            server_spec_url += f":{self.proxy_parsed_url.port}"

        self.proxy_server_spec = mitm_server_spec.parse(server_spec_url, None)

    def http_connect_upstream(self, flow: http.HTTPFlow):
        if self.proxy_parsed_url and (self.proxy_parsed_url.username or self.proxy_parsed_url.password):
            credentials = f"{self.proxy_parsed_url.username}:{self.proxy_parsed_url.password}"
            encoded_credentials = base64.b64encode(credentials.encode()).decode()
            flow.request.headers["proxy-authorization"] = f"Basic {encoded_credentials}"

    def request(self, flow: http.HTTPFlow):
        if self.proxy_url.value:
            self.process_proxy_change()

        should_not_proxy = (
            not self.proxy_server_spec or
            not self.hosts_to_proxy or
            all(key not in flow.request.pretty_host for key in self.hosts_to_proxy)
        )

        if should_not_proxy:
            logging.getLogger("mitmproxy").warning(f"Skipping proxy for {flow.request.pretty_url}")
            return

        logging.getLogger("mitmproxy").warning(f"Proxying {flow.request.pretty_url} to {self.proxy_parsed_url.geturl()}")

        has_proxy_changed = self.proxy_server_spec != flow.server_conn.via
        server_connection_already_open = flow.server_conn.timestamp_start is not None

        if has_proxy_changed and server_connection_already_open:
            # server_conn already refers to an existing connection (which cannot be modified),
            # so we need to replace it with a new server connection object.
            # pylint: disable=no-value-for-parameter
            flow.server_conn = mitmproxy.connection.Server(address=flow.server_conn.address)
            # pylint: enable=no-value-for-parameter

        flow.server_conn.via = mitm_server_spec.ServerSpec(self.proxy_server_spec)


class SubprocessedMitmProxy(mp.Process):
    def __init__(self, *, listen_port: int, proxy_url: ValueProxy[str], hosts_to_proxy: list[str] | None = None):
        super().__init__()
        self.listen_port = listen_port
        self.proxy_url = proxy_url
        self.hosts_to_proxy = hosts_to_proxy

    def run(self):
        logger = logging.getLogger("mitmproxy")
        logger.warning(f"Starting mitmproxy on port {self.listen_port}")
        asyncio.run(self.asyncio_run())

    async def asyncio_run(self):
        opts = options.Options(listen_port=self.listen_port)
        master = DumpMaster(opts, with_termlog=False, with_dumper=False)
        master.addons.add(ProxyAddOn(self.proxy_url, self.hosts_to_proxy))

        await master.run()
Regulable answered 28/3 at 19:59 Comment(0)

© 2022 - 2024 β€” McMap. All rights reserved.