import_emails.py
9.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""Import emails from a mailman storage to the django database."""
import os
import re
import sys
import mailbox
from optparse import make_option
from django.db import transaction
from django.template.defaultfilters import slugify
from django.core.management.base import BaseCommand, CommandError
from super_archives.models import MailingList, Message, Thread, EmailAddress
from super_archives.management.commands.message import Message as CustomMessage
class Command(BaseCommand, object):
"""Get emails from mailman archives and import them in the django db. """
help = __doc__
default_archives_path = '/var/lib/mailman/archives/private'
RE_SUBJECT_CLEAN = re.compile('((re|res|fw|fwd|en|enc):)|\[.*?\]',
re.IGNORECASE)
THREAD_CACHE = {}
EMAIL_ADDR_CACHE = {}
# A new command line option to get the dump file to parse.
option_list = BaseCommand.option_list + (
make_option('--archives_path',
dest='archives_path',
help='Path of email archives to be imported. (default: %s)' %
default_archives_path,
default=default_archives_path),
make_option('--exclude-list',
dest='exclude_lists',
help=("Mailing list that won't be imported. It can be used many"
"times for more than one list."),
action='append',
default=None),
make_option('--all',
dest='all',
help='Import all messages (default: False)',
action="store_true",
default=False),
)
def __init__(self, *args, **kwargs):
super(Command, self).__init__(*args, **kwargs)
def log(self, msg, error=False):
"""Log message helper."""
output = self.stdout
if error:
output = self.stderr
output.write(msg)
output.write('\n')
def parse_emails(self, email_filename, index=0):
"""Generator function that parse and extract emails from the file
`email_filename` starting from the position `index`.
Yield: An instance of `mailbox.mboxMessage` for each email in the
file.
"""
self.log("Parsing email dump: %s." % email_filename)
mbox = mailbox.mbox(email_filename, factory=CustomMessage)
# Get each email from mbox file
#
# The following implementation was used because the object
# mbox does not support slicing. Converting the object to a
# tuple (as represented in the code down here) was a valid
# option but its performance was too poor.
#
#for message in tuple(mbox)[index:]:
# yield message
#
key = index
while mbox.has_key(key):
key += 1
yield key-1, mbox[key-1]
def get_emails(self, mailinglist_dir, all, exclude_lists):
"""Generator function that get the emails from each mailing
list dump dirctory. If `all` is set to True all the emails in the
mbox will be imported if not it will just resume from the last
message previously imported. The lists set in `exclude_lists`
won't be imported.
Yield: A tuple in the form: (mailing list name, email message).
"""
self.log("Getting emails dumps from: %s" % mailinglist_dir)
# Get the list of directories ending with .mbox
mailing_lists_mboxes = (mbox for mbox in os.listdir(mailinglist_dir)
if mbox.endswith('.mbox'))
# Get messages from each mbox
for mbox in mailing_lists_mboxes:
mbox_path = os.path.join(mailinglist_dir, mbox, mbox)
mailinglist_name = mbox.split('.')[0]
# Check if the mailinglist is set not to be imported
if exclude_lists and mailinglist_name in exclude_lists:
continue
# Find the index of the last imported message
if all:
n_msgs = 0
else:
try:
mailinglist = MailingList.objects.get(
name=mailinglist_name)
n_msgs = mailinglist.last_imported_index
except MailingList.DoesNotExist:
n_msgs = 0
for index, msg in self.parse_emails(mbox_path, n_msgs):
yield mailinglist_name, msg, index
def get_thread(self, email, mailinglist):
"""Group messages by thread looking for similar subjects"""
subject_slug = slugify(email.subject_clean)
thread = self.THREAD_CACHE.get(subject_slug, {}).get(mailinglist.id)
if thread is None:
thread = Thread.objects.get_or_create(
mailinglist=mailinglist,
subject_token=subject_slug
)[0]
if self.THREAD_CACHE.get(subject_slug) is None:
self.THREAD_CACHE[subject_slug] = dict()
self.THREAD_CACHE[subject_slug][mailinglist.id] = thread
thread.latest_message = email
thread.save()
return thread
def save_email(self, list_name, email_msg, index):
"""Save email message into the database."""
# Update last imported message into the DB
mailinglist, created = MailingList.objects.get_or_create(name=list_name)
mailinglist.last_imported_index = index
if created:
# if the mailinglist is newly created it's sure that the message
# is not in the DB yet.
self.create_email(mailinglist, email_msg)
else:
# If the message is already at the database don't do anything
try:
messages = Message.objects.get(
message_id=email_msg.get('Message-ID'),
thread__mailinglist=mailinglist
)
except Message.DoesNotExist:
self.create_email(mailinglist, email_msg)
mailinglist.save()
def create_email(self, mailinglist, email_msg):
real_name, from_ = email_msg.get_from_addr()
email_addr = self.EMAIL_ADDR_CACHE.get(from_)
if email_addr is None:
email_addr = EmailAddress.objects.get_or_create(
address=from_)[0]
self.EMAIL_ADDR_CACHE[from_] = email_addr
if not email_addr.real_name and real_name:
email_addr.real_name = real_name[:64]
email_addr.save()
subject = email_msg.get_subject()
email = Message.objects.create(
message_id=email_msg.get('Message-ID'),
from_address=email_addr,
subject=subject,
subject_clean=self.RE_SUBJECT_CLEAN.sub('', subject).strip(),
body=email_msg.get_body(),
received_time=email_msg.get_received_datetime(),
)
email.thread = self.get_thread(email, mailinglist)
email.save()
@transaction.commit_manually
def import_emails(self, archives_path, all, exclude_lists=None):
"""Get emails from the filesystem from the `archives_path`
and store them into the database. If `all` is set to True all
the filesystem storage will be imported otherwise the
importation will resume from the last message previously
imported. The lists set in `exclude_lists` won't be imported.
"""
count = 0
email_generator = self.get_emails(archives_path, all, exclude_lists)
for mailinglist_name, msg, index in email_generator:
try:
self.save_email(mailinglist_name, msg, index)
except:
# This anti-pattern is needed to avoid the transations to
# get stuck in case of errors.
transaction.rollback()
raise
count += 1
if count % 1000 == 0:
transaction.commit()
transaction.commit()
def handle(self, *args, **options):
"""Main command method."""
lock_file = '/var/lock/colab/import_emails.lock'
# Already running, so quit
if os.path.exists(lock_file):
self.log(("This script is already running. (If your are sure it's "
"not please delete the lock file in %s')") % lock_file)
sys.exit(0)
if not os.path.exists(os.path.dirname(lock_file)):
os.mkdir(os.path.dirname(lock_file), 0755)
run_lock = file(lock_file, 'w')
run_lock.close()
archives_path = options.get('archives_path')
self.log('Using archives_path `%s`' % self.default_archives_path)
if not os.path.exists(archives_path):
raise CommandError('archives_path (%s) does not exist' %
archives_path)
self.import_emails(archives_path,
options.get('all'), options.get('exclude_lists'))
os.remove(lock_file)