| Trees | Indices | Help |
|---|
|
|
1 # -*- Mode: Python; test-case-name: flumotion.test.test_component_providers -*-
2 # vi:si:et:sw=4:sts=4:ts=4
3 #
4 # Flumotion - a streaming media server
5 # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com).
6 # All rights reserved.
7
8 # This file may be distributed and/or modified under the terms of
9 # the GNU General Public License version 2 as published by
10 # the Free Software Foundation.
11 # This file is distributed without any warranty; without even the implied
12 # warranty of merchantability or fitness for a particular purpose.
13 # See "LICENSE.GPL" in the source distribution for more information.
14
15 # Licensees having purchased or holding a valid Flumotion Advanced
16 # Streaming Server license may use this file in accordance with the
17 # Flumotion Advanced Streaming Server Commercial License Agreement.
18 # See "LICENSE.Flumotion" in the source distribution for more information.
19
20 # Headers in this file shall remain intact.
21
22 import errno
23 import os
24 import stat
25 import tempfile
26 import threading
27 import time
28
29 from twisted.internet import defer, reactor, threads, abstract
30
31 from flumotion.common import log, format, common, python
32 from flumotion.component.misc.httpserver import cachestats
33 from flumotion.component.misc.httpserver import cachemanager
34 from flumotion.component.misc.httpserver import fileprovider
35 from flumotion.component.misc.httpserver import localpath
36 from flumotion.component.misc.httpserver.fileprovider import FileClosedError
37 from flumotion.component.misc.httpserver.fileprovider import FileError
38 from flumotion.component.misc.httpserver.fileprovider import NotFoundError
39
40
41 SEEK_SET = 0 # os.SEEK_SET is not defined in python 2.4
42 FILE_COPY_BUFFER_SIZE = abstract.FileDescriptor.bufferSize
43 MAX_LOGNAME_SIZE = 30 # maximum number of characters to use for logging a path
44
45
46 LOG_CATEGORY = "fileprovider-localcached"
47
48
49 errnoLookup = {errno.ENOENT: fileprovider.NotFoundError,
50 errno.EISDIR: fileprovider.CannotOpenError,
51 errno.EACCES: fileprovider.AccessError}
52
53
55 """
56 @rtype: (file, statinfo)
57 """
58 try:
59 file = open(path, mode)
60 fd = file.fileno()
61 except IOError, e:
62 cls = errnoLookup.get(e.errno, fileprovider.FileError)
63 raise cls("Failed to open file '%s': %s" % (path, str(e)))
64 try:
65 info = os.fstat(fd)
66 except OSError, e:
67 file.close()
68 cls = errnoLookup.get(e.errno, fileprovider.FileError)
69 raise cls("Failed to stat file '%s': %s" % (path, str(e)))
70 return file, info
71
72
75 """
76
77 WARNING: Currently does not work properly in combination with rate-control.
78
79 I'm caching the files taken from a mounted
80 network file system to a shared local directory.
81 Multiple instances can share the same cache directory,
82 but it's recommended to use slightly different values
83 for the property cleanup-high-watermark.
84 I'm using the directory access time to know when
85 the cache usage changed and keep an estimation
86 of the cache usage for statistics.
87
88 I'm creating a unique thread to do the file copying block by block,
89 for all files to be copied to the cache.
90 Using a thread instead of a reactor.callLater 'loop' allow for
91 higher copy throughput and do not slow down the mail loop when
92 lots of files are copied at the same time.
93 Simulations with real request logs show that using a thread
94 gives better results than the equivalent asynchronous implementation.
95 """
96
97 logCategory = LOG_CATEGORY
98
100 props = args['properties']
101 self._sourceDir = props.get('path')
102 cacheDir = props.get('cache-dir')
103 cacheSizeInMB = props.get('cache-size')
104 if cacheSizeInMB is not None:
105 cacheSize = cacheSizeInMB * 10 ** 6 # in bytes
106 else:
107 cacheSize = None
108 cleanupEnabled = props.get('cleanup-enabled')
109 cleanupHighWatermark = props.get('cleanup-high-watermark')
110 cleanupLowWatermark = props.get('cleanup-low-watermark')
111
112 self._sessions = {} # {CopySession: None}
113 self._index = {} # {path: CopySession}
114
115 self.stats = cachestats.CacheStatistics()
116
117 self.cache = cachemanager.CacheManager(self.stats,
118 cacheDir, cacheSize,
119 cleanupEnabled,
120 cleanupHighWatermark,
121 cleanupLowWatermark)
122
123 common.ensureDir(self._sourceDir, "source")
124
125 # Startup copy thread
126 self._thread = CopyThread(self)
127
129 self.debug('Starting cachedprovider plug for component %r', component)
130 d = self.cache.setUp()
131 d.addCallback(lambda x: self._thread.start())
132 return d
133
135 self.debug('Stopping cachedprovider plug for component %r', component)
136 self._thread.stop()
137 dl = []
138 for s in self._index.values():
139 d = s.close()
140 if d:
141 dl.append(d)
142 if len(dl) != 0:
143 return defer.DeferredList(dl)
144
146 #FIXME: This is temporary. Should be done with plug UI.
147 # Used for the UI to know which plug is used
148 updater.update("provider-name", "fileprovider-localcached")
149 self.stats.startUpdates(updater)
150
153
158
159
160 ## Protected Methods ##
161
163 """
164 Returns a log name for a path, shortened to a maximum size
165 specified by the global variable MAX_LOGNAME_SIZE.
166 The log name will be the filename part of the path postfixed
167 by the id in brackets if id is not None.
168 """
169 filename = os.path.basename(path)
170 basename, postfix = os.path.splitext(filename)
171 if id is not None:
172 postfix += "[%s]" % id
173 prefixMaxLen = MAX_LOGNAME_SIZE - len(postfix)
174 if len(basename) > prefixMaxLen:
175 basename = basename[:prefixMaxLen-1] + "*"
176 return basename + postfix
177
180
182 # First outdate existing session for the path
183 self.outdateCopySession(path)
184 # Then create a new one
185 session = CopySession(self, path, file, info)
186 self._index[path] = session
187 return session
188
193
195 path = session.sourcePath
196 if path in self._index:
197 del self._index[path]
198 self.disableSession(session)
199
201 self.debug("Starting Copy Session '%s' (%d)",
202 session.logName, len(self._sessions))
203 if session in self._sessions:
204 return
205 self._sessions[session] = None
206 self._activateCopyLoop()
207
209 self.debug("Stopping Copy Session '%s' (%d)",
210 session.logName, len(self._sessions))
211 if session in self._sessions:
212 del self._sessions[session]
213 if not self._sessions:
214 self._disableCopyLoop()
215
217 self._thread.wakeup()
218
220 self._thread.sleep()
221
222
224
225 logCategory = LOG_CATEGORY
226
228 localpath.LocalPath.__init__(self, path)
229 self.logName = plug.getLogName(path)
230 self.plug = plug
231
235
239
240
241 ## Private Methods ##
242
251
252
254
255 logCategory = LOG_CATEGORY
256
258 threading.Thread.__init__(self)
259 self.plug = plug
260 self._running = True
261 self._event = threading.Event()
262
267
269 self._event.set()
270
272 self._event.clear()
273
275 while self._running:
276 sessions = self.plug._sessions.keys()
277 for session in sessions:
278 try:
279 session.doServe()
280 except Exception, e:
281 log.warning("Error during async file serving: %s",
282 log.getExceptionMessage(e))
283 try:
284 session.doCopy()
285 except Exception, e:
286 log.warning("Error during file copy: %s",
287 log.getExceptionMessage(e))
288 self._event.wait()
289
290
293
294
296 """
297 I'm serving a file at the same time I'm copying it
298 from the network file system to the cache.
299 If the client ask for data not yet copied, the source file
300 read operation is delegated the the copy thread as an asynchronous
301 operation because file seeking/reading is not thread safe.
302
303 The copy session have to open two times the temporary file,
304 one for read-only and one for write only,
305 because closing a read/write file change the modification time.
306 We want the modification time to be set to a known value
307 when the copy is finished even keeping read access to the file.
308
309 The session manage a reference counter to know how many TempFileDelegate
310 instances are using the session to delegate read operations.
311 This is done for two reasons:
312 - To avoid circular references by have the session manage
313 a list of delegate instances.
314 - If not cancelled, sessions should not be deleted
315 when no delegates reference them anymore. So weakref cannot be used.
316 """
317
318 logCategory = LOG_CATEGORY
319
321 self.plug = plug
322 self.logName = plug.getLogName(sourcePath, sourceFile.fileno())
323 self.copying = None # Not yet started
324 self.sourcePath = sourcePath
325 self.tempPath = plug.cache.getTempPath(sourcePath)
326 self.cachePath = plug.cache.getCachePath(sourcePath)
327 # The size and modification time is not supposed to change over time
328 self.mtime = sourceInfo[stat.ST_MTIME]
329 self.size = sourceInfo[stat.ST_SIZE]
330 self._sourceFile = sourceFile
331 self._cancelled = False # True when a session has been outdated
332 self._wTempFile = None
333 self._rTempFile = None
334 self._allocTag = None # Tag used to identify cache allocations
335 self._waitCancel = None
336 # List of the pending read from source file
337 self._pending = [] # [(position, size, defer),]
338 self._refCount = 0
339 self._copied = 0 # None when the file is fully copied
340 self._correction = 0 # Used to take into account copies data for stats
341 self._startCopyingDefer = self._startCopying()
342
346
348 # If the temporary file is open for reading
349 if self._rTempFile:
350 # And the needed data is already downloaded
351 # Safe to read because it's not used by the copy thread
352 if (self._copied is None) or ((position + size) <= self._copied):
353 try:
354 self._rTempFile.seek(position)
355 data = self._rTempFile.read(size)
356 # Adjust the cache/source values to take copy into account
357 size = len(data)
358 # It's safe to use and modify self._correction even if
359 # it's used by the copy thread because the copy thread
360 # only add and the main thread only subtract.
361 # The only thing that could append it's a less accurate
362 # correction...
363 diff = min(self._correction, size)
364 self._correction -= diff
365 stats.onBytesRead(0, size, diff)
366 return data
367 except Exception, e:
368 self.warning("Failed to read from temporary file: %s",
369 log.getExceptionMessage(e))
370 self._cancelSession()
371 # If the source file is not open anymore, we can't continue
372 if self._sourceFile is None:
373 raise FileError("File caching error, cannot proceed")
374 # Otherwise read the data directly from the source
375 try:
376 # It's safe to not use Lock, because simple type operations
377 # are thread safe, and even if the copying state change
378 # from True to False, _onCopyFinished will be called
379 # later in the same thread and will process pending reads.
380 if self.copying:
381 # If we are currently copying the source file,
382 # we defer the file read to the copying thread
383 # because we can't read a file from two threads.
384 d = defer.Deferred()
385
386 def updateStats(data):
387 stats.onBytesRead(len(data), 0, 0)
388 return data
389
390 d.addCallback(updateStats)
391 self._pending.append((position, size, d))
392 return d
393 # Not copying, it's safe to read directly
394 self._sourceFile.seek(position)
395 data = self._sourceFile.read(size)
396 stats.onBytesRead(len(data), 0, 0)
397 return data
398 except IOError, e:
399 cls = errnoLookup.get(e.errno, FileError)
400 raise cls("Failed to read source file: %s" % str(e))
401
404
406 self._refCount -= 1
407 # If there is only one client and the session has been cancelled,
408 # stop copying and and serve the source file directly
409 if (self._refCount == 1) and self._cancelled:
410 # Cancel the copy and close the writing temporary file.
411 self._cancelCopy(False, True)
412 # We close if the copy is finished (if _copied is None)
413 if (self._refCount == 0) and (self._copied is None):
414 self.close()
415
417 self.log("Closing copy session")
418 # Cancel the copy, close the source file and the writing temp file.
419 self._cancelCopy(True, True)
420 self._closeReadTempFile()
421 self.plug.removeCopySession(self)
422 self.plug = None
423
425 if self._startCopyingDefer:
426 d = self._startCopyingDefer
427 self._startCopyingDefer = None
428 d.addCallback(lambda _: self._close())
429 return d
430
432 if not (self.copying and self._pending):
433 # Nothing to do anymore.
434 return False
435 # We have pending source file read operations
436 position, size, d = self._pending.pop(0)
437 self._sourceFile.seek(position)
438 data = self._sourceFile.read(size)
439 # Call the deferred in the main thread
440 reactor.callFromThread(d.callback, data)
441 return len(self._pending) > 0
442
444 # Called in the copy thread context.
445 if not self.copying:
446 # Nothing to do anymore.
447 return False
448 # Copy a buffer from the source file to the temporary writing file
449 cont = True
450 try:
451 # It's safe to use self._copied, because it's only set
452 # by the copy thread during copy.
453 self._sourceFile.seek(self._copied)
454 self._wTempFile.seek(self._copied)
455 data = self._sourceFile.read(FILE_COPY_BUFFER_SIZE)
456 self._wTempFile.write(data)
457 self._wTempFile.flush()
458 except IOError, e:
459 self.warning("Failed to copy source file: %s",
460 log.getExceptionMessage(e))
461 # Abort copy and cancel the session
462 self.copying = False
463 reactor.callFromThread(self.plug.disableSession, self)
464 reactor.callFromThread(self._cancelSession)
465 # Do not continue
466 cont = False
467 else:
468 size = len(data)
469 self._copied += size
470 self._correction += size
471 if size < FILE_COPY_BUFFER_SIZE:
472 # Stop copying
473 self.copying = False
474 reactor.callFromThread(self.plug.disableSession, self)
475 reactor.callFromThread(self._onCopyFinished)
476 cont = False
477 # Check for cancellation
478 if self._waitCancel and self.copying:
479 # Copy has been cancelled
480 self.copying = False
481 reactor.callFromThread(self.plug.disableSession, self)
482 reactor.callFromThread(self._onCopyCancelled, *self._waitCancel)
483 return False
484 return cont
485
486
487 ## Private Methods ##
488
490 # Retrieve a cache allocation tag, used to track the cache free space
491 return self.plug.cache.allocateCacheSpace(self.size)
492
494 if not (self._cancelled or self._allocTag is None):
495 self.plug.cache.releaseCacheSpace(self._allocTag)
496 self._allocTag = None
497
499 if not self._cancelled:
500 self.log("Canceling copy session")
501 # Not a valid copy session anymore
502 self._cancelled = True
503 # If there is no more than 1 client using the session,
504 # stop copying and and serve the source file directly
505 if self._refCount <= 1:
506 # Cancel and close the temp write file.
507 self._cancelCopy(False, True)
508
510 self._allocTag = tag
511
512 if not tag:
513 # No free space, proxying source file directly
514 self._cancelSession()
515 return
516 self.plug.stats.onCopyStarted()
517 # Then open a transient temporary files
518 try:
519 fd, transientPath = tempfile.mkstemp(".tmp", LOG_CATEGORY)
520 self.log("Created transient file '%s'", transientPath)
521 self._wTempFile = os.fdopen(fd, "wb")
522 self.log("Opened temporary file for writing [fd %d]",
523 self._wTempFile.fileno())
524 self._rTempFile = file(transientPath, "rb")
525 self.log("Opened temporary file for reading [fd %d]",
526 self._rTempFile.fileno())
527 except IOError, e:
528 self.warning("Failed to open temporary file: %s",
529 log.getExceptionMessage(e))
530 self._cancelSession()
531 return
532 # Truncate it to the source size
533 try:
534 self.log("Truncating temporary file to size %d", self.size)
535 self._wTempFile.truncate(self.size)
536 except IOError, e:
537 self.warning("Failed to truncate temporary file: %s",
538 log.getExceptionMessage(e))
539 self._cancelSession()
540 return
541 # And move it to the real temporary file path
542 try:
543 self.log("Renaming transient file to '%s'", self.tempPath)
544 os.rename(transientPath, self.tempPath)
545 except IOError, e:
546 self.warning("Failed to rename transient temporary file: %s",
547 log.getExceptionMessage(e))
548 # And start copying
549 self.debug("Start caching '%s' [fd %d]",
550 self.sourcePath, self._sourceFile.fileno())
551 # Activate the copy
552 self.copying = True
553 self.plug.activateSession(self)
554
556 self.log("Start copy session")
557 # First ensure there is not already a temporary file
558 self._removeTempFile()
559 # Reserve cache space, may trigger a cache cleanup
560 d = self._allocCacheSpace()
561 d.addCallback(self._gotCacheSpace)
562 return d
563
565 if self.copying:
566 self.log("Canceling file copy")
567 if self._waitCancel:
568 # Already waiting for cancellation.
569 return
570 self.debug("Cancel caching '%s' [fd %d]",
571 self.sourcePath, self._sourceFile.fileno())
572 # Disable the copy, we do not modify copying directly
573 # to let the copying thread terminate current operations.
574 # The file close operation are deferred.
575 self._waitCancel = (closeSource, closeTempWrite)
576 return
577 # No pending copy, we can close the files
578 if closeSource:
579 self._closeSourceFile()
580 if closeTempWrite:
581 self._closeWriteTempFile()
582
584 self.log("Copy session cancelled")
585 # Called when the copy thread really stopped to read/write
586 self._waitCancel = None
587 self.plug.stats.onCopyCancelled(self.size, self._copied)
588 # Resolve all pending source read operations
589 for position, size, d in self._pending:
590 if self._sourceFile is None:
591 d.errback(CopySessionCancelled())
592 else:
593 try:
594 self._sourceFile.seek(position)
595 data = self._sourceFile.read(size)
596 d.callback(data)
597 except Exception, e:
598 self.warning("Failed to read from source file: %s",
599 log.getExceptionMessage(e))
600 d.errback(e)
601 self._pending = []
602 # then we can safely close files
603 if closeSource:
604 self._closeSourceFile()
605 if closeTempWrite:
606 self._closeWriteTempFile()
607
609 if self._sourceFile is None:
610 return
611 # Called when the copy thread really stopped to read/write
612 self.debug("Finished caching '%s' [fd %d]",
613 self.sourcePath, self._sourceFile.fileno())
614 self.plug.stats.onCopyFinished(self.size)
615 # Set the copy as finished to prevent the temporary file
616 # to be deleted when closed
617 self._copied = None
618 # Closing source and write files
619 self._closeSourceFile()
620 self._closeWriteTempFile()
621 # Setting the modification time on the temporary file
622 try:
623 mtime = self.mtime
624 atime = int(time.time())
625 self.log("Setting temporary file modification time to %d", mtime)
626 # FIXME: Should use futimes, but it's not wrapped by python
627 os.utime(self.tempPath, (atime, mtime))
628 except OSError, e:
629 if e.errno == errno.ENOENT:
630 # The file may have been deleted by another process
631 self._releaseCacheSpace()
632 else:
633 self.warning("Failed to update modification time of temporary "
634 "file: %s", log.getExceptionMessage(e))
635 self._cancelSession()
636 try:
637 self.log("Renaming temporary file to '%s'", self.cachePath)
638 os.rename(self.tempPath, self.cachePath)
639 except OSError, e:
640 if e.errno == errno.ENOENT:
641 self._releaseCacheSpace()
642 else:
643 self.warning("Failed to rename temporary file: %s",
644 log.getExceptionMessage(e))
645 self._cancelSession()
646 # Complete all pending source read operations with the temporary file.
647 for position, size, d in self._pending:
648 try:
649 self._rTempFile.seek(position)
650 data = self._rTempFile.read(size)
651 d.callback(data)
652 except Exception, e:
653 self.warning("Failed to read from temporary file: %s",
654 log.getExceptionMessage(e))
655 d.errback(e)
656 self._pending = []
657 if self._refCount == 0:
658 # We were waiting for the file to be copied to close it.
659 self.close()
660
662 try:
663 os.remove(self.tempPath)
664 self.log("Deleted temporary file '%s'", self.tempPath)
665 # Inform the plug that cache space has been released
666 self._releaseCacheSpace()
667 except OSError, e:
668 if e.errno == errno.ENOENT:
669 if self._wTempFile is not None:
670 # Already deleted but inform the plug anyway
671 self._releaseCacheSpace()
672 else:
673 self.warning("Error deleting temporary file: %s",
674 log.getExceptionMessage(e))
675
677 if self._sourceFile is not None:
678 self.log("Closing source file [fd %d]", self._sourceFile.fileno())
679 try:
680 try:
681 self._sourceFile.close()
682 finally:
683 self._sourceFile = None
684 except IOError, e:
685 self.warning("Failed to close source file: %s",
686 log.getExceptionMessage(e))
687
689 if self._rTempFile is not None:
690 self.log("Closing temporary file for reading [fd %d]",
691 self._rTempFile.fileno())
692 try:
693 try:
694 self._rTempFile.close()
695 finally:
696 self._rTempFile = None
697 except IOError, e:
698 self.warning("Failed to close temporary file for reading: %s",
699 log.getExceptionMessage(e))
700
702 if self._wTempFile is not None:
703 # If the copy is not finished, remove the temporary file
704 if not self._cancelled and self._copied is not None:
705 self._removeTempFile()
706 self.log("Closing temporary file for writing [fd %d]",
707 self._wTempFile.fileno())
708 try:
709 try:
710 self._wTempFile.close()
711 finally:
712 self._wTempFile = None
713 except Exception, e:
714 self.warning("Failed to close temporary file for writing: %s",
715 log.getExceptionMessage(e))
716
717
719
720 logCategory = LOG_CATEGORY
721
723 self.logName = plug.getLogName(session.sourcePath)
724 self.mtime = session.mtime
725 self.size = session.size
726 self._session = session
727 self._reading = False
728 self._position = 0
729 session.incRef()
730
733
736
738 assert not self._reading, "Simultaneous read not supported"
739 d = self._session.read(self._position, size, stats)
740 if isinstance(d, defer.Deferred):
741 self._reading = True
742 return d.addCallback(self._cbGotData)
743 self._position += len(d)
744 return d
745
750
751
752 ## Private Methods ##
753
758
759
761
762 logCategory = LOG_CATEGORY
763
764 # Default values
765 _file = None
766
768 self.logName = plug.getLogName(path, file.fileno())
769 self._file = file
770 # The size and modification time is not supposed to change over time
771 self.mtime = info[stat.ST_MTIME]
772 self.size = info[stat.ST_SIZE]
773
775 try:
776 return self._file.tell()
777 except IOError, e:
778 cls = errnoLookup.get(e.errno, FileError)
779 raise cls("Failed to tell position in file: %s" % str(e))
780
782 try:
783 self._file.seek(offset, SEEK_SET)
784 except IOError, e:
785 cls = errnoLookup.get(e.errno, FileError)
786 raise cls("Failed to seek in cached file: %s" % str(e))
787
789 try:
790 return self._file.read(size)
791 except IOError, e:
792 cls = errnoLookup.get(e.errno, FileError)
793 raise cls("Failed to read data from file: %s" % str(e))
794
805
806
808
810 data = DirectFileDelegate.read(self, size)
811 stats.onBytesRead(0, len(data), 0)
812 return data
813
818
819
821
822 logCategory = LOG_CATEGORY
823
824 # Overriding parent class properties to become attribute
825 mimeType = None
826
827 # Default values
828 _delegate = None
829
831 self.logName = plug.getLogName(path)
832 self.plug = plug
833 self._path = path
834 self.mimeType = mimeType
835 self.stats = cachestats.RequestStatistics(plug.stats)
836 self._delegate = None
837
839 # Opening source file in a separate thread, as it usually involves
840 # accessing a network filesystem (which would block the reactor)
841 d = threads.deferToThread(open_stat, self._path)
842 d.addCallbacks(self._selectDelegate, self._sourceOpenFailed)
843
844 def _setDelegate(delegate):
845 self._delegate = delegate
846 d.addCallback(_setDelegate)
847 d.addCallback(lambda _: self)
848 return d
849
851 failure.trap(NotFoundError)
852 self.debug("Source file %r not found", self._path)
853 self.plug.outdateCopySession(self._path)
854 cachedPath = self.plug.cache.getCachePath(self._path)
855 self._removeCachedFile(cachedPath)
856 raise failure
857
860
862 if self._delegate is None:
863 raise FileClosedError("File closed")
864 return self._delegate.mtime
865
867 if self._delegate is None:
868 raise FileClosedError("File closed")
869 return self._delegate.size
870
872 if self._delegate is None:
873 raise FileClosedError("File closed")
874 return self._delegate.tell()
875
877 if self._delegate is None:
878 raise FileClosedError("File closed")
879 return self._delegate.seek(offset)
880
882 if self._delegate is None:
883 raise FileClosedError("File closed")
884 try:
885 d = self._delegate.read(size, self.stats)
886 if isinstance(d, defer.Deferred):
887 return d
888 return defer.succeed(d)
889 except IOError, e:
890 cls = errnoLookup.get(e.errno, FileError)
891 return defer.fail(cls("Failed to read cached data: %s", str(e)))
892 except:
893 return defer.fail()
894
896 if self._delegate:
897 self.stats.onClosed()
898 self._delegate.close()
899 self._delegate = None
900
902 self.close()
903
906
907
908 ## Private Methods ##
909
911 self.log("Closing source file [fd %d]", sourceFile.fileno())
912 try:
913 sourceFile.close()
914 except Exception, e:
915 self.warning("Failed to close source file: %s",
916 log.getExceptionMessage(e))
917
919 sourcePath = self._path
920 self.log("Selecting delegate for source file %r [fd %d]",
921 sourcePath, sourceFile.fileno())
922 # Update the log name
923 self.logName = self.plug.getLogName(self._path, sourceFile.fileno())
924 # Opening cached file
925 cachedPath = self.plug.cache.getCachePath(sourcePath)
926 try:
927 cachedFile, cachedInfo = open_stat(cachedPath)
928 self.log("Opened cached file [fd %d]", cachedFile.fileno())
929 except NotFoundError:
930 self.debug("Did not find cached file '%s'", cachedPath)
931 return self._tryTempFile(sourcePath, sourceFile, sourceInfo)
932 except FileError, e:
933 self.debug("Failed to open cached file: %s", str(e))
934 self._removeCachedFile(cachedPath)
935 return self._tryTempFile(sourcePath, sourceFile, sourceInfo)
936 # Found a cached file, now check the modification time
937 self.debug("Found cached file '%s'", cachedPath)
938 sourceTime = sourceInfo[stat.ST_MTIME]
939 cacheTime = cachedInfo[stat.ST_MTIME]
940 if sourceTime != cacheTime:
941 # Source file changed, remove file and start caching again
942 self.debug("Cached file out-of-date (%d != %d)",
943 sourceTime, cacheTime)
944 self.stats.onCacheOutdated()
945 self.plug.outdateCopySession(sourcePath)
946 self._removeCachedFile(cachedPath)
947 return self._cacheFile(sourcePath, sourceFile, sourceInfo)
948 self._closeSourceFile(sourceFile)
949 # We have a valid cached file, just delegate to it.
950 self.debug("Serving cached file '%s'", cachedPath)
951 delegate = CachedFileDelegate(self.plug, cachedPath,
952 cachedFile, cachedInfo)
953 self.stats.onStarted(delegate.size, cachestats.CACHE_HIT)
954 return delegate
955
957 try:
958 os.remove(cachePath)
959 self.debug("Deleted cached file '%s'", cachePath)
960 except OSError, e:
961 if e.errno != errno.ENOENT:
962 self.warning("Error deleting cached file: %s", str(e))
963
965 session = self.plug.getCopySession(sourcePath)
966 if session is None:
967 self.debug("No copy sessions found")
968 return self._cacheFile(sourcePath, sourceFile, sourceInfo)
969 self.debug("Copy session found")
970 if sourceInfo[stat.ST_MTIME] != session.mtime:
971 self.debug("Copy session out-of-date (%d != %d)",
972 sourceInfo[stat.ST_MTIME], session.mtime)
973 self.stats.onCacheOutdated()
974 session.outdate()
975 return self._cacheFile(sourcePath, sourceFile, sourceInfo)
976 self._closeSourceFile(sourceFile)
977 # We have a valid session, just delegate to it.
978 self.debug("Serving temporary file '%s'", session.tempPath)
979 delegate = TempFileDelegate(self.plug, session)
980 self.stats.onStarted(delegate.size, cachestats.TEMP_HIT)
981 return delegate
982
984 session = self.plug.createCopySession(sourcePath, sourceFile,
985 sourceInfo)
986 self.debug("Serving temporary file '%s'", session.tempPath)
987 delegate = TempFileDelegate(self.plug, session)
988 self.stats.onStarted(delegate.size, cachestats.CACHE_MISS)
989 return delegate
990
| Trees | Indices | Help |
|---|
| Generated by Epydoc 3.0.1 on Fri Sep 24 12:50:26 2010 | http://epydoc.sourceforge.net |