10 Commits

Author SHA1 Message Date
Anders Jensen
435707b379 Merge branch 'release/0.12.2' 2020-07-31 16:18:54 +02:00
Anders Jensen
034f0ef331 updated readmes 2020-07-31 16:18:42 +02:00
Anders Jensen
d0e9780d76 added support for new ssl version 2020-07-31 16:16:57 +02:00
Anders Jensen
bffef417f8 Merge tag '0.12.1' into develop
-
2020-04-22 12:34:42 +02:00
Anders Jensen
30340a25a2 Merge branch 'release/0.12.1' 2020-04-22 12:34:39 +02:00
Anders Jensen
287efadb3e fixed breaking bug 2020-04-22 12:34:25 +02:00
Anders Jensen
d6e7fc83b8 Merge tag '0.12.0' into develop
-
2020-04-21 21:49:58 +02:00
Anders Jensen
b159bc2be5 Merge branch 'release/0.12.0' 2020-04-21 21:49:54 +02:00
Anders Jensen
e8f24b34bb moved to using read piece instead of reading from disk 2020-04-21 21:49:31 +02:00
Anders Jensen
a7fff07379 Merge tag '0.11.0' into develop
Added Deluge 2 support
2019-11-03 09:47:28 +01:00
6 changed files with 149 additions and 43 deletions

View File

@@ -1,7 +1,7 @@
# Streaming Plugin
https://github.com/JohnDoee/deluge-streaming
(c)2019 by Anders Jensen <johndoee@tidalstream.org>
(c)2020 by Anders Jensen <johndoee@tridentstream.org>
## Description
@@ -107,6 +107,19 @@ List of URL GET Arguments
# Version Info
## Version 0.12.2
* Added support for TLS 1.2
## Version 0.12.1
* Fixed small breaking bug
## Version 0.12.0
* Moved to reading pieces through Deluge to avoid unflushed data
* Fixed Deluge 2 / libtorrent related bug
## Version 0.11.0
* Initial support for Deluge 2 / Python 3
* Added support for aggressive piece prioritization when it should not be necessary.

View File

@@ -41,8 +41,8 @@ from setuptools import setup, find_packages
__plugin_name__ = "Streaming"
__author__ = "Anders Jensen"
__author_email__ = "johndoee@tidalstream.org"
__version__ = "0.11.0"
__author_email__ = "johndoee@tridentstream.org"
__version__ = "0.12.2"
__url__ = "https://github.com/JohnDoee/deluge-streaming"
__license__ = "GPLv3"
__description__ = "Enables streaming of files while downloading them."

View File

