Merge branch 'release/0.12.0'

This commit is contained in:
Anders Jensen
2020-04-21 21:49:54 +02:00
6 changed files with 127 additions and 34 deletions

View File

@@ -1,7 +1,7 @@
# Streaming Plugin # Streaming Plugin
https://github.com/JohnDoee/deluge-streaming https://github.com/JohnDoee/deluge-streaming
(c)2019 by Anders Jensen <johndoee@tidalstream.org> (c)2020 by Anders Jensen <johndoee@tridentstream.org>
## Description ## Description
@@ -107,6 +107,11 @@ List of URL GET Arguments
# Version Info # Version Info
## Version 0.12.0
* Moved to reading pieces through Deluge to avoid unflushed data
* Fixed Deluge 2 / libtorrent related bug
## Version 0.11.0 ## Version 0.11.0
* Initial support for Deluge 2 / Python 3 * Initial support for Deluge 2 / Python 3
* Added support for aggressive piece prioritization when it should not be necessary. * Added support for aggressive piece prioritization when it should not be necessary.

View File

@@ -41,8 +41,8 @@ from setuptools import setup, find_packages
__plugin_name__ = "Streaming" __plugin_name__ = "Streaming"
__author__ = "Anders Jensen" __author__ = "Anders Jensen"
__author_email__ = "johndoee@tidalstream.org" __author_email__ = "johndoee@tridentstream.org"
__version__ = "0.11.0" __version__ = "0.12.0"
__url__ = "https://github.com/JohnDoee/deluge-streaming" __url__ = "https://github.com/JohnDoee/deluge-streaming"
__license__ = "GPLv3" __license__ = "GPLv3"
__description__ = "Enables streaming of files while downloading them." __description__ = "Enables streaming of files while downloading them."

View File

