本文目录导读:

我将为您介绍Python实现邮件解析的完整方法,包括解析邮件文件和处理常见编码问题。
基础邮件解析
安装所需库
pip install email # 或(Python 3.x自带,无需额外安装)
解析邮件文件(.eml)
import email
from email import policy
from email.parser import BytesParser
def parse_email_file(file_path):
"""解析.eml邮件文件"""
with open(file_path, 'rb') as f:
msg = BytesParser(policy=policy.default).parse(f)
return msg
# 使用示例
msg = parse_email_file('example.eml')
获取邮件基本信息
def get_email_info(msg):
"""获取邮件基本信息"""
info = {
'from': msg['From'],
'to': msg['To'],
'cc': msg['CC'],
'subject': msg['Subject'],
'date': msg['Date'],
'message_id': msg['Message-ID']
}
return info
# 使用示例
info = get_email_info(msg)
print(f"发件人: {info['from']}")
print(f"主题: {info['subject']}")
解析邮件正文
def get_email_body(msg):
"""获取邮件正文内容"""
body = ""
html_body = ""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition"))
# 只处理非附件内容
if "attachment" not in content_disposition:
if content_type == "text/plain":
# 纯文本正文
charset = part.get_content_charset()
body = part.get_content(decode=True)
if charset:
body = body.decode(charset, errors='ignore')
else:
try:
body = body.decode('utf-8')
except:
body = body.decode('gbk', errors='ignore')
elif content_type == "text/html":
# HTML正文
html_body = part.get_content(decode=True)
charset = part.get_content_charset()
if charset:
html_body = html_body.decode(charset, errors='ignore')
else:
try:
html_body = html_body.decode('utf-8')
except:
html_body = html_body.decode('gbk', errors='ignore')
else:
# 非多部分邮件
body = msg.get_content()
return body, html_body
# 使用示例
text_body, html_body = get_email_body(msg)
print(f"纯文本内容: {text_body[:200]}...") # 显示前200字符
提取邮件附件
import os
def extract_attachments(msg, output_dir='attachments'):
"""提取邮件附件"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
attachments = []
for part in msg.walk():
content_disposition = str(part.get("Content-Disposition"))
filename = part.get_filename()
if filename and "attachment" in content_disposition:
# 处理文件名编码
decoded_filename = email.header.decode_header(filename)
filename = decoded_filename[0][0] if decoded_filename[0][1] else filename
# 如果是bytes类型,尝试解码
if isinstance(filename, bytes):
try:
filename = filename.decode('utf-8')
except:
filename = filename.decode('gbk', errors='ignore')
# 保存附件
filepath = os.path.join(output_dir, filename)
with open(filepath, 'wb') as f:
f.write(part.get_payload(decode=True))
attachments.append({
'filename': filename,
'path': filepath,
'size': os.path.getsize(filepath)
})
return attachments
# 使用示例
attachments = extract_attachments(msg)
for att in attachments:
print(f"附件: {att['filename']} ({att['size']} bytes)")
完整邮件解析示例
import email
from email import policy
from email.parser import BytesParser
import os
from email.header import decode_header
class EmailParser:
"""完整的邮件解析器"""
def __init__(self, file_path):
self.file_path = file_path
self.msg = None
self._parse()
def _parse(self):
"""解析邮件文件"""
with open(self.file_path, 'rb') as f:
self.msg = BytesParser(policy=policy.default).parse(f)
def decode_header_value(self, header_value):
"""解码邮件头中的中文字符"""
if header_value is None:
return ''
decoded_parts = []
for part, encoding in decode_header(header_value):
if isinstance(part, bytes):
if encoding:
try:
decoded_parts.append(part.decode(encoding))
except:
decoded_parts.append(part.decode('utf-8', errors='ignore'))
else:
try:
decoded_parts.append(part.decode('utf-8'))
except:
decoded_parts.append(part.decode('gbk', errors='ignore'))
else:
decoded_parts.append(part)
return ' '.join(decoded_parts)
def get_headers(self):
"""获取所有邮件头信息"""
headers = {}
for key in ['From', 'To', 'CC', 'Subject', 'Date', 'Message-ID']:
value = self.msg[key]
if value:
if key == 'Subject':
headers[key] = self.decode_header_value(value)
else:
headers[key] = value
else:
headers[key] = ''
return headers
def get_body(self):
"""获取邮件正文"""
text_body = ''
html_body = ''
if self.msg.is_multipart():
for part in self.msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition"))
if "attachment" not in content_disposition:
if content_type == "text/plain":
text_body = self._get_decoded_content(part)
elif content_type == "text/html":
html_body = self._get_decoded_content(part)
else:
text_body = self.msg.get_content()
return text_body, html_body
def _get_decoded_content(self, part):
"""解码内容"""
content = part.get_payload(decode=True)
charset = part.get_content_charset()
if charset:
try:
return content.decode(charset)
except:
pass
for encoding in ['utf-8', 'gbk', 'gb2312', 'big5']:
try:
return content.decode(encoding)
except:
continue
return content.decode('utf-8', errors='ignore')
def get_attachments(self):
"""获取附件列表"""
attachments = []
for part in self.msg.walk():
filename = part.get_filename()
content_disposition = str(part.get("Content-Disposition"))
if filename and "attachment" in content_disposition:
decoded_filename = self.decode_header_value(filename)
attachments.append({
'filename': decoded_filename,
'data': part.get_payload(decode=True),
'content_type': part.get_content_type()
})
return attachments
def get_all_info(self):
"""获取所有邮件信息"""
headers = self.get_headers()
text_body, html_body = self.get_body()
attachments = self.get_attachments()
return {
'headers': headers,
'text_body': text_body,
'html_body': html_body,
'attachments': attachments
}
# 使用示例
def main():
# 解析邮件
parser = EmailParser('example.eml')
# 获取所有信息
info = parser.get_all_info()
# 打印基本信息
print("=== 邮件基本信息 ===")
for key, value in info['headers'].items():
print(f"{key}: {value}")
# 打印正文
print("\n=== 邮件正文 ===")
if info['text_body']:
print("纯文本内容:")
print(info['text_body'][:500]) # 显示前500字符
# 打印附件信息
print("\n=== 附件 ===")
for att in info['attachments']:
print(f"附件: {att['filename']} ({len(att['data'])} bytes)")
if __name__ == "__main__":
main()
处理不同编码的邮件
def handle_encoding_issues(msg):
"""处理各种编码问题"""
result = {}
# 处理主题
if msg['Subject']:
try:
subject_parts = decode_header(msg['Subject'])
subject = ''
for part, charset in subject_parts:
if charset:
subject += part.decode(charset)
else:
subject += str(part)
result['subject'] = subject
except:
result['subject'] = msg['Subject']
# 处理发件人
if msg['From']:
try:
from_parts = decode_header(msg['From'])
from_addr = ''
for part, charset in from_parts:
if charset:
from_addr += part.decode(charset)
else:
from_addr += part if isinstance(part, str) else part.decode('utf-8')
result['from'] = from_addr
except:
result['from'] = msg['From']
return result
实战:批量解析邮件文件
import glob
def batch_parse_emails(directory):
"""批量解析目录下的所有.eml文件"""
results = []
for eml_file in glob.glob(os.path.join(directory, '*.eml')):
print(f"解析: {eml_file}")
try:
parser = EmailParser(eml_file)
info = parser.get_all_info()
results.append({
'file': eml_file,
'info': info
})
except Exception as e:
print(f"解析失败 {eml_file}: {e}")
return results
# 使用示例
# results = batch_parse_emails('./emails_directory')
注意事项
- 编码处理:中文字符需要特殊处理,使用
decode_header解码 - 多部分邮件:使用
walk()遍历所有部分 - 附件处理:注意附件的大小和类型
- 异常处理:解析过程中可能遇到各种编码错误
- 性能考虑:大型邮件文件可能需要流式处理
这个实现涵盖了大多数邮件解析场景,可以作为您邮件处理的基础框架。