@@ -56,7 +56,7 @@ from deluge._libtorrent import lt
from deluge.core.rpcserver import export
from deluge.plugins.pluginbase import CorePluginBase
from twisted.internet import reactor, defer, task
from twisted.internet import reactor, defer, task, error
from twisted.web import server, client
from twisted.web.resource import Resource as TwistedResource
@@ -65,7 +65,6 @@ from thomas import router, Item, OutputBase
from .resource import Resource
from .torrentfile import DelugeTorrentInput
defer.setDebugging(True)
router.register_handler(DelugeTorrentInput.plugin_name, DelugeTorrentInput, True, False, False)
VIDEO_STREAMABLE_EXTENSIONS = ['mkv', 'mp4', 'iso', 'ogg', 'ogm', 'm4v']
@@ -119,7 +118,7 @@ def get_torrent(infohash):
# Ensure file_priorities option is populated.
self.set_file_priorities([])
return self.options["file_priorities"]
return list(self.options["file_priorities"])
torrent = component.get("TorrentManager").torrents.get(infohash, None)
if torrent and not hasattr(torrent, 'get_file_priorities'):
@@ -200,7 +199,7 @@ class Torrent(object):
self.torrent.handle.set_piece_deadline(needed_piece, 0)
self.torrent.handle.piece_priority(needed_piece, MAX_PIECE_PRIORITY)
file_priorities = self.torrent.get_file_priorities()
file_priorities = list(self.torrent.get_file_priorities())
if file_priorities[f['index']] != MAX_FILE_PRIORITY:
logger.debug('Also setting file to max %r' % (f, ))
file_priorities[f['index']] = MAX_FILE_PRIORITY
@@ -224,7 +223,8 @@ class Torrent(object):
logger.debug('Calling read again to get the real number')
return self.can_read(from_byte)
else:
return ((last_available_piece - needed_piece) * self.piece_length) + self.piece_length - rest
logger.debug('Really last available piece is %s' % (last_available_piece, ))
return ((last_available_piece - needed_piece) * self.piece_length) + self.piece_length - rest, last_available_piece
def is_idle(self):
return not self.readers and self.last_activity + TORRENT_CLEANUP_INTERVAL < datetime.now()
@@ -275,7 +275,7 @@ class Torrent(object):
logger.debug('We had a fileset not started, must_whitelist:%r first_files:%r cannot_blacklist:%r' % (must_whitelist, first_files, cannot_blacklist))
status = self.torrent.get_status(['files', 'file_progress'])
file_priorities = self.torrent.get_file_priorities()
file_priorities = list(self.torrent.get_file_priorities())
for f, progress in zip(status['files'], status['file_progress']):
i = f['index']
if progress == 1.0:
@@ -314,7 +314,7 @@ class Torrent(object):
else:
fileset_ranges[fileset_hash] = fileset['files'].index(path)
file_priorities = self.torrent.get_file_priorities()
file_priorities = list(self.torrent.get_file_priorities())
logger.debug('Fileset heads: %r' % (fileset_ranges, ))
for fileset_hash, first_file in fileset_ranges.items():
fileset = self.filesets[fileset_hash]
@@ -387,6 +387,14 @@ class Torrent(object):
if fileset_hash not in self.filesets:
self.filesets[fileset_hash] = {'started': False, 'files': files}
def request_piece(self, piece):
self.torrent.handle.read_piece(piece)
def new_piece_available(self, piece, data):
logger.debug("New pice available: %s" % (piece, ))
for reader in self.readers.keys():
reader.new_piece_available(piece, data)
class TorrentHandler(object):
def __init__(self, reset_priorities_on_finish, aggressive_prioritizing=False):
@@ -397,6 +405,7 @@ class TorrentHandler(object):
self.alerts = component.get("AlertManager")
self.alerts.register_handler("torrent_removed_alert", self.on_alert_torrent_removed)
self.alerts.register_handler("torrent_finished_alert", self.on_alert_torrent_finished)
self.alerts.register_handler("read_piece_alert", self.on_alert_read_piece)
self.cleanup_looping_call = task.LoopingCall(self.cleanup)
self.cleanup_looping_call.start(60)
@@ -427,6 +436,18 @@ class TorrentHandler(object):
if self.reset_priorities_on_finish:
self.torrents[infohash].reset_priorities()
def on_alert_read_piece(self, alert):
try:
infohash = str(alert.handle.info_hash())
except (RuntimeError, KeyError):
logger.warning('Failed to handle on read piece alert')
return
if infohash not in self.torrents:
return
self.torrents[infohash].new_piece_available(alert.piece, alert.buffer)
def shutdown(self):
for torrent in self.torrents.values():
if self.reset_priorities_on_finish:
@@ -599,9 +620,11 @@ class ServerContextFactory(object):
def getContext(self):
from OpenSSL import SSL
method = getattr(SSL, 'TLSv1_1_METHOD', None)
if method is None:
method = getattr(SSL, 'SSLv23_METHOD', None)
methods_names = ['TLSv1_2_METHOD', 'TLSv1_1_METHOD', 'SSLv23_METHOD']
for method_name in methods_names:
method = getattr(SSL, method_name, None)
if method is not None:
break
ctx = SSL.Context(method)
ctx.use_certificate_file(self._cert_file)
@@ -722,13 +745,19 @@ class Core(CorePluginBase):
try:
self.listening = reactor.listenSSL(self.config['port'], self.site, context, interface=self.config['ip'])
except:
self.listening = reactor.listenSSL(self.config['port'], self.site, context, interface='0.0.0.0')
try:
self.listening = reactor.listenSSL(self.config['port'], self.site, context, interface='0.0.0.0')
except error.CannotListenError:
logger.warning("Unable to listen to anything")
self.base_url += 's'
else:
try:
self.listening = reactor.listenTCP(self.config['port'], self.site, interface=self.config['ip'])
except:
self.listening = reactor.listenTCP(self.config['port'], self.site, interface='0.0.0.0')
try:
self.listening = reactor.listenTCP(self.config['port'], self.site, interface='0.0.0.0')
except error.CannotListenError:
logger.warning("Unable to listen to anything")
port = self.config['port']
ip = self.config['ip']
@@ -799,9 +828,6 @@ class Core(CorePluginBase):
plugin_manager = component.get("CorePluginManager")
return 'WebUi' in plugin_manager.get_enabled_plugins()
def check_config(self):
pass
@export
@defer.inlineCallbacks
def set_config(self, config):