@@ -65,7 +65,6 @@ from thomas import router, Item, OutputBase
from .resource import Resource from .resource import Resource
from .torrentfile import DelugeTorrentInput from .torrentfile import DelugeTorrentInput
defer.setDebugging(True)
router.register_handler(DelugeTorrentInput.plugin_name, DelugeTorrentInput, True, False, False) router.register_handler(DelugeTorrentInput.plugin_name, DelugeTorrentInput, True, False, False)
VIDEO_STREAMABLE_EXTENSIONS = ['mkv', 'mp4', 'iso', 'ogg', 'ogm', 'm4v'] VIDEO_STREAMABLE_EXTENSIONS = ['mkv', 'mp4', 'iso', 'ogg', 'ogm', 'm4v']
@@ -119,7 +118,7 @@ def get_torrent(infohash):
# Ensure file_priorities option is populated. # Ensure file_priorities option is populated.
self.set_file_priorities([]) self.set_file_priorities([])
return self.options["file_priorities"] return list(self.options["file_priorities"])
torrent = component.get("TorrentManager").torrents.get(infohash, None) torrent = component.get("TorrentManager").torrents.get(infohash, None)
if torrent and not hasattr(torrent, 'get_file_priorities'): if torrent and not hasattr(torrent, 'get_file_priorities'):
@@ -200,7 +199,7 @@ class Torrent(object):
self.torrent.handle.set_piece_deadline(needed_piece, 0) self.torrent.handle.set_piece_deadline(needed_piece, 0)
self.torrent.handle.piece_priority(needed_piece, MAX_PIECE_PRIORITY) self.torrent.handle.piece_priority(needed_piece, MAX_PIECE_PRIORITY)
file_priorities = self.torrent.get_file_priorities() file_priorities = list(self.torrent.get_file_priorities())
if file_priorities[f['index']] != MAX_FILE_PRIORITY: if file_priorities[f['index']] != MAX_FILE_PRIORITY:
logger.debug('Also setting file to max %r' % (f, )) logger.debug('Also setting file to max %r' % (f, ))
file_priorities[f['index']] = MAX_FILE_PRIORITY file_priorities[f['index']] = MAX_FILE_PRIORITY
@@ -224,7 +223,8 @@ class Torrent(object):
logger.debug('Calling read again to get the real number') logger.debug('Calling read again to get the real number')
return self.can_read(from_byte) return self.can_read(from_byte)
else: else:
return ((last_available_piece - needed_piece) * self.piece_length) + self.piece_length - rest logger.debug('Really last available piece is %s' % (last_available_piece, ))
return ((last_available_piece - needed_piece) * self.piece_length) + self.piece_length - rest, last_available_piece
def is_idle(self): def is_idle(self):
return not self.readers and self.last_activity + TORRENT_CLEANUP_INTERVAL < datetime.now() return not self.readers and self.last_activity + TORRENT_CLEANUP_INTERVAL < datetime.now()
@@ -275,7 +275,7 @@ class Torrent(object):
logger.debug('We had a fileset not started, must_whitelist:%r first_files:%r cannot_blacklist:%r' % (must_whitelist, first_files, cannot_blacklist)) logger.debug('We had a fileset not started, must_whitelist:%r first_files:%r cannot_blacklist:%r' % (must_whitelist, first_files, cannot_blacklist))
status = self.torrent.get_status(['files', 'file_progress']) status = self.torrent.get_status(['files', 'file_progress'])
file_priorities = self.torrent.get_file_priorities() file_priorities = list(self.torrent.get_file_priorities())
for f, progress in zip(status['files'], status['file_progress']): for f, progress in zip(status['files'], status['file_progress']):
i = f['index'] i = f['index']
if progress == 1.0: if progress == 1.0:
@@ -314,7 +314,7 @@ class Torrent(object):
else: else:
fileset_ranges[fileset_hash] = fileset['files'].index(path) fileset_ranges[fileset_hash] = fileset['files'].index(path)
file_priorities = self.torrent.get_file_priorities() file_priorities = list(self.torrent.get_file_priorities())
logger.debug('Fileset heads: %r' % (fileset_ranges, )) logger.debug('Fileset heads: %r' % (fileset_ranges, ))
for fileset_hash, first_file in fileset_ranges.items(): for fileset_hash, first_file in fileset_ranges.items():
fileset = self.filesets[fileset_hash] fileset = self.filesets[fileset_hash]
@@ -387,6 +387,14 @@ class Torrent(object):
if fileset_hash not in self.filesets: if fileset_hash not in self.filesets:
self.filesets[fileset_hash] = {'started': False, 'files': files} self.filesets[fileset_hash] = {'started': False, 'files': files}
def request_piece(self, piece):
self.torrent.handle.read_piece(piece)
def new_piece_available(self, piece, data):
logger.debug("New pice available: %s" % (piece, ))
for reader in self.readers.keys():
reader.new_piece_available(piece, data)
class TorrentHandler(object): class TorrentHandler(object):
def __init__(self, reset_priorities_on_finish, aggressive_prioritizing=False): def __init__(self, reset_priorities_on_finish, aggressive_prioritizing=False):
@@ -397,6 +405,7 @@ class TorrentHandler(object):
self.alerts = component.get("AlertManager") self.alerts = component.get("AlertManager")
self.alerts.register_handler("torrent_removed_alert", self.on_alert_torrent_removed) self.alerts.register_handler("torrent_removed_alert", self.on_alert_torrent_removed)
self.alerts.register_handler("torrent_finished_alert", self.on_alert_torrent_finished) self.alerts.register_handler("torrent_finished_alert", self.on_alert_torrent_finished)
self.alerts.register_handler("read_piece_alert", self.on_alert_read_piece)
self.cleanup_looping_call = task.LoopingCall(self.cleanup) self.cleanup_looping_call = task.LoopingCall(self.cleanup)
self.cleanup_looping_call.start(60) self.cleanup_looping_call.start(60)
@@ -427,6 +436,18 @@ class TorrentHandler(object):
if self.reset_priorities_on_finish: if self.reset_priorities_on_finish:
self.torrents[infohash].reset_priorities() self.torrents[infohash].reset_priorities()
def on_alert_read_piece(self, alert):
try:
infohash = str(alert.handle.info_hash())
except (RuntimeError, KeyError):
logger.warning('Failed to handle on read piece alert')
return
if infohash not in self.torrents:
return
self.torrents[infohash].new_piece_available(alert.piece, alert.buffer)
def shutdown(self): def shutdown(self):
for torrent in self.torrents.values(): for torrent in self.torrents.values():
if self.reset_priorities_on_finish: if self.reset_priorities_on_finish:

