_dirmon.py
11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
###############################################################################
# Name: _dirmon.py #
# Purpose: Directory monitor object. #
# Author: Cody Precord <cprecord@editra.org> #
# Copyright: (c) 2011 Cody Precord <staff@editra.org> #
# Licence: wxWindows Licence #
###############################################################################
"""
Editra Business Model Library: DirectoryMonitor
"""
__author__ = "Cody Precord <cprecord@editra.org>"
__cvsid__ = "$Id: _dirmon.py 73166 2012-12-12 04:31:53Z CJP $"
__revision__ = "$Revision: 73166 $"
__all__ = ['DirectoryMonitor',]
#-----------------------------------------------------------------------------#
# Imports
import wx
import os
import time
import threading
# Local imports
import fileutil
#-----------------------------------------------------------------------------#
class DirectoryMonitor(object):
"""Object to manage monitoring file system changes"""
def __init__(self, checkFreq=1000.0):
"""@keyword checkFreq: check frequency in milliseconds"""
super(DirectoryMonitor, self).__init__()
# Attributes
self._watcher = WatcherThread(self._ThreadNotifier, checkFreq=checkFreq)
self._callbacks = list()
self._cbackLock = threading.Lock()
self._running = False
def __del__(self):
if self._running:
self._watcher.Shutdown()
self._watcher.join()
def _ThreadNotifier(self, added, deleted, modified):
"""Notifier callback from background L{WatcherThread}
to call notifiers on main thread.
@note: this method is invoked from a background thread and
is not safe to make direct UI calls from.
"""
with self._cbackLock:
for cback in self._callbacks:
wx.CallAfter(cback, added, deleted, modified)
#---- Properties ----#
# Is the monitor currently watching any directories
Monitoring = property(lambda self: self._running)
Frequency = property(lambda self: self._watcher.GetFrequency(),
lambda self, freq: self._watcher.SetFrequency(freq))
#---- End Properties ----#
def AddDirectory(self, dname):
"""Add a directory to the monitor
@param dname: directory path
@return: bool - True if added, False if failed to add
"""
return self._watcher.AddWatchDirectory(dname)
def SubscribeCallback(self, callback):
"""Subscribe a callback method to be called when changes are
detected in one of the watched directories.
@param callback: callable([added,], [deleted,], [modified,])
"""
with self._cbackLock:
if callback not in self._callbacks:
self._callbacks.append(callback)
def UnsubscribeCallback(self, callback):
"""Remove a callback method from the monitor"""
with self._cbackLock:
if callback in self._callbacks:
self._callbacks.remove(callback)
def RemoveDirectory(self, dname):
"""Remove a directory from the watch list
@param dname: directory path
"""
self._watcher.RemoveWatchDirectory(dname)
def StartMonitoring(self):
"""Start monitoring the directories in the watch list and
notifying target of changes.
"""
self._running = True
self._watcher.start()
def Suspend(self, pause=True):
"""Suspend background processing
@keyword pause: True (suspend) False (resume)
"""
if pause:
self._watcher.Suspend()
else:
self._watcher.Continue()
def Refresh(self, paths=None):
"""Force a recheck of the monitored directories. This method
is useful for doing manual control of the refresh cycle. It is
ignored and does nothing when WatcherThread is set up for automatic
refresh cycles.
@keyword paths: specific paths to refresh or None for all.
"""
self._watcher.Refresh(paths)
#-----------------------------------------------------------------------------#
class WatcherThread(threading.Thread):
"""Background thread to monitor a directory"""
def __init__(self, notifier, checkFreq=1000.0):
"""Create the WatcherThread. Provide a callback notifier method
that will be called when changes are detected in the directory.
The notifier will be called in the context of this thread. Notifier
will be called with three lists of ebmlib.File objects to indicate
the changes that have occurred.
@param notifier: callable([added,], [deleted,], [modified,])
@keyword checkFreq: check frequency in milliseconds. If value is set
to zero or less update checks must be manually
controlled via the Refresh interface.
"""
super(WatcherThread, self).__init__()
# Attributes
assert callable(notifier)
self._notifier = notifier
self._dirs = list() # Directories being monitored
self._refreshDirs = None
self._freq = checkFreq # Monitoring frequency in milliseconds
self._continue = True
self._changePending = False
self._lock = threading.Lock()
self._suspend = False
self._suspendcond = threading.Condition()
self._listEmptyCond = threading.Condition()
self._refreshCond = threading.Condition()
def run(self):
"""Run the watcher"""
while self._continue:
deleted = list()
added = list()
modified = list()
# Watch is empty so wait on things to monitor before continuing
if not self._dirs:
with self._listEmptyCond:
self._listEmptyCond.wait()
# Suspend processing if requested
if self._suspend:
with self._suspendcond:
self._suspendcond.wait()
with self._lock:
for dobj in self._PendingRefresh:
if not self._continue:
return
elif self._changePending:
break
# Check if a watched directory has been deleted
if not os.path.exists(dobj.Path):
deleted.append(dobj)
self._dirs.remove(dobj)
continue
snapshot = fileutil.GetDirectoryObject(dobj.Path,
False, True)
# Check for deletions
dobjFiles = dobj.Files # optimization
dobjIndex = dobjFiles.index # optimization
snapFiles = snapshot.Files # optimization
for tobj in dobjFiles:
if not self._continue:
return
elif self._changePending:
break
if tobj not in snapFiles:
deleted.append(tobj)
dobjFiles.remove(tobj)
# Check for additions and modifications
for tobj in snapFiles:
if not self._continue:
return
elif self._changePending:
break
if tobj not in dobjFiles:
# new object was added
added.append(tobj)
dobjFiles.append(tobj)
else:
idx = dobjIndex(tobj)
existing = dobjFiles[idx]
# object was modified
if existing.ModTime < tobj.ModTime:
modified.append(tobj)
existing.ModTime = tobj.ModTime
# Call Notifier if anything changed
if any((added, deleted, modified)):
self._notifier(added, deleted, modified)
# Wait till next check
if self._freq > 0:
# Automatic updates
time.sleep(self._freq / 1000.0)
else:
# Manually controlled updates
with self._refreshCond:
self._refreshDirs = None
self._refreshCond.wait()
#---- Implementation ----#
@property
def _PendingRefresh(self):
"""Get the list of directories pending refresh"""
if self._refreshDirs is None:
return self._dirs
else:
return self._refreshDirs
def AddWatchDirectory(self, dpath):
"""Add a directory to the watch list
@param dpath: directory path (unicode)
@return: bool - True means watch was added, False means unable to list directory
"""
assert os.path.isdir(dpath)
dobj = fileutil.Directory(dpath)
self._changePending = True
with self._lock:
if dobj not in self._dirs and os.access(dobj.Path, os.R_OK):
# Get current snapshot of the directory
try:
dobj = fileutil.GetDirectoryObject(dpath, False, True)
except OSError:
self._changePending = False
return False
self._dirs.append(dobj)
with self._listEmptyCond:
self._listEmptyCond.notify()
self._changePending = False
return True
def RemoveWatchDirectory(self, dpath):
"""Remove a directory from the watch
@param dpath: directory path to remove (unicode)
"""
dobj = fileutil.Directory(dpath)
self._changePending = True
with self._lock:
if dobj in self._dirs:
self._dirs.remove(dobj)
# Also remove any subpaths of dpath
toremove = list()
for d in self._dirs:
if fileutil.IsSubPath(d.Path, dpath):
toremove.append(d)
for todel in toremove:
self._dirs.remove(todel)
self._changePending = False
def GetFrequency(self):
"""Get the update frequency
@return: int (milliseconds)
"""
return self._freq
def SetFrequency(self, milli):
"""Set the update frequency
@param milli: int (milliseconds)
"""
self._freq = float(milli)
def Refresh(self, paths=None):
"""Recheck the monitored directories
only useful when manually controlling refresh cycle of the monitor.
@keyword paths: if None refresh all, else list of specific directories
"""
with self._refreshCond:
if paths is not None:
self._refreshDirs = list()
for dobj in paths:
self._refreshDirs.append(dobj)
self._refreshCond.notify()
def Shutdown(self):
"""Shut the thread down"""
self._continue = False
def Suspend(self):
"""Suspend the thread"""
self._suspend = True
def Continue(self):
"""Continue the thread"""
self._suspend = False
with self._suspendcond:
self._suspendcond.notify()