View File

@@ -228,20 +228,18 @@
</packing>
</child>
<!-- <child>
<widget class="GtkRadioButton" id="input_serve_webui">
<object class="GtkRadioButton" id="input_serve_webui">
<property name="label" translatable="yes">Serve files via WebUI</property>
<property name="visible">False</property>
<property name="sensitive">False</property>
<property name="visible">True</property>
<property name="can_focus">True</property>
<property name="receives_default">False</property>
<property name="use_action_appearance">False</property>
<property name="active">True</property>
<property name="draw_indicator">True</property>
</widget>
</object>
<packing>
<property name="expand">True</property>
<property name="fill">True</property>
<property name="position">2</property>
<property name="position">4</property>
</packing>
</child> -->
<child>
@@ -250,17 +248,15 @@
<property name="can_focus">False</property>
<property name="spacing">5</property>
<!-- <child>
<widget class="GtkRadioButton" id="input_serve_standalone">
<object class="GtkRadioButton" id="input_serve_standalone">
<property name="label" translatable="yes">Serve files via standalone</property>
<property name="visible">False</property>
<property name="sensitive">False</property>
<property name="visible">True</property>
<property name="can_focus">True</property>
<property name="receives_default">False</property>
<property name="use_action_appearance">False</property>
<property name="active">True</property>
<property name="draw_indicator">True</property>
<property name="group">input_serve_webui</property>
</widget>
</object>
<packing>
<property name="expand">True</property>
<property name="fill">True</property>

View File

@@ -32,7 +32,7 @@ class Resource(TwistedResource):
if not authenticated:
request.setResponseCode(401)
return 'Unauthorized'
return b'Unauthorized'
m = getattr(self, 'render_' + request.method.decode('utf-8'), None)
if not m:

View File