View File

@@ -228,20 +228,18 @@
</packing> </packing>
</child> </child>
<!-- <child> <!-- <child>
<widget class="GtkRadioButton" id="input_serve_webui"> <object class="GtkRadioButton" id="input_serve_webui">
<property name="label" translatable="yes">Serve files via WebUI</property> <property name="label" translatable="yes">Serve files via WebUI</property>
<property name="visible">False</property> <property name="visible">True</property>
<property name="sensitive">False</property>
<property name="can_focus">True</property> <property name="can_focus">True</property>
<property name="receives_default">False</property> <property name="receives_default">False</property>
<property name="use_action_appearance">False</property> <property name="use_action_appearance">False</property>
<property name="active">True</property>
<property name="draw_indicator">True</property> <property name="draw_indicator">True</property>
</widget> </object>
<packing> <packing>
<property name="expand">True</property> <property name="expand">True</property>
<property name="fill">True</property> <property name="fill">True</property>
<property name="position">2</property> <property name="position">4</property>
</packing> </packing>
</child> --> </child> -->
<child> <child>
@@ -250,17 +248,15 @@
<property name="can_focus">False</property> <property name="can_focus">False</property>
<property name="spacing">5</property> <property name="spacing">5</property>
<!-- <child> <!-- <child>
<widget class="GtkRadioButton" id="input_serve_standalone"> <object class="GtkRadioButton" id="input_serve_standalone">
<property name="label" translatable="yes">Serve files via standalone</property> <property name="label" translatable="yes">Serve files via standalone</property>
<property name="visible">False</property> <property name="visible">True</property>
<property name="sensitive">False</property>
<property name="can_focus">True</property> <property name="can_focus">True</property>
<property name="receives_default">False</property> <property name="receives_default">False</property>
<property name="use_action_appearance">False</property> <property name="use_action_appearance">False</property>
<property name="active">True</property>
<property name="draw_indicator">True</property> <property name="draw_indicator">True</property>
<property name="group">input_serve_webui</property> <property name="group">input_serve_webui</property>
</widget> </object>
<packing> <packing>
<property name="expand">True</property> <property name="expand">True</property>
<property name="fill">True</property> <property name="fill">True</property>

View File

@@ -32,7 +32,7 @@ class Resource(TwistedResource):
if not authenticated: if not authenticated:
request.setResponseCode(401) request.setResponseCode(401)
return 'Unauthorized' return b'Unauthorized'
m = getattr(self, 'render_' + request.method.decode('utf-8'), None) m = getattr(self, 'render_' + request.method.decode('utf-8'), None)
if not m: if not m:

View File

