mirror of
https://github.com/JohnDoee/deluge-streaming/
synced 2026-07-01 07:31:17 -07:00
Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
435707b379 | ||
|
|
034f0ef331 | ||
|
|
d0e9780d76 | ||
|
|
bffef417f8 | ||
|
|
30340a25a2 | ||
|
|
287efadb3e | ||
|
|
d6e7fc83b8 | ||
|
|
b159bc2be5 | ||
|
|
e8f24b34bb | ||
|
|
a7fff07379 |
15
README.md
15
README.md
@@ -1,7 +1,7 @@
|
||||
# Streaming Plugin
|
||||
https://github.com/JohnDoee/deluge-streaming
|
||||
|
||||
(c)2019 by Anders Jensen <johndoee@tidalstream.org>
|
||||
(c)2020 by Anders Jensen <johndoee@tridentstream.org>
|
||||
|
||||
## Description
|
||||
|
||||
@@ -107,6 +107,19 @@ List of URL GET Arguments
|
||||
|
||||
# Version Info
|
||||
|
||||
## Version 0.12.2
|
||||
|
||||
* Added support for TLS 1.2
|
||||
|
||||
## Version 0.12.1
|
||||
|
||||
* Fixed small breaking bug
|
||||
|
||||
## Version 0.12.0
|
||||
|
||||
* Moved to reading pieces through Deluge to avoid unflushed data
|
||||
* Fixed Deluge 2 / libtorrent related bug
|
||||
|
||||
## Version 0.11.0
|
||||
* Initial support for Deluge 2 / Python 3
|
||||
* Added support for aggressive piece prioritization when it should not be necessary.
|
||||
|
||||
4
setup.py
4
setup.py
@@ -41,8 +41,8 @@ from setuptools import setup, find_packages
|
||||
|
||||
__plugin_name__ = "Streaming"
|
||||
__author__ = "Anders Jensen"
|
||||
__author_email__ = "johndoee@tidalstream.org"
|
||||
__version__ = "0.11.0"
|
||||
__author_email__ = "johndoee@tridentstream.org"
|
||||
__version__ = "0.12.2"
|
||||
__url__ = "https://github.com/JohnDoee/deluge-streaming"
|
||||
__license__ = "GPLv3"
|
||||
__description__ = "Enables streaming of files while downloading them."
|
||||
|
||||
@@ -56,7 +56,7 @@ from deluge._libtorrent import lt
|
||||
from deluge.core.rpcserver import export
|
||||
from deluge.plugins.pluginbase import CorePluginBase
|
||||
|
||||
from twisted.internet import reactor, defer, task
|
||||
from twisted.internet import reactor, defer, task, error
|
||||
from twisted.web import server, client
|
||||
from twisted.web.resource import Resource as TwistedResource
|
||||
|
||||
@@ -65,7 +65,6 @@ from thomas import router, Item, OutputBase
|
||||
from .resource import Resource
|
||||
from .torrentfile import DelugeTorrentInput
|
||||
|
||||
defer.setDebugging(True)
|
||||
router.register_handler(DelugeTorrentInput.plugin_name, DelugeTorrentInput, True, False, False)
|
||||
|
||||
VIDEO_STREAMABLE_EXTENSIONS = ['mkv', 'mp4', 'iso', 'ogg', 'ogm', 'm4v']
|
||||
@@ -119,7 +118,7 @@ def get_torrent(infohash):
|
||||
# Ensure file_priorities option is populated.
|
||||
self.set_file_priorities([])
|
||||
|
||||
return self.options["file_priorities"]
|
||||
return list(self.options["file_priorities"])
|
||||
|
||||
torrent = component.get("TorrentManager").torrents.get(infohash, None)
|
||||
if torrent and not hasattr(torrent, 'get_file_priorities'):
|
||||
@@ -200,7 +199,7 @@ class Torrent(object):
|
||||
self.torrent.handle.set_piece_deadline(needed_piece, 0)
|
||||
self.torrent.handle.piece_priority(needed_piece, MAX_PIECE_PRIORITY)
|
||||
|
||||
file_priorities = self.torrent.get_file_priorities()
|
||||
file_priorities = list(self.torrent.get_file_priorities())
|
||||
if file_priorities[f['index']] != MAX_FILE_PRIORITY:
|
||||
logger.debug('Also setting file to max %r' % (f, ))
|
||||
file_priorities[f['index']] = MAX_FILE_PRIORITY
|
||||
@@ -224,7 +223,8 @@ class Torrent(object):
|
||||
logger.debug('Calling read again to get the real number')
|
||||
return self.can_read(from_byte)
|
||||
else:
|
||||
return ((last_available_piece - needed_piece) * self.piece_length) + self.piece_length - rest
|
||||
logger.debug('Really last available piece is %s' % (last_available_piece, ))
|
||||
return ((last_available_piece - needed_piece) * self.piece_length) + self.piece_length - rest, last_available_piece
|
||||
|
||||
def is_idle(self):
|
||||
return not self.readers and self.last_activity + TORRENT_CLEANUP_INTERVAL < datetime.now()
|
||||
@@ -275,7 +275,7 @@ class Torrent(object):
|
||||
logger.debug('We had a fileset not started, must_whitelist:%r first_files:%r cannot_blacklist:%r' % (must_whitelist, first_files, cannot_blacklist))
|
||||
status = self.torrent.get_status(['files', 'file_progress'])
|
||||
|
||||
file_priorities = self.torrent.get_file_priorities()
|
||||
file_priorities = list(self.torrent.get_file_priorities())
|
||||
for f, progress in zip(status['files'], status['file_progress']):
|
||||
i = f['index']
|
||||
if progress == 1.0:
|
||||
@@ -314,7 +314,7 @@ class Torrent(object):
|
||||
else:
|
||||
fileset_ranges[fileset_hash] = fileset['files'].index(path)
|
||||
|
||||
file_priorities = self.torrent.get_file_priorities()
|
||||
file_priorities = list(self.torrent.get_file_priorities())
|
||||
logger.debug('Fileset heads: %r' % (fileset_ranges, ))
|
||||
for fileset_hash, first_file in fileset_ranges.items():
|
||||
fileset = self.filesets[fileset_hash]
|
||||
@@ -387,6 +387,14 @@ class Torrent(object):
|
||||
if fileset_hash not in self.filesets:
|
||||
self.filesets[fileset_hash] = {'started': False, 'files': files}
|
||||
|
||||
def request_piece(self, piece):
|
||||
self.torrent.handle.read_piece(piece)
|
||||
|
||||
def new_piece_available(self, piece, data):
|
||||
logger.debug("New pice available: %s" % (piece, ))
|
||||
for reader in self.readers.keys():
|
||||
reader.new_piece_available(piece, data)
|
||||
|
||||
|
||||
class TorrentHandler(object):
|
||||
def __init__(self, reset_priorities_on_finish, aggressive_prioritizing=False):
|
||||
@@ -397,6 +405,7 @@ class TorrentHandler(object):
|
||||
self.alerts = component.get("AlertManager")
|
||||
self.alerts.register_handler("torrent_removed_alert", self.on_alert_torrent_removed)
|
||||
self.alerts.register_handler("torrent_finished_alert", self.on_alert_torrent_finished)
|
||||
self.alerts.register_handler("read_piece_alert", self.on_alert_read_piece)
|
||||
|
||||
self.cleanup_looping_call = task.LoopingCall(self.cleanup)
|
||||
self.cleanup_looping_call.start(60)
|
||||
@@ -427,6 +436,18 @@ class TorrentHandler(object):
|
||||
if self.reset_priorities_on_finish:
|
||||
self.torrents[infohash].reset_priorities()
|
||||
|
||||
def on_alert_read_piece(self, alert):
|
||||
try:
|
||||
infohash = str(alert.handle.info_hash())
|
||||
except (RuntimeError, KeyError):
|
||||
logger.warning('Failed to handle on read piece alert')
|
||||
return
|
||||
|
||||
if infohash not in self.torrents:
|
||||
return
|
||||
|
||||
self.torrents[infohash].new_piece_available(alert.piece, alert.buffer)
|
||||
|
||||
def shutdown(self):
|
||||
for torrent in self.torrents.values():
|
||||
if self.reset_priorities_on_finish:
|
||||
@@ -599,9 +620,11 @@ class ServerContextFactory(object):
|
||||
def getContext(self):
|
||||
from OpenSSL import SSL
|
||||
|
||||
method = getattr(SSL, 'TLSv1_1_METHOD', None)
|
||||
if method is None:
|
||||
method = getattr(SSL, 'SSLv23_METHOD', None)
|
||||
methods_names = ['TLSv1_2_METHOD', 'TLSv1_1_METHOD', 'SSLv23_METHOD']
|
||||
for method_name in methods_names:
|
||||
method = getattr(SSL, method_name, None)
|
||||
if method is not None:
|
||||
break
|
||||
|
||||
ctx = SSL.Context(method)
|
||||
ctx.use_certificate_file(self._cert_file)
|
||||
@@ -722,13 +745,19 @@ class Core(CorePluginBase):
|
||||
try:
|
||||
self.listening = reactor.listenSSL(self.config['port'], self.site, context, interface=self.config['ip'])
|
||||
except:
|
||||
self.listening = reactor.listenSSL(self.config['port'], self.site, context, interface='0.0.0.0')
|
||||
try:
|
||||
self.listening = reactor.listenSSL(self.config['port'], self.site, context, interface='0.0.0.0')
|
||||
except error.CannotListenError:
|
||||
logger.warning("Unable to listen to anything")
|
||||
self.base_url += 's'
|
||||
else:
|
||||
try:
|
||||
self.listening = reactor.listenTCP(self.config['port'], self.site, interface=self.config['ip'])
|
||||
except:
|
||||
self.listening = reactor.listenTCP(self.config['port'], self.site, interface='0.0.0.0')
|
||||
try:
|
||||
self.listening = reactor.listenTCP(self.config['port'], self.site, interface='0.0.0.0')
|
||||
except error.CannotListenError:
|
||||
logger.warning("Unable to listen to anything")
|
||||
|
||||
port = self.config['port']
|
||||
ip = self.config['ip']
|
||||
@@ -799,9 +828,6 @@ class Core(CorePluginBase):
|
||||
plugin_manager = component.get("CorePluginManager")
|
||||
return 'WebUi' in plugin_manager.get_enabled_plugins()
|
||||
|
||||
def check_config(self):
|
||||
pass
|
||||
|
||||
@export
|
||||
@defer.inlineCallbacks
|
||||
def set_config(self, config):
|
||||
|
||||
@@ -228,20 +228,18 @@
|
||||
</packing>
|
||||
</child>
|
||||
<!-- <child>
|
||||
<widget class="GtkRadioButton" id="input_serve_webui">
|
||||
<object class="GtkRadioButton" id="input_serve_webui">
|
||||
<property name="label" translatable="yes">Serve files via WebUI</property>
|
||||
<property name="visible">False</property>
|
||||
<property name="sensitive">False</property>
|
||||
<property name="visible">True</property>
|
||||
<property name="can_focus">True</property>
|
||||
<property name="receives_default">False</property>
|
||||
<property name="use_action_appearance">False</property>
|
||||
<property name="active">True</property>
|
||||
<property name="draw_indicator">True</property>
|
||||
</widget>
|
||||
</object>
|
||||
<packing>
|
||||
<property name="expand">True</property>
|
||||
<property name="fill">True</property>
|
||||
<property name="position">2</property>
|
||||
<property name="position">4</property>
|
||||
</packing>
|
||||
</child> -->
|
||||
<child>
|
||||
@@ -250,17 +248,15 @@
|
||||
<property name="can_focus">False</property>
|
||||
<property name="spacing">5</property>
|
||||
<!-- <child>
|
||||
<widget class="GtkRadioButton" id="input_serve_standalone">
|
||||
<object class="GtkRadioButton" id="input_serve_standalone">
|
||||
<property name="label" translatable="yes">Serve files via standalone</property>
|
||||
<property name="visible">False</property>
|
||||
<property name="sensitive">False</property>
|
||||
<property name="visible">True</property>
|
||||
<property name="can_focus">True</property>
|
||||
<property name="receives_default">False</property>
|
||||
<property name="use_action_appearance">False</property>
|
||||
<property name="active">True</property>
|
||||
<property name="draw_indicator">True</property>
|
||||
<property name="group">input_serve_webui</property>
|
||||
</widget>
|
||||
</object>
|
||||
<packing>
|
||||
<property name="expand">True</property>
|
||||
<property name="fill">True</property>
|
||||
|
||||
@@ -32,7 +32,7 @@ class Resource(TwistedResource):
|
||||
|
||||
if not authenticated:
|
||||
request.setResponseCode(401)
|
||||
return 'Unauthorized'
|
||||
return b'Unauthorized'
|
||||
|
||||
m = getattr(self, 'render_' + request.method.decode('utf-8'), None)
|
||||
if not m:
|
||||
|
||||
@@ -1,17 +1,27 @@
|
||||
import logging
|
||||
import mimetypes
|
||||
import os
|
||||
import time
|
||||
import threading
|
||||
|
||||
from io import BytesIO
|
||||
|
||||
from thomas import InputBase
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
PIECE_REQUEST_HISTORY_TIME = 10
|
||||
MAX_PIECE_REQUEST_COUNT = 20
|
||||
|
||||
class DelugeTorrentInput(InputBase.find_plugin('file')):
|
||||
class DelugeTorrentInput(InputBase):
|
||||
plugin_name = 'torrent_file'
|
||||
protocols = []
|
||||
|
||||
current_piece_data = None
|
||||
can_read_to = None
|
||||
last_available_piece = None
|
||||
_pos = None
|
||||
_closed = False
|
||||
|
||||
def __init__(self, item, torrent_handler, infohash, offset, path):
|
||||
self.item = item
|
||||
@@ -20,6 +30,9 @@ class DelugeTorrentInput(InputBase.find_plugin('file')):
|
||||
self.infohash = infohash
|
||||
self.offset = offset
|
||||
self.path = path
|
||||
self.piece_buffer = {}
|
||||
self.requested_pieces = {}
|
||||
self.piece_consumption_time = []
|
||||
self.size, self.filename, self.content_type = self.get_info()
|
||||
|
||||
def get_info(self):
|
||||
@@ -33,36 +46,94 @@ class DelugeTorrentInput(InputBase.find_plugin('file')):
|
||||
if not os.path.exists(self.path):
|
||||
self.torrent.can_read(self.offset)
|
||||
|
||||
def tell(self):
|
||||
return self._pos
|
||||
|
||||
def seek(self, pos):
|
||||
self.ensure_exists()
|
||||
super(DelugeTorrentInput, self).seek(pos)
|
||||
self._pos = pos
|
||||
logger.debug('Seeking at %s torrentfile_id %r' % (self.tell(), id(self)))
|
||||
self.torrent.add_reader(self, self.item.path, self.offset + self.tell(), self.offset + self.size)
|
||||
|
||||
def _read(self, num):
|
||||
data = self.current_piece_data.read(num)
|
||||
self._pos += len(data)
|
||||
return data
|
||||
|
||||
def read(self, num):
|
||||
if self.current_piece_data:
|
||||
data = self._read(num)
|
||||
if data:
|
||||
return data
|
||||
|
||||
self.ensure_exists()
|
||||
|
||||
if not self._open_file:
|
||||
if self._pos is None:
|
||||
self.seek(0)
|
||||
|
||||
logger.debug('Trying to read %s from %i torrentfile_id %r' % (self.path, self.tell(), id(self)))
|
||||
tell = self.tell()
|
||||
if self.can_read_to is None or self.can_read_to <= tell:
|
||||
self.can_read_to = self.torrent.can_read(self.offset + tell) + tell
|
||||
can_read_result = self.torrent.can_read(self.offset + tell)
|
||||
self.last_available_piece = can_read_result[1]
|
||||
self.can_read_to = can_read_result[0] + tell
|
||||
|
||||
if self._open_file:
|
||||
self._open_file.seek(tell)
|
||||
current_piece, rest = self.current_piece
|
||||
logger.debug('Calculated last available piece is %s offset %s can_read_to %s piece_length %s' % (self.last_available_piece, self.offset, self.can_read_to, self.torrent.piece_length))
|
||||
|
||||
real_num = min(num, self.can_read_to - tell)
|
||||
if num != real_num:
|
||||
logger.info('The real number we can read to is %s and not %s at position %s' % (real_num, num, tell))
|
||||
while self.piece_consumption_time and self.piece_consumption_time[0] < time.time() - PIECE_REQUEST_HISTORY_TIME:
|
||||
self.piece_consumption_time.pop(0)
|
||||
|
||||
if not self._open_file: # the file was closed while we waited
|
||||
max_piece_count = (self.last_available_piece - current_piece) + 1
|
||||
pieces_to_request = min(min(max(2, len(self.piece_consumption_time)), max_piece_count), MAX_PIECE_REQUEST_COUNT)
|
||||
|
||||
logger.debug('New piece request status pieces_to_request: %s piece_consumption_time: %s max_piece_count: %s' % (pieces_to_request, len(self.piece_consumption_time), max_piece_count, ))
|
||||
logger.debug('Requested pieces: %r' % (self.requested_pieces.items()))
|
||||
logger.debug('Piece buffer: %r' % (self.piece_buffer.keys()))
|
||||
|
||||
for piece in range(current_piece, current_piece + pieces_to_request):
|
||||
if piece in self.requested_pieces:
|
||||
continue
|
||||
|
||||
logger.debug('Requesting piece %s' % (piece, ))
|
||||
self.requested_pieces[piece] = threading.Event()
|
||||
self.torrent.request_piece(piece)
|
||||
|
||||
for _ in range(1000):
|
||||
if self.requested_pieces[current_piece].wait(1):
|
||||
break
|
||||
if self._closed:
|
||||
return b''
|
||||
else:
|
||||
return b''
|
||||
|
||||
data = super(DelugeTorrentInput, self).read(real_num)
|
||||
return data
|
||||
for delete_piece in [p for p in self.piece_buffer.keys() if p < current_piece]:
|
||||
del self.piece_buffer[delete_piece]
|
||||
|
||||
for delete_piece in [p for p in self.requested_pieces.keys() if p < current_piece]:
|
||||
del self.requested_pieces[delete_piece]
|
||||
|
||||
self.current_piece_data = self.piece_buffer[current_piece]
|
||||
self.current_piece_data.seek(rest)
|
||||
self.piece_consumption_time.append(time.time())
|
||||
logger.debug('Returning %s bytes' % (num, ))
|
||||
return self._read(num)
|
||||
|
||||
@property
|
||||
def current_piece(self):
|
||||
from_byte = self.offset + self.tell()
|
||||
piece_length = self.torrent.piece_length
|
||||
piece, rest = divmod(from_byte, piece_length)
|
||||
return piece, rest
|
||||
|
||||
def new_piece_available(self, piece, data):
|
||||
if piece not in self.requested_pieces or self.requested_pieces[piece].is_set():
|
||||
return
|
||||
|
||||
logger.debug("Setting data for piece %s" % (piece, ))
|
||||
self.piece_buffer[piece] = BytesIO(data)
|
||||
self.requested_pieces[piece].set()
|
||||
|
||||
def close(self):
|
||||
self.torrent.remove_reader(self)
|
||||
super(DelugeTorrentInput, self).close()
|
||||
self._closed = True
|
||||
|
||||
Reference in New Issue
Block a user