@@ -1,17 +1,27 @@
import logging
import mimetypes
import os
import time
import threading
from io import BytesIO
from thomas import InputBase
logger = logging.getLogger(__name__)
PIECE_REQUEST_HISTORY_TIME = 10
MAX_PIECE_REQUEST_COUNT = 20
class DelugeTorrentInput(InputBase.find_plugin('file')):
class DelugeTorrentInput(InputBase):
plugin_name = 'torrent_file'
protocols = []
current_piece_data = None
can_read_to = None
last_available_piece = None
_pos = None
_closed = False
def __init__(self, item, torrent_handler, infohash, offset, path):
self.item = item
@@ -20,6 +30,9 @@ class DelugeTorrentInput(InputBase.find_plugin('file')):
self.infohash = infohash
self.offset = offset
self.path = path
self.piece_buffer = {}
self.requested_pieces = {}
self.piece_consumption_time = []
self.size, self.filename, self.content_type = self.get_info()
def get_info(self):
@@ -33,36 +46,94 @@ class DelugeTorrentInput(InputBase.find_plugin('file')):
if not os.path.exists(self.path):
self.torrent.can_read(self.offset)
def tell(self):
return self._pos
def seek(self, pos):
self.ensure_exists()
super(DelugeTorrentInput, self).seek(pos)
self._pos = pos
logger.debug('Seeking at %s torrentfile_id %r' % (self.tell(), id(self)))
self.torrent.add_reader(self, self.item.path, self.offset + self.tell(), self.offset + self.size)
def _read(self, num):
data = self.current_piece_data.read(num)
self._pos += len(data)
return data
def read(self, num):
if self.current_piece_data:
data = self._read(num)
if data:
return data
self.ensure_exists()
if not self._open_file:
if self._pos is None:
self.seek(0)
logger.debug('Trying to read %s from %i torrentfile_id %r' % (self.path, self.tell(), id(self)))
tell = self.tell()
if self.can_read_to is None or self.can_read_to <= tell:
self.can_read_to = self.torrent.can_read(self.offset + tell) + tell
can_read_result = self.torrent.can_read(self.offset + tell)
self.last_available_piece = can_read_result[1]
self.can_read_to = can_read_result[0] + tell
if self._open_file:
self._open_file.seek(tell)
current_piece, rest = self.current_piece
logger.debug('Calculated last available piece is %s offset %s can_read_to %s piece_length %s' % (self.last_available_piece, self.offset, self.can_read_to, self.torrent.piece_length))
real_num = min(num, self.can_read_to - tell)
if num != real_num:
logger.info('The real number we can read to is %s and not %s at position %s' % (real_num, num, tell))
while self.piece_consumption_time and self.piece_consumption_time[0] < time.time() - PIECE_REQUEST_HISTORY_TIME:
self.piece_consumption_time.pop(0)
if not self._open_file: # the file was closed while we waited
max_piece_count = (self.last_available_piece - current_piece) + 1
pieces_to_request = min(min(max(2, len(self.piece_consumption_time)), max_piece_count), MAX_PIECE_REQUEST_COUNT)
logger.debug('New piece request status pieces_to_request: %s piece_consumption_time: %s max_piece_count: %s' % (pieces_to_request, len(self.piece_consumption_time), max_piece_count, ))
logger.debug('Requested pieces: %r' % (self.requested_pieces.items()))
logger.debug('Piece buffer: %r' % (self.piece_buffer.keys()))
for piece in range(current_piece, current_piece + pieces_to_request):
if piece in self.requested_pieces:
continue
logger.debug('Requesting piece %s' % (piece, ))
self.requested_pieces[piece] = threading.Event()
self.torrent.request_piece(piece)
for _ in range(1000):
if self.requested_pieces[current_piece].wait(1):
break
if self._closed:
return b''
else:
return b''
data = super(DelugeTorrentInput, self).read(real_num)
return data
for delete_piece in [p for p in self.piece_buffer.keys() if p < current_piece]:
del self.piece_buffer[delete_piece]
for delete_piece in [p for p in self.requested_pieces.keys() if p < current_piece]:
del self.requested_pieces[delete_piece]
self.current_piece_data = self.piece_buffer[current_piece]
self.current_piece_data.seek(rest)
self.piece_consumption_time.append(time.time())
logger.debug('Returning %s bytes' % (num, ))
return self._read(num)
@property
def current_piece(self):
from_byte = self.offset + self.tell()
piece_length = self.torrent.piece_length
piece, rest = divmod(from_byte, piece_length)
return piece, rest
def new_piece_available(self, piece, data):
if piece not in self.requested_pieces or self.requested_pieces[piece].is_set():
return
logger.debug("Setting data for piece %s" % (piece, ))
self.piece_buffer[piece] = BytesIO(data)
self.requested_pieces[piece].set()
def close(self):
self.torrent.remove_reader(self)
super(DelugeTorrentInput, self).close()
self._closed = True