@@ -1,17 +1,27 @@
import logging import logging
import mimetypes import mimetypes
import os import os
import time
import threading
from io import BytesIO
from thomas import InputBase from thomas import InputBase
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
PIECE_REQUEST_HISTORY_TIME = 10
MAX_PIECE_REQUEST_COUNT = 20
class DelugeTorrentInput(InputBase.find_plugin('file')): class DelugeTorrentInput(InputBase):
plugin_name = 'torrent_file' plugin_name = 'torrent_file'
protocols = [] protocols = []
current_piece_data = None
can_read_to = None can_read_to = None
last_available_piece = None
_pos = None
_closed = False
def __init__(self, item, torrent_handler, infohash, offset, path): def __init__(self, item, torrent_handler, infohash, offset, path):
self.item = item self.item = item
@@ -20,6 +30,9 @@ class DelugeTorrentInput(InputBase.find_plugin('file')):
self.infohash = infohash self.infohash = infohash
self.offset = offset self.offset = offset
self.path = path self.path = path
self.piece_buffer = {}
self.requested_pieces = {}
self.piece_request_queue = []
self.size, self.filename, self.content_type = self.get_info() self.size, self.filename, self.content_type = self.get_info()
def get_info(self): def get_info(self):
@@ -33,36 +46,94 @@ class DelugeTorrentInput(InputBase.find_plugin('file')):
if not os.path.exists(self.path): if not os.path.exists(self.path):
self.torrent.can_read(self.offset) self.torrent.can_read(self.offset)
def tell(self):
return self._pos
def seek(self, pos): def seek(self, pos):
self.ensure_exists() self.ensure_exists()
super(DelugeTorrentInput, self).seek(pos) self._pos = pos
logger.debug('Seeking at %s torrentfile_id %r' % (self.tell(), id(self))) logger.debug('Seeking at %s torrentfile_id %r' % (self.tell(), id(self)))
self.torrent.add_reader(self, self.item.path, self.offset + self.tell(), self.offset + self.size) self.torrent.add_reader(self, self.item.path, self.offset + self.tell(), self.offset + self.size)
def _read(self, num):
data = self.current_piece_data.read(num)
self._pos += len(data)
return data
def read(self, num): def read(self, num):
if self.current_piece_data:
data = self._read(num)
if data:
return data
self.ensure_exists() self.ensure_exists()
if not self._open_file: if self._pos is None:
self.seek(0) self.seek(0)
logger.debug('Trying to read %s from %i torrentfile_id %r' % (self.path, self.tell(), id(self))) logger.debug('Trying to read %s from %i torrentfile_id %r' % (self.path, self.tell(), id(self)))
tell = self.tell() tell = self.tell()
if self.can_read_to is None or self.can_read_to <= tell: if self.can_read_to is None or self.can_read_to <= tell:
self.can_read_to = self.torrent.can_read(self.offset + tell) + tell can_read_result = self.torrent.can_read(self.offset + tell)
self.last_available_piece = can_read_result[1]
self.can_read_to = can_read_result[0] + tell
if self._open_file: current_piece, rest = self.current_piece
self._open_file.seek(tell) logger.debug('Calculated last available piece is %s offset %s can_read_to %s piece_length %s' % (self.last_available_piece, self.offset, self.can_read_to, self.torrent.piece_length))
real_num = min(num, self.can_read_to - tell) while self.piece_consumption_time and self.piece_consumption_time[0] < time.time() - PIECE_REQUEST_HISTORY_TIME:
if num != real_num: self.piece_consumption_time.pop(0)
logger.info('The real number we can read to is %s and not %s at position %s' % (real_num, num, tell))
if not self._open_file: # the file was closed while we waited max_piece_count = (self.last_available_piece - current_piece) + 1
pieces_to_request = min(min(max(2, len(self.piece_consumption_time)), max_piece_count), MAX_PIECE_REQUEST_COUNT)
logger.debug('New piece request status pieces_to_request: %s piece_consumption_time: %s max_piece_count: %s' % (pieces_to_request, len(self.piece_consumption_time), max_piece_count, ))
logger.debug('Requested pieces: %r' % (self.requested_pieces.items()))
logger.debug('Piece buffer: %r' % (self.piece_buffer.keys()))
for piece in range(current_piece, current_piece + pieces_to_request):
if piece in self.requested_pieces:
continue
logger.debug('Requesting piece %s' % (piece, ))
self.requested_pieces[piece] = threading.Event()
self.torrent.request_piece(piece)
for _ in range(1000):
if self.requested_pieces[current_piece].wait(1):
break
if self._closed:
return b''
else:
return b'' return b''
data = super(DelugeTorrentInput, self).read(real_num) for delete_piece in [p for p in self.piece_buffer.keys() if p < current_piece]:
return data del self.piece_buffer[delete_piece]
for delete_piece in [p for p in self.requested_pieces.keys() if p < current_piece]:
del self.requested_pieces[delete_piece]
self.current_piece_data = self.piece_buffer[current_piece]
self.current_piece_data.seek(rest)
self.piece_consumption_time.append(time.time())
logger.debug('Returning %s bytes' % (num, ))
return self._read(num)
@property
def current_piece(self):
from_byte = self.offset + self.tell()
piece_length = self.torrent.piece_length
piece, rest = divmod(from_byte, piece_length)
return piece, rest
def new_piece_available(self, piece, data):
if piece not in self.requested_pieces or self.requested_pieces[piece].is_set():
return
logger.debug("Setting data for piece %s" % (piece, ))
self.piece_buffer[piece] = BytesIO(data)
self.requested_pieces[piece].set()
def close(self): def close(self):
self.torrent.remove_reader(self) self.torrent.remove_reader(self)
super(DelugeTorrentInput, self).close() self._closed = True