moved to using read piece instead of reading from disk

This commit is contained in:
Anders Jensen
2020-04-21 21:49:31 +02:00
parent a7fff07379
commit e8f24b34bb
6 changed files with 127 additions and 34 deletions

View File

@@ -1,7 +1,7 @@
# Streaming Plugin
https://github.com/JohnDoee/deluge-streaming
(c)2019 by Anders Jensen <johndoee@tidalstream.org>
(c)2020 by Anders Jensen <johndoee@tridentstream.org>
## Description
@@ -107,6 +107,11 @@ List of URL GET Arguments
# Version Info
## Version 0.12.0
* Moved to reading pieces through Deluge to avoid unflushed data
* Fixed Deluge 2 / libtorrent related bug
## Version 0.11.0
* Initial support for Deluge 2 / Python 3
* Added support for aggressive piece prioritization when it should not be necessary.

View File

@@ -41,8 +41,8 @@ from setuptools import setup, find_packages
__plugin_name__ = "Streaming"
__author__ = "Anders Jensen"
__author_email__ = "johndoee@tidalstream.org"
__version__ = "0.11.0"
__author_email__ = "johndoee@tridentstream.org"
__version__ = "0.12.0"
__url__ = "https://github.com/JohnDoee/deluge-streaming"
__license__ = "GPLv3"
__description__ = "Enables streaming of files while downloading them."

View File

@@ -65,7 +65,6 @@ from thomas import router, Item, OutputBase
from .resource import Resource
from .torrentfile import DelugeTorrentInput
defer.setDebugging(True)
router.register_handler(DelugeTorrentInput.plugin_name, DelugeTorrentInput, True, False, False)
VIDEO_STREAMABLE_EXTENSIONS = ['mkv', 'mp4', 'iso', 'ogg', 'ogm', 'm4v']
@@ -119,7 +118,7 @@ def get_torrent(infohash):
# Ensure file_priorities option is populated.
self.set_file_priorities([])
return self.options["file_priorities"]
return list(self.options["file_priorities"])
torrent = component.get("TorrentManager").torrents.get(infohash, None)
if torrent and not hasattr(torrent, 'get_file_priorities'):
@@ -200,7 +199,7 @@ class Torrent(object):
self.torrent.handle.set_piece_deadline(needed_piece, 0)
self.torrent.handle.piece_priority(needed_piece, MAX_PIECE_PRIORITY)
file_priorities = self.torrent.get_file_priorities()
file_priorities = list(self.torrent.get_file_priorities())
if file_priorities[f['index']] != MAX_FILE_PRIORITY:
logger.debug('Also setting file to max %r' % (f, ))
file_priorities[f['index']] = MAX_FILE_PRIORITY
@@ -224,7 +223,8 @@ class Torrent(object):
logger.debug('Calling read again to get the real number')
return self.can_read(from_byte)
else:
return ((last_available_piece - needed_piece) * self.piece_length) + self.piece_length - rest
logger.debug('Really last available piece is %s' % (last_available_piece, ))
return ((last_available_piece - needed_piece) * self.piece_length) + self.piece_length - rest, last_available_piece
def is_idle(self):
return not self.readers and self.last_activity + TORRENT_CLEANUP_INTERVAL < datetime.now()
@@ -275,7 +275,7 @@ class Torrent(object):
logger.debug('We had a fileset not started, must_whitelist:%r first_files:%r cannot_blacklist:%r' % (must_whitelist, first_files, cannot_blacklist))
status = self.torrent.get_status(['files', 'file_progress'])
file_priorities = self.torrent.get_file_priorities()
file_priorities = list(self.torrent.get_file_priorities())
for f, progress in zip(status['files'], status['file_progress']):
i = f['index']
if progress == 1.0:
@@ -314,7 +314,7 @@ class Torrent(object):
else:
fileset_ranges[fileset_hash] = fileset['files'].index(path)
file_priorities = self.torrent.get_file_priorities()
file_priorities = list(self.torrent.get_file_priorities())
logger.debug('Fileset heads: %r' % (fileset_ranges, ))
for fileset_hash, first_file in fileset_ranges.items():
fileset = self.filesets[fileset_hash]
@@ -387,6 +387,14 @@ class Torrent(object):
if fileset_hash not in self.filesets:
self.filesets[fileset_hash] = {'started': False, 'files': files}
def request_piece(self, piece):
self.torrent.handle.read_piece(piece)
def new_piece_available(self, piece, data):
logger.debug("New pice available: %s" % (piece, ))
for reader in self.readers.keys():
reader.new_piece_available(piece, data)
class TorrentHandler(object):
def __init__(self, reset_priorities_on_finish, aggressive_prioritizing=False):
@@ -397,6 +405,7 @@ class TorrentHandler(object):
self.alerts = component.get("AlertManager")
self.alerts.register_handler("torrent_removed_alert", self.on_alert_torrent_removed)
self.alerts.register_handler("torrent_finished_alert", self.on_alert_torrent_finished)
self.alerts.register_handler("read_piece_alert", self.on_alert_read_piece)
self.cleanup_looping_call = task.LoopingCall(self.cleanup)
self.cleanup_looping_call.start(60)
@@ -427,6 +436,18 @@ class TorrentHandler(object):
if self.reset_priorities_on_finish:
self.torrents[infohash].reset_priorities()
def on_alert_read_piece(self, alert):
try:
infohash = str(alert.handle.info_hash())
except (RuntimeError, KeyError):
logger.warning('Failed to handle on read piece alert')
return
if infohash not in self.torrents:
return
self.torrents[infohash].new_piece_available(alert.piece, alert.buffer)
def shutdown(self):
for torrent in self.torrents.values():
if self.reset_priorities_on_finish:

View File

@@ -228,20 +228,18 @@
</packing>
</child>
<!-- <child>
<widget class="GtkRadioButton" id="input_serve_webui">
<object class="GtkRadioButton" id="input_serve_webui">
<property name="label" translatable="yes">Serve files via WebUI</property>
<property name="visible">False</property>
<property name="sensitive">False</property>
<property name="visible">True</property>
<property name="can_focus">True</property>
<property name="receives_default">False</property>
<property name="use_action_appearance">False</property>
<property name="active">True</property>
<property name="draw_indicator">True</property>
</widget>
</object>
<packing>
<property name="expand">True</property>
<property name="fill">True</property>
<property name="position">2</property>
<property name="position">4</property>
</packing>
</child> -->
<child>
@@ -250,17 +248,15 @@
<property name="can_focus">False</property>
<property name="spacing">5</property>
<!-- <child>
<widget class="GtkRadioButton" id="input_serve_standalone">
<object class="GtkRadioButton" id="input_serve_standalone">
<property name="label" translatable="yes">Serve files via standalone</property>
<property name="visible">False</property>
<property name="sensitive">False</property>
<property name="visible">True</property>
<property name="can_focus">True</property>
<property name="receives_default">False</property>
<property name="use_action_appearance">False</property>
<property name="active">True</property>
<property name="draw_indicator">True</property>
<property name="group">input_serve_webui</property>
</widget>
</object>
<packing>
<property name="expand">True</property>
<property name="fill">True</property>

View File

@@ -32,7 +32,7 @@ class Resource(TwistedResource):
if not authenticated:
request.setResponseCode(401)
return 'Unauthorized'
return b'Unauthorized'
m = getattr(self, 'render_' + request.method.decode('utf-8'), None)
if not m:

View File

