I have struggled a lot to get my setup working, below is an example of working solution using modern mitmproxy module, that addresses following issues:
- Allows running of multiple mitmproxy instances on different ports, which is not possible with threads, multiprocessing only! (Reference)
- Allows specifying of upstream proxy that can be changed on the fly from the main process using shared value through multiprocessing manager
- Addresses authentication for upstream proxy, if needed
class ProxyAddOn:
def __init__(self, proxy_url: ValueProxy[str], hosts_to_proxy: list[str] | None = None):
self.proxy_url = proxy_url
self.hosts_to_proxy = hosts_to_proxy
self.proxy_parsed_url: urllib.parse.ParseResult | None = None
self.proxy_server_spec: mitm_server_spec.ServerSpec | None = None
def process_proxy_change(self):
if self.proxy_parsed_url and self.proxy_url.value == self.proxy_parsed_url.geturl():
return
logging.getLogger("mitmproxy").warning(
f"Changing proxy from {self.proxy_parsed_url.geturl() if self.proxy_parsed_url else 'None'} "
f"to {self.proxy_url.value}")
self.proxy_parsed_url = urllib.parse.urlparse(self.proxy_url.value)
server_spec_url = f"{self.proxy_parsed_url.scheme}://{self.proxy_parsed_url.hostname}"
if self.proxy_parsed_url.port:
server_spec_url += f":{self.proxy_parsed_url.port}"
self.proxy_server_spec = mitm_server_spec.parse(server_spec_url, None)
def http_connect_upstream(self, flow: http.HTTPFlow):
if self.proxy_parsed_url and (self.proxy_parsed_url.username or self.proxy_parsed_url.password):
credentials = f"{self.proxy_parsed_url.username}:{self.proxy_parsed_url.password}"
encoded_credentials = base64.b64encode(credentials.encode()).decode()
flow.request.headers["proxy-authorization"] = f"Basic {encoded_credentials}"
def request(self, flow: http.HTTPFlow):
if self.proxy_url.value:
self.process_proxy_change()
should_not_proxy = (
not self.proxy_server_spec or
not self.hosts_to_proxy or
all(key not in flow.request.pretty_host for key in self.hosts_to_proxy)
)
if should_not_proxy:
logging.getLogger("mitmproxy").warning(f"Skipping proxy for {flow.request.pretty_url}")
return
logging.getLogger("mitmproxy").warning(f"Proxying {flow.request.pretty_url} to {self.proxy_parsed_url.geturl()}")
has_proxy_changed = self.proxy_server_spec != flow.server_conn.via
server_connection_already_open = flow.server_conn.timestamp_start is not None
if has_proxy_changed and server_connection_already_open:
# server_conn already refers to an existing connection (which cannot be modified),
# so we need to replace it with a new server connection object.
# pylint: disable=no-value-for-parameter
flow.server_conn = mitmproxy.connection.Server(address=flow.server_conn.address)
# pylint: enable=no-value-for-parameter
flow.server_conn.via = mitm_server_spec.ServerSpec(self.proxy_server_spec)
class SubprocessedMitmProxy(mp.Process):
def __init__(self, *, listen_port: int, proxy_url: ValueProxy[str], hosts_to_proxy: list[str] | None = None):
super().__init__()
self.listen_port = listen_port
self.proxy_url = proxy_url
self.hosts_to_proxy = hosts_to_proxy
def run(self):
logger = logging.getLogger("mitmproxy")
logger.warning(f"Starting mitmproxy on port {self.listen_port}")
asyncio.run(self.asyncio_run())
async def asyncio_run(self):
opts = options.Options(listen_port=self.listen_port)
master = DumpMaster(opts, with_termlog=False, with_dumper=False)
master.addons.add(ProxyAddOn(self.proxy_url, self.hosts_to_proxy))
await master.run()