Python案例如何实现邮件解析?

wen python案例 1

本文目录导读:

Python案例如何实现邮件解析?

  1. 基础邮件解析
  2. 获取邮件基本信息
  3. 解析邮件正文
  4. 提取邮件附件
  5. 完整邮件解析示例
  6. 处理不同编码的邮件
  7. 实战:批量解析邮件文件
  8. 注意事项

我将为您介绍Python实现邮件解析的完整方法,包括解析邮件文件和处理常见编码问题。

基础邮件解析

安装所需库

pip install email
# 或(Python 3.x自带,无需额外安装)

解析邮件文件(.eml)

import email
from email import policy
from email.parser import BytesParser
def parse_email_file(file_path):
    """解析.eml邮件文件"""
    with open(file_path, 'rb') as f:
        msg = BytesParser(policy=policy.default).parse(f)
    return msg
# 使用示例
msg = parse_email_file('example.eml')

获取邮件基本信息

def get_email_info(msg):
    """获取邮件基本信息"""
    info = {
        'from': msg['From'],
        'to': msg['To'],
        'cc': msg['CC'],
        'subject': msg['Subject'],
        'date': msg['Date'],
        'message_id': msg['Message-ID']
    }
    return info
# 使用示例
info = get_email_info(msg)
print(f"发件人: {info['from']}")
print(f"主题: {info['subject']}")

解析邮件正文

def get_email_body(msg):
    """获取邮件正文内容"""
    body = ""
    html_body = ""
    if msg.is_multipart():
        for part in msg.walk():
            content_type = part.get_content_type()
            content_disposition = str(part.get("Content-Disposition"))
            # 只处理非附件内容
            if "attachment" not in content_disposition:
                if content_type == "text/plain":
                    # 纯文本正文
                    charset = part.get_content_charset()
                    body = part.get_content(decode=True)
                    if charset:
                        body = body.decode(charset, errors='ignore')
                    else:
                        try:
                            body = body.decode('utf-8')
                        except:
                            body = body.decode('gbk', errors='ignore')
                elif content_type == "text/html":
                    # HTML正文
                    html_body = part.get_content(decode=True)
                    charset = part.get_content_charset()
                    if charset:
                        html_body = html_body.decode(charset, errors='ignore')
                    else:
                        try:
                            html_body = html_body.decode('utf-8')
                        except:
                            html_body = html_body.decode('gbk', errors='ignore')
    else:
        # 非多部分邮件
        body = msg.get_content()
    return body, html_body
# 使用示例
text_body, html_body = get_email_body(msg)
print(f"纯文本内容: {text_body[:200]}...")  # 显示前200字符

提取邮件附件

import os
def extract_attachments(msg, output_dir='attachments'):
    """提取邮件附件"""
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    attachments = []
    for part in msg.walk():
        content_disposition = str(part.get("Content-Disposition"))
        filename = part.get_filename()
        if filename and "attachment" in content_disposition:
            # 处理文件名编码
            decoded_filename = email.header.decode_header(filename)
            filename = decoded_filename[0][0] if decoded_filename[0][1] else filename
            # 如果是bytes类型,尝试解码
            if isinstance(filename, bytes):
                try:
                    filename = filename.decode('utf-8')
                except:
                    filename = filename.decode('gbk', errors='ignore')
            # 保存附件
            filepath = os.path.join(output_dir, filename)
            with open(filepath, 'wb') as f:
                f.write(part.get_payload(decode=True))
            attachments.append({
                'filename': filename,
                'path': filepath,
                'size': os.path.getsize(filepath)
            })
    return attachments
# 使用示例
attachments = extract_attachments(msg)
for att in attachments:
    print(f"附件: {att['filename']} ({att['size']} bytes)")

完整邮件解析示例

