diff --git a/streaming/core.py b/streaming/core.py index f41e3c6..3d1f958 100644 --- a/streaming/core.py +++ b/streaming/core.py @@ -89,7 +89,15 @@ DEFAULT_PREFS = { logger = logging.getLogger(__name__) + +def sleep(secs): + d = defer.Deferred() + reactor.callLater(secs, d.callback, None) + return d + + def get_torrent(infohash): + # Taken from newer Deluge source to allow for backward compatibility. def get_file_priorities(self): """Return the file priorities""" if not self.handle.has_metadata(): @@ -440,9 +448,11 @@ class TorrentHandler(object): self.torrents[infohash] = Torrent(self, infohash) return self.torrents[infohash] - def stream(self, infohash, path): + @defer.inlineCallbacks + def stream(self, infohash, path, wait_for_end_pieces=False): logger.debug('Trying to get path:%s from infohash:%s' % (path, infohash)) - self.get_torrent(infohash) + local_torrent = self.get_torrent(infohash) + torrent = get_torrent(infohash) filesystem = self.get_filesystem(infohash) if path: @@ -452,18 +462,65 @@ class TorrentHandler(object): logger.debug('Stream, path:%s infohash:%s stream_item:%r' % (path, infohash, stream_item)) if stream_item is None: - return None + defer.returnValue(None) stream_result = stream_item.stream() logger.debug('Streamresult, path:%s infohash:%s stream_result:%r' % (path, infohash, stream_result)) if stream_result is None: - return None + defer.returnValue(None) if hasattr(stream_result, 'get_read_items'): - self.torrents[infohash].add_fileset(stream_result.get_read_items()) + fileset = stream_result.get_read_items() else: - self.torrents[infohash].add_fileset([stream_result]) - return stream_result + fileset = [stream_result] + self.torrents[infohash].add_fileset(fileset) + + if wait_for_end_pieces: + local_torrent.ensure_started() + logger.debug('We need to wait for pieces') + first_file = fileset[0] + last_file = fileset[-1] + + status = torrent.get_status(['piece_length', 'files', 'file_progress']) + piece_length = status['piece_length'] + + wait_for_pieces = [] + for f, progress in zip(status['files'], status['file_progress']): + if progress == 1.0: + continue + + piece_count = f['size'] // piece_length + + if f['path'] == first_file.path: + piece, rest = divmod(f['offset'], piece_length) + rest = piece_length - rest + wait_for_pieces.append(piece) + + if rest < 1024 and piece_count > 2: + wait_for_pieces.append(piece + 1) + + if f['path'] == last_file.path: + piece, rest = divmod(f['offset'] + f['size'], piece_length) + wait_for_pieces.append(piece) + + if rest < 1024 and piece_count > 2: + wait_for_pieces.append(piece - 1) + + logger.debug('We want first and last piece first, these are the pieces: %r' % (wait_for_pieces, )) + for piece in wait_for_pieces: + torrent.handle.set_piece_deadline(piece, 0) + torrent.handle.piece_priority(piece, 7) + + for _ in range(220): + for piece in wait_for_pieces: + if not torrent.status.pieces[piece]: + break + else: + break + + yield sleep(0.2) + + defer.returnValue(stream_result) def cleanup(self): for infohash, torrent in self.torrents.items(): @@ -555,7 +612,7 @@ class Core(CorePluginBase): settings['prioritize_partial_pieces'] = True session.set_settings(settings) except AttributeError: - logger.warning('Unable to exclude partial pieces') + logger.warning('Unable to prioritize partial pieces') http_output_cls = OutputBase.find_plugin('http') http_output = http_output_cls(url_prefix='file') @@ -720,7 +777,7 @@ class Core(CorePluginBase): fn = filepath_or_index try: - stream_or_item = self.torrent_handler.stream(infohash, fn) + stream_or_item = yield defer.maybeDeferred(self.torrent_handler.stream, infohash, fn, wait_for_end_pieces=wait_for_end_pieces) stream_url = self.thomas_http_output.serve_item(stream_or_item) except: logger.exception('Failed to stream torrent') @@ -728,7 +785,7 @@ class Core(CorePluginBase): defer.returnValue({ 'status': 'success', - 'filename': stream_url.split('/')[-1], + 'filename': stream_or_item.id, 'use_stream_urls': self.config['use_stream_urls'], 'auto_open_stream_urls': self.config['auto_open_stream_urls'], 'url': '%s/streaming/%s' % (self.base_url, stream_url.lstrip('/'))