mirror of
https://github.com/JohnDoee/deluge-streaming/
synced 2026-07-01 07:31:17 -07:00
Added support for waiting on end pieces
This commit is contained in:
@@ -89,7 +89,15 @@ DEFAULT_PREFS = {
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def sleep(secs):
|
||||
d = defer.Deferred()
|
||||
reactor.callLater(secs, d.callback, None)
|
||||
return d
|
||||
|
||||
|
||||
def get_torrent(infohash):
|
||||
# Taken from newer Deluge source to allow for backward compatibility.
|
||||
def get_file_priorities(self):
|
||||
"""Return the file priorities"""
|
||||
if not self.handle.has_metadata():
|
||||
@@ -440,9 +448,11 @@ class TorrentHandler(object):
|
||||
self.torrents[infohash] = Torrent(self, infohash)
|
||||
return self.torrents[infohash]
|
||||
|
||||
def stream(self, infohash, path):
|
||||
@defer.inlineCallbacks
|
||||
def stream(self, infohash, path, wait_for_end_pieces=False):
|
||||
logger.debug('Trying to get path:%s from infohash:%s' % (path, infohash))
|
||||
self.get_torrent(infohash)
|
||||
local_torrent = self.get_torrent(infohash)
|
||||
torrent = get_torrent(infohash)
|
||||
|
||||
filesystem = self.get_filesystem(infohash)
|
||||
if path:
|
||||
@@ -452,18 +462,65 @@ class TorrentHandler(object):
|
||||
|
||||
logger.debug('Stream, path:%s infohash:%s stream_item:%r' % (path, infohash, stream_item))
|
||||
if stream_item is None:
|
||||
return None
|
||||
defer.returnValue(None)
|
||||
|
||||
stream_result = stream_item.stream()
|
||||
logger.debug('Streamresult, path:%s infohash:%s stream_result:%r' % (path, infohash, stream_result))
|
||||
if stream_result is None:
|
||||
return None
|
||||
defer.returnValue(None)
|
||||
|
||||
if hasattr(stream_result, 'get_read_items'):
|
||||
self.torrents[infohash].add_fileset(stream_result.get_read_items())
|
||||
fileset = stream_result.get_read_items()
|
||||
else:
|
||||
self.torrents[infohash].add_fileset([stream_result])
|
||||
return stream_result
|
||||
fileset = [stream_result]
|
||||
self.torrents[infohash].add_fileset(fileset)
|
||||
|
||||
if wait_for_end_pieces:
|
||||
local_torrent.ensure_started()
|
||||
logger.debug('We need to wait for pieces')
|
||||
first_file = fileset[0]
|
||||
last_file = fileset[-1]
|
||||
|
||||
status = torrent.get_status(['piece_length', 'files', 'file_progress'])
|
||||
piece_length = status['piece_length']
|
||||
|
||||
wait_for_pieces = []
|
||||
for f, progress in zip(status['files'], status['file_progress']):
|
||||
if progress == 1.0:
|
||||
continue
|
||||
|
||||
piece_count = f['size'] // piece_length
|
||||
|
||||
if f['path'] == first_file.path:
|
||||
piece, rest = divmod(f['offset'], piece_length)
|
||||
rest = piece_length - rest
|
||||
wait_for_pieces.append(piece)
|
||||
|
||||
if rest < 1024 and piece_count > 2:
|
||||
wait_for_pieces.append(piece + 1)
|
||||
|
||||
if f['path'] == last_file.path:
|
||||
piece, rest = divmod(f['offset'] + f['size'], piece_length)
|
||||
wait_for_pieces.append(piece)
|
||||
|
||||
if rest < 1024 and piece_count > 2:
|
||||
wait_for_pieces.append(piece - 1)
|
||||
|
||||
logger.debug('We want first and last piece first, these are the pieces: %r' % (wait_for_pieces, ))
|
||||
for piece in wait_for_pieces:
|
||||
torrent.handle.set_piece_deadline(piece, 0)
|
||||
torrent.handle.piece_priority(piece, 7)
|
||||
|
||||
for _ in range(220):
|
||||
for piece in wait_for_pieces:
|
||||
if not torrent.status.pieces[piece]:
|
||||
break
|
||||
else:
|
||||
break
|
||||
|
||||
yield sleep(0.2)
|
||||
|
||||
defer.returnValue(stream_result)
|
||||
|
||||
def cleanup(self):
|
||||
for infohash, torrent in self.torrents.items():
|
||||
@@ -555,7 +612,7 @@ class Core(CorePluginBase):
|
||||
settings['prioritize_partial_pieces'] = True
|
||||
session.set_settings(settings)
|
||||
except AttributeError:
|
||||
logger.warning('Unable to exclude partial pieces')
|
||||
logger.warning('Unable to prioritize partial pieces')
|
||||
|
||||
http_output_cls = OutputBase.find_plugin('http')
|
||||
http_output = http_output_cls(url_prefix='file')
|
||||
@@ -720,7 +777,7 @@ class Core(CorePluginBase):
|
||||
fn = filepath_or_index
|
||||
|
||||
try:
|
||||
stream_or_item = self.torrent_handler.stream(infohash, fn)
|
||||
stream_or_item = yield defer.maybeDeferred(self.torrent_handler.stream, infohash, fn, wait_for_end_pieces=wait_for_end_pieces)
|
||||
stream_url = self.thomas_http_output.serve_item(stream_or_item)
|
||||
except:
|
||||
logger.exception('Failed to stream torrent')
|
||||
@@ -728,7 +785,7 @@ class Core(CorePluginBase):
|
||||
|
||||
defer.returnValue({
|
||||
'status': 'success',
|
||||
'filename': stream_url.split('/')[-1],
|
||||
'filename': stream_or_item.id,
|
||||
'use_stream_urls': self.config['use_stream_urls'],
|
||||
'auto_open_stream_urls': self.config['auto_open_stream_urls'],
|
||||
'url': '%s/streaming/%s' % (self.base_url, stream_url.lstrip('/'))
|
||||
|
||||
Reference in New Issue
Block a user