For those who still need it :) I've made a working solution with Python3 and PySocks:
# -*- coding: utf-8 -*-
import smtplib, socks, re, os, logging
from urllib.request import getproxies
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
# ============================================================= #
# global proxy config dictionary
PROXY = {"useproxy": True, "server": None, "port": None, "type": "HTTP", "username": None, "password": None}
# ============================================================= #
class Proxifier:
"""
Helper class to configure proxy settings. Exposes the `get_socket()` method that returns
a proxified connection (socket).
"""
def __init__(self, proxy_server=None, proxy_port=None, proxy_type='HTTP', proxy_username=None, proxy_password=None):
# proxy type: HTTP, SOCKS4 or SOCKS5 (default = HTTP)
self.proxy_type = {'HTTP': socks.HTTP, 'SOCKS4': socks.SOCKS4, 'SOCKS5': socks.SOCKS5}.get(proxy_type, socks.HTTP)
# proxy auth if required
self.proxy_username = proxy_username
self.proxy_password = proxy_password
# if host or port not set, attempt to retrieve from system
if not proxy_server or not proxy_port:
self._get_sysproxy()
else:
self.proxy_server = proxy_server
self.proxy_port = proxy_port
def _get_sysproxy(self, setvars=True):
"""
Retrieves system proxy settings from OS environment variables (HTTP_PROXY, HTTPS_PROXY etc.)
If `setvars` == `True`, sets the member variables as well.
"""
proxy_server, proxy_port, proxy_username, proxy_password = (None, None, None, None)
template = re.compile(r'^(((?P<user>[^:]+)(:(?P<pass>[^@]*)?))@)?(?P<host>[^:]+?)(:(?P<port>\d{1,5})?)$', re.I)
try:
sys_proxy = getproxies()
for p in sys_proxy:
if p.lower().startswith('http') or p.lower().startswith('socks'):
sp = sys_proxy[p].split('//')
sp = sp[1] if len(sp) > 1 else sp[0]
m = template.fullmatch(sp)
proxy_server = m.group('host') or None
try:
proxy_port = int(m.group('port')) or None
except:
pass
proxy_username = m.group('user') or None
proxy_password = m.group('pass') or None
break
except Exception as err:
logging.exception(err)
if setvars:
self.proxy_server = proxy_server or self.proxy_server
self.proxy_port = proxy_port or self.proxy_port
self.proxy_username = proxy_username or self.proxy_username
self.proxy_password = proxy_password or self.proxy_password
return (proxy_server, proxy_port)
def get_socket(self, source_address, host, port, timeout=None):
"""
Applies proxy settings to PySocks `create_connection()` method to
created a proxified connection (socket) which can be used by other
interfaces to establish connection.
"""
return socks.create_connection((host, port), timeout, source_address,
proxy_type=self.proxy_type, proxy_addr=self.proxy_server, proxy_port=self.proxy_port,
proxy_username=self.proxy_username, proxy_password=self.proxy_password)
@staticmethod
def get_proxifier(proxy=PROXY):
"""
Factory returns a `Proxifier` object given proxy settings in a dictionary.
"""
if not proxy or not proxy.get('useproxy', False):
return None
return Proxifier(proxy.get('server', None), proxy.get('port', None), proxy.get('type', None),
proxy.get('username', None), proxy.get('password', None))
# ============================================================= #
class SMTP_Proxy(smtplib.SMTP):
"""
Descendant of SMTP with optional proxy wrapping.
"""
def __init__(self, host='', port=0, local_hostname=None, timeout=object(), source_address=None,
proxifier: Proxifier=None):
# `Proxifier` object if proxy is required
self._proxifier = proxifier
super().__init__(host, port, local_hostname, timeout, source_address)
def _get_socket(self, host, port, timeout):
"""
Overridden method of base class to allow for proxified connection.
"""
if not self._proxifier:
# no proxy: use base class implementation
return super()._get_socket(host, port, timeout)
if timeout is not None and not timeout:
raise ValueError('Non-blocking socket (timeout=0) is not supported')
if self.debuglevel > 0:
self._print_debug('connect: to', (host, port), self.source_address)
# proxy: use proxifier connection
return self._proxifier.get_socket(self.source_address, host, port, timeout)
# ============================================================= #
class SMTP_SSL_Proxy(smtplib.SMTP_SSL):
"""
Descendant of SMTP_SSL with optional proxy wrapping.
"""
def __init__(self, host='', port=0, local_hostname=None, keyfile=None, certfile=None, timeout=object(), source_address=None, context=None,
proxifier: Proxifier=None):
# `Proxifier` object if proxy is required
self._proxifier = proxifier
super().__init__(host, port, local_hostname, keyfile, certfile, timeout, source_address, context)
def _get_socket(self, host, port, timeout):
"""
Overridden method of base class to allow for proxified connection.
"""
if not self._proxifier:
# no proxy: use base class implementation
return super()._get_socket(host, port, timeout)
if timeout is not None and not timeout:
raise ValueError('Non-blocking socket (timeout=0) is not supported')
if self.debuglevel > 0:
self._print_debug('connect: to', (host, port), self.source_address)
# proxy: use proxifier connection
newsocket = self._proxifier.get_socket(self.source_address, host, port, timeout)
return self.context.wrap_socket(newsocket, server_hostname=self._host)
# ============================================================= #
def send_email(body, subject, sender, receivers, smtp, proxy=PROXY, sender_name='Appname', attachments=None):
"""
Sends email with optional attachments and proxy settings.
"""
is_ssl = smtp['protocol'].upper() == 'SSL'
smtp_class = SMTP_SSL_Proxy if is_ssl else SMTP_Proxy
try:
msg = MIMEMultipart()
msg['Subject'] = subject
msg['To'] = ', '.join(receivers)
# msg['Bcc'] = ', '.join(receivers)
msg['From'] = f'{sender_name} <{sender}>' if sender_name else sender
msg.attach(MIMEText(body))
if attachments:
for filepath in attachments:
bname = os.path.basename(filepath)
try:
with open(filepath, 'rb') as file_:
part = MIMEApplication(file_.read(), Name=bname)
part['Content-Disposition'] = f'attachment; filename="{bname}"'
msg.attach(part)
except:
continue
with smtp_class(smtp['server'], smtp['port'], proxifier=Proxifier.get_proxifier(proxy)) as emailer:
emailer.login(smtp['login'], smtp['password'])
if not is_ssl:
emailer.starttls()
emailer.sendmail(sender, receivers, msg.as_string())
logging.debug(f"--- Email sent to: {receivers}")
except smtplib.SMTPException as smtp_err:
logging.exception(smtp_err)
except Exception as err:
logging.exception(err)