import email
from email import policy
from email.parser import BytesParser
import os
from email.header import decode_header
class EmailParser:
    """完整的邮件解析器"""
    def __init__(self, file_path):
        self.file_path = file_path
        self.msg = None
        self._parse()
    def _parse(self):
        """解析邮件文件"""
        with open(self.file_path, 'rb') as f:
            self.msg = BytesParser(policy=policy.default).parse(f)
    def decode_header_value(self, header_value):
        """解码邮件头中的中文字符"""
        if header_value is None:
            return ''
        decoded_parts = []
        for part, encoding in decode_header(header_value):
            if isinstance(part, bytes):
                if encoding:
                    try:
                        decoded_parts.append(part.decode(encoding))
                    except:
                        decoded_parts.append(part.decode('utf-8', errors='ignore'))
                else:
                    try:
                        decoded_parts.append(part.decode('utf-8'))
                    except:
                        decoded_parts.append(part.decode('gbk', errors='ignore'))
            else:
                decoded_parts.append(part)
        return ' '.join(decoded_parts)
    def get_headers(self):
        """获取所有邮件头信息"""
        headers = {}
        for key in ['From', 'To', 'CC', 'Subject', 'Date', 'Message-ID']:
            value = self.msg[key]
            if value:
                if key == 'Subject':
                    headers[key] = self.decode_header_value(value)
                else:
                    headers[key] = value
            else:
                headers[key] = ''
        return headers
    def get_body(self):
        """获取邮件正文"""
        text_body = ''
        html_body = ''
        if self.msg.is_multipart():
            for part in self.msg.walk():
                content_type = part.get_content_type()
                content_disposition = str(part.get("Content-Disposition"))
                if "attachment" not in content_disposition:
                    if content_type == "text/plain":
                        text_body = self._get_decoded_content(part)
                    elif content_type == "text/html":
                        html_body = self._get_decoded_content(part)
        else:
            text_body = self.msg.get_content()
        return text_body, html_body
    def _get_decoded_content(self, part):
        """解码内容"""
        content = part.get_payload(decode=True)
        charset = part.get_content_charset()
        if charset:
            try:
                return content.decode(charset)
            except:
                pass
        for encoding in ['utf-8', 'gbk', 'gb2312', 'big5']:
            try:
                return content.decode(encoding)
            except:
                continue
        return content.decode('utf-8', errors='ignore')
    def get_attachments(self):
        """获取附件列表"""
        attachments = []
        for part in self.msg.walk():
            filename = part.get_filename()
            content_disposition = str(part.get("Content-Disposition"))
            if filename and "attachment" in content_disposition:
                decoded_filename = self.decode_header_value(filename)
                attachments.append({
                    'filename': decoded_filename,
                    'data': part.get_payload(decode=True),
                    'content_type': part.get_content_type()
                })
        return attachments
    def get_all_info(self):
        """获取所有邮件信息"""
        headers = self.get_headers()
        text_body, html_body = self.get_body()
        attachments = self.get_attachments()
        return {
            'headers': headers,
            'text_body': text_body,
            'html_body': html_body,
            'attachments': attachments
        }
# 使用示例
def main():
    # 解析邮件
    parser = EmailParser('example.eml')
    # 获取所有信息
    info = parser.get_all_info()
    # 打印基本信息
    print("=== 邮件基本信息 ===")
    for key, value in info['headers'].items():
        print(f"{key}: {value}")
    # 打印正文
    print("\n=== 邮件正文 ===")
    if info['text_body']:
        print("纯文本内容:")
        print(info['text_body'][:500])  # 显示前500字符
    # 打印附件信息
    print("\n=== 附件 ===")
    for att in info['attachments']:
        print(f"附件: {att['filename']} ({len(att['data'])} bytes)")
if __name__ == "__main__":
    main()

处理不同编码的邮件

def handle_encoding_issues(msg):
    """处理各种编码问题"""
    result = {}
    # 处理主题
    if msg['Subject']:
        try:
            subject_parts = decode_header(msg['Subject'])
            subject = ''
            for part, charset in subject_parts:
                if charset:
                    subject += part.decode(charset)
                else:
                    subject += str(part)
            result['subject'] = subject
        except:
            result['subject'] = msg['Subject']
    # 处理发件人
    if msg['From']:
        try:
            from_parts = decode_header(msg['From'])
            from_addr = ''
            for part, charset in from_parts:
                if charset:
                    from_addr += part.decode(charset)
                else:
                    from_addr += part if isinstance(part, str) else part.decode('utf-8')
            result['from'] = from_addr
        except:
            result['from'] = msg['From']
    return result

实战:批量解析邮件文件

import glob
def batch_parse_emails(directory):
    """批量解析目录下的所有.eml文件"""
    results = []
    for eml_file in glob.glob(os.path.join(directory, '*.eml')):
        print(f"解析: {eml_file}")
        try:
            parser = EmailParser(eml_file)
            info = parser.get_all_info()
            results.append({
                'file': eml_file,
                'info': info
            })
        except Exception as e:
            print(f"解析失败 {eml_file}: {e}")
    return results
# 使用示例
# results = batch_parse_emails('./emails_directory')

注意事项

  1. 编码处理:中文字符需要特殊处理,使用decode_header解码
  2. 多部分邮件:使用walk()遍历所有部分
  3. 附件处理:注意附件的大小和类型
  4. 异常处理:解析过程中可能遇到各种编码错误
  5. 性能考虑:大型邮件文件可能需要流式处理

这个实现涵盖了大多数邮件解析场景,可以作为您邮件处理的基础框架。

抱歉,评论功能暂时关闭!