@@ -1,17 +1,27 @@
import logging
import mimetypes
import os
import time
import threading
from io import BytesIO
from thomas import InputBase
logger = logging.getLogger(__name__)
PIECE_REQUEST_HISTORY_TIME = 10
MAX_PIECE_REQUEST_COUNT = 20
class DelugeTorrentInput(InputBase.find_plugin('file')):
class DelugeTorrentInput(InputBase):
plugin_name = 'torrent_file'
protocols = []
current_piece_data = None
can_read_to = None
last_available_piece = None
_pos = None
_closed = False
def __init__(self, item, torrent_handler, infohash, offset, path):
self.item = item
@@ -20,6 +30,9 @@ class DelugeTorrentInput(InputBase.find_plugin('file')):
self.infohash = infohash
self.offset = offset
self.path = path
self.piece_buffer = {}
self.requested_pieces = {}
self.piece_request_queue = []
self.size, self.filename, self.content_type = self.get_info()
def get_info(self):
@@ -33,36 +46,94 @@ class DelugeTorrentInput(InputBase.find_plugin('file')):
if not os.path.exists(self.path):
self.torrent.can_read(self.offset)
def tell(self):
return self._pos
def seek(self, pos):
self.ensure_exists()
super(DelugeTorrentInput, self).seek(pos)
self._pos = pos
logger.debug('Seeking at %s torrentfile_id %r' % (self.tell(), id(self)))
self.torrent.add_reader(self, self.item.path, self.offset + self.tell(), self.offset + self.size)
def _read(self, num):
data = self.current_piece_data.read(num)
self._pos += len(data)
return data
def read(self, num):
if self.current_piece_data:
data = self._read(num)
if data:
return data
self.ensure_exists()
if not self._open_file:
if self._pos is None:
self.seek(0)
logger.debug('Trying to read %s from %i torrentfile_id %r' % (self.path, self.tell(), id(self)))
tell = self.tell()
if self.can_read_to is None or self.can_read_to <= tell:
self.can_read_to = self.torrent.can_read(self.offset + tell) + tell
can_read_result = self.torrent.can_read(self.offset + tell)
self.last_available_piece = can_read_result[1]
self.can_read_to = can_read_result[0] + tell
if self._open_file:
self._open_file.seek(tell)
current_piece, rest = self.current_piece
logger.debug('Calculated last available piece is %s offset %s can_read_to %s piece_length %s' % (self.last_available_piece, self.offset, self.can_read_to, self.torrent.piece_length))
real_num = min(num, self.can_read_to - tell)
if num != real_num:
logger.info('The real number we can read to is %s and not %s at position %s' % (real_num, num, tell))
while self.piece_consumption_time and self.piece_consumption_time[0] < time.time() - PIECE_REQUEST_HISTORY_TIME:
self.piece_consumption_time.pop(0)
if not self._open_file: # the file was closed while we waited
max_piece_count = (self.last_available_piece - current_piece) + 1
pieces_to_request = min(min(max(2, len(self.piece_consumption_time)), max_piece_count), MAX_PIECE_REQUEST_COUNT)
logger.debug('New piece request status pieces_to_request: %s piece_consumption_time: %s max_piece_count: %s' % (pieces_to_request, len(self.piece_consumption_time), max_piece_count, ))
logger.debug('Requested pieces: %r' % (self.requested_pieces.items()))
logger.debug('Piece buffer: %r' % (self.piece_buffer.keys()))
for piece in range(current_piece, current_piece + pieces_to_request):
if piece in self.requested_pieces:
continue
logger.debug('Requesting piece %s' % (piece, ))
self.requested_pieces[piece] = threading.Event()
self.torrent.request_piece(piece)
for _ in range(1000):
if self.requested_pieces[current_piece].wait(1):
break
if self._closed:
return b''
else:
return b''
data = super(DelugeTorrentInput, self).read(real_num)
return data
for delete_piece in [p for p in self.piece_buffer.keys() if p < current_piece]:
del self.piece_buffer[delete_piece]
for delete_piece in [p for p in self.requested_pieces.keys() if p < current_piece]:
del self.requested_pieces[delete_piece]
self.current_piece_data = self.piece_buffer[current_piece]
self.current_piece_data.seek(rest)
self.piece_consumption_time.append(time.time())
logger.debug('Returning %s bytes' % (num, ))
return self._read(num)
@property
def current_piece(self):
from_byte = self.offset + self.tell()
piece_length = self.torrent.piece_length
piece, rest = divmod(from_byte, piece_length)
return piece, rest
def new_piece_available(self, piece, data):
if piece not in self.requested_pieces or self.requested_pieces[piece].is_set():
return
logger.debug("Setting data for piece %s" % (piece, ))
self.piece_buffer[piece] = BytesIO(data)
self.requested_pieces[piece].set()
def close(self):
self.torrent.remove_reader(self)
super(DelugeTorrentInput, self).close()
self._closed = True