diff --git a/.gitignore b/.gitignore index 27da99e..2d7481d 100644 --- a/.gitignore +++ b/.gitignore @@ -23,8 +23,13 @@ dropin.cache _trial_temp *.komodoproject docs/_build* -apiserver/metadata/imdbhandler.py -apiserver/metadata/malhandler.py -apiserver/services/search.py -apiserver/services/control.py -apiserver/services/files.py +.env* + + +# for bundling +thomas +six.py +rarfile.py +rfc6266.py +lepl +pytz \ No newline at end of file diff --git a/README.md b/README.md index 991f158..ec730fa 100644 --- a/README.md +++ b/README.md @@ -42,8 +42,21 @@ make Deluge an abstraction layer for the [TidalStream](http://www.tidalstream.or The _allow remote_ option is to allow remote add and stream of torrents. +## Todo + +* [x] Add RAR streaming support +* [ ] Better feedback in interface about streams +* [ ] Better feedback when using API +* [x] Reverse proxy improvement (e.g. port different than bind port) + # Version Info +## Version 0.10.0 +* Rewrote large parts of the code +* Now using [thomas](https://github.com/JohnDoee/thomas) as file-reading core - this adds support for multi-rar streaming. +* Faster streaming by reading directly from disk +* Reverse proxy mode + ## Version 0.9.0 * Few bugfixes * Added support for Deluge 2 diff --git a/create-egg.sh b/create-egg.sh new file mode 100644 index 0000000..7d5d2d1 --- /dev/null +++ b/create-egg.sh @@ -0,0 +1,9 @@ +virtualenv .env-egg +.env-egg/bin/pip install -U thomas +ln -s .env-egg/lib/python2.7/site-packages/thomas . +ln -s .env-egg/lib/python2.7/site-packages/rarfile.py . +ln -s .env-egg/lib/python2.7/site-packages/six.py . +ln -s .env-egg/lib/python2.7/site-packages/rfc6266.py . +ln -s .env-egg/lib/python2.7/site-packages/lepl . +ln -s .env-egg/lib/python2.7/site-packages/pytz . +.env-egg/bin/python setup.py bdist_egg \ No newline at end of file diff --git a/setup.py b/setup.py index fff6ac2..f74eabb 100644 --- a/setup.py +++ b/setup.py @@ -37,12 +37,12 @@ # statement from all source files in the program, then also delete it here. # -from setuptools import setup +from setuptools import setup, find_packages __plugin_name__ = "Streaming" __author__ = "Anders Jensen" __author_email__ = "johndoee@tidalstream.org" -__version__ = "0.9.0" +__version__ = "0.10.0" __url__ = "https://github.com/JohnDoee/deluge-streaming" __license__ = "GPLv3" __description__ = "Enables streaming of files while downloading them." @@ -64,6 +64,18 @@ downloads ahead, this enables seeking in video files. If you want to stream from a non-local computer, e.g. your seedbox, you will need to change the IP in option to the external server ip.""" __pkg_data__ = {__plugin_name__.lower(): ["template/*", "data/*"]} +REQUIREMENTS_PACKAGES = [ + 'thomas', + 'lepl', + 'pytz', +] + +REQUIREMENTS_MODULES = [ + 'six', + 'rarfile', + 'rfc6266', +] + setup( name=__plugin_name__, version=__version__, @@ -73,8 +85,10 @@ setup( url=__url__, license=__license__, long_description=__long_description__ if __long_description__ else __description__, + # install_requires=REQUIREMENTS_PACKAGES, - packages=[__plugin_name__.lower()], + packages=[__plugin_name__.lower()] + ['%s.%s' % (x, y) for x in REQUIREMENTS_PACKAGES for y in find_packages(x)] + REQUIREMENTS_PACKAGES, + py_modules=REQUIREMENTS_MODULES, package_data = __pkg_data__, entry_points=""" diff --git a/streaming/core.py b/streaming/core.py index a1bead7..afdeddf 100644 --- a/streaming/core.py +++ b/streaming/core.py @@ -37,30 +37,39 @@ # statement from all source files in the program, then also delete it here. # -import base64 import json import logging import os -import urllib +import random +import string +import time import deluge.configmanager -from collections import defaultdict from copy import copy +from datetime import datetime, timedelta +from types import MethodType from deluge import component, configmanager from deluge._libtorrent import lt from deluge.core.rpcserver import export from deluge.plugins.pluginbase import CorePluginBase -from twisted.internet import reactor, defer -from twisted.python import randbytes -from twisted.web import server, resource, static, client +from twisted.internet import reactor, defer, task +from twisted.web import server, client + +from thomas import router, Item, OutputBase -from .filelike import FilelikeObjectResource from .resource import Resource +from .torrentfile import DelugeTorrentInput -logger = logging.getLogger(__name__) +defer.setDebugging(True) +router.register_handler(DelugeTorrentInput.plugin_name, DelugeTorrentInput, True, False, False) + +VIDEO_STREAMABLE_EXTENSIONS = ['mkv', 'mp4', 'iso', 'ogg', 'ogm', 'm4v'] +AUDIO_STREAMABLE_EXTENSIONS = ['flac', 'mp3', 'oga'] +STREAMABLE_EXTENSIONS = set(VIDEO_STREAMABLE_EXTENSIONS + AUDIO_STREAMABLE_EXTENSIONS) +TORRENT_CLEANUP_INTERVAL = timedelta(minutes=30) DEFAULT_PREFS = { 'ip': '127.0.0.1', @@ -70,23 +79,459 @@ DEFAULT_PREFS = { 'use_stream_urls': False, 'auto_open_stream_urls': False, 'use_ssl': False, - 'remote_username': 'username', - 'remote_password': 'password', + 'remote_username': 'stream', + 'remote_password': ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(16)), + 'reverse_proxy_enabled': False, + 'reverse_proxy_base_url': '', 'serve_method': 'standalone', 'ssl_source': 'daemon', 'ssl_priv_key_path': '', 'ssl_cert_path': '', } -PRIORITY_INCREASE = 5 +logger = logging.getLogger(__name__) -def sleep(seconds): +def sleep(secs): d = defer.Deferred() - reactor.callLater(seconds, d.callback, seconds) + reactor.callLater(secs, d.callback, None) return d +def get_torrent(infohash): + # Taken from newer Deluge source to allow for backward compatibility. + def get_file_priorities(self): + """Return the file priorities""" + if not self.handle.has_metadata(): + return [] + + if not self.options["file_priorities"]: + # Ensure file_priorities option is populated. + self.set_file_priorities([]) + + return self.options["file_priorities"] + + torrent = component.get("TorrentManager").torrents.get(infohash, None) + if torrent and not hasattr(torrent, 'get_file_priorities'): + torrent.get_file_priorities = MethodType(get_file_priorities, torrent) + + return torrent + + +class Torrent(object): + def __init__(self, torrent_handler, infohash): + self.torrent_handler = torrent_handler + self.infohash = infohash + + self.filesets = {} + self.readers = {} + self.cycle_lock = defer.DeferredLock() + self.last_activity = datetime.now() + + self.torrent = get_torrent(infohash) + status = self.torrent.get_status(['piece_length']) + self.piece_length = status['piece_length'] + self.torrent.handle.set_sequential_download(True) + self.torrent.handle.set_priority(1) + + def ensure_started(self): + if self.torrent.status.paused: + self.torrent.resume() + + def get_file_from_offset(self, offset): + status = self.torrent.get_status(['files']) + last_file = None + for f in status['files']: + if f['offset'] > offset: + break + + last_file = f + return last_file + + def can_read(self, from_byte): + self.ensure_started() + + needed_piece, rest = divmod(from_byte, self.piece_length) + last_available_piece = None + for piece, status in enumerate(self.torrent.status.pieces[needed_piece:], needed_piece): + if not status: + break + last_available_piece = piece + + if last_available_piece is None: + logger.debug('Since we are waiting for a piece, setting priority for %s to max' % (needed_piece, )) + self.torrent.handle.set_piece_deadline(needed_piece, 0) + self.torrent.handle.piece_priority(needed_piece, 7) + + f = self.get_file_from_offset(from_byte) + logger.debug('Also setting file to max %r' % (f, )) + file_priorities = self.torrent.get_file_priorities() + file_priorities[f['index']] = 7 + self.torrent.set_file_priorities(file_priorities) + + for _ in range(300): + if self.torrent.status.pieces[needed_piece]: + break + if not reactor.running: + return + time.sleep(0.2) + + logger.debug('Calling read again to get the real number') + return self.can_read(from_byte) + else: + return ((last_available_piece - needed_piece) * self.piece_length) + rest + self.piece_length + + def is_idle(self): + return not self.readers and self.last_activity + TORRENT_CLEANUP_INTERVAL < datetime.now() + + def add_reader(self, filelike, path, from_byte, to_byte): + logger.debug('Added reader %s path:%s from_byte:%s' % (filelike, path, from_byte, )) + self.readers[filelike] = (path, from_byte, to_byte) + + self.cycle() + + def remove_reader(self, filelike): + if filelike in self.readers: + logger.debug('Removed reader %s' % (filelike, )) + del self.readers[filelike] + self.cycle() + self.last_activity = datetime.now() + + def cycle(self): + @defer.inlineCallbacks + def handle_cycle(): + yield self.cycle_lock.acquire() + try: + self._cycle() + except: + logger.exception('Failed to cycle') + self.cycle_lock.release() + reactor.callFromThread(handle_cycle) + + def _cycle(self): + logger.debug('Doing a cycle') + + found_not_started = False + cannot_blacklist = set() + must_whitelist = set() + first_files = set() + for fileset in self.filesets.values(): + logger.debug('Fileset %r' % (fileset, )) + if not fileset['started']: + found_not_started = True + must_whitelist |= set(fileset['files']) + fileset['started'] = True + cannot_blacklist |= set(fileset['files']) + first_files.add(fileset['files'][0]) + + if found_not_started: + self.ensure_started() + + logger.debug('We had a fileset not started, must_whitelist:%r first_files:%r cannot_blacklist:%r' % (must_whitelist, first_files, cannot_blacklist)) + status = self.torrent.get_status(['files', 'file_progress']) + + file_priorities = self.torrent.get_file_priorities() + for f, progress in zip(status['files'], status['file_progress']): + i = f['index'] + if progress == 1.0: + file_priorities[i] = 1 + continue + + if f['path'] in must_whitelist: + if f['path'] in first_files: + file_priorities[i] = 7 + else: + file_priorities[i] = 1 + elif f['path'] not in cannot_blacklist: + file_priorities[i] = 0 + + self.torrent.set_file_priorities(file_priorities) + + if self.readers: + status = self.torrent.get_status(['files', 'file_progress']) + file_ranges = {} + fileset_ranges = {} + for path, from_byte, to_byte in self.readers.values(): + logger.debug('Reader %s, %s, %s' % (path, from_byte, to_byte, )) + if path in file_ranges: + file_ranges[path] = min(from_byte, file_ranges[path]) + else: + file_ranges[path] = from_byte + + for fileset_hash, fileset in self.filesets.items(): + if path in fileset['files']: + if fileset_hash in fileset_ranges: + fileset_ranges[fileset_hash] = min(fileset_ranges[fileset_hash], fileset['files'].index(path)) + else: + fileset_ranges[fileset_hash] = fileset['files'].index(path) + + currently_downloading = self.get_currently_downloading() + logger.debug('File heads: %r' % (file_ranges, )) + for f, progress in zip(status['files'], status['file_progress']): + if progress == 1.0: + continue + + if f['path'] not in file_ranges: + continue + + first_piece = f['offset'] // self.piece_length + current_piece = file_ranges[path] // self.piece_length + last_piece = (f['offset'] + f['size']) // self.piece_length + logger.debug('Configuring pieces first piece %s current piece %s - all before should be blacklisted' % (first_piece, current_piece)) + + for piece, piece_status in enumerate(self.torrent.status.pieces[first_piece:last_piece], first_piece): + if piece_status or piece in currently_downloading: + continue + + priority = self.torrent.handle.piece_priority(piece) + if piece == first_piece: + if priority == 0: + self.torrent.handle.piece_priority(piece, 1) + continue + + if piece < current_piece: + self.torrent.handle.piece_priority(piece, 0) + elif piece == current_piece: + self.torrent.handle.piece_priority(piece, 7) + else: + self.torrent.handle.piece_priority(piece, 1) + + file_priorities = self.torrent.get_file_priorities() + logger.debug('Fileset heads: %r' % (fileset_ranges, )) + for fileset_hash, first_file in fileset_ranges.items(): + fileset = self.filesets[fileset_hash] + logger.debug('From index %s' % (first_file, )) + file_mapping = {f['path']: f['index'] for f in status['files']} + for i, f in enumerate(fileset['files']): + index = file_mapping[f] + if i < first_file: + file_priorities[index] = 0 + elif i == first_file: + file_priorities[index] = 7 + else: + file_priorities[index] = 1 + + self.torrent.set_file_priorities(file_priorities) + + def get_currently_downloading(self): + currently_downloading = set() + for peer in self.torrent.handle.get_peer_info(): + if peer.downloading_piece_index != -1: + currently_downloading.add(peer.downloading_piece_index) + + return currently_downloading + + def reset_priorities(self): + for piece in range(len(self.torrent.status.pieces)): + self.torrent.handle.piece_priority(piece, 1) + + self.torrent.set_file_priorities([1] * len(self.torrent.get_file_priorities())) + + def shutdown(self): + logger.debug('Shutting down torrent %r' % (self, )) + for reader in self.readers.keys(): + reactor.callInThread(reader.close) + + def add_fileset(self, fileset): + files = [f.path for f in fileset] + fileset_hash = hash(','.join(files)) + + if fileset_hash not in self.filesets: + self.filesets[fileset_hash] = {'started': False, 'files': files} + + +class TorrentHandler(object): + def __init__(self, reset_priorities_on_finish): + self.torrents = {} + self.reset_priorities_on_finish = reset_priorities_on_finish + + self.alerts = component.get("AlertManager") + self.alerts.register_handler("torrent_removed_alert", self.on_alert_torrent_removed) + self.alerts.register_handler("torrent_finished_alert", self.on_alert_torrent_finished) + + self.cleanup_looping_call = task.LoopingCall(self.cleanup) + self.cleanup_looping_call.start(60) + + def on_alert_torrent_removed(self, alert): + try: + infohash = str(alert.handle.info_hash()) + except (RuntimeError, KeyError): + logger.warning('Failed to handle on torrent remove alert') + return + + if infohash not in self.torrents: + return + + self.torrents[infohash].shutdown() + del self.torrents[infohash] + + def on_alert_torrent_finished(self, alert): + try: + infohash = str(alert.handle.info_hash()) + except (RuntimeError, KeyError): + logger.warning('Failed to handle on torrent finished alert') + return + + if infohash not in self.torrents: + return + + if self.reset_priorities_on_finish: + self.torrents[infohash].reset_priorities() + + def shutdown(self): + for torrent in self.torrents.values(): + if self.reset_priorities_on_finish: + torrent.reset_priorities() + torrent.shutdown() + + self.cleanup_looping_call.stop() + + def get_filesystem(self, infohash): + torrent = get_torrent(infohash) + status = torrent.get_status(['piece_length', 'files', 'file_progress', 'save_path']) + self.piece_length = status['piece_length'] + save_path = status['save_path'] + + found_rar = False + path_item_mapping = {} + for f, progress in zip(status['files'], status['file_progress']): + full_path = os.path.join(save_path, f['path']) + if '/' in f['path']: + path, fn = f['path'].rsplit('/', 1) + else: + fn = f['path'] + path = '' + + item = Item(fn, attributes={'size': f['size']}) + item.readable = True + item.streamable = True + path_item_mapping.setdefault(path, []).append(item) + + if progress == 1.0: + item.add_route('file', True, False, False, kwargs={'path': full_path}) + else: + item.add_route('torrent_file', True, False, False, kwargs={ + 'torrent_handler': self, + 'infohash': infohash, + 'offset': f['offset'], + 'path': full_path, + }) + item.add_route('direct', False, False, True) + + if not found_rar and fn.split('.')[-1].lower() == 'rar': + found_rar = True + + path_mapping = {} + for path, items in path_item_mapping.items(): + combined_path = [] + for path_part in (path + '/').split('/'): + partial_path = '/'.join(combined_path) + if partial_path not in path_mapping: + item = path_mapping[partial_path] = Item(partial_path.split('/')[-1]) + item.streamable = True + item.add_route('direct', False, False, True, kwargs={'allowed_extensions': STREAMABLE_EXTENSIONS}) + if found_rar: + item.add_route('rar', False, False, True, kwargs={'lazy': True}) + + if combined_path: + parent_path = '/'.join(combined_path[:-1]) + path_mapping[parent_path].add_item(item) + combined_path.append(path_part) + + for item in items: + path_mapping[path].add_item(item) + + item = path_mapping[''].list()[0] # TODO: make not use an empty item + item.parent_item = None + return item + + def get_torrent(self, infohash): + if infohash not in self.torrents: + self.torrents[infohash] = Torrent(self, infohash) + return self.torrents[infohash] + + @defer.inlineCallbacks + def stream(self, infohash, path, wait_for_end_pieces=False): + logger.debug('Trying to get path:%s from infohash:%s' % (path, infohash)) + local_torrent = self.get_torrent(infohash) + torrent = get_torrent(infohash) + + filesystem = self.get_filesystem(infohash) + if path: + stream_item = filesystem.get_item_from_path(path) + else: + stream_item = filesystem + + logger.debug('Stream, path:%s infohash:%s stream_item:%r' % (path, infohash, stream_item)) + if stream_item is None: + defer.returnValue(None) + + stream_result = stream_item.stream() + logger.debug('Streamresult, path:%s infohash:%s stream_result:%r' % (path, infohash, stream_result)) + if stream_result is None: + defer.returnValue(None) + + if hasattr(stream_result, 'get_read_items'): + fileset = stream_result.get_read_items() + else: + fileset = [stream_result] + self.torrents[infohash].add_fileset(fileset) + + if wait_for_end_pieces: + local_torrent.ensure_started() + logger.debug('We need to wait for pieces') + first_file = fileset[0] + last_file = fileset[-1] + + status = torrent.get_status(['piece_length', 'files', 'file_progress']) + piece_length = status['piece_length'] + + wait_for_pieces = [] + for f, progress in zip(status['files'], status['file_progress']): + if progress == 1.0: + continue + + piece_count = f['size'] // piece_length + + if f['path'] == first_file.path: + piece, rest = divmod(f['offset'], piece_length) + rest = piece_length - rest + wait_for_pieces.append(piece) + + if rest < 1024 and piece_count > 2: + wait_for_pieces.append(piece + 1) + + if f['path'] == last_file.path: + piece, rest = divmod(f['offset'] + f['size'], piece_length) + wait_for_pieces.append(piece) + + if rest < 1024 and piece_count > 2: + wait_for_pieces.append(piece - 1) + + logger.debug('We want first and last piece first, these are the pieces: %r' % (wait_for_pieces, )) + for piece in wait_for_pieces: + torrent.handle.set_piece_deadline(piece, 0) + torrent.handle.piece_priority(piece, 7) + + for _ in range(220): + for piece in wait_for_pieces: + if not torrent.status.pieces[piece]: + break + else: + break + + yield sleep(0.2) + + defer.returnValue(stream_result) + + def cleanup(self): + for infohash, torrent in self.torrents.items(): + if torrent.is_idle(): + logger.debug('Torrent %s is idle, killing it' % (torrent, )) + torrent.shutdown() + del self.torrents[infohash] + + class ServerContextFactory(object): def __init__(self, cert_file, key_file): self._cert_file = cert_file @@ -106,35 +551,6 @@ class ServerContextFactory(object): return ctx -class FileServeResource(resource.Resource): - isLeaf = True - - def __init__(self): - self.file_mapping = {} - resource.Resource.__init__(self) - - def generate_secure_token(self): - return base64.urlsafe_b64encode(randbytes.RandomFactory().secureRandom(21, True)) - - def add_file(self, path): - token = self.generate_secure_token() - self.file_mapping[token] = path - - return token - - def render_GET(self, request): - key = request.postpath[0] - if key not in self.file_mapping: - return resource.NoResource().render(request) - - f = self.file_mapping[key] - if f.is_complete(): - return static.File(f.full_path).render_GET(request) - else: - tfr = f.open() - return FilelikeObjectResource(tfr, f.size).render_GET(request) - - class StreamResource(Resource): isLeaf = True @@ -185,413 +601,6 @@ class StreamResource(Resource): defer.returnValue(json.dumps(result)) -class UnknownTorrentException(Exception): - pass - - -class UnknownFileException(Exception): - pass - - -class TorrentFileReader(object): - def __init__(self, torrent_file): - self.torrent_file = torrent_file - self.size = torrent_file.size - self.position = 0 - - self.waiting_for_piece = None - self.current_piece = None - self.current_piece_data = None - - @defer.inlineCallbacks - def read(self, size=1024): - required_piece, read_position = self.torrent_file.get_piece_info(self.position) - - if self.current_piece != required_piece: - logger.debug('We are missing piece %i and it is required, requesting' % (required_piece, )) - self.waiting_for_piece = required_piece - self.current_piece_data = yield self.torrent_file.get_piece_data(required_piece) - self.current_piece = required_piece - self.waiting_for_piece = None - - logger.debug('We can read from local piece from %s size %s from position %s - size of current payload %s' % (read_position, size, self.position, len(self.current_piece_data))) - data = self.current_piece_data[read_position:read_position+size] - self.position += len(data) - - defer.returnValue(data) - - def tell(self): - return self.position - - def close(self): - self.torrent_file.close(self) - - def seek(self, offset, whence=os.SEEK_SET): - self.position = offset - - -class TorrentFile(object): # can be read from, knows about itself - def __init__(self, torrent, first_piece, last_piece, piece_size, offset, path, full_path, size, index): - self.torrent = torrent - self.first_piece = first_piece - self.last_piece = last_piece - self.piece_size = piece_size - self.offset = offset - self.path = path - self.size = size - self.full_path = full_path - self.index = index - - self.file_requested = False - self.file_requested_once = False - self.do_shutdown = False - self.first_piece_end = self.piece_size * (self.first_piece + 1) - offset - self.waiting_pieces = {} - self.current_readers = [] - self.registered_alert = False - - self.alerts = component.get("AlertManager") - - def open(self): - """ - Returns a filelike object - """ - if not self.registered_alert: - self.alerts.register_handler("read_piece_alert", self.on_alert_got_piece_data) - self.registered_alert = True - - tfr = TorrentFileReader(self) - self.current_readers.append(tfr) - self.file_requested = False - - return tfr - - def close(self, tfr): - self.current_readers.remove(tfr) - - def is_complete(self): - torrent_status = self.torrent.torrent.get_status(['file_progress', 'state']) - file_progress = torrent_status['file_progress'] - return file_progress and file_progress[self.index] == 1.0 - - def get_piece_info(self, tell): - return divmod((self.offset + tell), self.piece_size) - - def on_alert_got_piece_data(self, alert): - torrent_id = str(alert.handle.info_hash()) - if torrent_id != self.torrent.infohash: - return - - logger.debug('Got piece data for piece %s' % alert.piece) - if alert.piece not in self.waiting_pieces: - logger.debug('Got data for piece %i, but no data needed for this piece?' % alert.piece) - return - - if alert.buffer is None: - return - - piece_data = copy(alert.buffer) - cbs = self.waiting_pieces.pop(alert.piece, []) - - for cb in cbs: - cb.callback(piece_data) - - @defer.inlineCallbacks - def wait_for_end_pieces(self): - handle = self.torrent.torrent.handle - for piece in [self.first_piece, self.last_piece]: - handle.set_piece_deadline(piece, 0) - handle.piece_priority(piece, 7) - - while not handle.have_piece(self.first_piece) and not handle.have_piece(self.last_piece): - if self.do_shutdown: - raise Exception('Shutting down') - logger.debug('Did not have piece %i, waiting' % piece) - yield sleep(1) - - @defer.inlineCallbacks - def get_piece_data(self, piece): - logger.debug('Trying to get piece data for piece %s' % piece) - for reader in self.current_readers: - if reader.current_piece == piece: - defer.returnValue(reader.current_piece_data) - - if piece not in self.waiting_pieces: - created_waiting_defer = True - self.waiting_pieces[piece] = [] - else: - created_waiting_defer = False - - d = defer.Deferred() - self.waiting_pieces[piece].append(d) - - logger.debug('Waiting for %s' % piece) - while not self.torrent.torrent.handle.have_piece(piece): - if self.do_shutdown: - raise Exception('Shutting down') - logger.debug('Did not have piece %i, waiting' % piece) - yield sleep(1) - - if created_waiting_defer: - self.torrent.torrent.handle.read_piece(piece) - - data = yield d - logger.debug('Done waiting for piece %i, returning data' % piece) - defer.returnValue(data) - - def shutdown(self): - self.do_shutdown = True - - -class Torrent(object): - def __init__(self, torrent_handler, infohash): - self.infohash = infohash - self.torrent = component.get("TorrentManager").torrents.get(infohash, None) - self.torrent_handler = torrent_handler - - if not self.torrent: - raise UnknownTorrentException('%s is not a known infohash' % infohash) - - self.torrent_files = None - self.priority_increased = defaultdict(set) - self.do_shutdown = False - self.torrent_released = True # set to True if all the files are set to download - - self.populate_files() - self.file_priorities = [0] * len(self.torrent_files) - - self.last_piece = self.torrent_files[-1].last_piece - self.torrent.handle.set_sequential_download(True) - self.torrent.handle.set_priority(1) - reactor.callLater(0, self.update_piece_priority) - - def populate_files(self): - self.torrent_files = [] - - status = self.torrent.get_status(['piece_length', 'files', 'save_path']) - piece_length = status['piece_length'] - files = status['files'] - save_path = status['save_path'] - - for f in files: - first_piece = f['offset'] / piece_length - last_piece = (f['offset'] + f['size']) / piece_length - full_path = os.path.join(save_path, f['path']) - - self.torrent_files.append(TorrentFile(self, first_piece, last_piece, piece_length, f['offset'], - f['path'], full_path, f['size'], f['index'])) - - return files - - def find_file(self, file_or_index=None, includes_name=False): - best_file = None - biggest_file_size = 0 - - for i, f in enumerate(self.torrent_files): - path = f.path - if not includes_name and '/' in path: - path = '/'.join(path.split('/')[1:]) - - logger.debug('Testing file %r against %s / %r' % (file_or_index, i, path)) - if file_or_index is not None: - if i == file_or_index or path == file_or_index: - best_file = f - break - else: - if f.size > biggest_file_size: - best_file = f - biggest_file_size = f.size - - return best_file - - def get_file(self, file_or_index=None, includes_name=False): - f = self.find_file(file_or_index, includes_name) - if f is None: - raise UnknownFileException('Was unable to find %s' % file_or_index) - - return f - - def get_currently_downloading(self): - currently_downloading = set() - for peer in self.torrent.handle.get_peer_info(): - if peer.downloading_piece_index != -1: - currently_downloading.add(peer.downloading_piece_index) - - return currently_downloading - - def get_torrent_file(self, file_or_index, includes_name): - f = self.get_file(file_or_index, includes_name) - f.file_requested = True - f.file_requested_once = True - - self.torrent.resume() - - should_update_priorities = False - if self.file_priorities[f.index] == 0: - self.file_priorities[f.index] = 3 - should_update_priorities = True - - if self.torrent_released: - should_update_priorities = True - - if should_update_priorities and not f.is_complete(): # Need to do this stuff on seek too - self.torrent.set_file_priorities(self.file_priorities) - - return f - - def shutdown(self): - logger.info('Shutting down torrent %s' % (self.infohash, )) - - self.torrent.handle.set_priority(0) - - for piece, status in enumerate(self.torrent.status.pieces[0:self.last_piece+1]): - if status: - continue - - priority = self.torrent.handle.piece_priority(piece) - if priority == 0: - self.torrent.handle.piece_priority(piece, 1) - - if not self.torrent_handler.config['download_only_streamed']: - logger.debug('Resetting file priorities') - file_priorities = [(1 if fp == 0 else fp) for fp in self.file_priorities] - self.torrent.set_file_priorities(file_priorities) - - self.do_shutdown = True - self.torrent_handler.remove_torrent(self.infohash) - - for tf in self.torrent_files: - tf.shutdown() - - def update_piece_priority(self): # if file streamed has reached end, unblacklist all prior pieces - if self.do_shutdown: - return - - logger.debug('Updating piece priority for %s' % (self.infohash, )) - currently_downloading = self.get_currently_downloading() - - for f in self.torrent_files: - if not f.file_requested and not f.current_readers: # nobody wants the file and nobody is watching - continue - - logger.debug('Rescheduling file %s' % (f.path, )) - - heads = set() - if f.file_requested: # we expect a piece head to be at start - heads.add(f.first_piece) - - waiting_for_pieces = set() - - for tfr in f.current_readers: - if tfr.waiting_for_piece is not None: - waiting_for_pieces.add(tfr.waiting_for_piece) - - piece = max(tfr.waiting_for_piece, tfr.current_piece) - if piece is not None: - heads.add(piece) - - if not heads: - continue - - first_head = min(heads) - - for head_piece in heads: - priority_increased = 0 - for piece, status in enumerate(self.torrent.status.pieces[head_piece:f.last_piece+1], head_piece): - if status or piece in currently_downloading: - continue - - priority = self.torrent.handle.piece_priority(piece) - if priority_increased < PRIORITY_INCREASE: - priority_increased += 1 - - if piece in waiting_for_pieces: - if priority < 7: - logger.debug('setting priority for %s to 7 with deadline 0' % (piece, )) - - self.torrent.handle.set_piece_deadline(piece, 0) - self.torrent.handle.piece_priority(piece, 7) - elif priority < 6: - deadline = 3000 * priority_increased - logger.debug('setting priority for %s to 6 with deadline %s' % (piece, deadline, )) - self.torrent.handle.piece_priority(piece, 6) - self.torrent.handle.set_piece_deadline(piece, deadline) - - elif priority == 0: - self.torrent.handle.piece_priority(piece, 1) - - if head_piece == first_head: - if priority_increased < PRIORITY_INCREASE: - logger.debug('Everything we need has been scheduled, looking for pieces across file to unblacklist') - for piece, status in enumerate(self.torrent.status.pieces[f.first_piece:f.last_piece+1], f.first_piece): - if status: - continue - - priority = self.torrent.handle.piece_priority(piece) - if priority == 0: - self.torrent.handle.piece_priority(piece, 1) - else: - logger.debug('Looking for pieces before smallest head %s to blacklist' % (first_head, )) - for piece, status in enumerate(self.torrent.status.pieces[f.first_piece:first_head], f.first_piece): - if status or piece in currently_downloading: - continue - - if self.torrent.handle.piece_priority(piece) != 0: - logger.debug('Blacklisting %i' % (piece, )) - self.torrent.handle.piece_priority(piece, 0) - - found_requested = False - for f in self.torrent_files: - if f.file_requested_once: - found_requested = True - if not f.is_complete() or f.current_readers: - break - else: - if found_requested: - logger.debug('Nobody is currently using %s, shutting down torrent-handler' % (self.infohash, )) - self.shutdown() - - reactor.callLater(1, self.update_piece_priority) - - -class TorrentHandler(object): - def __init__(self, config): - self.torrents = {} - self.config = config - - self.alerts = component.get("AlertManager") - self.alerts.register_handler("torrent_removed_alert", self.on_alert_torrent_removed) - - def get_stream(self, infohash, file_or_index=None, includes_name=False): - logger.info('Trying to stream infohash %s and file %s include_name %s' % (infohash, file_or_index, includes_name)) - if infohash not in self.torrents: - self.torrents[infohash] = Torrent(self, infohash) - - return self.torrents[infohash].get_torrent_file(file_or_index, includes_name) - - def on_alert_torrent_removed(self, alert): - try: - torrent_id = str(alert.handle.info_hash()) - except (RuntimeError, KeyError): - logger.warning('Failed to handle on torrent remove alert') - return - - if torrent_id not in self.torrents: - return - - self.torrents[torrent_id].shutdown() - self.remove_torrent(torrent_id) - - def remove_torrent(self, torrent_id): - del self.torrents[torrent_id] - - def shutdown(self): - logger.debug('Shutting down TorrentHandler') - self.alerts.deregister_handler(self.on_alert_torrent_removed) - for torrent in self.torrents.values(): - torrent.shutdown() - - class Core(CorePluginBase): listening = None base_url = None @@ -605,11 +614,16 @@ class Core(CorePluginBase): settings['prioritize_partial_pieces'] = True session.set_settings(settings) except AttributeError: - logger.warning('Unable to exclude partial pieces') + logger.warning('Unable to prioritize partial pieces') + + http_output_cls = OutputBase.find_plugin('http') + http_output = http_output_cls(url_prefix='file') + http_output.start() + + self.thomas_http_output = http_output - self.fsr = FileServeResource() resource = Resource() - resource.putChild('file', self.fsr) + resource.putChild('file', http_output.resource) if self.config['allow_remote']: resource.putChild('stream', StreamResource(username=self.config['remote_username'], password=self.config['remote_password'], @@ -619,7 +633,7 @@ class Core(CorePluginBase): base_resource.putChild('streaming', resource) self.site = server.Site(base_resource) - self.torrent_handler = TorrentHandler(self.config) + self.torrent_handler = TorrentHandler(self.config['download_only_streamed'] == False) plugin_manager = component.get("CorePluginManager") logger.warning('plugins %s' % (plugin_manager.get_enabled_plugins(), )) @@ -663,16 +677,22 @@ class Core(CorePluginBase): else: raise NotImplementedError() - self.base_url += '://' - if ':' in ip: - self.base_url += ip + if self.config['reverse_proxy_enabled'] and self.config['reverse_proxy_base_url']: + self.base_url = self.config['reverse_proxy_base_url'] else: - self.base_url += '%s:%s' % (ip, port) + self.base_url += '://' + if ':' in ip: + self.base_url += ip + else: + self.base_url += '%s:%s' % (ip, port) + + self.base_url = self.base_url.rstrip('/') @defer.inlineCallbacks def disable(self): self.site.stopFactory() self.torrent_handler.shutdown() + self.thomas_http_output.stop() if self.check_webui(): plugin_manager = component.get("CorePluginManager") @@ -733,9 +753,10 @@ class Core(CorePluginBase): @export @defer.inlineCallbacks def stream_torrent(self, infohash=None, url=None, filedump=None, filepath_or_index=None, includes_name=False, wait_for_end_pieces=False): - tor = component.get("TorrentManager").torrents.get(infohash, None) + logger.debug('Trying to stream infohash:%s, url:%s, filepath_or_index:%s' % (infohash, url, filepath_or_index)) + torrent = get_torrent(infohash) - if tor is None: + if torrent is None: logger.info('Did not find torrent, must add it') if not filedump and url: @@ -751,23 +772,28 @@ class Core(CorePluginBase): try: yield core.add_torrent_file('file.torrent', filedump.encode('base64'), {'add_paused': True}) except: + logger.exception('Failed to add torrent') defer.returnValue({'status': 'error', 'message': 'failed to add torrent'}) + if filepath_or_index is None: + fn = '' + elif isinstance(filepath_or_index, int): + status = torrent.get_status(['files']) + fn = status['files'][filepath_or_index]['path'] + else: + fn = filepath_or_index + try: - tf = self.torrent_handler.get_stream(infohash, filepath_or_index, includes_name) - except UnknownTorrentException: - defer.returnValue({'status': 'error', 'message': 'unable to find torrent, probably failed to add it'}) + stream_or_item = yield defer.maybeDeferred(self.torrent_handler.stream, infohash, fn, wait_for_end_pieces=wait_for_end_pieces) + stream_url = self.thomas_http_output.serve_item(stream_or_item) + except: + logger.exception('Failed to stream torrent') + defer.returnValue({'status': 'error', 'message': 'failed to stream torrent'}) - if wait_for_end_pieces: - logger.debug('Waiting for end pieces') - yield tf.wait_for_end_pieces() - - filename = os.path.basename(tf.path).encode('utf-8') defer.returnValue({ 'status': 'success', - 'filename': filename, + 'filename': stream_or_item.id, 'use_stream_urls': self.config['use_stream_urls'], 'auto_open_stream_urls': self.config['auto_open_stream_urls'], - 'url': '%s/streaming/file/%s/%s' % (self.base_url, self.fsr.add_file(tf), - urllib.quote_plus(filename)) + 'url': '%s/streaming/%s' % (self.base_url, stream_url.lstrip('/')) }) diff --git a/streaming/data/config.glade b/streaming/data/config.glade index eb51b9e..c2f8a20 100644 --- a/streaming/data/config.glade +++ b/streaming/data/config.glade @@ -26,19 +26,26 @@ False 5 - - Download only streamed files, skip the other files + True - True - False - False - True + False + 5 + + + Download only streamed files, skip the other files + True + True + False + False + True + + + False + False + 0 + + - - False - False - 0 - @@ -160,9 +167,73 @@ + + True + False + + + Enable Reverse Proxy + True + True + False + False + True + + + False + False + 0 + + + + + True + True + 2 + + + + + True + False + + + True + False + Reverse Proxy Base Url: + + + False + False + 0 + + + + + True + True + False + False + True + True + + + True + True + 1 + + + + + True + True + 3 + + + True False 5 - + True False - 20 + True @@ -364,7 +435,7 @@ False False - 3 + 5 @@ -436,7 +507,7 @@ True False 5 - + True @@ -498,7 +569,7 @@ True True - False + True True False diff --git a/streaming/data/streaming.js b/streaming/data/streaming.js index 5546602..84d8430 100644 --- a/streaming/data/streaming.js +++ b/streaming/data/streaming.js @@ -35,7 +35,10 @@ PreferencePage = Ext.extend(Ext.Panel, { title: 'Streaming', border: false, layout: 'form', + header: false, autoScroll: true, + autoHeight: true, + width: 320, _fields: {}, initComponent: function() { @@ -50,10 +53,12 @@ PreferencePage = Ext.extend(Ext.Panel, { title: 'Settings', style: 'margin-bottom: 0px; padding-bottom: 0px; padding-top: 5px', autoHeight: true, - labelWidth: 1, + labelAlign: 'top', + labelWidth: 150, + width: 300, defaultType: 'textfield', defaults: { - width: 180, + width: 280, } }); @@ -69,10 +74,12 @@ PreferencePage = Ext.extend(Ext.Panel, { title: 'File Serving Settings', style: 'margin-bottom: 0px; padding-bottom: 0px; padding-top: 5px', autoHeight: true, - labelWidth: 110, + labelAlign: 'top', + labelWidth: 150, + width: 280, defaultType: 'textfield', defaults: { - width: 180, + width: 260, } }); @@ -89,13 +96,22 @@ PreferencePage = Ext.extend(Ext.Panel, { maxValue: 99999, })); + var field = fieldset.add({ + xtype: 'togglefield', + name: 'reverse_proxy_base_url', + fieldLabel: 'Reverse Proxy Config', + }); + + om.bind('reverse_proxy_enabled', field.toggle); + om.bind('reverse_proxy_base_url', field.input); + fieldset = this.add({ xtype: 'fieldset', border: false, autoHeight: true, defaultType: 'radio', style: 'margin-bottom: 5px; margin-top: 0; padding-bottom: 5px; padding-top: 0;', - width: 240, + width: 280, labelWidth: 1 }); @@ -130,7 +146,7 @@ PreferencePage = Ext.extend(Ext.Panel, { autoHeight: true, defaultType: 'radio', style: 'margin-left: 24px; margin-bottom: 5px; margin-top: 0; padding-bottom: 5px; padding-top: 0;', - width: 240, + width: 280, labelWidth: 1 }); @@ -164,12 +180,12 @@ PreferencePage = Ext.extend(Ext.Panel, { om.bind('ssl_priv_key_path', fieldset.add({ name: 'ssl_priv_key_path', - fieldLabel: 'Private key file path' + fieldLabel: 'Private key file path', })); om.bind('ssl_cert_path', fieldset.add({ name: 'ssl_cert_path', - fieldLabel: 'Certificate and chains file path' + fieldLabel: 'Certificate and chains file path', })); fieldset = this.add({ @@ -178,10 +194,12 @@ PreferencePage = Ext.extend(Ext.Panel, { title: 'Advanced settings', style: 'margin-bottom: 0px; padding-bottom: 0px; padding-top: 5px', autoHeight: true, - labelWidth: 1, + labelAlign: 'top', + labelWidth: 150, + width: 280, defaultType: 'textfield', defaults: { - width: 180, + width: 260, } }); @@ -189,7 +207,8 @@ PreferencePage = Ext.extend(Ext.Panel, { xtype: 'checkbox', name: 'allow_remote', boxLabel: 'Allow remote control', - style: 'margin-left: 12px;' + style: 'margin-left: 12px;', + width: 150 })); fieldset = this.add({ @@ -197,22 +216,23 @@ PreferencePage = Ext.extend(Ext.Panel, { border: false, style: 'margin-bottom: 0px; padding-bottom: 0px; padding-top: 5px', autoHeight: true, - labelWidth: 110, + labelAlign: 'top', + labelWidth: 150, + width: 260, defaultType: 'textfield', defaults: { - width: 180, + width: 240, } }); - om.bind('remote_username', fieldset.add({ - xtype: 'textfield', - name: 'remote_username', - fieldLabel: 'Remote control username' - })); + // om.bind('remote_username', fieldset.add({ + // xtype: 'textfield', + // name: 'remote_username', + // fieldLabel: 'Remote control username' + // })); om.bind('remote_password', fieldset.add({ xtype: 'textfield', - inputType: 'password', name: 'remote_password', fieldLabel: 'Remote control password' })); @@ -233,7 +253,7 @@ PreferencePage = Ext.extend(Ext.Panel, { labelWidth: 1, defaultType: 'textfield', defaults: { - width: 180, + width: 200, } }); @@ -308,7 +328,7 @@ PreferencePage = Ext.extend(Ext.Panel, { var apiUrl = 'http'; if (optionsManager.get('use_ssl')) apiUrl += 's'; - apiUrl += '://' + optionsManager.get('ip') + ':' + optionsManager.get('port') + '/streaming/stream'; + apiUrl += '://' + optionsManager.get('remote_username') + ':' + optionsManager.get('remote_password') + '@' + optionsManager.get('ip') + ':' + optionsManager.get('port') + '/streaming/stream'; Ext.getCmp('remote_url').setValue(apiUrl); } }); diff --git a/streaming/gtkui.py b/streaming/gtkui.py index ee5ab74..a06b3a5 100644 --- a/streaming/gtkui.py +++ b/streaming/gtkui.py @@ -124,10 +124,11 @@ class GtkUI(GtkPluginBase): def on_apply_prefs(self): log.debug("applying prefs for Streaming") - if self.glade.get_widget("input_serve_standalone").get_active(): - serve_method = 'standalone' - elif self.glade.get_widget("input_serve_webui").get_active(): - serve_method = 'webui' + serve_method = 'standalone' + # if self.glade.get_widget("input_serve_standalone").get_active(): + # serve_method = 'standalone' + # elif self.glade.get_widget("input_serve_webui").get_active(): + # serve_method = 'webui' if self.glade.get_widget("input_ssl_cert_daemon").get_active(): ssl_source = 'daemon' @@ -141,8 +142,11 @@ class GtkUI(GtkPluginBase): "auto_open_stream_urls": self.glade.get_widget("input_auto_open_stream_urls").get_active(), "allow_remote": self.glade.get_widget("input_allow_remote").get_active(), "download_only_streamed": self.glade.get_widget("input_download_only_streamed").get_active(), + "reverse_proxy_enabled": self.glade.get_widget("input_reverse_proxy_enabled").get_active(), + # "download_in_order": self.glade.get_widget("input_download_in_order").get_active(), "use_ssl": self.glade.get_widget("input_use_ssl").get_active(), - "remote_username": self.glade.get_widget("input_remote_username").get_text(), + # "remote_username": self.glade.get_widget("input_remote_username").get_text(), + "reverse_proxy_base_url": self.glade.get_widget("input_reverse_proxy_base_url").get_text(), "remote_password": self.glade.get_widget("input_remote_password").get_text(), "ssl_priv_key_path": self.glade.get_widget("input_ssl_priv_key_path").get_text(), "ssl_cert_path": self.glade.get_widget("input_ssl_cert_path").get_text(), @@ -173,18 +177,22 @@ class GtkUI(GtkPluginBase): self.glade.get_widget("input_allow_remote").set_active(config["allow_remote"]) self.glade.get_widget("input_use_ssl").set_active(config["use_ssl"]) self.glade.get_widget("input_download_only_streamed").set_active(config["download_only_streamed"]) - self.glade.get_widget("input_remote_username").set_text(config["remote_username"]) + self.glade.get_widget("input_reverse_proxy_enabled").set_active(config["reverse_proxy_enabled"]) + # self.glade.get_widget("input_download_in_order").set_active(config["download_in_order"]) + # self.glade.get_widget("input_download_everything").set_active(not config["download_in_order"] and not config["download_only_streamed"]) + # self.glade.get_widget("input_remote_username").set_text(config["remote_username"]) + self.glade.get_widget("input_reverse_proxy_base_url").set_text(config["reverse_proxy_base_url"]) self.glade.get_widget("input_remote_password").set_text(config["remote_password"]) self.glade.get_widget("input_ssl_priv_key_path").set_text(config["ssl_priv_key_path"]) self.glade.get_widget("input_ssl_cert_path").set_text(config["ssl_cert_path"]) - self.glade.get_widget("input_serve_standalone").set_active(config["serve_method"] == "standalone") - self.glade.get_widget("input_serve_webui").set_active(config["serve_method"] == "webui") + # self.glade.get_widget("input_serve_standalone").set_active(config["serve_method"] == "standalone") + # self.glade.get_widget("input_serve_webui").set_active(config["serve_method"] == "webui") self.glade.get_widget("input_ssl_cert_daemon").set_active(config["ssl_source"] == "daemon") self.glade.get_widget("input_ssl_cert_custom").set_active(config["ssl_source"] == "custom") - api_url = 'http%s://%s:%s/streaming/stream' % (('s' if config["use_ssl"] else ''), config["ip"], config["port"]) + api_url = 'http%s://%s:%s@%s:%s/streaming/stream' % (('s' if config["use_ssl"] else ''), config["remote_username"], config["remote_password"], config["ip"], config["port"]) self.glade.get_widget("output_remote_url").set_text(api_url) def stream_ready(self, result): diff --git a/streaming/resource.py b/streaming/resource.py index 8eb9792..ed63f93 100644 --- a/streaming/resource.py +++ b/streaming/resource.py @@ -29,8 +29,6 @@ class Resource(TwistedResource): authenticated = True if not authenticated: - print auth_header - print self.username, self.password request.setResponseCode(401) return 'Unauthorized' diff --git a/streaming/torrentfile.py b/streaming/torrentfile.py new file mode 100644 index 0000000..6429612 --- /dev/null +++ b/streaming/torrentfile.py @@ -0,0 +1,65 @@ +import logging +import mimetypes +import os + +from thomas import InputBase + +logger = logging.getLogger(__name__) + + +class DelugeTorrentInput(InputBase.find_plugin('file')): + plugin_name = 'torrent_file' + protocols = [] + + can_read_to = None + + def __init__(self, item, torrent_handler, infohash, offset, path): + self.item = item + self.torrent_handler = torrent_handler + self.torrent = torrent_handler.get_torrent(infohash) + self.infohash = infohash + self.offset = offset + self.path = path + self.size, self.filename, self.content_type = self.get_info() + + def get_info(self): + logger.info('Getting info about %r' % (self.path, )) + + content_type = mimetypes.guess_type(self.path)[0] or 'bytes' + + return self.item['size'], os.path.basename(self.path), content_type + + def ensure_exists(self): + if not os.path.exists(self.path): + self.torrent.can_read(self.offset) + + def seek(self, pos): + self.ensure_exists() + super(DelugeTorrentInput, self).seek(pos) + logger.debug('Seeking at %s torrentfile_id %r' % (self.tell(), id(self))) + self.torrent.add_reader(self, self.item.path, self.offset + self.tell(), self.offset + self.size) + + def read(self, num): + self.ensure_exists() + + if not self._open_file: + self.seek(0) + + #logger.debug('Trying to read %s from %i torrentfile_id %r' % (self.path, self.tell(), id(self))) + tell = self.tell() + if self.can_read_to <= tell or self.can_read_to is None: + self.can_read_to = self.torrent.can_read(self.offset + tell) + tell + + real_num = min(num, self.can_read_to - tell) + if num != real_num: + logger.info('The real number we can read to is %s and not %s at position %s' % (real_num, num, tell)) + + if not self._open_file: # the file was closed while we waited + return b'' + + data = super(DelugeTorrentInput, self).read(real_num) + return data + + def close(self): + self.torrent.remove_reader(self) + super(DelugeTorrentInput, self).close()